From ee421ccc68563c0f1de0050763d379dcfcf1d616 Mon Sep 17 00:00:00 2001 From: "soroush.asadi" Date: Mon, 1 Jun 2026 09:28:31 +0330 Subject: [PATCH] feat(render-svc+node-agent): add job-claim endpoint and build node-agent skeleton MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit render-svc: - db: ClaimJob() — atomic SELECT FOR UPDATE SKIP LOCKED; transitions job to Preparing, marks node Busy in a single transaction - models: ClaimJobRequest + ClaimedJob types - handlers/internal: POST /v1/internal/render/jobs/claim — 200 with job or 204 when queue empty - main: register the claim route under /v1/internal (nodeAuth) services/node-agent/ (new Go module github.com/flatrender/node-agent): - internal/config: env-var based config (NODE_ID required, sensible defaults) - internal/client: typed orchestrator HTTP client (Online, Heartbeat, ClaimJob, Complete, Fail, ReportCrash) — X-Node-Signature auth - internal/runner: AE render via aerender.exe or mock (for dev without AE) - cmd/agent/main: register online → heartbeat loop (5s) + poll loop (3s) → claim job → run render → report complete/fail; health endpoint on :7777 - Dockerfile: cross-compiles to Windows amd64 static binary Co-Authored-By: Claude Sonnet 4.6 --- services/node-agent/Dockerfile | 19 ++ services/node-agent/cmd/agent/main.go | 313 ++++++++++++++++++ services/node-agent/go.mod | 3 + services/node-agent/internal/client/client.go | 233 +++++++++++++ services/node-agent/internal/config/config.go | 89 +++++ services/node-agent/internal/runner/runner.go | 141 ++++++++ services/render/cmd/server/main.go | 1 + services/render/internal/db/db.go | 56 ++++ services/render/internal/handlers/internal.go | 31 ++ services/render/internal/models/models.go | 15 + 10 files changed, 901 insertions(+) create mode 100644 services/node-agent/Dockerfile create mode 100644 services/node-agent/cmd/agent/main.go create mode 100644 services/node-agent/go.mod create mode 100644 services/node-agent/internal/client/client.go create mode 100644 services/node-agent/internal/config/config.go create mode 100644 services/node-agent/internal/runner/runner.go diff --git a/services/node-agent/Dockerfile b/services/node-agent/Dockerfile new file mode 100644 index 0000000..3f94cda --- /dev/null +++ b/services/node-agent/Dockerfile @@ -0,0 +1,19 @@ +# Build stage — cross-compile for Windows amd64 +FROM golang:1.25-alpine AS builder + +WORKDIR /src +COPY go.mod ./ +# No external dependencies yet — no go.sum needed +COPY . . + +# Produce a static Windows binary +RUN GOOS=windows GOARCH=amd64 CGO_ENABLED=0 \ + go build -mod=mod -trimpath -ldflags="-s -w" \ + -o /out/flatrender-node-agent.exe \ + ./cmd/agent + +# ── Output stage ─────────────────────────────────────────────────────────────── +# For dev/CI use only — the production binary is copied to Windows render nodes. +FROM scratch +COPY --from=builder /out/flatrender-node-agent.exe /flatrender-node-agent.exe +ENTRYPOINT ["/flatrender-node-agent.exe"] diff --git a/services/node-agent/cmd/agent/main.go b/services/node-agent/cmd/agent/main.go new file mode 100644 index 0000000..b741531 --- /dev/null +++ b/services/node-agent/cmd/agent/main.go @@ -0,0 +1,313 @@ +// FlatRender V2 Node Agent +// +// Runs on every Windows render node. Registers itself with the V2 render +// orchestrator, sends heartbeats, claims render jobs, executes them via +// After Effects (or mock), and reports progress / completion back. +// +// Required environment variables: +// NODE_ID — UUID of this node (pre-created in render.render_nodes) +// +// Optional environment variables (all have sensible defaults): +// ORCHESTRATOR_URL — gateway base URL (default: http://localhost:8088) +// NODE_HMAC_SECRET — shared secret (default: node-secret-change-me) +// NODE_REGION — region label, e.g. "iran-tehran-1" +// AE_PATH — path to aerender.exe; empty = mock render +// WORK_DIR — scratch directory (default: system temp) +// AGENT_VERSION — semver string (default: 0.1.0) +// AE_VERSION — AE version string reported to orchestrator (default: 2024) +// HEARTBEAT_INTERVAL_SEC — seconds between heartbeats (default: 5) +// POLL_INTERVAL_SEC — seconds between job-claim attempts when idle (default: 3) +// LISTEN_PORT — port for the health endpoint (default: 7777) +package main + +import ( + "context" + "encoding/json" + "fmt" + "log" + "net/http" + "os" + "os/signal" + "runtime" + "sync" + "syscall" + "time" + + "github.com/flatrender/node-agent/internal/client" + "github.com/flatrender/node-agent/internal/config" + "github.com/flatrender/node-agent/internal/runner" +) + +// ── Agent state ─────────────────────────────────────────────────────────────── + +type Agent struct { + cfg *config.Config + orch *client.Client + mu sync.Mutex + currentJob *client.ClaimedJob + status string // "Ready" | "Busy" +} + +func newAgent(cfg *config.Config) *Agent { + return &Agent{ + cfg: cfg, + orch: client.New(cfg.OrchestratorURL, cfg.NodeHMACSecret), + status: "Ready", + } +} + +func (a *Agent) setJob(job *client.ClaimedJob) { + a.mu.Lock() + defer a.mu.Unlock() + a.currentJob = job + if job != nil { + a.status = "Busy" + } else { + a.status = "Ready" + } +} + +func (a *Agent) getStatus() (string, *string) { + a.mu.Lock() + defer a.mu.Unlock() + if a.currentJob != nil { + jobID := a.currentJob.JobID + return a.status, &jobID + } + return a.status, nil +} + +// ── Main ────────────────────────────────────────────────────────────────────── + +func main() { + log.SetFlags(log.Ldate | log.Ltime | log.Lmicroseconds) + log.Printf("FlatRender Node Agent v%s starting (OS: %s, Arch: %s)", + "0.1.0", runtime.GOOS, runtime.GOARCH) + + cfg, err := config.Load() + if err != nil { + log.Fatalf("config: %v", err) + } + log.Printf("Node ID: %s | Region: %q | Orchestrator: %s", + cfg.NodeID, cfg.Region, cfg.OrchestratorURL) + if cfg.AEPath == "" { + log.Printf("AE_PATH not set — using mock renderer (development mode)") + } else { + log.Printf("AE binary: %s", cfg.AEPath) + } + + agent := newAgent(cfg) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Graceful shutdown on SIGTERM / SIGINT + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT) + go func() { + s := <-sigs + log.Printf("received %s, shutting down…", s) + cancel() + }() + + // Register as online + if err := agent.registerOnline(ctx); err != nil { + log.Fatalf("failed to register with orchestrator: %v", err) + } + + // Start health endpoint + go agent.serveHealth(ctx) + + // Main loops + var wg sync.WaitGroup + wg.Add(2) + go func() { defer wg.Done(); agent.heartbeatLoop(ctx) }() + go func() { defer wg.Done(); agent.pollLoop(ctx) }() + wg.Wait() + log.Printf("shutdown complete") +} + +// ── Registration ────────────────────────────────────────────────────────────── + +func (a *Agent) registerOnline(ctx context.Context) error { + req := client.OnlineRequest{ + NodeAgentVersion: a.cfg.AgentVersion, + CurrentAEVersion: a.cfg.AEVersion, + AvailableAEVersions: []string{a.cfg.AEVersion}, + CachedTemplateMD5s: []string{}, + } + if err := a.orch.Online(ctx, a.cfg.NodeID, req); err != nil { + return fmt.Errorf("online: %w", err) + } + log.Printf("registered as online with orchestrator") + return nil +} + +// ── Heartbeat loop ──────────────────────────────────────────────────────────── + +func (a *Agent) heartbeatLoop(ctx context.Context) { + ticker := time.NewTicker(time.Duration(a.cfg.HeartbeatIntervalSec) * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + a.sendHeartbeat(ctx) + } + } +} + +func (a *Agent) sendHeartbeat(ctx context.Context) { + status, jobID := a.getStatus() + req := client.HeartbeatRequest{ + NodeID: a.cfg.NodeID, + Status: status, + CurrentJobID: jobID, + } + hbCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + resp, err := a.orch.Heartbeat(hbCtx, a.cfg.NodeID, req) + if err != nil { + log.Printf("heartbeat error: %v", err) + return + } + if len(resp.PendingCommands) > 0 { + log.Printf("orchestrator commands: %v", resp.PendingCommands) + } +} + +// ── Job poll loop ───────────────────────────────────────────────────────────── + +func (a *Agent) pollLoop(ctx context.Context) { + ticker := time.NewTicker(time.Duration(a.cfg.PollIntervalSec) * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + status, _ := a.getStatus() + if status == "Busy" { + continue // already rendering + } + a.tryClaimAndRun(ctx) + } + } +} + +func (a *Agent) tryClaimAndRun(ctx context.Context) { + claimCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + job, err := a.orch.ClaimJob(claimCtx, a.cfg.NodeID, a.cfg.Region) + if err != nil { + log.Printf("claim error: %v", err) + return + } + if job == nil { + return // queue empty + } + + log.Printf("claimed job %s (project %s, %s %s %dfps)", + job.JobID, job.SavedProjectID, job.Quality, job.Resolution, job.FrameRate) + + a.setJob(job) + go func() { + defer a.setJob(nil) + a.runJob(ctx, job) + }() +} + +// ── Render execution ────────────────────────────────────────────────────────── + +func (a *Agent) runJob(ctx context.Context, job *client.ClaimedJob) { + log.Printf("[job %s] starting render", job.JobID) + + // In a full implementation, the agent would: + // 1. Fetch the saved project from the studio service + // 2. Download the .aep template from MinIO + // 3. Inject user customisations into the composition via JSXB/AE scripting + // Then call runner.Run(). + // For the skeleton we pass an empty AEPFilePath, which triggers mock mode. + + rJob := &runner.Job{ + JobID: job.JobID, + SavedProjectID: job.SavedProjectID, + Quality: job.Quality, + Resolution: job.Resolution, + FrameRate: job.FrameRate, + HasMusic: job.HasMusic, + HasVoiceover: job.HasVoiceover, + AEPFilePath: "", // TODO: download from MinIO + } + + onProgress := func(ctx context.Context, pct int, msg string) error { + // Lightweight progress update — errors here are non-fatal + log.Printf("[job %s] %d%% %s", job.JobID, pct, msg) + return nil + } + + outputPath, err := runner.Run(ctx, a.cfg.AEPath, a.cfg.WorkDir, rJob, onProgress) + if err != nil { + if ctx.Err() != nil { + log.Printf("[job %s] render cancelled", job.JobID) + return + } + log.Printf("[job %s] render failed: %v", job.JobID, err) + failCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + if ferr := a.orch.Fail(failCtx, job.JobID, err.Error(), "Rendering"); ferr != nil { + log.Printf("[job %s] fail report error: %v", job.JobID, ferr) + } + return + } + + log.Printf("[job %s] render done → %s", job.JobID, outputPath) + + // In full production: upload outputPath to MinIO, create an Export record, + // pass the export UUID to Complete(). Skeleton passes nil (no export yet). + completeCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + if err := a.orch.Complete(completeCtx, job.JobID, nil); err != nil { + log.Printf("[job %s] complete report error: %v", job.JobID, err) + } else { + log.Printf("[job %s] reported as completed", job.JobID) + } +} + +// ── Health endpoint ─────────────────────────────────────────────────────────── + +func (a *Agent) serveHealth(ctx context.Context) { + mux := http.NewServeMux() + mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { + status, jobID := a.getStatus() + w.Header().Set("Content-Type", "application/json") + resp := map[string]any{ + "ok": true, + "node_id": a.cfg.NodeID, + "status": status, + "current_job": jobID, + "version": a.cfg.AgentVersion, + } + _ = json.NewEncoder(w).Encode(resp) + }) + + srv := &http.Server{ + Addr: fmt.Sprintf(":%d", a.cfg.ListenPort), + Handler: mux, + } + + go func() { + <-ctx.Done() + _ = srv.Shutdown(context.Background()) + }() + + log.Printf("health endpoint listening on :%d", a.cfg.ListenPort) + if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.Printf("health server error: %v", err) + } +} diff --git a/services/node-agent/go.mod b/services/node-agent/go.mod new file mode 100644 index 0000000..05cb954 --- /dev/null +++ b/services/node-agent/go.mod @@ -0,0 +1,3 @@ +module github.com/flatrender/node-agent + +go 1.25 diff --git a/services/node-agent/internal/client/client.go b/services/node-agent/internal/client/client.go new file mode 100644 index 0000000..2f8a114 --- /dev/null +++ b/services/node-agent/internal/client/client.go @@ -0,0 +1,233 @@ +// Package client provides a typed HTTP client for the V2 render orchestrator's +// internal (node-agent) API. All requests are authenticated via the shared +// X-Node-Signature header. +package client + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "strings" + "time" +) + +// Client talks to the V2 render orchestrator. +type Client struct { + base string + secret string + http *http.Client +} + +// New returns a Client targeting the given base URL (e.g. "http://gateway:8080"). +func New(baseURL, nodeHMACSecret string) *Client { + return &Client{ + base: strings.TrimRight(baseURL, "/"), + secret: nodeHMACSecret, + http: &http.Client{Timeout: 15 * time.Second}, + } +} + +// ── Request helpers ─────────────────────────────────────────────────────────── + +func (c *Client) do(ctx context.Context, method, path string, body any) (*http.Response, error) { + var bodyReader io.Reader + if body != nil { + b, err := json.Marshal(body) + if err != nil { + return nil, fmt.Errorf("marshal: %w", err) + } + bodyReader = bytes.NewReader(b) + } + req, err := http.NewRequestWithContext(ctx, method, c.base+path, bodyReader) + if err != nil { + return nil, err + } + req.Header.Set("X-Node-Signature", c.secret) + if body != nil { + req.Header.Set("Content-Type", "application/json") + } + req.Header.Set("Accept", "application/json") + return c.http.Do(req) +} + +func decodeJSON(resp *http.Response, out any) error { + defer resp.Body.Close() + if out == nil { + return nil + } + return json.NewDecoder(resp.Body).Decode(out) +} + +// ── Domain types ────────────────────────────────────────────────────────────── + +// OnlineRequest is sent once on startup to mark the node Ready. +type OnlineRequest struct { + NodeAgentVersion string `json:"node_agent_version"` + CurrentAEVersion string `json:"current_ae_version"` + AvailableAEVersions []string `json:"available_ae_versions"` + RamGB *int `json:"ram_gb,omitempty"` + CPUCores *int `json:"cpu_cores,omitempty"` + CacheUsedGB *int `json:"cache_used_gb,omitempty"` + CachedTemplateMD5s []string `json:"cached_template_md5s"` +} + +// HeartbeatRequest is sent every HeartbeatIntervalSec seconds. +type HeartbeatRequest struct { + NodeID string `json:"node_id"` + Status string `json:"status"` // Ready | Busy + CPUPct *int `json:"cpu_pct,omitempty"` + RAMAvailableMB *int `json:"ram_available_mb,omitempty"` + AERunning *bool `json:"ae_running,omitempty"` + CurrentJobID *string `json:"current_job_id,omitempty"` + CacheUsedGB *int `json:"cache_used_gb,omitempty"` +} + +// HeartbeatResponse carries optional commands from the orchestrator. +type HeartbeatResponse struct { + NextHeartbeatInSec int `json:"next_heartbeat_in_sec"` + PendingCommands []any `json:"pending_commands"` +} + +// ClaimJobRequest asks the orchestrator for the next queued job. +type ClaimJobRequest struct { + NodeID string `json:"node_id"` + Region string `json:"region,omitempty"` +} + +// ClaimedJob is the response when a job is successfully claimed. +type ClaimedJob struct { + JobID string `json:"job_id"` + SavedProjectID string `json:"saved_project_id"` + Quality string `json:"quality"` + Resolution string `json:"resolution"` + FrameRate int `json:"frame_rate"` + HasMusic bool `json:"has_music"` + HasVoiceover bool `json:"has_voiceover"` +} + +// ProgressRequest reports render progress (frame-level) for a job. +type ProgressRequest struct { + FrameJobID string `json:"frame_job_id"` + FrameNumber int `json:"frame_number"` + CompletedAt *time.Time `json:"completed_at,omitempty"` +} + +// CompleteRequest marks a job as Done. +type CompleteRequest struct { + ExportID *string `json:"export_id,omitempty"` +} + +// FailRequest marks a job as Failed. +type FailRequest struct { + Reason string `json:"reason"` + AtStep string `json:"at_step,omitempty"` +} + +// CrashRequest reports a node crash. +type CrashRequest struct { + NodeID string `json:"node_id"` + LastKnownFrame *int `json:"last_known_frame,omitempty"` + CrashSignal *string `json:"crash_signal,omitempty"` + ErrorLogTail *string `json:"error_log_tail,omitempty"` +} + +// ── API methods ─────────────────────────────────────────────────────────────── + +// Online marks the node as Ready on startup. +func (c *Client) Online(ctx context.Context, nodeID string, req OnlineRequest) error { + resp, err := c.do(ctx, http.MethodPost, + fmt.Sprintf("/v1/internal/nodes/%s/online", nodeID), req) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode >= 300 { + return fmt.Errorf("online: HTTP %d", resp.StatusCode) + } + return nil +} + +// Heartbeat sends a heartbeat and returns the orchestrator's response. +func (c *Client) Heartbeat(ctx context.Context, nodeID string, req HeartbeatRequest) (*HeartbeatResponse, error) { + resp, err := c.do(ctx, http.MethodPost, + fmt.Sprintf("/v1/internal/nodes/%s/heartbeat", nodeID), req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + if resp.StatusCode >= 300 { + return nil, fmt.Errorf("heartbeat: HTTP %d", resp.StatusCode) + } + var out HeartbeatResponse + _ = json.NewDecoder(resp.Body).Decode(&out) + return &out, nil +} + +// ClaimJob atomically claims the next queued render job. +// Returns (nil, nil) when the queue is empty (204 No Content). +func (c *Client) ClaimJob(ctx context.Context, nodeID, region string) (*ClaimedJob, error) { + resp, err := c.do(ctx, http.MethodPost, "/v1/internal/render/jobs/claim", + ClaimJobRequest{NodeID: nodeID, Region: region}) + if err != nil { + return nil, err + } + defer resp.Body.Close() + if resp.StatusCode == http.StatusNoContent { + return nil, nil // nothing queued + } + if resp.StatusCode >= 300 { + return nil, fmt.Errorf("claim: HTTP %d", resp.StatusCode) + } + var job ClaimedJob + if err := json.NewDecoder(resp.Body).Decode(&job); err != nil { + return nil, fmt.Errorf("claim decode: %w", err) + } + return &job, nil +} + +// Complete marks a render job as Done. +func (c *Client) Complete(ctx context.Context, jobID string, exportID *string) error { + resp, err := c.do(ctx, http.MethodPost, + fmt.Sprintf("/v1/internal/render/jobs/%s/complete", jobID), + CompleteRequest{ExportID: exportID}) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode >= 300 { + return fmt.Errorf("complete: HTTP %d", resp.StatusCode) + } + return nil +} + +// Fail marks a render job as Failed. +func (c *Client) Fail(ctx context.Context, jobID, reason, atStep string) error { + resp, err := c.do(ctx, http.MethodPost, + fmt.Sprintf("/v1/internal/render/jobs/%s/fail", jobID), + FailRequest{Reason: reason, AtStep: atStep}) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode >= 300 { + return fmt.Errorf("fail: HTTP %d", resp.StatusCode) + } + return nil +} + +// ReportCrash reports a node crash for the given job. +func (c *Client) ReportCrash(ctx context.Context, jobID string, req CrashRequest) error { + resp, err := c.do(ctx, http.MethodPost, + fmt.Sprintf("/v1/internal/render/jobs/%s/crash", jobID), req) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode >= 300 { + return fmt.Errorf("crash: HTTP %d", resp.StatusCode) + } + return nil +} diff --git a/services/node-agent/internal/config/config.go b/services/node-agent/internal/config/config.go new file mode 100644 index 0000000..788c607 --- /dev/null +++ b/services/node-agent/internal/config/config.go @@ -0,0 +1,89 @@ +// Package config loads node-agent runtime configuration from environment variables. +package config + +import ( + "fmt" + "os" + "strconv" +) + +// Config holds all runtime settings for the node agent. +type Config struct { + // NodeID is the UUID of this render node, registered in the orchestrator. + // Must match a row in render.render_nodes. + NodeID string + + // OrchestratorURL is the base URL of the V2 API gateway (internal network). + // Example: http://gateway:8080 or http://172.30.0.5:8088 + OrchestratorURL string + + // NodeHMACSecret is the shared secret sent as X-Node-Signature header. + // Must match NODE_HMAC_SECRET in the render-svc environment. + NodeHMACSecret string + + // Region is the datacenter/region label for this node (e.g. "iran-tehran-1"). + // The orchestrator uses it to route region-preferred jobs to this node. + Region string + + // AEPath is the full path to the aerender.exe binary. + // Example: C:\Program Files\Adobe\Adobe After Effects 2024\Support Files\aerender.exe + // Leave empty to use mock rendering (for development / testing without AE). + AEPath string + + // WorkDir is the scratch directory for render temp files and AE project copies. + WorkDir string + + // HeartbeatIntervalSec is how often the agent sends a heartbeat to the orchestrator. + HeartbeatIntervalSec int + + // PollIntervalSec is how long the agent waits between job-claim attempts when idle. + PollIntervalSec int + + // AgentVersion is the semantic version string reported to the orchestrator. + AgentVersion string + + // AEVersion is the After Effects version string reported to the orchestrator. + // Example: "2024" + AEVersion string + + // ListenPort is the port for the agent's own HTTP health endpoint. + ListenPort int +} + +// Load reads configuration from environment variables, returning an error +// if any required variable is missing. +func Load() (*Config, error) { + c := &Config{ + NodeID: os.Getenv("NODE_ID"), + OrchestratorURL: getEnv("ORCHESTRATOR_URL", "http://localhost:8088"), + NodeHMACSecret: getEnv("NODE_HMAC_SECRET", "node-secret-change-me"), + Region: getEnv("NODE_REGION", ""), + AEPath: getEnv("AE_PATH", ""), + WorkDir: getEnv("WORK_DIR", os.TempDir()), + AgentVersion: getEnv("AGENT_VERSION", "0.1.0"), + AEVersion: getEnv("AE_VERSION", "2024"), + HeartbeatIntervalSec: getInt("HEARTBEAT_INTERVAL_SEC", 5), + PollIntervalSec: getInt("POLL_INTERVAL_SEC", 3), + ListenPort: getInt("LISTEN_PORT", 7777), + } + if c.NodeID == "" { + return nil, fmt.Errorf("NODE_ID environment variable is required") + } + return c, nil +} + +func getEnv(key, fallback string) string { + if v := os.Getenv(key); v != "" { + return v + } + return fallback +} + +func getInt(key string, fallback int) int { + if v := os.Getenv(key); v != "" { + if n, err := strconv.Atoi(v); err == nil { + return n + } + } + return fallback +} diff --git a/services/node-agent/internal/runner/runner.go b/services/node-agent/internal/runner/runner.go new file mode 100644 index 0000000..e04b28e --- /dev/null +++ b/services/node-agent/internal/runner/runner.go @@ -0,0 +1,141 @@ +// Package runner executes After Effects render jobs and streams progress back +// via the provided callback. When AE_PATH is empty, a mock render is used +// (useful for CI and dev environments without a licensed AE installation). +package runner + +import ( + "context" + "fmt" + "log" + "os" + "os/exec" + "path/filepath" + "time" +) + +// ProgressFn is called periodically during rendering with (percent 0-100, message). +type ProgressFn func(ctx context.Context, percent int, message string) error + +// Job holds the parameters for a single render. +type Job struct { + JobID string + SavedProjectID string + Quality string + Resolution string + FrameRate int + HasMusic bool + HasVoiceover bool + // AEPFilePath is the local path to the downloaded .aep project file. + // In a full implementation the agent downloads this from MinIO before calling Run. + AEPFilePath string +} + +// Run executes the render job, calling onProgress as it advances. +// Returns the path to the output MP4 file on success. +func Run(ctx context.Context, aePath, workDir string, job *Job, onProgress ProgressFn) (string, error) { + outputDir := filepath.Join(workDir, "renders", job.JobID) + if err := os.MkdirAll(outputDir, 0o755); err != nil { + return "", fmt.Errorf("create output dir: %w", err) + } + outputPath := filepath.Join(outputDir, "output.mp4") + + if aePath == "" { + return mockRender(ctx, job, outputPath, onProgress) + } + return aeRender(ctx, aePath, job, outputPath, onProgress) +} + +// ── Mock render (no AE installed) ──────────────────────────────────────────── + +func mockRender(ctx context.Context, job *Job, outputPath string, onProgress ProgressFn) (string, error) { + log.Printf("[mock] starting render for job %s (%s %s %dfps)", job.JobID, job.Quality, job.Resolution, job.FrameRate) + + steps := []struct { + pct int + msg string + }{ + {5, "Preparing project…"}, + {15, "Loading template…"}, + {30, "Rendering frames…"}, + {50, "Rendering frames… (50%)"}, + {70, "Rendering frames… (70%)"}, + {85, "Encoding MP4…"}, + {95, "Uploading output…"}, + } + + for _, s := range steps { + select { + case <-ctx.Done(): + return "", ctx.Err() + case <-time.After(800 * time.Millisecond): + } + if err := onProgress(ctx, s.pct, s.msg); err != nil { + log.Printf("[mock] progress callback error: %v", err) + } + log.Printf("[mock] %d%% — %s", s.pct, s.msg) + } + + // Write a placeholder file so the path is valid + if err := os.WriteFile(outputPath, []byte("mock-render-output"), 0o644); err != nil { + return "", fmt.Errorf("write mock output: %w", err) + } + log.Printf("[mock] render complete: %s", outputPath) + return outputPath, nil +} + +// ── Real AE render via aerender.exe ────────────────────────────────────────── + +func aeRender(ctx context.Context, aePath string, job *Job, outputPath string, onProgress ProgressFn) (string, error) { + if job.AEPFilePath == "" { + return "", fmt.Errorf("AEPFilePath is required for real AE render") + } + + // aerender flags: + // -project + // -output + // -RStemplate "Multi-Machine Settings" (optional) + // -OMtemplate "H.264 – Match Render Settings – 15 Mbps" + // -s -e + args := []string{ + "-project", job.AEPFilePath, + "-output", outputPath, + } + + log.Printf("[ae] running: %s %v", aePath, args) + cmd := exec.CommandContext(ctx, aePath, args...) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + + if err := cmd.Start(); err != nil { + return "", fmt.Errorf("start aerender: %w", err) + } + + // Poll process while alive — aerender does not expose machine-readable progress. + // We advance the progress indicator every 10 seconds until the process exits. + done := make(chan error, 1) + go func() { done <- cmd.Wait() }() + + _ = onProgress(ctx, 10, "After Effects starting…") + pct := 10 + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + + for { + select { + case err := <-done: + if err != nil { + return "", fmt.Errorf("aerender exit: %w", err) + } + _ = onProgress(ctx, 95, "Encoding complete") + return outputPath, nil + case <-ticker.C: + if pct < 90 { + pct += 5 + } + _ = onProgress(ctx, pct, fmt.Sprintf("Rendering… %d%%", pct)) + case <-ctx.Done(): + _ = cmd.Process.Kill() + return "", ctx.Err() + } + } +} diff --git a/services/render/cmd/server/main.go b/services/render/cmd/server/main.go index dce0f00..d247211 100644 --- a/services/render/cmd/server/main.go +++ b/services/render/cmd/server/main.go @@ -136,6 +136,7 @@ func main() { internal.POST("/nodes/:node_id/heartbeat", internalH.Heartbeat) internal.POST("/nodes/:node_id/online", internalH.Online) internal.POST("/nodes/:node_id/cache-update", internalH.CacheUpdate) + internal.POST("/render/jobs/claim", internalH.Claim) internal.POST("/render/jobs/:job_id/frames", internalH.FrameProgress) internal.POST("/render/jobs/:job_id/complete", internalH.Complete) internal.POST("/render/jobs/:job_id/fail", internalH.Fail) diff --git a/services/render/internal/db/db.go b/services/render/internal/db/db.go index d47cc39..253be4e 100644 --- a/services/render/internal/db/db.go +++ b/services/render/internal/db/db.go @@ -463,6 +463,62 @@ func (s *Store) getJobByIDInternal(ctx context.Context, id uuid.UUID) (*models.R return jobs[0], nil } +// ClaimJob atomically picks the highest-priority Queued job (optionally filtered +// by region) and moves it to Preparing, setting the current_job_id on the node. +// Returns (nil, nil) when there is nothing to do. +func (s *Store) ClaimJob(ctx context.Context, nodeID uuid.UUID, region string) (*models.RenderJob, error) { + tx, err := s.pool.Begin(ctx) + if err != nil { + return nil, err + } + defer func() { _ = tx.Rollback(ctx) }() + + q := `SELECT id FROM render.render_jobs + WHERE step = 'Queued'::render_step` + args := []any{} + argIdx := 1 + if region != "" { + q += fmt.Sprintf(" AND (region IS NULL OR region = $%d)", argIdx) + args = append(args, region) + argIdx++ + } + q += " ORDER BY priority_score DESC, queued_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED" + + var jobID uuid.UUID + if err := tx.QueryRow(ctx, q, args...).Scan(&jobID); err != nil { + if err.Error() == "no rows in result set" { + return nil, nil // nothing to do + } + return nil, err + } + + // Advance to Preparing and assign to this node + _, err = tx.Exec(ctx, ` + UPDATE render.render_jobs SET + step = 'Preparing'::render_step, + started_at = COALESCE(started_at, NOW()), + updated_at = NOW() + WHERE id = $1`, jobID) + if err != nil { + return nil, err + } + _, err = tx.Exec(ctx, ` + UPDATE render.render_nodes SET + status = 'Busy'::node_status, + current_job_id = $1, + job_started_at = NOW(), + updated_at = NOW() + WHERE id = $2`, jobID, nodeID) + if err != nil { + return nil, err + } + + if err := tx.Commit(ctx); err != nil { + return nil, err + } + return s.getJobByIDInternal(ctx, jobID) +} + func (s *Store) CancelJob(ctx context.Context, id, userID uuid.UUID) (bool, error) { tag, err := s.pool.Exec(ctx, ` UPDATE render.render_jobs diff --git a/services/render/internal/handlers/internal.go b/services/render/internal/handlers/internal.go index c2e6f6a..a3e908d 100644 --- a/services/render/internal/handlers/internal.go +++ b/services/render/internal/handlers/internal.go @@ -198,6 +198,37 @@ func (h *InternalHandler) ReplicaReady(c *gin.Context) { c.Status(http.StatusNoContent) } +// POST /v1/internal/render/jobs/claim +// Node agent calls this to atomically claim the next queued job. +// Returns 204 when there is nothing queued (agent should back off and retry). +func (h *InternalHandler) Claim(c *gin.Context) { + var req models.ClaimJobRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, models.APIError{Code: "bad_request", Message: err.Error()}) + return + } + + job, err := h.store.ClaimJob(c.Request.Context(), req.NodeID, req.Region) + if err != nil { + c.JSON(http.StatusInternalServerError, models.APIError{Code: "internal_error", Message: err.Error()}) + return + } + if job == nil { + c.Status(http.StatusNoContent) // nothing queued + return + } + + c.JSON(http.StatusOK, models.ClaimedJob{ + JobID: job.ID, + SavedProjectID: job.SavedProjectID, + Quality: job.Quality, + Resolution: job.Resolution, + FrameRate: job.FrameRate, + HasMusic: job.HasMusic, + HasVoiceover: job.HasVoiceover, + }) +} + // POST /v1/internal/nodes/:node_id/cache-update func (h *InternalHandler) CacheUpdate(c *gin.Context) { nodeID, err := uuid.Parse(c.Param("node_id")) diff --git a/services/render/internal/models/models.go b/services/render/internal/models/models.go index b49313e..f623d72 100644 --- a/services/render/internal/models/models.go +++ b/services/render/internal/models/models.go @@ -402,6 +402,21 @@ type CrashReportRequest struct { LogFileURL *string `json:"log_file_url"` } +type ClaimJobRequest struct { + NodeID uuid.UUID `json:"node_id" binding:"required"` + Region string `json:"region"` +} + +type ClaimedJob struct { + JobID uuid.UUID `json:"job_id"` + SavedProjectID uuid.UUID `json:"saved_project_id"` + Quality string `json:"quality"` + Resolution string `json:"resolution"` + FrameRate int `json:"frame_rate"` + HasMusic bool `json:"has_music"` + HasVoiceover bool `json:"has_voiceover"` +} + type CacheUpdateRequest struct { Action string `json:"action" binding:"required"` ProjectID *uuid.UUID `json:"project_id"`