// FlatRender V2 Node Agent // // Runs on every Windows render node. Registers itself with the V2 render // orchestrator, sends heartbeats, claims render jobs, executes them via // After Effects (or mock), and reports progress / completion back. // // Required environment variables: // NODE_ID — UUID of this node (pre-created in render.render_nodes) // // Optional environment variables (all have sensible defaults): // ORCHESTRATOR_URL — gateway base URL (default: http://localhost:8088) // NODE_HMAC_SECRET — shared secret (default: node-secret-change-me) // NODE_REGION — region label, e.g. "iran-tehran-1" // AE_PATH — path to aerender.exe; empty = mock render // WORK_DIR — scratch directory (default: system temp) // AGENT_VERSION — semver string (default: 0.1.0) // AE_VERSION — AE version string reported to orchestrator (default: 2024) // HEARTBEAT_INTERVAL_SEC — seconds between heartbeats (default: 5) // POLL_INTERVAL_SEC — seconds between job-claim attempts when idle (default: 3) // LISTEN_PORT — port for the health endpoint (default: 7777) package main import ( "context" "encoding/json" "fmt" "log" "net/http" "os" "os/signal" "path/filepath" "runtime" "sync" "sync/atomic" "syscall" "time" "github.com/flatrender/node-agent/internal/client" "github.com/flatrender/node-agent/internal/config" "github.com/flatrender/node-agent/internal/metrics" "github.com/flatrender/node-agent/internal/runner" ) // ── Agent state ─────────────────────────────────────────────────────────────── type Agent struct { cfg *config.Config orch *client.Client mu sync.Mutex currentJob *client.ClaimedJob scanning bool // true while running an AE scan job (shares the AE app) status string // "Ready" | "Busy" } func newAgent(cfg *config.Config) *Agent { return &Agent{ cfg: cfg, orch: client.New(cfg.OrchestratorURL, cfg.NodeHMACSecret), status: "Ready", } } func (a *Agent) setJob(job *client.ClaimedJob) { a.mu.Lock() defer a.mu.Unlock() a.currentJob = job if job != nil { a.status = "Busy" } else { a.status = "Ready" } } func (a *Agent) getStatus() (string, *string) { a.mu.Lock() defer a.mu.Unlock() if a.currentJob != nil { jobID := a.currentJob.JobID return a.status, &jobID } if a.scanning { return "Busy", nil } return a.status, nil } func (a *Agent) setScanning(v bool) { a.mu.Lock() a.scanning = v a.mu.Unlock() } // isBusy reports whether the AE app is in use (rendering or scanning). func (a *Agent) isBusy() bool { a.mu.Lock() defer a.mu.Unlock() return a.currentJob != nil || a.scanning } // ── Main ────────────────────────────────────────────────────────────────────── func main() { log.SetFlags(log.Ldate | log.Ltime | log.Lmicroseconds) log.Printf("FlatRender Node Agent v%s starting (OS: %s, Arch: %s)", "0.1.0", runtime.GOOS, runtime.GOARCH) cfg, err := config.Load() if err != nil { log.Fatalf("config: %v", err) } log.Printf("Node ID: %s | Region: %q | Orchestrator: %s", cfg.NodeID, cfg.Region, cfg.OrchestratorURL) if cfg.AEPath == "" { log.Printf("AE_PATH not set — using mock renderer (development mode)") } else { log.Printf("AE binary: %s", cfg.AEPath) } agent := newAgent(cfg) ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Graceful shutdown on SIGTERM / SIGINT sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT) go func() { s := <-sigs log.Printf("received %s, shutting down…", s) cancel() }() // Register as online if err := agent.registerOnline(ctx); err != nil { log.Fatalf("failed to register with orchestrator: %v", err) } // Start health endpoint go agent.serveHealth(ctx) // Main loops var wg sync.WaitGroup wg.Add(5) go func() { defer wg.Done(); agent.heartbeatLoop(ctx) }() go func() { defer wg.Done(); agent.pollLoop(ctx) }() go func() { defer wg.Done(); agent.fontSyncLoop(ctx) }() go func() { defer wg.Done(); agent.scanLoop(ctx) }() go func() { defer wg.Done(); agent.snapshotLoop(ctx) }() wg.Wait() log.Printf("shutdown complete") } // ── Registration ────────────────────────────────────────────────────────────── func (a *Agent) registerOnline(ctx context.Context) error { req := client.OnlineRequest{ NodeAgentVersion: a.cfg.AgentVersion, CurrentAEVersion: a.cfg.AEVersion, AvailableAEVersions: []string{a.cfg.AEVersion}, CachedTemplateMD5s: []string{}, } if err := a.orch.Online(ctx, a.cfg.NodeID, req); err != nil { return fmt.Errorf("online: %w", err) } log.Printf("registered as online with orchestrator") return nil } // ── Font sync loop ──────────────────────────────────────────────────────────── // Periodically installs any fonts the orchestrator wants on this node and reports // per-font status, so the admin can verify installation. func (a *Agent) fontSyncLoop(ctx context.Context) { ticker := time.NewTicker(60 * time.Second) defer ticker.Stop() a.syncFonts(ctx) // run once on startup for { select { case <-ctx.Done(): return case <-ticker.C: a.syncFonts(ctx) } } } func (a *Agent) syncFonts(ctx context.Context) { fonts, err := a.orch.PendingFonts(ctx, a.cfg.NodeID) if err != nil { return // transient; try again next tick } for _, f := range fonts { name := f.SystemName if name == "" { name = f.Name } if err := runner.InstallFont(ctx, f.FileURL, name); err != nil { log.Printf("font install failed (%s): %v", f.Name, err) _ = a.orch.ReportFont(ctx, a.cfg.NodeID, f.ID, "Failed", err.Error()) continue } log.Printf("font installed: %s", f.Name) _ = a.orch.ReportFont(ctx, a.cfg.NodeID, f.ID, "Installed", "") } } // ── Scan loop ────────────────────────────────────────────────────────────────── // Claims AE scan jobs and runs the template scanner (scan.jsx) via afterfx.exe, // posting the resulting ScanResult JSON back to the orchestrator. Requires the AE // app — skipped entirely in mock/dev (no AE_PATH). func (a *Agent) scanLoop(ctx context.Context) { interval := time.Duration(a.cfg.PollIntervalSec) * time.Second if interval < 5*time.Second { interval = 5 * time.Second } ticker := time.NewTicker(interval) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: a.pollScanOnce(ctx) } } } func (a *Agent) pollScanOnce(ctx context.Context) { if a.cfg.AfterFxPath == "" || a.cfg.AEPath == "" { return // scanning needs the real AE app } if a.isBusy() { return // don't contend with a render (or another scan) for the AE app } claim, err := a.orch.ClaimScan(ctx, a.cfg.NodeID, a.cfg.Region) if err != nil { log.Printf("scan claim error: %v", err) return } if claim == nil { return // nothing queued } a.setScanning(true) defer a.setScanning(false) log.Printf("[scan %s] claimed (project %s)", claim.ScanJobID, claim.ProjectID) // Reuse the template prepare/cache pipeline (download + extract bundle by md5). prepCtx, cancel := context.WithTimeout(ctx, 15*time.Minute) aepPath, perr := runner.PrepareTemplate(prepCtx, claim.AEPDownloadURL, claim.IsBundle, claim.BundleMD5, a.cfg.WorkDir, "scan-"+claim.ScanJobID) cancel() if perr != nil { a.failScan(claim.ScanJobID, "prepare template: "+perr.Error()) return } outPath := filepath.Join(a.cfg.WorkDir, "scans", claim.ScanJobID, "scan.json") scanCtx, cancel2 := context.WithTimeout(ctx, 10*time.Minute) defer cancel2() // Watchdog: if the user cancels the scan server-side, cancel the context → // exec.CommandContext kills the AfterFX process. var cancelled atomic.Bool go func() { t := time.NewTicker(3 * time.Second) defer t.Stop() for { select { case <-scanCtx.Done(): return case <-t.C: st, e := a.orch.ScanStatus(scanCtx, claim.ScanJobID) if e == nil && (st == "cancelled" || st == "error") { log.Printf("[scan %s] cancelled server-side — killing AE", claim.ScanJobID) cancelled.Store(true) cancel2() return } } } }() result, serr := runner.RunScan(scanCtx, a.cfg.AfterFxPath, aepPath, a.cfg.WorkDir, outPath, claim.Mode) if cancelled.Load() { log.Printf("[scan %s] aborted (cancelled)", claim.ScanJobID) return // already cancelled in DB; don't report fail } if serr != nil { a.failScan(claim.ScanJobID, serr.Error()) return } rptCtx, cancel3 := context.WithTimeout(context.Background(), 30*time.Second) defer cancel3() if err := a.orch.ReportScanResult(rptCtx, claim.ScanJobID, result); err != nil { log.Printf("[scan %s] report result error: %v", claim.ScanJobID, err) return } log.Printf("[scan %s] done (%d bytes)", claim.ScanJobID, len(result)) } func (a *Agent) failScan(id, reason string) { log.Printf("[scan %s] failed: %s", id, reason) ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) defer cancel() _ = a.orch.ReportScanFail(ctx, id, reason) } // ── Snapshot loop ───────────────────────────────────────────────────────────── // Claims per-scene snapshot jobs, renders a single frame with AE, extracts a PNG // still and uploads it. Requires the AE app — skipped without AE_PATH. func (a *Agent) snapshotLoop(ctx context.Context) { interval := time.Duration(a.cfg.PollIntervalSec) * time.Second if interval < 5*time.Second { interval = 5 * time.Second } ticker := time.NewTicker(interval) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: a.pollSnapshotOnce(ctx) } } } func (a *Agent) pollSnapshotOnce(ctx context.Context) { if a.cfg.AEPath == "" { return // snapshots need the real AE app } if a.isBusy() { return // don't contend with a render/scan for AE } claim, err := a.orch.ClaimSnapshot(ctx, a.cfg.NodeID, a.cfg.Region) if err != nil { log.Printf("snapshot claim error: %v", err) return } if claim == nil { return // nothing queued } a.setScanning(true) // reuse the scan-busy flag (same AE-exclusive lock) defer a.setScanning(false) log.Printf("[snapshot %s] claimed (scene %s)", claim.SnapshotJobID, claim.SceneKey) prepCtx, cancel := context.WithTimeout(ctx, 15*time.Minute) aepPath, perr := runner.PrepareTemplate(prepCtx, claim.AEPDownloadURL, claim.IsBundle, claim.BundleMD5, a.cfg.WorkDir, "snap-"+claim.SnapshotJobID) cancel() if perr != nil { a.failSnapshot(claim.SnapshotJobID, "prepare template: "+perr.Error()) return } workDir := filepath.Join(a.cfg.WorkDir, "snapshots", claim.SnapshotJobID) renderCtx, cancel2 := context.WithTimeout(ctx, 10*time.Minute) defer cancel2() pngPath, rerr := runner.RunSnapshot(renderCtx, a.cfg.AEPath, aepPath, claim.CompName, claim.Frame, workDir) if rerr != nil { a.failSnapshot(claim.SnapshotJobID, rerr.Error()) return } // Upload the still to the public bucket, then report its permanent URL. upCtx, cancel3 := context.WithTimeout(context.Background(), 2*time.Minute) defer cancel3() up, uerr := a.orch.GetSnapshotUploadURL(upCtx, claim.SnapshotJobID) if uerr != nil { a.failSnapshot(claim.SnapshotJobID, "upload-url: "+uerr.Error()) return } if _, uperr := runner.UploadFile(upCtx, up.UploadURL, pngPath); uperr != nil { a.failSnapshot(claim.SnapshotJobID, "upload: "+uperr.Error()) return } if rperr := a.orch.ReportSnapshotResult(upCtx, claim.SnapshotJobID, up.PublicURL); rperr != nil { log.Printf("[snapshot %s] report result error: %v", claim.SnapshotJobID, rperr) return } log.Printf("[snapshot %s] done → %s", claim.SnapshotJobID, up.PublicURL) } func (a *Agent) failSnapshot(id, reason string) { log.Printf("[snapshot %s] failed: %s", id, reason) ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) defer cancel() _ = a.orch.ReportSnapshotFail(ctx, id, reason) } // ── Heartbeat loop ──────────────────────────────────────────────────────────── func (a *Agent) heartbeatLoop(ctx context.Context) { ticker := time.NewTicker(time.Duration(a.cfg.HeartbeatIntervalSec) * time.Second) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: a.sendHeartbeat(ctx) } } } func (a *Agent) sendHeartbeat(ctx context.Context) { status, jobID := a.getStatus() // Live host metrics (Windows kernel32; stub elsewhere). CPU samples ~300ms. cpu := metrics.CPUPercent(300 * time.Millisecond) _, ramAvail := metrics.Memory() diskPct, diskTotal := metrics.Disk(a.cfg.WorkDir) aeRunning := a.cfg.AEPath != "" && a.isBusy() req := client.HeartbeatRequest{ NodeID: a.cfg.NodeID, Status: status, CurrentJobID: jobID, CPUPct: &cpu, RAMAvailableMB: &ramAvail, DiskUsedPct: &diskPct, DiskTotalGB: &diskTotal, AERunning: &aeRunning, } hbCtx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() resp, err := a.orch.Heartbeat(hbCtx, a.cfg.NodeID, req) if err != nil { log.Printf("heartbeat error: %v", err) return } if len(resp.PendingCommands) > 0 { log.Printf("orchestrator commands: %v", resp.PendingCommands) } } // ── Job poll loop ───────────────────────────────────────────────────────────── func (a *Agent) pollLoop(ctx context.Context) { ticker := time.NewTicker(time.Duration(a.cfg.PollIntervalSec) * time.Second) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: status, _ := a.getStatus() if status == "Busy" { continue // already rendering } a.tryClaimAndRun(ctx) } } } func (a *Agent) tryClaimAndRun(ctx context.Context) { claimCtx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() job, err := a.orch.ClaimJob(claimCtx, a.cfg.NodeID, a.cfg.Region) if err != nil { log.Printf("claim error: %v", err) return } if job == nil { return // queue empty } log.Printf("claimed job %s (project %s, %s %s %dfps)", job.JobID, job.SavedProjectID, job.Quality, job.Resolution, job.FrameRate) a.setJob(job) go func() { defer a.setJob(nil) a.runJob(ctx, job) }() } // ── Render execution ────────────────────────────────────────────────────────── func (a *Agent) runJob(ctx context.Context, job *client.ClaimedJob) { log.Printf("[job %s] starting render", job.JobID) // ── Step 1: Fetch + prepare the .aep template ──────────────────────────── // PrepareTemplate downloads the project file; if it's a .zip bundle it is // extracted and the .aep inside located. Prepared templates are cached by md5 // so repeated renders of the same template skip the download + extraction. aepPath := "" if job.AEPDownloadURL != "" && a.cfg.AEPath != "" { dlCtx, dlCancel := context.WithTimeout(ctx, 15*time.Minute) p, prepErr := runner.PrepareTemplate(dlCtx, job.AEPDownloadURL, job.IsBundle, job.BundleMD5, a.cfg.WorkDir, job.JobID) dlCancel() if prepErr != nil { log.Printf("[job %s] template prepare failed (%v) — falling back to mock", job.JobID, prepErr) } else { kind := "aep" if job.IsBundle { kind = "bundle" } log.Printf("[job %s] template ready (%s) → %s", job.JobID, kind, p) aepPath = p } } binds := make([]runner.Binding, 0, len(job.Bindings)) for _, b := range job.Bindings { binds = append(binds, runner.Binding{Key: b.Key, Type: b.Type, Value: b.Value}) } rJob := &runner.Job{ JobID: job.JobID, SavedProjectID: job.SavedProjectID, Quality: job.Quality, Resolution: job.Resolution, FrameRate: job.FrameRate, HasMusic: job.HasMusic, HasVoiceover: job.HasVoiceover, AEPFilePath: aepPath, CompName: job.CompName, AfterFxPath: a.cfg.AfterFxPath, Bindings: binds, } onProgress := func(ctx context.Context, pct int, msg string) error { log.Printf("[job %s] %d%% %s", job.JobID, pct, msg) // Report to the orchestrator so the UI shows a moving bar + ETA (best-effort, // short timeout — AE pegs CPU so the DB may be slow on a single-box dev setup). pCtx, cancel := context.WithTimeout(ctx, 8*time.Second) defer cancel() if err := a.orch.UpdateProgress(pCtx, job.JobID, pct); err != nil { log.Printf("[job %s] progress push error: %v", job.JobID, err) } return nil } onPreview := func(ctx context.Context, imageB64 string) error { pvCtx, cancel := context.WithTimeout(ctx, 8*time.Second) defer cancel() if err := a.orch.UpdatePreview(pvCtx, job.JobID, imageB64); err != nil { log.Printf("[job %s] preview push error: %v", job.JobID, err) } return nil } // ── Step 2: Render ─────────────────────────────────────────────────────── outputPath, err := runner.Run(ctx, a.cfg.AEPath, a.cfg.WorkDir, rJob, onProgress, onPreview) if err != nil { if ctx.Err() != nil { log.Printf("[job %s] render cancelled", job.JobID) return } log.Printf("[job %s] render failed: %v", job.JobID, err) failCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() if ferr := a.orch.Fail(failCtx, job.JobID, err.Error(), "Rendering"); ferr != nil { log.Printf("[job %s] fail report error: %v", job.JobID, ferr) } return } log.Printf("[job %s] render done → %s", job.JobID, outputPath) // ── Step 3: Get presigned upload URL + upload output to MinIO ───────────── var exportID *string uploadCtx, uploadCancel := context.WithTimeout(context.Background(), 5*time.Minute) defer uploadCancel() // Retry the upload-URL call: right after a CPU-heavy render the orchestrator/DB // can be briefly slow, and dropping a finished render's output is the worst outcome. var uploadInfo *client.OutputUploadURLResponse var urlErr error for attempt := 1; attempt <= 4; attempt++ { uploadInfo, urlErr = a.orch.GetOutputUploadURL(uploadCtx, job.JobID) if urlErr == nil { break } log.Printf("[job %s] get upload URL attempt %d failed: %v", job.JobID, attempt, urlErr) select { case <-uploadCtx.Done(): case <-time.After(time.Duration(attempt*3) * time.Second): } } if urlErr != nil { log.Printf("[job %s] get upload URL failed after retries: %v — completing without export", job.JobID, urlErr) } else { log.Printf("[job %s] uploading output to %s", job.JobID, uploadInfo.ObjectKey) if _, upErr := runner.UploadFile(uploadCtx, uploadInfo.UploadURL, outputPath); upErr != nil { log.Printf("[job %s] upload failed: %v — completing without export", job.JobID, upErr) } else { log.Printf("[job %s] upload complete (export %s)", job.JobID, uploadInfo.ExportID) exportID = &uploadInfo.ExportID } } // ── Step 4: Report complete ─────────────────────────────────────────────── completeCtx, completeCancel := context.WithTimeout(context.Background(), 10*time.Second) defer completeCancel() if err := a.orch.Complete(completeCtx, job.JobID, exportID); err != nil { log.Printf("[job %s] complete report error: %v", job.JobID, err) } else { log.Printf("[job %s] reported as completed (export=%v)", job.JobID, exportID) } } // ── Health endpoint ─────────────────────────────────────────────────────────── func (a *Agent) serveHealth(ctx context.Context) { mux := http.NewServeMux() mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { status, jobID := a.getStatus() w.Header().Set("Content-Type", "application/json") resp := map[string]any{ "ok": true, "node_id": a.cfg.NodeID, "status": status, "current_job": jobID, "version": a.cfg.AgentVersion, } _ = json.NewEncoder(w).Encode(resp) }) srv := &http.Server{ Addr: fmt.Sprintf(":%d", a.cfg.ListenPort), Handler: mux, } go func() { <-ctx.Done() _ = srv.Shutdown(context.Background()) }() log.Printf("health endpoint listening on :%d", a.cfg.ListenPort) if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { log.Printf("health server error: %v", err) } }