// FlatRender V2 Node Agent // // Runs on every Windows render node. Registers itself with the V2 render // orchestrator, sends heartbeats, claims render jobs, executes them via // After Effects (or mock), and reports progress / completion back. // // Required environment variables: // NODE_ID — UUID of this node (pre-created in render.render_nodes) // // Optional environment variables (all have sensible defaults): // ORCHESTRATOR_URL — gateway base URL (default: http://localhost:8088) // NODE_HMAC_SECRET — shared secret (default: node-secret-change-me) // NODE_REGION — region label, e.g. "iran-tehran-1" // AE_PATH — path to aerender.exe; empty = mock render // WORK_DIR — scratch directory (default: system temp) // AGENT_VERSION — semver string (default: 0.1.0) // AE_VERSION — AE version string reported to orchestrator (default: 2024) // HEARTBEAT_INTERVAL_SEC — seconds between heartbeats (default: 5) // POLL_INTERVAL_SEC — seconds between job-claim attempts when idle (default: 3) // LISTEN_PORT — port for the health endpoint (default: 7777) package main import ( "context" "encoding/json" "fmt" "log" "net/http" "os" "os/signal" "path/filepath" "runtime" "sync" "sync/atomic" "syscall" "time" "github.com/flatrender/node-agent/internal/client" "github.com/flatrender/node-agent/internal/config" "github.com/flatrender/node-agent/internal/runner" ) // ── Agent state ─────────────────────────────────────────────────────────────── type Agent struct { cfg *config.Config orch *client.Client mu sync.Mutex currentJob *client.ClaimedJob scanning bool // true while running an AE scan job (shares the AE app) status string // "Ready" | "Busy" } func newAgent(cfg *config.Config) *Agent { return &Agent{ cfg: cfg, orch: client.New(cfg.OrchestratorURL, cfg.NodeHMACSecret), status: "Ready", } } func (a *Agent) setJob(job *client.ClaimedJob) { a.mu.Lock() defer a.mu.Unlock() a.currentJob = job if job != nil { a.status = "Busy" } else { a.status = "Ready" } } func (a *Agent) getStatus() (string, *string) { a.mu.Lock() defer a.mu.Unlock() if a.currentJob != nil { jobID := a.currentJob.JobID return a.status, &jobID } if a.scanning { return "Busy", nil } return a.status, nil } func (a *Agent) setScanning(v bool) { a.mu.Lock() a.scanning = v a.mu.Unlock() } // isBusy reports whether the AE app is in use (rendering or scanning). func (a *Agent) isBusy() bool { a.mu.Lock() defer a.mu.Unlock() return a.currentJob != nil || a.scanning } // ── Main ────────────────────────────────────────────────────────────────────── func main() { log.SetFlags(log.Ldate | log.Ltime | log.Lmicroseconds) log.Printf("FlatRender Node Agent v%s starting (OS: %s, Arch: %s)", "0.1.0", runtime.GOOS, runtime.GOARCH) cfg, err := config.Load() if err != nil { log.Fatalf("config: %v", err) } log.Printf("Node ID: %s | Region: %q | Orchestrator: %s", cfg.NodeID, cfg.Region, cfg.OrchestratorURL) if cfg.AEPath == "" { log.Printf("AE_PATH not set — using mock renderer (development mode)") } else { log.Printf("AE binary: %s", cfg.AEPath) } agent := newAgent(cfg) ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Graceful shutdown on SIGTERM / SIGINT sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT) go func() { s := <-sigs log.Printf("received %s, shutting down…", s) cancel() }() // Register as online if err := agent.registerOnline(ctx); err != nil { log.Fatalf("failed to register with orchestrator: %v", err) } // Start health endpoint go agent.serveHealth(ctx) // Main loops var wg sync.WaitGroup wg.Add(4) go func() { defer wg.Done(); agent.heartbeatLoop(ctx) }() go func() { defer wg.Done(); agent.pollLoop(ctx) }() go func() { defer wg.Done(); agent.fontSyncLoop(ctx) }() go func() { defer wg.Done(); agent.scanLoop(ctx) }() wg.Wait() log.Printf("shutdown complete") } // ── Registration ────────────────────────────────────────────────────────────── func (a *Agent) registerOnline(ctx context.Context) error { req := client.OnlineRequest{ NodeAgentVersion: a.cfg.AgentVersion, CurrentAEVersion: a.cfg.AEVersion, AvailableAEVersions: []string{a.cfg.AEVersion}, CachedTemplateMD5s: []string{}, } if err := a.orch.Online(ctx, a.cfg.NodeID, req); err != nil { return fmt.Errorf("online: %w", err) } log.Printf("registered as online with orchestrator") return nil } // ── Font sync loop ──────────────────────────────────────────────────────────── // Periodically installs any fonts the orchestrator wants on this node and reports // per-font status, so the admin can verify installation. func (a *Agent) fontSyncLoop(ctx context.Context) { ticker := time.NewTicker(60 * time.Second) defer ticker.Stop() a.syncFonts(ctx) // run once on startup for { select { case <-ctx.Done(): return case <-ticker.C: a.syncFonts(ctx) } } } func (a *Agent) syncFonts(ctx context.Context) { fonts, err := a.orch.PendingFonts(ctx, a.cfg.NodeID) if err != nil { return // transient; try again next tick } for _, f := range fonts { name := f.SystemName if name == "" { name = f.Name } if err := runner.InstallFont(ctx, f.FileURL, name); err != nil { log.Printf("font install failed (%s): %v", f.Name, err) _ = a.orch.ReportFont(ctx, a.cfg.NodeID, f.ID, "Failed", err.Error()) continue } log.Printf("font installed: %s", f.Name) _ = a.orch.ReportFont(ctx, a.cfg.NodeID, f.ID, "Installed", "") } } // ── Scan loop ────────────────────────────────────────────────────────────────── // Claims AE scan jobs and runs the template scanner (scan.jsx) via afterfx.exe, // posting the resulting ScanResult JSON back to the orchestrator. Requires the AE // app — skipped entirely in mock/dev (no AE_PATH). func (a *Agent) scanLoop(ctx context.Context) { interval := time.Duration(a.cfg.PollIntervalSec) * time.Second if interval < 5*time.Second { interval = 5 * time.Second } ticker := time.NewTicker(interval) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: a.pollScanOnce(ctx) } } } func (a *Agent) pollScanOnce(ctx context.Context) { if a.cfg.AfterFxPath == "" || a.cfg.AEPath == "" { return // scanning needs the real AE app } if a.isBusy() { return // don't contend with a render (or another scan) for the AE app } claim, err := a.orch.ClaimScan(ctx, a.cfg.NodeID, a.cfg.Region) if err != nil { log.Printf("scan claim error: %v", err) return } if claim == nil { return // nothing queued } a.setScanning(true) defer a.setScanning(false) log.Printf("[scan %s] claimed (project %s)", claim.ScanJobID, claim.ProjectID) // Reuse the template prepare/cache pipeline (download + extract bundle by md5). prepCtx, cancel := context.WithTimeout(ctx, 15*time.Minute) aepPath, perr := runner.PrepareTemplate(prepCtx, claim.AEPDownloadURL, claim.IsBundle, claim.BundleMD5, a.cfg.WorkDir, "scan-"+claim.ScanJobID) cancel() if perr != nil { a.failScan(claim.ScanJobID, "prepare template: "+perr.Error()) return } outPath := filepath.Join(a.cfg.WorkDir, "scans", claim.ScanJobID, "scan.json") scanCtx, cancel2 := context.WithTimeout(ctx, 10*time.Minute) defer cancel2() // Watchdog: if the user cancels the scan server-side, cancel the context → // exec.CommandContext kills the AfterFX process. var cancelled atomic.Bool go func() { t := time.NewTicker(3 * time.Second) defer t.Stop() for { select { case <-scanCtx.Done(): return case <-t.C: st, e := a.orch.ScanStatus(scanCtx, claim.ScanJobID) if e == nil && (st == "cancelled" || st == "error") { log.Printf("[scan %s] cancelled server-side — killing AE", claim.ScanJobID) cancelled.Store(true) cancel2() return } } } }() result, serr := runner.RunScan(scanCtx, a.cfg.AfterFxPath, aepPath, a.cfg.WorkDir, outPath, claim.Mode) if cancelled.Load() { log.Printf("[scan %s] aborted (cancelled)", claim.ScanJobID) return // already cancelled in DB; don't report fail } if serr != nil { a.failScan(claim.ScanJobID, serr.Error()) return } rptCtx, cancel3 := context.WithTimeout(context.Background(), 30*time.Second) defer cancel3() if err := a.orch.ReportScanResult(rptCtx, claim.ScanJobID, result); err != nil { log.Printf("[scan %s] report result error: %v", claim.ScanJobID, err) return } log.Printf("[scan %s] done (%d bytes)", claim.ScanJobID, len(result)) } func (a *Agent) failScan(id, reason string) { log.Printf("[scan %s] failed: %s", id, reason) ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) defer cancel() _ = a.orch.ReportScanFail(ctx, id, reason) } // ── Heartbeat loop ──────────────────────────────────────────────────────────── func (a *Agent) heartbeatLoop(ctx context.Context) { ticker := time.NewTicker(time.Duration(a.cfg.HeartbeatIntervalSec) * time.Second) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: a.sendHeartbeat(ctx) } } } func (a *Agent) sendHeartbeat(ctx context.Context) { status, jobID := a.getStatus() req := client.HeartbeatRequest{ NodeID: a.cfg.NodeID, Status: status, CurrentJobID: jobID, } hbCtx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() resp, err := a.orch.Heartbeat(hbCtx, a.cfg.NodeID, req) if err != nil { log.Printf("heartbeat error: %v", err) return } if len(resp.PendingCommands) > 0 { log.Printf("orchestrator commands: %v", resp.PendingCommands) } } // ── Job poll loop ───────────────────────────────────────────────────────────── func (a *Agent) pollLoop(ctx context.Context) { ticker := time.NewTicker(time.Duration(a.cfg.PollIntervalSec) * time.Second) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: status, _ := a.getStatus() if status == "Busy" { continue // already rendering } a.tryClaimAndRun(ctx) } } } func (a *Agent) tryClaimAndRun(ctx context.Context) { claimCtx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() job, err := a.orch.ClaimJob(claimCtx, a.cfg.NodeID, a.cfg.Region) if err != nil { log.Printf("claim error: %v", err) return } if job == nil { return // queue empty } log.Printf("claimed job %s (project %s, %s %s %dfps)", job.JobID, job.SavedProjectID, job.Quality, job.Resolution, job.FrameRate) a.setJob(job) go func() { defer a.setJob(nil) a.runJob(ctx, job) }() } // ── Render execution ────────────────────────────────────────────────────────── func (a *Agent) runJob(ctx context.Context, job *client.ClaimedJob) { log.Printf("[job %s] starting render", job.JobID) // ── Step 1: Fetch + prepare the .aep template ──────────────────────────── // PrepareTemplate downloads the project file; if it's a .zip bundle it is // extracted and the .aep inside located. Prepared templates are cached by md5 // so repeated renders of the same template skip the download + extraction. aepPath := "" if job.AEPDownloadURL != "" && a.cfg.AEPath != "" { dlCtx, dlCancel := context.WithTimeout(ctx, 15*time.Minute) p, prepErr := runner.PrepareTemplate(dlCtx, job.AEPDownloadURL, job.IsBundle, job.BundleMD5, a.cfg.WorkDir, job.JobID) dlCancel() if prepErr != nil { log.Printf("[job %s] template prepare failed (%v) — falling back to mock", job.JobID, prepErr) } else { kind := "aep" if job.IsBundle { kind = "bundle" } log.Printf("[job %s] template ready (%s) → %s", job.JobID, kind, p) aepPath = p } } rJob := &runner.Job{ JobID: job.JobID, SavedProjectID: job.SavedProjectID, Quality: job.Quality, Resolution: job.Resolution, FrameRate: job.FrameRate, HasMusic: job.HasMusic, HasVoiceover: job.HasVoiceover, AEPFilePath: aepPath, } onProgress := func(ctx context.Context, pct int, msg string) error { log.Printf("[job %s] %d%% %s", job.JobID, pct, msg) return nil } onPreview := func(ctx context.Context, imageB64 string) error { pvCtx, cancel := context.WithTimeout(ctx, 8*time.Second) defer cancel() if err := a.orch.UpdatePreview(pvCtx, job.JobID, imageB64); err != nil { log.Printf("[job %s] preview push error: %v", job.JobID, err) } return nil } // ── Step 2: Render ─────────────────────────────────────────────────────── outputPath, err := runner.Run(ctx, a.cfg.AEPath, a.cfg.WorkDir, rJob, onProgress, onPreview) if err != nil { if ctx.Err() != nil { log.Printf("[job %s] render cancelled", job.JobID) return } log.Printf("[job %s] render failed: %v", job.JobID, err) failCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() if ferr := a.orch.Fail(failCtx, job.JobID, err.Error(), "Rendering"); ferr != nil { log.Printf("[job %s] fail report error: %v", job.JobID, ferr) } return } log.Printf("[job %s] render done → %s", job.JobID, outputPath) // ── Step 3: Get presigned upload URL + upload output to MinIO ───────────── var exportID *string uploadCtx, uploadCancel := context.WithTimeout(context.Background(), 5*time.Minute) defer uploadCancel() uploadInfo, urlErr := a.orch.GetOutputUploadURL(uploadCtx, job.JobID) if urlErr != nil { log.Printf("[job %s] get upload URL failed: %v — completing without export", job.JobID, urlErr) } else { log.Printf("[job %s] uploading output to %s", job.JobID, uploadInfo.ObjectKey) if _, upErr := runner.UploadFile(uploadCtx, uploadInfo.UploadURL, outputPath); upErr != nil { log.Printf("[job %s] upload failed: %v — completing without export", job.JobID, upErr) } else { log.Printf("[job %s] upload complete (export %s)", job.JobID, uploadInfo.ExportID) exportID = &uploadInfo.ExportID } } // ── Step 4: Report complete ─────────────────────────────────────────────── completeCtx, completeCancel := context.WithTimeout(context.Background(), 10*time.Second) defer completeCancel() if err := a.orch.Complete(completeCtx, job.JobID, exportID); err != nil { log.Printf("[job %s] complete report error: %v", job.JobID, err) } else { log.Printf("[job %s] reported as completed (export=%v)", job.JobID, exportID) } } // ── Health endpoint ─────────────────────────────────────────────────────────── func (a *Agent) serveHealth(ctx context.Context) { mux := http.NewServeMux() mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { status, jobID := a.getStatus() w.Header().Set("Content-Type", "application/json") resp := map[string]any{ "ok": true, "node_id": a.cfg.NodeID, "status": status, "current_job": jobID, "version": a.cfg.AgentVersion, } _ = json.NewEncoder(w).Encode(resp) }) srv := &http.Server{ Addr: fmt.Sprintf(":%d", a.cfg.ListenPort), Handler: mux, } go func() { <-ctx.Done() _ = srv.Shutdown(context.Background()) }() log.Printf("health endpoint listening on :%d", a.cfg.ListenPort) if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { log.Printf("health server error: %v", err) } }