feat(render-svc+node-agent): add job-claim endpoint and build node-agent skeleton
render-svc: - db: ClaimJob() — atomic SELECT FOR UPDATE SKIP LOCKED; transitions job to Preparing, marks node Busy in a single transaction - models: ClaimJobRequest + ClaimedJob types - handlers/internal: POST /v1/internal/render/jobs/claim — 200 with job or 204 when queue empty - main: register the claim route under /v1/internal (nodeAuth) services/node-agent/ (new Go module github.com/flatrender/node-agent): - internal/config: env-var based config (NODE_ID required, sensible defaults) - internal/client: typed orchestrator HTTP client (Online, Heartbeat, ClaimJob, Complete, Fail, ReportCrash) — X-Node-Signature auth - internal/runner: AE render via aerender.exe or mock (for dev without AE) - cmd/agent/main: register online → heartbeat loop (5s) + poll loop (3s) → claim job → run render → report complete/fail; health endpoint on :7777 - Dockerfile: cross-compiles to Windows amd64 static binary Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,313 @@
|
||||
// FlatRender V2 Node Agent
|
||||
//
|
||||
// Runs on every Windows render node. Registers itself with the V2 render
|
||||
// orchestrator, sends heartbeats, claims render jobs, executes them via
|
||||
// After Effects (or mock), and reports progress / completion back.
|
||||
//
|
||||
// Required environment variables:
|
||||
// NODE_ID — UUID of this node (pre-created in render.render_nodes)
|
||||
//
|
||||
// Optional environment variables (all have sensible defaults):
|
||||
// ORCHESTRATOR_URL — gateway base URL (default: http://localhost:8088)
|
||||
// NODE_HMAC_SECRET — shared secret (default: node-secret-change-me)
|
||||
// NODE_REGION — region label, e.g. "iran-tehran-1"
|
||||
// AE_PATH — path to aerender.exe; empty = mock render
|
||||
// WORK_DIR — scratch directory (default: system temp)
|
||||
// AGENT_VERSION — semver string (default: 0.1.0)
|
||||
// AE_VERSION — AE version string reported to orchestrator (default: 2024)
|
||||
// HEARTBEAT_INTERVAL_SEC — seconds between heartbeats (default: 5)
|
||||
// POLL_INTERVAL_SEC — seconds between job-claim attempts when idle (default: 3)
|
||||
// LISTEN_PORT — port for the health endpoint (default: 7777)
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
"runtime"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/flatrender/node-agent/internal/client"
|
||||
"github.com/flatrender/node-agent/internal/config"
|
||||
"github.com/flatrender/node-agent/internal/runner"
|
||||
)
|
||||
|
||||
// ── Agent state ───────────────────────────────────────────────────────────────
|
||||
|
||||
type Agent struct {
|
||||
cfg *config.Config
|
||||
orch *client.Client
|
||||
mu sync.Mutex
|
||||
currentJob *client.ClaimedJob
|
||||
status string // "Ready" | "Busy"
|
||||
}
|
||||
|
||||
func newAgent(cfg *config.Config) *Agent {
|
||||
return &Agent{
|
||||
cfg: cfg,
|
||||
orch: client.New(cfg.OrchestratorURL, cfg.NodeHMACSecret),
|
||||
status: "Ready",
|
||||
}
|
||||
}
|
||||
|
||||
func (a *Agent) setJob(job *client.ClaimedJob) {
|
||||
a.mu.Lock()
|
||||
defer a.mu.Unlock()
|
||||
a.currentJob = job
|
||||
if job != nil {
|
||||
a.status = "Busy"
|
||||
} else {
|
||||
a.status = "Ready"
|
||||
}
|
||||
}
|
||||
|
||||
func (a *Agent) getStatus() (string, *string) {
|
||||
a.mu.Lock()
|
||||
defer a.mu.Unlock()
|
||||
if a.currentJob != nil {
|
||||
jobID := a.currentJob.JobID
|
||||
return a.status, &jobID
|
||||
}
|
||||
return a.status, nil
|
||||
}
|
||||
|
||||
// ── Main ──────────────────────────────────────────────────────────────────────
|
||||
|
||||
func main() {
|
||||
log.SetFlags(log.Ldate | log.Ltime | log.Lmicroseconds)
|
||||
log.Printf("FlatRender Node Agent v%s starting (OS: %s, Arch: %s)",
|
||||
"0.1.0", runtime.GOOS, runtime.GOARCH)
|
||||
|
||||
cfg, err := config.Load()
|
||||
if err != nil {
|
||||
log.Fatalf("config: %v", err)
|
||||
}
|
||||
log.Printf("Node ID: %s | Region: %q | Orchestrator: %s",
|
||||
cfg.NodeID, cfg.Region, cfg.OrchestratorURL)
|
||||
if cfg.AEPath == "" {
|
||||
log.Printf("AE_PATH not set — using mock renderer (development mode)")
|
||||
} else {
|
||||
log.Printf("AE binary: %s", cfg.AEPath)
|
||||
}
|
||||
|
||||
agent := newAgent(cfg)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// Graceful shutdown on SIGTERM / SIGINT
|
||||
sigs := make(chan os.Signal, 1)
|
||||
signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT)
|
||||
go func() {
|
||||
s := <-sigs
|
||||
log.Printf("received %s, shutting down…", s)
|
||||
cancel()
|
||||
}()
|
||||
|
||||
// Register as online
|
||||
if err := agent.registerOnline(ctx); err != nil {
|
||||
log.Fatalf("failed to register with orchestrator: %v", err)
|
||||
}
|
||||
|
||||
// Start health endpoint
|
||||
go agent.serveHealth(ctx)
|
||||
|
||||
// Main loops
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(2)
|
||||
go func() { defer wg.Done(); agent.heartbeatLoop(ctx) }()
|
||||
go func() { defer wg.Done(); agent.pollLoop(ctx) }()
|
||||
wg.Wait()
|
||||
log.Printf("shutdown complete")
|
||||
}
|
||||
|
||||
// ── Registration ──────────────────────────────────────────────────────────────
|
||||
|
||||
func (a *Agent) registerOnline(ctx context.Context) error {
|
||||
req := client.OnlineRequest{
|
||||
NodeAgentVersion: a.cfg.AgentVersion,
|
||||
CurrentAEVersion: a.cfg.AEVersion,
|
||||
AvailableAEVersions: []string{a.cfg.AEVersion},
|
||||
CachedTemplateMD5s: []string{},
|
||||
}
|
||||
if err := a.orch.Online(ctx, a.cfg.NodeID, req); err != nil {
|
||||
return fmt.Errorf("online: %w", err)
|
||||
}
|
||||
log.Printf("registered as online with orchestrator")
|
||||
return nil
|
||||
}
|
||||
|
||||
// ── Heartbeat loop ────────────────────────────────────────────────────────────
|
||||
|
||||
func (a *Agent) heartbeatLoop(ctx context.Context) {
|
||||
ticker := time.NewTicker(time.Duration(a.cfg.HeartbeatIntervalSec) * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
a.sendHeartbeat(ctx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (a *Agent) sendHeartbeat(ctx context.Context) {
|
||||
status, jobID := a.getStatus()
|
||||
req := client.HeartbeatRequest{
|
||||
NodeID: a.cfg.NodeID,
|
||||
Status: status,
|
||||
CurrentJobID: jobID,
|
||||
}
|
||||
hbCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
resp, err := a.orch.Heartbeat(hbCtx, a.cfg.NodeID, req)
|
||||
if err != nil {
|
||||
log.Printf("heartbeat error: %v", err)
|
||||
return
|
||||
}
|
||||
if len(resp.PendingCommands) > 0 {
|
||||
log.Printf("orchestrator commands: %v", resp.PendingCommands)
|
||||
}
|
||||
}
|
||||
|
||||
// ── Job poll loop ─────────────────────────────────────────────────────────────
|
||||
|
||||
func (a *Agent) pollLoop(ctx context.Context) {
|
||||
ticker := time.NewTicker(time.Duration(a.cfg.PollIntervalSec) * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
status, _ := a.getStatus()
|
||||
if status == "Busy" {
|
||||
continue // already rendering
|
||||
}
|
||||
a.tryClaimAndRun(ctx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (a *Agent) tryClaimAndRun(ctx context.Context) {
|
||||
claimCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
job, err := a.orch.ClaimJob(claimCtx, a.cfg.NodeID, a.cfg.Region)
|
||||
if err != nil {
|
||||
log.Printf("claim error: %v", err)
|
||||
return
|
||||
}
|
||||
if job == nil {
|
||||
return // queue empty
|
||||
}
|
||||
|
||||
log.Printf("claimed job %s (project %s, %s %s %dfps)",
|
||||
job.JobID, job.SavedProjectID, job.Quality, job.Resolution, job.FrameRate)
|
||||
|
||||
a.setJob(job)
|
||||
go func() {
|
||||
defer a.setJob(nil)
|
||||
a.runJob(ctx, job)
|
||||
}()
|
||||
}
|
||||
|
||||
// ── Render execution ──────────────────────────────────────────────────────────
|
||||
|
||||
func (a *Agent) runJob(ctx context.Context, job *client.ClaimedJob) {
|
||||
log.Printf("[job %s] starting render", job.JobID)
|
||||
|
||||
// In a full implementation, the agent would:
|
||||
// 1. Fetch the saved project from the studio service
|
||||
// 2. Download the .aep template from MinIO
|
||||
// 3. Inject user customisations into the composition via JSXB/AE scripting
|
||||
// Then call runner.Run().
|
||||
// For the skeleton we pass an empty AEPFilePath, which triggers mock mode.
|
||||
|
||||
rJob := &runner.Job{
|
||||
JobID: job.JobID,
|
||||
SavedProjectID: job.SavedProjectID,
|
||||
Quality: job.Quality,
|
||||
Resolution: job.Resolution,
|
||||
FrameRate: job.FrameRate,
|
||||
HasMusic: job.HasMusic,
|
||||
HasVoiceover: job.HasVoiceover,
|
||||
AEPFilePath: "", // TODO: download from MinIO
|
||||
}
|
||||
|
||||
onProgress := func(ctx context.Context, pct int, msg string) error {
|
||||
// Lightweight progress update — errors here are non-fatal
|
||||
log.Printf("[job %s] %d%% %s", job.JobID, pct, msg)
|
||||
return nil
|
||||
}
|
||||
|
||||
outputPath, err := runner.Run(ctx, a.cfg.AEPath, a.cfg.WorkDir, rJob, onProgress)
|
||||
if err != nil {
|
||||
if ctx.Err() != nil {
|
||||
log.Printf("[job %s] render cancelled", job.JobID)
|
||||
return
|
||||
}
|
||||
log.Printf("[job %s] render failed: %v", job.JobID, err)
|
||||
failCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
if ferr := a.orch.Fail(failCtx, job.JobID, err.Error(), "Rendering"); ferr != nil {
|
||||
log.Printf("[job %s] fail report error: %v", job.JobID, ferr)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
log.Printf("[job %s] render done → %s", job.JobID, outputPath)
|
||||
|
||||
// In full production: upload outputPath to MinIO, create an Export record,
|
||||
// pass the export UUID to Complete(). Skeleton passes nil (no export yet).
|
||||
completeCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
if err := a.orch.Complete(completeCtx, job.JobID, nil); err != nil {
|
||||
log.Printf("[job %s] complete report error: %v", job.JobID, err)
|
||||
} else {
|
||||
log.Printf("[job %s] reported as completed", job.JobID)
|
||||
}
|
||||
}
|
||||
|
||||
// ── Health endpoint ───────────────────────────────────────────────────────────
|
||||
|
||||
func (a *Agent) serveHealth(ctx context.Context) {
|
||||
mux := http.NewServeMux()
|
||||
mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
|
||||
status, jobID := a.getStatus()
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
resp := map[string]any{
|
||||
"ok": true,
|
||||
"node_id": a.cfg.NodeID,
|
||||
"status": status,
|
||||
"current_job": jobID,
|
||||
"version": a.cfg.AgentVersion,
|
||||
}
|
||||
_ = json.NewEncoder(w).Encode(resp)
|
||||
})
|
||||
|
||||
srv := &http.Server{
|
||||
Addr: fmt.Sprintf(":%d", a.cfg.ListenPort),
|
||||
Handler: mux,
|
||||
}
|
||||
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
_ = srv.Shutdown(context.Background())
|
||||
}()
|
||||
|
||||
log.Printf("health endpoint listening on :%d", a.cfg.ListenPort)
|
||||
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
||||
log.Printf("health server error: %v", err)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user