diff --git a/docker-compose.v2.yml b/docker-compose.v2.yml index caab8bd..0b5602a 100644 --- a/docker-compose.v2.yml +++ b/docker-compose.v2.yml @@ -193,6 +193,11 @@ services: MINIO_SECRET_KEY: "${MINIO_SECRET_KEY:-minioadmin}" MINIO_USE_SSL: "false" MINIO_BUCKET: "${MINIO_BUCKET:-flatrender-exports}" + # Scene snapshots upload to this public-read bucket; PUBLIC_URL is the + # browser-reachable base for the stored snapshot_url (defaults to the host + # MinIO endpoint above when unset). + MINIO_UPLOAD_BUCKET: "${MINIO_UPLOAD_BUCKET:-user-uploads}" + MINIO_PUBLIC_URL: "${NEXT_PUBLIC_MINIO_URL:-http://172.28.144.1:9000}" NOTIFICATION_URL: "http://notification-svc:8080" IDENTITY_URL: "http://identity-svc:8080" SERVICE_TOKEN: "${SERVICE_TOKEN:-internal-service-secret}" diff --git a/services/node-agent/cmd/agent/main.go b/services/node-agent/cmd/agent/main.go index 384b1b0..4dd7fce 100644 --- a/services/node-agent/cmd/agent/main.go +++ b/services/node-agent/cmd/agent/main.go @@ -140,11 +140,12 @@ func main() { // Main loops var wg sync.WaitGroup - wg.Add(4) + wg.Add(5) go func() { defer wg.Done(); agent.heartbeatLoop(ctx) }() go func() { defer wg.Done(); agent.pollLoop(ctx) }() go func() { defer wg.Done(); agent.fontSyncLoop(ctx) }() go func() { defer wg.Done(); agent.scanLoop(ctx) }() + go func() { defer wg.Done(); agent.snapshotLoop(ctx) }() wg.Wait() log.Printf("shutdown complete") } @@ -307,6 +308,91 @@ func (a *Agent) failScan(id, reason string) { _ = a.orch.ReportScanFail(ctx, id, reason) } +// ── Snapshot loop ───────────────────────────────────────────────────────────── +// Claims per-scene snapshot jobs, renders a single frame with AE, extracts a PNG +// still and uploads it. Requires the AE app — skipped without AE_PATH. + +func (a *Agent) snapshotLoop(ctx context.Context) { + interval := time.Duration(a.cfg.PollIntervalSec) * time.Second + if interval < 5*time.Second { + interval = 5 * time.Second + } + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + a.pollSnapshotOnce(ctx) + } + } +} + +func (a *Agent) pollSnapshotOnce(ctx context.Context) { + if a.cfg.AEPath == "" { + return // snapshots need the real AE app + } + if a.isBusy() { + return // don't contend with a render/scan for AE + } + + claim, err := a.orch.ClaimSnapshot(ctx, a.cfg.NodeID, a.cfg.Region) + if err != nil { + log.Printf("snapshot claim error: %v", err) + return + } + if claim == nil { + return // nothing queued + } + + a.setScanning(true) // reuse the scan-busy flag (same AE-exclusive lock) + defer a.setScanning(false) + log.Printf("[snapshot %s] claimed (scene %s)", claim.SnapshotJobID, claim.SceneKey) + + prepCtx, cancel := context.WithTimeout(ctx, 15*time.Minute) + aepPath, perr := runner.PrepareTemplate(prepCtx, claim.AEPDownloadURL, claim.IsBundle, claim.BundleMD5, a.cfg.WorkDir, "snap-"+claim.SnapshotJobID) + cancel() + if perr != nil { + a.failSnapshot(claim.SnapshotJobID, "prepare template: "+perr.Error()) + return + } + + workDir := filepath.Join(a.cfg.WorkDir, "snapshots", claim.SnapshotJobID) + renderCtx, cancel2 := context.WithTimeout(ctx, 10*time.Minute) + defer cancel2() + pngPath, rerr := runner.RunSnapshot(renderCtx, a.cfg.AEPath, aepPath, claim.CompName, claim.Frame, workDir) + if rerr != nil { + a.failSnapshot(claim.SnapshotJobID, rerr.Error()) + return + } + + // Upload the still to the public bucket, then report its permanent URL. + upCtx, cancel3 := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel3() + up, uerr := a.orch.GetSnapshotUploadURL(upCtx, claim.SnapshotJobID) + if uerr != nil { + a.failSnapshot(claim.SnapshotJobID, "upload-url: "+uerr.Error()) + return + } + if _, uperr := runner.UploadFile(upCtx, up.UploadURL, pngPath); uperr != nil { + a.failSnapshot(claim.SnapshotJobID, "upload: "+uperr.Error()) + return + } + if rperr := a.orch.ReportSnapshotResult(upCtx, claim.SnapshotJobID, up.PublicURL); rperr != nil { + log.Printf("[snapshot %s] report result error: %v", claim.SnapshotJobID, rperr) + return + } + log.Printf("[snapshot %s] done → %s", claim.SnapshotJobID, up.PublicURL) +} + +func (a *Agent) failSnapshot(id, reason string) { + log.Printf("[snapshot %s] failed: %s", id, reason) + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + _ = a.orch.ReportSnapshotFail(ctx, id, reason) +} + // ── Heartbeat loop ──────────────────────────────────────────────────────────── func (a *Agent) heartbeatLoop(ctx context.Context) { diff --git a/services/node-agent/internal/client/client.go b/services/node-agent/internal/client/client.go index f7b000f..0a31b3d 100644 --- a/services/node-agent/internal/client/client.go +++ b/services/node-agent/internal/client/client.go @@ -341,6 +341,95 @@ func (c *Client) ScanStatus(ctx context.Context, scanJobID string) (string, erro return out.Status, nil } +// ── Scene snapshots ───────────────────────────────────────────────────────── + +// SnapshotClaim is returned when a per-scene snapshot job is claimed. +type SnapshotClaim struct { + SnapshotJobID string `json:"snapshot_job_id"` + ProjectID string `json:"project_id"` + SceneID string `json:"scene_id"` + SceneKey string `json:"scene_key"` + CompName string `json:"comp_name"` + Frame int `json:"frame"` + AEPDownloadURL string `json:"aep_download_url"` + IsBundle bool `json:"is_bundle"` + BundleMD5 string `json:"bundle_md5"` +} + +// SnapshotUploadURLResponse carries the presigned PUT + the permanent public URL. +type SnapshotUploadURLResponse struct { + UploadURL string `json:"upload_url"` + ObjectKey string `json:"object_key"` + PublicURL string `json:"public_url"` +} + +// ClaimSnapshot atomically claims the next queued snapshot job (204 → nil,nil). +func (c *Client) ClaimSnapshot(ctx context.Context, nodeID, region string) (*SnapshotClaim, error) { + resp, err := c.do(ctx, http.MethodPost, "/v1/internal/snapshot/claim", + ClaimJobRequest{NodeID: nodeID, Region: region}) + if err != nil { + return nil, err + } + defer resp.Body.Close() + if resp.StatusCode == http.StatusNoContent { + return nil, nil + } + if resp.StatusCode >= 300 { + return nil, fmt.Errorf("snapshot claim: HTTP %d", resp.StatusCode) + } + var sc SnapshotClaim + if err := json.NewDecoder(resp.Body).Decode(&sc); err != nil { + return nil, fmt.Errorf("snapshot claim decode: %w", err) + } + return &sc, nil +} + +// GetSnapshotUploadURL asks the orchestrator for a presigned PUT + public URL. +func (c *Client) GetSnapshotUploadURL(ctx context.Context, jobID string) (*SnapshotUploadURLResponse, error) { + resp, err := c.do(ctx, http.MethodPost, + fmt.Sprintf("/v1/internal/snapshot/%s/upload-url", jobID), nil) + if err != nil { + return nil, err + } + defer resp.Body.Close() + if resp.StatusCode >= 300 { + return nil, fmt.Errorf("snapshot upload-url: HTTP %d", resp.StatusCode) + } + var out SnapshotUploadURLResponse + if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { + return nil, fmt.Errorf("decode: %w", err) + } + return &out, nil +} + +// ReportSnapshotResult posts the uploaded still's public URL. +func (c *Client) ReportSnapshotResult(ctx context.Context, jobID, imageURL string) error { + resp, err := c.do(ctx, http.MethodPost, + fmt.Sprintf("/v1/internal/snapshot/%s/result", jobID), map[string]string{"image_url": imageURL}) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode >= 300 { + return fmt.Errorf("snapshot result: HTTP %d", resp.StatusCode) + } + return nil +} + +// ReportSnapshotFail marks a snapshot job as failed. +func (c *Client) ReportSnapshotFail(ctx context.Context, jobID, reason string) error { + resp, err := c.do(ctx, http.MethodPost, + fmt.Sprintf("/v1/internal/snapshot/%s/fail", jobID), FailRequest{Reason: reason}) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode >= 300 { + return fmt.Errorf("snapshot fail: HTTP %d", resp.StatusCode) + } + return nil +} + // UpdatePreview sends a base64-encoded preview frame to the orchestrator. // Errors are non-fatal — the UI simply won't update the preview image. func (c *Client) UpdatePreview(ctx context.Context, jobID, imageB64 string) error { diff --git a/services/node-agent/internal/runner/binder.go b/services/node-agent/internal/runner/binder.go index 966b78c..6937959 100644 --- a/services/node-agent/internal/runner/binder.go +++ b/services/node-agent/internal/runner/binder.go @@ -77,9 +77,14 @@ type bindLayerSpec struct { Type string `json:"type"` // "text" | "media" | "audio" Value string `json:"value"` } +type bindColorSpec struct { + Key string `json:"key"` + Value string `json:"value"` +} type bindSpec struct { Comp string `json:"comp,omitempty"` Fps int `json:"fps,omitempty"` + Colors []bindColorSpec `json:"colors,omitempty"` Layers []bindLayerSpec `json:"layers"` } @@ -109,6 +114,15 @@ func RunBinder(ctx context.Context, job *Job, workDir string) (string, error) { if key == "" { continue } + // Colours bind to frshare/frd_* text layers (hex source text) — bind.jsx + // propagates them via template expressions, not as visible layer content. + if strings.EqualFold(strings.TrimSpace(b.Type), "color") { + if strings.TrimSpace(b.Value) == "" { + continue + } + spec.Colors = append(spec.Colors, bindColorSpec{Key: key, Value: b.Value}) + continue + } if isMediaBinding(b.Type) { if strings.TrimSpace(b.Value) == "" { continue diff --git a/services/node-agent/internal/runner/snapshot.go b/services/node-agent/internal/runner/snapshot.go new file mode 100644 index 0000000..4985d63 --- /dev/null +++ b/services/node-agent/internal/runner/snapshot.go @@ -0,0 +1,69 @@ +package runner + +import ( + "context" + "fmt" + "log" + "os" + "os/exec" + "path/filepath" + "strconv" +) + +// RunSnapshot renders a single frame of compName from aepPath and writes a PNG +// still, returning its path. It reuses the render pipeline shape: aerender emits +// the comp's output module (lossless AVI/MOV) for one frame, then ffmpeg extracts +// a single PNG. Requires aerender (aePath) and ffmpeg on the node. +func RunSnapshot(ctx context.Context, aePath, aepPath, compName string, frame int, workDir string) (string, error) { + if aePath == "" { + return "", fmt.Errorf("AE path required for snapshot render") + } + if compName == "" { + return "", fmt.Errorf("comp name required for snapshot render") + } + if err := os.MkdirAll(workDir, 0o755); err != nil { + return "", fmt.Errorf("workdir: %w", err) + } + out := filepath.Join(workDir, "snap.avi") + _ = os.Remove(out) + + // -s/-e bound the render to a single frame; aerender writes via the comp's + // output module (cmd.Dir = project folder so relative footage resolves). + args := []string{ + "-project", aepPath, "-comp", compName, + "-s", strconv.Itoa(frame), "-e", strconv.Itoa(frame), + "-output", out, + } + log.Printf("[snapshot] aerender %v", args) + cmd := exec.CommandContext(ctx, aePath, args...) + cmd.Dir = filepath.Dir(aepPath) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + if err := cmd.Run(); err != nil { + return "", fmt.Errorf("aerender: %w", err) + } + actual := findRenderedOutput(out) + if actual == "" { + return "", fmt.Errorf("aerender produced no output for comp %q", compName) + } + + ff := ffmpegPath() + if ff == "" { + return "", fmt.Errorf("ffmpeg not found (set FFMPEG_PATH or place ffmpeg.exe next to the agent)") + } + png := filepath.Join(workDir, "snap.png") + _ = os.Remove(png) + ffArgs := []string{"-y", "-i", actual, "-frames:v", "1", png} + log.Printf("[snapshot] ffmpeg %v", ffArgs) + fc := exec.CommandContext(ctx, ff, ffArgs...) + fc.Stdout = os.Stdout + fc.Stderr = os.Stderr + if err := fc.Run(); err != nil { + return "", fmt.Errorf("ffmpeg still: %w", err) + } + _ = os.Remove(actual) // drop the intermediate render + if st, err := os.Stat(png); err != nil || st.Size() == 0 { + return "", fmt.Errorf("no snapshot image produced") + } + return png, nil +} diff --git a/services/render/cmd/server/main.go b/services/render/cmd/server/main.go index 7bb5cdd..5a99278 100644 --- a/services/render/cmd/server/main.go +++ b/services/render/cmd/server/main.go @@ -36,6 +36,13 @@ func main() { minioUseSSL := getEnv("MINIO_USE_SSL", "false") == "true" minioBucket := getEnv("MINIO_BUCKET", "flatrender-exports") minioTemplatesBucket := getEnv("MINIO_TEMPLATES_BUCKET", "flatrender-templates") + minioUploadBucket := getEnv("MINIO_UPLOAD_BUCKET", "user-uploads") // public-read — scene snapshots + minioPublicBase := getEnv("MINIO_PUBLIC_URL", func() string { + if minioUseSSL { + return "https://" + minioEndpoint + } + return "http://" + minioEndpoint + }()) notificationURL := getEnv("NOTIFICATION_URL", "http://localhost:8080") identityURL := getEnv("IDENTITY_URL", "") serviceToken := getEnv("SERVICE_TOKEN", "internal-service-secret") @@ -82,7 +89,7 @@ func main() { fontH := handlers.NewFontHandler(store) bundleH := handlers.NewTemplateBundleHandler(mc, minioTemplatesBucket) scanH := handlers.NewScanHandler(store, mc, minioTemplatesBucket) - snapJobH := handlers.NewSnapshotJobHandler(store, mc, minioTemplatesBucket) + snapJobH := handlers.NewSnapshotJobHandler(store, mc, minioTemplatesBucket, minioUploadBucket, minioPublicBase) internalH := handlers.NewInternalHandler(store, notifyClient, mc, minioTemplatesBucket, minioBucket) // ── Dev mock worker (no AE node needed) ──────────────────────────────────── @@ -217,6 +224,7 @@ func main() { // AE scene snapshots (node claims, renders one frame, posts the image URL) internal.POST("/snapshot/claim", snapJobH.Claim) + internal.POST("/snapshot/:id/upload-url", snapJobH.UploadURL) internal.POST("/snapshot/:id/result", snapJobH.Result) internal.POST("/snapshot/:id/fail", snapJobH.Fail) diff --git a/services/render/internal/db/db.go b/services/render/internal/db/db.go index f5c2b5a..12ed94d 100644 --- a/services/render/internal/db/db.go +++ b/services/render/internal/db/db.go @@ -674,7 +674,19 @@ func (s *Store) GetRenderBindings(ctx context.Context, savedProjectID uuid.UUID) SELECT c.key, c.type, COALESCE(c.value, '') FROM studio.saved_scene_contents c JOIN studio.saved_scenes s ON s.id = c.saved_scene_id - WHERE s.saved_project_id = $1 AND c.value IS NOT NULL AND c.value <> ''`, + WHERE s.saved_project_id = $1 AND c.value IS NOT NULL AND c.value <> '' + UNION + -- project-wide shared colours (frshare/frd_* layers): bind.jsx writes these + -- into the frshare comp's text layers so template expressions propagate them. + SELECT sc.element_key, 'color', sc.value + FROM studio.saved_shared_colors sc + WHERE sc.saved_project_id = $1 AND sc.value IS NOT NULL AND sc.value <> '' + UNION + -- per-scene colours (frl_c* layers) + SELECT cc.element_key, 'color', cc.value + FROM studio.saved_scene_colors cc + JOIN studio.saved_scenes s2 ON s2.id = cc.saved_scene_id + WHERE s2.saved_project_id = $1 AND cc.value IS NOT NULL AND cc.value <> ''`, savedProjectID) if err != nil { return nil, err diff --git a/services/render/internal/db/snapshotjobs.go b/services/render/internal/db/snapshotjobs.go index 48c0136..59383c7 100644 --- a/services/render/internal/db/snapshotjobs.go +++ b/services/render/internal/db/snapshotjobs.go @@ -123,3 +123,13 @@ func (s *Store) SetSnapshotError(ctx context.Context, id uuid.UUID, msg string) id, msg) return err } + +// GetSnapshotJobMeta returns the project id + scene key for a job (to build the +// object key the node uploads its rendered still to). +func (s *Store) GetSnapshotJobMeta(ctx context.Context, id uuid.UUID) (uuid.UUID, string, error) { + var pid uuid.UUID + var key string + err := s.pool.QueryRow(ctx, + `SELECT project_id, scene_key FROM render.snapshot_jobs WHERE id = $1`, id).Scan(&pid, &key) + return pid, key, err +} diff --git a/services/render/internal/handlers/snapshotjobs.go b/services/render/internal/handlers/snapshotjobs.go index d9b4790..048e522 100644 --- a/services/render/internal/handlers/snapshotjobs.go +++ b/services/render/internal/handlers/snapshotjobs.go @@ -1,7 +1,9 @@ package handlers import ( + "fmt" "net/http" + "time" "github.com/flatrender/render-svc/internal/db" "github.com/flatrender/render-svc/internal/models" @@ -17,10 +19,12 @@ type SnapshotJobHandler struct { store *db.Store minio *minio.Client templatesBucket string + uploadBucket string // public-read bucket snapshots land in (e.g. user-uploads) + publicBase string // browser-reachable base, e.g. http://172.28.144.1:9000 } -func NewSnapshotJobHandler(store *db.Store, mc *minio.Client, templatesBucket string) *SnapshotJobHandler { - return &SnapshotJobHandler{store: store, minio: mc, templatesBucket: templatesBucket} +func NewSnapshotJobHandler(store *db.Store, mc *minio.Client, templatesBucket, uploadBucket, publicBase string) *SnapshotJobHandler { + return &SnapshotJobHandler{store: store, minio: mc, templatesBucket: templatesBucket, uploadBucket: uploadBucket, publicBase: publicBase} } // POST /v1/scene-snapshots/:project_id (admin) → queue one job per active scene. @@ -90,6 +94,33 @@ func (h *SnapshotJobHandler) Claim(c *gin.Context) { }) } +// POST /v1/internal/snapshot/:id/upload-url (node, HMAC) +// Presigns a PUT to the public-read uploads bucket and returns the permanent +// public URL the node should report back once the still is uploaded. +func (h *SnapshotJobHandler) UploadURL(c *gin.Context) { + id, err := uuid.Parse(c.Param("id")) + if err != nil { + c.JSON(http.StatusBadRequest, models.APIError{Code: "bad_request", Message: "invalid id"}) + return + } + pid, sceneKey, merr := h.store.GetSnapshotJobMeta(c.Request.Context(), id) + if merr != nil { + c.JSON(http.StatusNotFound, models.APIError{Code: "not_found", Message: "snapshot job not found"}) + return + } + objectKey := fmt.Sprintf("snapshots/%s/%s.png", pid, sceneKey) + put, perr := h.minio.PresignedPutObject(c.Request.Context(), h.uploadBucket, objectKey, 15*time.Minute) + if perr != nil { + c.JSON(http.StatusInternalServerError, models.APIError{Code: "presign_failed", Message: perr.Error()}) + return + } + c.JSON(http.StatusOK, gin.H{ + "upload_url": put.String(), + "object_key": objectKey, + "public_url": fmt.Sprintf("%s/%s/%s", h.publicBase, h.uploadBucket, objectKey), + }) +} + // POST /v1/internal/snapshot/:id/result (node, HMAC) body {image_url} func (h *SnapshotJobHandler) Result(c *gin.Context) { id, err := uuid.Parse(c.Param("id"))