6cf8716d7e
Build backend images / build content-svc (push) Failing after 13s
Build backend images / build file-svc (push) Failing after 53s
Build backend images / build gateway (push) Failing after 1m22s
Build backend images / build identity-svc (push) Failing after 19s
Build backend images / build notification-svc (push) Failing after 21s
Build backend images / build render-svc (push) Failing after 20s
Build backend images / build studio-svc (push) Failing after 1m6s
C2 — real-AE scene snapshots on the node:
- node-agent: runner/snapshot.go RunSnapshot (aerender -comp <key> -s f -e f
→ findRenderedOutput → ffmpeg -frames:v 1 PNG); client ClaimSnapshot /
GetSnapshotUploadURL / ReportSnapshotResult / ReportSnapshotFail; snapshotLoop +
pollSnapshotOnce mirroring the scan loop (reuses the AE-exclusive lock).
- render-svc: GetSnapshotJobMeta + UploadURL handler presigns a PUT to the
public-read user-uploads bucket at snapshots/{project}/{scene}.png and returns a
permanent public_url (not the time-limited export presign); MINIO_UPLOAD_BUCKET +
MINIO_PUBLIC_URL config + compose env + /snapshot/:id/upload-url route.
Epic B — bind edited colours into the render:
- render-svc GetRenderBindings UNIONs studio.saved_shared_colors +
saved_scene_colors (type 'color') so the node writes them before render.
- node-agent binder.go routes type:"color" bindings into the bind-spec colors[]
array that bind.jsx already applies to the frshare colour layers.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
644 lines
21 KiB
Go
644 lines
21 KiB
Go
// FlatRender V2 Node Agent
|
|
//
|
|
// Runs on every Windows render node. Registers itself with the V2 render
|
|
// orchestrator, sends heartbeats, claims render jobs, executes them via
|
|
// After Effects (or mock), and reports progress / completion back.
|
|
//
|
|
// Required environment variables:
|
|
// NODE_ID — UUID of this node (pre-created in render.render_nodes)
|
|
//
|
|
// Optional environment variables (all have sensible defaults):
|
|
// ORCHESTRATOR_URL — gateway base URL (default: http://localhost:8088)
|
|
// NODE_HMAC_SECRET — shared secret (default: node-secret-change-me)
|
|
// NODE_REGION — region label, e.g. "iran-tehran-1"
|
|
// AE_PATH — path to aerender.exe; empty = mock render
|
|
// WORK_DIR — scratch directory (default: system temp)
|
|
// AGENT_VERSION — semver string (default: 0.1.0)
|
|
// AE_VERSION — AE version string reported to orchestrator (default: 2024)
|
|
// HEARTBEAT_INTERVAL_SEC — seconds between heartbeats (default: 5)
|
|
// POLL_INTERVAL_SEC — seconds between job-claim attempts when idle (default: 3)
|
|
// LISTEN_PORT — port for the health endpoint (default: 7777)
|
|
package main
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log"
|
|
"net/http"
|
|
"os"
|
|
"os/signal"
|
|
"path/filepath"
|
|
"runtime"
|
|
"sync"
|
|
"sync/atomic"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/flatrender/node-agent/internal/client"
|
|
"github.com/flatrender/node-agent/internal/config"
|
|
"github.com/flatrender/node-agent/internal/metrics"
|
|
"github.com/flatrender/node-agent/internal/runner"
|
|
)
|
|
|
|
// ── Agent state ───────────────────────────────────────────────────────────────
|
|
|
|
type Agent struct {
|
|
cfg *config.Config
|
|
orch *client.Client
|
|
mu sync.Mutex
|
|
currentJob *client.ClaimedJob
|
|
scanning bool // true while running an AE scan job (shares the AE app)
|
|
status string // "Ready" | "Busy"
|
|
}
|
|
|
|
func newAgent(cfg *config.Config) *Agent {
|
|
return &Agent{
|
|
cfg: cfg,
|
|
orch: client.New(cfg.OrchestratorURL, cfg.NodeHMACSecret),
|
|
status: "Ready",
|
|
}
|
|
}
|
|
|
|
func (a *Agent) setJob(job *client.ClaimedJob) {
|
|
a.mu.Lock()
|
|
defer a.mu.Unlock()
|
|
a.currentJob = job
|
|
if job != nil {
|
|
a.status = "Busy"
|
|
} else {
|
|
a.status = "Ready"
|
|
}
|
|
}
|
|
|
|
func (a *Agent) getStatus() (string, *string) {
|
|
a.mu.Lock()
|
|
defer a.mu.Unlock()
|
|
if a.currentJob != nil {
|
|
jobID := a.currentJob.JobID
|
|
return a.status, &jobID
|
|
}
|
|
if a.scanning {
|
|
return "Busy", nil
|
|
}
|
|
return a.status, nil
|
|
}
|
|
|
|
func (a *Agent) setScanning(v bool) {
|
|
a.mu.Lock()
|
|
a.scanning = v
|
|
a.mu.Unlock()
|
|
}
|
|
|
|
// isBusy reports whether the AE app is in use (rendering or scanning).
|
|
func (a *Agent) isBusy() bool {
|
|
a.mu.Lock()
|
|
defer a.mu.Unlock()
|
|
return a.currentJob != nil || a.scanning
|
|
}
|
|
|
|
// ── Main ──────────────────────────────────────────────────────────────────────
|
|
|
|
func main() {
|
|
log.SetFlags(log.Ldate | log.Ltime | log.Lmicroseconds)
|
|
log.Printf("FlatRender Node Agent v%s starting (OS: %s, Arch: %s)",
|
|
"0.1.0", runtime.GOOS, runtime.GOARCH)
|
|
|
|
cfg, err := config.Load()
|
|
if err != nil {
|
|
log.Fatalf("config: %v", err)
|
|
}
|
|
log.Printf("Node ID: %s | Region: %q | Orchestrator: %s",
|
|
cfg.NodeID, cfg.Region, cfg.OrchestratorURL)
|
|
if cfg.AEPath == "" {
|
|
log.Printf("AE_PATH not set — using mock renderer (development mode)")
|
|
} else {
|
|
log.Printf("AE binary: %s", cfg.AEPath)
|
|
}
|
|
|
|
agent := newAgent(cfg)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
// Graceful shutdown on SIGTERM / SIGINT
|
|
sigs := make(chan os.Signal, 1)
|
|
signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT)
|
|
go func() {
|
|
s := <-sigs
|
|
log.Printf("received %s, shutting down…", s)
|
|
cancel()
|
|
}()
|
|
|
|
// Register as online
|
|
if err := agent.registerOnline(ctx); err != nil {
|
|
log.Fatalf("failed to register with orchestrator: %v", err)
|
|
}
|
|
|
|
// Start health endpoint
|
|
go agent.serveHealth(ctx)
|
|
|
|
// Main loops
|
|
var wg sync.WaitGroup
|
|
wg.Add(5)
|
|
go func() { defer wg.Done(); agent.heartbeatLoop(ctx) }()
|
|
go func() { defer wg.Done(); agent.pollLoop(ctx) }()
|
|
go func() { defer wg.Done(); agent.fontSyncLoop(ctx) }()
|
|
go func() { defer wg.Done(); agent.scanLoop(ctx) }()
|
|
go func() { defer wg.Done(); agent.snapshotLoop(ctx) }()
|
|
wg.Wait()
|
|
log.Printf("shutdown complete")
|
|
}
|
|
|
|
// ── Registration ──────────────────────────────────────────────────────────────
|
|
|
|
func (a *Agent) registerOnline(ctx context.Context) error {
|
|
req := client.OnlineRequest{
|
|
NodeAgentVersion: a.cfg.AgentVersion,
|
|
CurrentAEVersion: a.cfg.AEVersion,
|
|
AvailableAEVersions: []string{a.cfg.AEVersion},
|
|
CachedTemplateMD5s: []string{},
|
|
}
|
|
if err := a.orch.Online(ctx, a.cfg.NodeID, req); err != nil {
|
|
return fmt.Errorf("online: %w", err)
|
|
}
|
|
log.Printf("registered as online with orchestrator")
|
|
return nil
|
|
}
|
|
|
|
// ── Font sync loop ────────────────────────────────────────────────────────────
|
|
// Periodically installs any fonts the orchestrator wants on this node and reports
|
|
// per-font status, so the admin can verify installation.
|
|
|
|
func (a *Agent) fontSyncLoop(ctx context.Context) {
|
|
ticker := time.NewTicker(60 * time.Second)
|
|
defer ticker.Stop()
|
|
a.syncFonts(ctx) // run once on startup
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
a.syncFonts(ctx)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (a *Agent) syncFonts(ctx context.Context) {
|
|
fonts, err := a.orch.PendingFonts(ctx, a.cfg.NodeID)
|
|
if err != nil {
|
|
return // transient; try again next tick
|
|
}
|
|
for _, f := range fonts {
|
|
name := f.SystemName
|
|
if name == "" {
|
|
name = f.Name
|
|
}
|
|
if err := runner.InstallFont(ctx, f.FileURL, name); err != nil {
|
|
log.Printf("font install failed (%s): %v", f.Name, err)
|
|
_ = a.orch.ReportFont(ctx, a.cfg.NodeID, f.ID, "Failed", err.Error())
|
|
continue
|
|
}
|
|
log.Printf("font installed: %s", f.Name)
|
|
_ = a.orch.ReportFont(ctx, a.cfg.NodeID, f.ID, "Installed", "")
|
|
}
|
|
}
|
|
|
|
// ── Scan loop ──────────────────────────────────────────────────────────────────
|
|
// Claims AE scan jobs and runs the template scanner (scan.jsx) via afterfx.exe,
|
|
// posting the resulting ScanResult JSON back to the orchestrator. Requires the AE
|
|
// app — skipped entirely in mock/dev (no AE_PATH).
|
|
|
|
func (a *Agent) scanLoop(ctx context.Context) {
|
|
interval := time.Duration(a.cfg.PollIntervalSec) * time.Second
|
|
if interval < 5*time.Second {
|
|
interval = 5 * time.Second
|
|
}
|
|
ticker := time.NewTicker(interval)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
a.pollScanOnce(ctx)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (a *Agent) pollScanOnce(ctx context.Context) {
|
|
if a.cfg.AfterFxPath == "" || a.cfg.AEPath == "" {
|
|
return // scanning needs the real AE app
|
|
}
|
|
if a.isBusy() {
|
|
return // don't contend with a render (or another scan) for the AE app
|
|
}
|
|
|
|
claim, err := a.orch.ClaimScan(ctx, a.cfg.NodeID, a.cfg.Region)
|
|
if err != nil {
|
|
log.Printf("scan claim error: %v", err)
|
|
return
|
|
}
|
|
if claim == nil {
|
|
return // nothing queued
|
|
}
|
|
|
|
a.setScanning(true)
|
|
defer a.setScanning(false)
|
|
log.Printf("[scan %s] claimed (project %s)", claim.ScanJobID, claim.ProjectID)
|
|
|
|
// Reuse the template prepare/cache pipeline (download + extract bundle by md5).
|
|
prepCtx, cancel := context.WithTimeout(ctx, 15*time.Minute)
|
|
aepPath, perr := runner.PrepareTemplate(prepCtx, claim.AEPDownloadURL, claim.IsBundle, claim.BundleMD5, a.cfg.WorkDir, "scan-"+claim.ScanJobID)
|
|
cancel()
|
|
if perr != nil {
|
|
a.failScan(claim.ScanJobID, "prepare template: "+perr.Error())
|
|
return
|
|
}
|
|
|
|
outPath := filepath.Join(a.cfg.WorkDir, "scans", claim.ScanJobID, "scan.json")
|
|
scanCtx, cancel2 := context.WithTimeout(ctx, 10*time.Minute)
|
|
defer cancel2()
|
|
|
|
// Watchdog: if the user cancels the scan server-side, cancel the context →
|
|
// exec.CommandContext kills the AfterFX process.
|
|
var cancelled atomic.Bool
|
|
go func() {
|
|
t := time.NewTicker(3 * time.Second)
|
|
defer t.Stop()
|
|
for {
|
|
select {
|
|
case <-scanCtx.Done():
|
|
return
|
|
case <-t.C:
|
|
st, e := a.orch.ScanStatus(scanCtx, claim.ScanJobID)
|
|
if e == nil && (st == "cancelled" || st == "error") {
|
|
log.Printf("[scan %s] cancelled server-side — killing AE", claim.ScanJobID)
|
|
cancelled.Store(true)
|
|
cancel2()
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
result, serr := runner.RunScan(scanCtx, a.cfg.AfterFxPath, aepPath, a.cfg.WorkDir, outPath, claim.Mode)
|
|
if cancelled.Load() {
|
|
log.Printf("[scan %s] aborted (cancelled)", claim.ScanJobID)
|
|
return // already cancelled in DB; don't report fail
|
|
}
|
|
if serr != nil {
|
|
a.failScan(claim.ScanJobID, serr.Error())
|
|
return
|
|
}
|
|
|
|
rptCtx, cancel3 := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer cancel3()
|
|
if err := a.orch.ReportScanResult(rptCtx, claim.ScanJobID, result); err != nil {
|
|
log.Printf("[scan %s] report result error: %v", claim.ScanJobID, err)
|
|
return
|
|
}
|
|
log.Printf("[scan %s] done (%d bytes)", claim.ScanJobID, len(result))
|
|
}
|
|
|
|
func (a *Agent) failScan(id, reason string) {
|
|
log.Printf("[scan %s] failed: %s", id, reason)
|
|
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
|
|
defer cancel()
|
|
_ = a.orch.ReportScanFail(ctx, id, reason)
|
|
}
|
|
|
|
// ── Snapshot loop ─────────────────────────────────────────────────────────────
|
|
// Claims per-scene snapshot jobs, renders a single frame with AE, extracts a PNG
|
|
// still and uploads it. Requires the AE app — skipped without AE_PATH.
|
|
|
|
func (a *Agent) snapshotLoop(ctx context.Context) {
|
|
interval := time.Duration(a.cfg.PollIntervalSec) * time.Second
|
|
if interval < 5*time.Second {
|
|
interval = 5 * time.Second
|
|
}
|
|
ticker := time.NewTicker(interval)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
a.pollSnapshotOnce(ctx)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (a *Agent) pollSnapshotOnce(ctx context.Context) {
|
|
if a.cfg.AEPath == "" {
|
|
return // snapshots need the real AE app
|
|
}
|
|
if a.isBusy() {
|
|
return // don't contend with a render/scan for AE
|
|
}
|
|
|
|
claim, err := a.orch.ClaimSnapshot(ctx, a.cfg.NodeID, a.cfg.Region)
|
|
if err != nil {
|
|
log.Printf("snapshot claim error: %v", err)
|
|
return
|
|
}
|
|
if claim == nil {
|
|
return // nothing queued
|
|
}
|
|
|
|
a.setScanning(true) // reuse the scan-busy flag (same AE-exclusive lock)
|
|
defer a.setScanning(false)
|
|
log.Printf("[snapshot %s] claimed (scene %s)", claim.SnapshotJobID, claim.SceneKey)
|
|
|
|
prepCtx, cancel := context.WithTimeout(ctx, 15*time.Minute)
|
|
aepPath, perr := runner.PrepareTemplate(prepCtx, claim.AEPDownloadURL, claim.IsBundle, claim.BundleMD5, a.cfg.WorkDir, "snap-"+claim.SnapshotJobID)
|
|
cancel()
|
|
if perr != nil {
|
|
a.failSnapshot(claim.SnapshotJobID, "prepare template: "+perr.Error())
|
|
return
|
|
}
|
|
|
|
workDir := filepath.Join(a.cfg.WorkDir, "snapshots", claim.SnapshotJobID)
|
|
renderCtx, cancel2 := context.WithTimeout(ctx, 10*time.Minute)
|
|
defer cancel2()
|
|
pngPath, rerr := runner.RunSnapshot(renderCtx, a.cfg.AEPath, aepPath, claim.CompName, claim.Frame, workDir)
|
|
if rerr != nil {
|
|
a.failSnapshot(claim.SnapshotJobID, rerr.Error())
|
|
return
|
|
}
|
|
|
|
// Upload the still to the public bucket, then report its permanent URL.
|
|
upCtx, cancel3 := context.WithTimeout(context.Background(), 2*time.Minute)
|
|
defer cancel3()
|
|
up, uerr := a.orch.GetSnapshotUploadURL(upCtx, claim.SnapshotJobID)
|
|
if uerr != nil {
|
|
a.failSnapshot(claim.SnapshotJobID, "upload-url: "+uerr.Error())
|
|
return
|
|
}
|
|
if _, uperr := runner.UploadFile(upCtx, up.UploadURL, pngPath); uperr != nil {
|
|
a.failSnapshot(claim.SnapshotJobID, "upload: "+uperr.Error())
|
|
return
|
|
}
|
|
if rperr := a.orch.ReportSnapshotResult(upCtx, claim.SnapshotJobID, up.PublicURL); rperr != nil {
|
|
log.Printf("[snapshot %s] report result error: %v", claim.SnapshotJobID, rperr)
|
|
return
|
|
}
|
|
log.Printf("[snapshot %s] done → %s", claim.SnapshotJobID, up.PublicURL)
|
|
}
|
|
|
|
func (a *Agent) failSnapshot(id, reason string) {
|
|
log.Printf("[snapshot %s] failed: %s", id, reason)
|
|
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
|
|
defer cancel()
|
|
_ = a.orch.ReportSnapshotFail(ctx, id, reason)
|
|
}
|
|
|
|
// ── Heartbeat loop ────────────────────────────────────────────────────────────
|
|
|
|
func (a *Agent) heartbeatLoop(ctx context.Context) {
|
|
ticker := time.NewTicker(time.Duration(a.cfg.HeartbeatIntervalSec) * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
a.sendHeartbeat(ctx)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (a *Agent) sendHeartbeat(ctx context.Context) {
|
|
status, jobID := a.getStatus()
|
|
|
|
// Live host metrics (Windows kernel32; stub elsewhere). CPU samples ~300ms.
|
|
cpu := metrics.CPUPercent(300 * time.Millisecond)
|
|
_, ramAvail := metrics.Memory()
|
|
diskPct, diskTotal := metrics.Disk(a.cfg.WorkDir)
|
|
aeRunning := a.cfg.AEPath != "" && a.isBusy()
|
|
|
|
req := client.HeartbeatRequest{
|
|
NodeID: a.cfg.NodeID,
|
|
Status: status,
|
|
CurrentJobID: jobID,
|
|
CPUPct: &cpu,
|
|
RAMAvailableMB: &ramAvail,
|
|
DiskUsedPct: &diskPct,
|
|
DiskTotalGB: &diskTotal,
|
|
AERunning: &aeRunning,
|
|
}
|
|
hbCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
|
defer cancel()
|
|
|
|
resp, err := a.orch.Heartbeat(hbCtx, a.cfg.NodeID, req)
|
|
if err != nil {
|
|
log.Printf("heartbeat error: %v", err)
|
|
return
|
|
}
|
|
if len(resp.PendingCommands) > 0 {
|
|
log.Printf("orchestrator commands: %v", resp.PendingCommands)
|
|
}
|
|
}
|
|
|
|
// ── Job poll loop ─────────────────────────────────────────────────────────────
|
|
|
|
func (a *Agent) pollLoop(ctx context.Context) {
|
|
ticker := time.NewTicker(time.Duration(a.cfg.PollIntervalSec) * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
status, _ := a.getStatus()
|
|
if status == "Busy" {
|
|
continue // already rendering
|
|
}
|
|
a.tryClaimAndRun(ctx)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (a *Agent) tryClaimAndRun(ctx context.Context) {
|
|
claimCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
|
defer cancel()
|
|
|
|
job, err := a.orch.ClaimJob(claimCtx, a.cfg.NodeID, a.cfg.Region)
|
|
if err != nil {
|
|
log.Printf("claim error: %v", err)
|
|
return
|
|
}
|
|
if job == nil {
|
|
return // queue empty
|
|
}
|
|
|
|
log.Printf("claimed job %s (project %s, %s %s %dfps)",
|
|
job.JobID, job.SavedProjectID, job.Quality, job.Resolution, job.FrameRate)
|
|
|
|
a.setJob(job)
|
|
go func() {
|
|
defer a.setJob(nil)
|
|
a.runJob(ctx, job)
|
|
}()
|
|
}
|
|
|
|
// ── Render execution ──────────────────────────────────────────────────────────
|
|
|
|
func (a *Agent) runJob(ctx context.Context, job *client.ClaimedJob) {
|
|
log.Printf("[job %s] starting render", job.JobID)
|
|
|
|
// ── Step 1: Fetch + prepare the .aep template ────────────────────────────
|
|
// PrepareTemplate downloads the project file; if it's a .zip bundle it is
|
|
// extracted and the .aep inside located. Prepared templates are cached by md5
|
|
// so repeated renders of the same template skip the download + extraction.
|
|
aepPath := ""
|
|
if job.AEPDownloadURL != "" && a.cfg.AEPath != "" {
|
|
dlCtx, dlCancel := context.WithTimeout(ctx, 15*time.Minute)
|
|
p, prepErr := runner.PrepareTemplate(dlCtx, job.AEPDownloadURL, job.IsBundle, job.BundleMD5, a.cfg.WorkDir, job.JobID)
|
|
dlCancel()
|
|
if prepErr != nil {
|
|
log.Printf("[job %s] template prepare failed (%v) — falling back to mock", job.JobID, prepErr)
|
|
} else {
|
|
kind := "aep"
|
|
if job.IsBundle {
|
|
kind = "bundle"
|
|
}
|
|
log.Printf("[job %s] template ready (%s) → %s", job.JobID, kind, p)
|
|
aepPath = p
|
|
}
|
|
}
|
|
|
|
binds := make([]runner.Binding, 0, len(job.Bindings))
|
|
for _, b := range job.Bindings {
|
|
binds = append(binds, runner.Binding{Key: b.Key, Type: b.Type, Value: b.Value})
|
|
}
|
|
|
|
rJob := &runner.Job{
|
|
JobID: job.JobID,
|
|
SavedProjectID: job.SavedProjectID,
|
|
Quality: job.Quality,
|
|
Resolution: job.Resolution,
|
|
FrameRate: job.FrameRate,
|
|
HasMusic: job.HasMusic,
|
|
HasVoiceover: job.HasVoiceover,
|
|
AEPFilePath: aepPath,
|
|
CompName: job.CompName,
|
|
AfterFxPath: a.cfg.AfterFxPath,
|
|
Bindings: binds,
|
|
}
|
|
|
|
onProgress := func(ctx context.Context, pct int, msg string) error {
|
|
log.Printf("[job %s] %d%% %s", job.JobID, pct, msg)
|
|
// Report to the orchestrator so the UI shows a moving bar + ETA (best-effort,
|
|
// short timeout — AE pegs CPU so the DB may be slow on a single-box dev setup).
|
|
pCtx, cancel := context.WithTimeout(ctx, 8*time.Second)
|
|
defer cancel()
|
|
if err := a.orch.UpdateProgress(pCtx, job.JobID, pct); err != nil {
|
|
log.Printf("[job %s] progress push error: %v", job.JobID, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
onPreview := func(ctx context.Context, imageB64 string) error {
|
|
pvCtx, cancel := context.WithTimeout(ctx, 8*time.Second)
|
|
defer cancel()
|
|
if err := a.orch.UpdatePreview(pvCtx, job.JobID, imageB64); err != nil {
|
|
log.Printf("[job %s] preview push error: %v", job.JobID, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ── Step 2: Render ───────────────────────────────────────────────────────
|
|
outputPath, err := runner.Run(ctx, a.cfg.AEPath, a.cfg.WorkDir, rJob, onProgress, onPreview)
|
|
if err != nil {
|
|
if ctx.Err() != nil {
|
|
log.Printf("[job %s] render cancelled", job.JobID)
|
|
return
|
|
}
|
|
log.Printf("[job %s] render failed: %v", job.JobID, err)
|
|
failCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
if ferr := a.orch.Fail(failCtx, job.JobID, err.Error(), "Rendering"); ferr != nil {
|
|
log.Printf("[job %s] fail report error: %v", job.JobID, ferr)
|
|
}
|
|
return
|
|
}
|
|
log.Printf("[job %s] render done → %s", job.JobID, outputPath)
|
|
|
|
// ── Step 3: Get presigned upload URL + upload output to MinIO ─────────────
|
|
var exportID *string
|
|
uploadCtx, uploadCancel := context.WithTimeout(context.Background(), 5*time.Minute)
|
|
defer uploadCancel()
|
|
|
|
// Retry the upload-URL call: right after a CPU-heavy render the orchestrator/DB
|
|
// can be briefly slow, and dropping a finished render's output is the worst outcome.
|
|
var uploadInfo *client.OutputUploadURLResponse
|
|
var urlErr error
|
|
for attempt := 1; attempt <= 4; attempt++ {
|
|
uploadInfo, urlErr = a.orch.GetOutputUploadURL(uploadCtx, job.JobID)
|
|
if urlErr == nil {
|
|
break
|
|
}
|
|
log.Printf("[job %s] get upload URL attempt %d failed: %v", job.JobID, attempt, urlErr)
|
|
select {
|
|
case <-uploadCtx.Done():
|
|
case <-time.After(time.Duration(attempt*3) * time.Second):
|
|
}
|
|
}
|
|
if urlErr != nil {
|
|
log.Printf("[job %s] get upload URL failed after retries: %v — completing without export", job.JobID, urlErr)
|
|
} else {
|
|
log.Printf("[job %s] uploading output to %s", job.JobID, uploadInfo.ObjectKey)
|
|
if _, upErr := runner.UploadFile(uploadCtx, uploadInfo.UploadURL, outputPath); upErr != nil {
|
|
log.Printf("[job %s] upload failed: %v — completing without export", job.JobID, upErr)
|
|
} else {
|
|
log.Printf("[job %s] upload complete (export %s)", job.JobID, uploadInfo.ExportID)
|
|
exportID = &uploadInfo.ExportID
|
|
}
|
|
}
|
|
|
|
// ── Step 4: Report complete ───────────────────────────────────────────────
|
|
completeCtx, completeCancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer completeCancel()
|
|
if err := a.orch.Complete(completeCtx, job.JobID, exportID); err != nil {
|
|
log.Printf("[job %s] complete report error: %v", job.JobID, err)
|
|
} else {
|
|
log.Printf("[job %s] reported as completed (export=%v)", job.JobID, exportID)
|
|
}
|
|
}
|
|
|
|
// ── Health endpoint ───────────────────────────────────────────────────────────
|
|
|
|
func (a *Agent) serveHealth(ctx context.Context) {
|
|
mux := http.NewServeMux()
|
|
mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
|
|
status, jobID := a.getStatus()
|
|
w.Header().Set("Content-Type", "application/json")
|
|
resp := map[string]any{
|
|
"ok": true,
|
|
"node_id": a.cfg.NodeID,
|
|
"status": status,
|
|
"current_job": jobID,
|
|
"version": a.cfg.AgentVersion,
|
|
}
|
|
_ = json.NewEncoder(w).Encode(resp)
|
|
})
|
|
|
|
srv := &http.Server{
|
|
Addr: fmt.Sprintf(":%d", a.cfg.ListenPort),
|
|
Handler: mux,
|
|
}
|
|
|
|
go func() {
|
|
<-ctx.Done()
|
|
_ = srv.Shutdown(context.Background())
|
|
}()
|
|
|
|
log.Printf("health endpoint listening on :%d", a.cfg.ListenPort)
|
|
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
|
log.Printf("health server error: %v", err)
|
|
}
|
|
}
|