Files
edubox/agent/websocket.go
T

474 lines
15 KiB
Go

package main
import (
"fmt"
"log"
"net/http"
"sync"
"time"
"github.com/gorilla/websocket"
)
type WSMessage struct {
Action string `json:"action"`
NodeID string `json:"nodeId,omitempty"`
Code string `json:"code,omitempty"`
InstanceID string `json:"instanceId,omitempty"`
Type string `json:"type,omitempty"`
Port int `json:"port,omitempty"`
ComposeConfig string `json:"composeConfig,omitempty"`
InitScript string `json:"initScript,omitempty"`
StudentId string `json:"studentId,omitempty"`
StudentName string `json:"studentName,omitempty"`
Error string `json:"error,omitempty"`
TailscaleIP string `json:"tailscaleIp,omitempty"`
HeadscaleURL string `json:"headscaleUrl,omitempty"`
HeadscaleAuthKey string `json:"headscaleAuthKey,omitempty"`
Token string `json:"token,omitempty"`
}
var (
mainConn *websocket.Conn
mainConnMu sync.Mutex
)
// headscale config received from the server during activation.
// These are mutable because activation may happen after the agent starts.
var (
currentHeadscaleURL string
currentHeadscaleAuthKey string
headscaleConfigMu sync.Mutex
)
func setHeadscaleConfig(url, authKey string) {
headscaleConfigMu.Lock()
currentHeadscaleURL = url
currentHeadscaleAuthKey = authKey
headscaleConfigMu.Unlock()
}
func getHeadscaleConfig() (string, string) {
headscaleConfigMu.Lock()
defer headscaleConfigMu.Unlock()
return currentHeadscaleURL, currentHeadscaleAuthKey
}
func sendMessage(msg WSMessage) error {
mainConnMu.Lock()
defer mainConnMu.Unlock()
if mainConn == nil {
return fmt.Errorf("not connected to server")
}
if msg.Action != "heartbeat" {
log.Printf("sendMessage: sending %+v", msg)
}
return mainConn.WriteJSON(msg)
}
// UI notifier system: broadcast activation results to all connected UI clients
type uiNotifier func(msg map[string]interface{})
var (
uiNotifiers = make(map[int]uiNotifier)
uiNotifiersMu sync.Mutex
uiNotifierID int
)
func registerUINotifier(fn uiNotifier) int {
uiNotifiersMu.Lock()
defer uiNotifiersMu.Unlock()
id := uiNotifierID
uiNotifierID++
uiNotifiers[id] = fn
log.Printf("registerUINotifier: registered ID %d (total: %d)", id, len(uiNotifiers))
return id
}
func unregisterUINotifier(id int) {
uiNotifiersMu.Lock()
defer uiNotifiersMu.Unlock()
delete(uiNotifiers, id)
log.Printf("unregisterUINotifier: removed ID %d (total: %d)", id, len(uiNotifiers))
}
func notifyUI(msg map[string]interface{}) {
uiNotifiersMu.Lock()
notifiers := make([]uiNotifier, 0, len(uiNotifiers))
for _, fn := range uiNotifiers {
notifiers = append(notifiers, fn)
}
uiNotifiersMu.Unlock()
log.Printf("notifyUI: broadcasting to %d UI clients", len(notifiers))
for _, fn := range notifiers {
go func(notify uiNotifier) {
defer func() {
if r := recover(); r != nil {
log.Printf("PANIC in notifyUI goroutine: %v", r)
}
}()
notify(msg)
}(fn)
}
}
func startWebSocket(serverAddr, nodeID, dataDir, headscaleURL, headscaleAuthKey string) {
setHeadscaleConfig(headscaleURL, headscaleAuthKey)
for {
token, _ := loadNodeToken(dataDir)
headers := http.Header{}
if token != "" {
headers.Set("Authorization", "Bearer "+token)
}
conn, _, err := websocket.DefaultDialer.Dial(serverAddr, headers)
if err != nil {
log.Printf("WS connect error: %v, retrying in 5s...", err)
time.Sleep(5 * time.Second)
continue
}
log.Printf("WS connected to %s (token=%v)", serverAddr, token != "")
mainConnMu.Lock()
mainConn = conn
mainConnMu.Unlock()
// Register
if err := conn.WriteJSON(WSMessage{Action: "register", NodeID: nodeID}); err != nil {
log.Printf("WS register error: %v", err)
conn.Close()
mainConnMu.Lock()
mainConn = nil
mainConnMu.Unlock()
continue
}
// Activation flow
act, err := loadActivation(dataDir)
if err != nil || !act.Activated {
log.Println("Waiting for activation...")
} else {
log.Printf("Already activated as %s", act.StudentName)
// If already activated, ensure VPN is up. The pre-auth key is
// one-time only, so on restart we rely on the persisted tailscaled
// state; tailscale up without an authkey reuses existing state.
hsURL, hsKey := getHeadscaleConfig()
if hsURL != "" {
go startTailscaleAndReport(dataDir, nodeID, hsURL, hsKey)
}
}
// Heartbeat goroutine
done := make(chan struct{})
go func() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := sendMessage(WSMessage{Action: "heartbeat", NodeID: nodeID}); err != nil {
return
}
case <-done:
return
}
}
}()
// Read loop
for {
var msg WSMessage
if err := conn.ReadJSON(&msg); err != nil {
log.Printf("WS read error: %v", err)
break
}
log.Printf("WS received from server: action=%s", msg.Action)
handleMessage(conn, msg, dataDir, nodeID)
}
close(done)
conn.Close()
mainConnMu.Lock()
mainConn = nil
mainConnMu.Unlock()
log.Println("WS disconnected, reconnecting in 5s...")
time.Sleep(5 * time.Second)
}
}
func handleMessage(conn *websocket.Conn, msg WSMessage, dataDir, nodeID string) {
defer func() {
if r := recover(); r != nil {
log.Printf("PANIC in handleMessage (action=%s): %v", msg.Action, r)
}
}()
switch msg.Action {
case "set_token":
if msg.Token != "" {
if err := saveNodeToken(dataDir, msg.Token); err != nil {
log.Printf("saveNodeToken error: %v", err)
} else {
log.Printf("Node token saved")
}
}
case "activated":
log.Printf("handleMessage: activated received, student=%s", msg.StudentName)
if msg.Token != "" {
if err := saveNodeToken(dataDir, msg.Token); err != nil {
log.Printf("saveNodeToken error: %v", err)
} else {
log.Printf("Node token saved on activation")
}
}
if msg.StudentName != "" {
act := &Activation{Activated: true, StudentId: msg.StudentId, StudentName: msg.StudentName, Code: msg.Code}
if err := saveActivation(dataDir, act); err != nil {
log.Printf("saveActivation error: %v", err)
} else {
log.Printf("Activated as %s", act.StudentName)
}
}
// The server sends Headscale credentials on activation.
// The pre-auth key is ephemeral and must be used immediately;
// it is intentionally NOT persisted to the config file.
if msg.HeadscaleURL != "" && msg.HeadscaleAuthKey != "" {
setHeadscaleConfig(msg.HeadscaleURL, msg.HeadscaleAuthKey)
cfg, _, err := loadOrCreateConfig(dataDir)
if err != nil {
log.Printf("loadOrCreateConfig error: %v", err)
} else {
cfg.HeadscaleURL = msg.HeadscaleURL
// Intentionally do not save HeadscaleAuthKey: it is one-time only.
if err := saveConfig(dataDir, cfg); err != nil {
log.Printf("saveConfig error: %v", err)
} else {
log.Printf("Saved Headscale URL received from server (auth key not persisted)")
}
}
go startTailscaleAndReport(dataDir, nodeID, msg.HeadscaleURL, msg.HeadscaleAuthKey)
}
notifyUI(map[string]interface{}{
"action": "activated",
"studentName": msg.StudentName,
})
case "registered":
// Server acknowledged our register message; nothing to do.
return
case "start_vpn":
log.Printf("Server requested VPN start")
hsURL, hsKey := getHeadscaleConfig()
if hsURL == "" || hsKey == "" {
log.Printf("Cannot start VPN: headscale config missing")
go sendMessage(WSMessage{Action: "vpn_error", NodeID: nodeID, Error: "headscale config missing"})
return
}
go func() {
defer func() {
if r := recover(); r != nil {
log.Printf("PANIC in start_vpn goroutine: %v", r)
}
}()
ip, err := startTailscale(dataDir, nodeID, hsURL, hsKey)
if err != nil {
log.Printf("start_vpn error: %v", err)
sendMessage(WSMessage{Action: "vpn_error", NodeID: nodeID, Error: err.Error()})
broadcastUI(map[string]interface{}{
"action": "status",
"status": buildUIStatus(dataDir),
})
return
}
for {
if err := sendMessage(WSMessage{Action: "tailscale_ip", NodeID: nodeID, TailscaleIP: ip}); err != nil {
log.Printf("Waiting for WebSocket to send tailscale_ip...")
time.Sleep(1 * time.Second)
continue
}
log.Printf("Sent tailscale_ip to server: %s", ip)
break
}
broadcastUI(map[string]interface{}{
"action": "status",
"status": buildUIStatus(dataDir),
})
}()
case "stop_vpn":
log.Printf("Server requested VPN stop")
stopTailscale()
go func() {
defer func() {
if r := recover(); r != nil {
log.Printf("PANIC in stop_vpn goroutine: %v", r)
}
}()
sendMessage(WSMessage{Action: "vpn_stopped", NodeID: nodeID})
}()
case "activation_failed":
log.Printf("handleMessage: activation_failed received, error=%s", msg.Error)
notifyUI(map[string]interface{}{
"action": "activation_failed",
"error": msg.Error,
})
case "start":
log.Printf("Start instance %s on port %d", msg.InstanceID, msg.Port)
go func() {
defer func() {
if r := recover(); r != nil {
log.Printf("PANIC in start goroutine instance=%s: %v", msg.InstanceID, r)
}
}()
handleStartInstance(dataDir, nodeID, msg.InstanceID, msg.Type, msg.ComposeConfig, msg.InitScript, msg.Port)
}()
case "stop":
log.Printf("Stop instance %s", msg.InstanceID)
if inst, _ := loadInstances(dataDir); inst[msg.InstanceID] != nil {
removeTailscaleServe(inst[msg.InstanceID].Port)
}
if err := dockerComposeStop(dataDir, msg.InstanceID); err != nil {
log.Printf("dockerComposeStop error: %v", err)
}
if inst, _ := loadInstances(dataDir); inst[msg.InstanceID] != nil {
inst[msg.InstanceID].Status = "stopped"
_ = saveInstances(dataDir, inst)
}
go sendMessage(WSMessage{Action: "instance_stopped", InstanceID: msg.InstanceID})
notifyUI(map[string]interface{}{"action": "instances_updated"})
case "delete":
log.Printf("Delete instance %s", msg.InstanceID)
if inst, _ := loadInstances(dataDir); inst[msg.InstanceID] != nil {
removeTailscaleServe(inst[msg.InstanceID].Port)
}
dockerComposeRm(dataDir, msg.InstanceID)
removeInstance(dataDir, msg.InstanceID)
go sendMessage(WSMessage{Action: "instance_deleted", InstanceID: msg.InstanceID})
notifyUI(map[string]interface{}{"action": "instances_updated"})
case "reset":
log.Printf("Reset instance %s", msg.InstanceID)
dockerComposeRm(dataDir, msg.InstanceID)
go func() {
defer func() {
if r := recover(); r != nil {
log.Printf("PANIC in reset goroutine instance=%s: %v", msg.InstanceID, r)
}
}()
handleStartInstance(dataDir, nodeID, msg.InstanceID, msg.Type, msg.ComposeConfig, msg.InitScript, msg.Port)
}()
default:
log.Printf("Unknown action: %s", msg.Action)
}
}
func handleStartInstance(dataDir, nodeID, instanceID, instanceType, composeConfig, initScript string, port int) {
defer func() {
if r := recover(); r != nil {
log.Printf("PANIC in handleStartInstance instance=%s: %v", instanceID, r)
_ = upsertInstance(dataDir, &InstanceInfo{ID: instanceID, TemplateName: instanceType, Port: port, Status: "error"})
sendMessage(WSMessage{Action: "instance_error", InstanceID: instanceID, Error: fmt.Sprintf("internal panic: %v", r)})
sendInstanceProgress(instanceID, "start", "0", "Erreur interne")
notifyUI(map[string]interface{}{"action": "instances_updated"})
}
}()
log.Printf("handleStartInstance begin: instance=%s type=%s port=%d dataDir=%s initScriptLen=%d", instanceID, instanceType, port, dataDir, len(initScript))
notifyInstanceProgress := func(percent, message string) {
sendInstanceProgress(instanceID, "start", percent, message)
}
_ = upsertInstance(dataDir, &InstanceInfo{
ID: instanceID,
TemplateName: instanceType,
Port: port,
Status: "starting",
})
notifyUI(map[string]interface{}{"action": "instances_updated"})
notifyInstanceProgress("10", "Préparation de l'application...")
if err := writeCompose(dataDir, instanceID, composeConfig, port); err != nil {
log.Printf("writeCompose error: %v", err)
_ = upsertInstance(dataDir, &InstanceInfo{ID: instanceID, TemplateName: instanceType, Port: port, Status: "error"})
sendMessage(WSMessage{Action: "instance_error", InstanceID: instanceID, Error: err.Error()})
notifyInstanceProgress("0", "Erreur de préparation")
notifyUI(map[string]interface{}{"action": "instances_updated"})
return
}
if initScript != "" {
if err := writeInitScript(dataDir, instanceID, initScript); err != nil {
log.Printf("writeInitScript error: %v", err)
}
}
notifyInstanceProgress("30", "Configuration de l'application...")
if err := dockerComposeUp(dataDir, instanceID); err != nil {
log.Printf("dockerComposeUp error: %v", err)
_ = upsertInstance(dataDir, &InstanceInfo{ID: instanceID, TemplateName: instanceType, Port: port, Status: "error"})
sendMessage(WSMessage{Action: "instance_error", InstanceID: instanceID, Error: err.Error()})
notifyInstanceProgress("0", "Erreur de démarrage")
notifyUI(map[string]interface{}{"action": "instances_updated"})
return
}
notifyInstanceProgress("60", "Application en cours de démarrage...")
ensureTailscale(dataDir, nodeID, port)
if err := setupTailscaleServe(port); err != nil {
log.Printf("setupTailscaleServe error: %v", err)
// Non-fatal: the instance may still work on Linux or if Windows
// userspace forwarding happens to function.
}
notifyInstanceProgress("80", "Connexion sécurisée active...")
// Repair older WordPress instances: remove hardcoded WP_HOME/WP_SITEURL
// so the studioE5 mu-plugin can compute the public URL from the Host header.
time.Sleep(2 * time.Second)
if err := stripWordPressHardcodedURLs(dataDir, instanceID); err != nil {
log.Printf("stripWordPressHardcodedURLs error: %v", err)
}
notifyInstanceProgress("90", "Finalisation de l'installation...")
status := getInstanceStatus(dataDir, instanceID)
_ = upsertInstance(dataDir, &InstanceInfo{ID: instanceID, TemplateName: instanceType, Port: port, Status: status})
sendMessage(WSMessage{Action: "instance_started", InstanceID: instanceID, Port: port})
notifyInstanceProgress("100", "Application prête")
notifyUI(map[string]interface{}{"action": "instances_updated"})
}
func ensureTailscale(dataDir, nodeID string, port int) {
hsURL, hsKey := getHeadscaleConfig()
if hsURL == "" || hsKey == "" {
log.Printf("Cannot ensure Tailscale: headscale config missing")
return
}
if isTailscaleRunning() {
return
}
log.Printf("Tailscale not running, starting it for instance port %d", port)
ip, err := startTailscale(dataDir, nodeID, hsURL, hsKey)
if err != nil {
log.Printf("ensureTailscale start error: %v", err)
broadcastUI(map[string]interface{}{
"action": "status",
"status": buildUIStatus(dataDir),
})
return
}
for {
if err := sendMessage(WSMessage{Action: "tailscale_ip", NodeID: nodeID, TailscaleIP: ip}); err != nil {
log.Printf("Waiting for WebSocket to send tailscale_ip...")
time.Sleep(1 * time.Second)
continue
}
log.Printf("Sent tailscale_ip to server: %s", ip)
break
}
broadcastUI(map[string]interface{}{
"action": "status",
"status": buildUIStatus(dataDir),
})
}