Files
soroush.asadi 6cf8716d7e
Build backend images / build content-svc (push) Failing after 13s
Build backend images / build file-svc (push) Failing after 53s
Build backend images / build gateway (push) Failing after 1m22s
Build backend images / build identity-svc (push) Failing after 19s
Build backend images / build notification-svc (push) Failing after 21s
Build backend images / build render-svc (push) Failing after 20s
Build backend images / build studio-svc (push) Failing after 1m6s
feat(render): node-agent AE snapshot runner (Epic C2) + colour render-binding (Epic B)
C2 — real-AE scene snapshots on the node:
- node-agent: runner/snapshot.go RunSnapshot (aerender -comp <key> -s f -e f
  → findRenderedOutput → ffmpeg -frames:v 1 PNG); client ClaimSnapshot /
  GetSnapshotUploadURL / ReportSnapshotResult / ReportSnapshotFail; snapshotLoop +
  pollSnapshotOnce mirroring the scan loop (reuses the AE-exclusive lock).
- render-svc: GetSnapshotJobMeta + UploadURL handler presigns a PUT to the
  public-read user-uploads bucket at snapshots/{project}/{scene}.png and returns a
  permanent public_url (not the time-limited export presign); MINIO_UPLOAD_BUCKET +
  MINIO_PUBLIC_URL config + compose env + /snapshot/:id/upload-url route.

Epic B — bind edited colours into the render:
- render-svc GetRenderBindings UNIONs studio.saved_shared_colors +
  saved_scene_colors (type 'color') so the node writes them before render.
- node-agent binder.go routes type:"color" bindings into the bind-spec colors[]
  array that bind.jsx already applies to the frshare colour layers.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-11 18:08:43 +03:30

243 lines
12 KiB
Go

package main
import (
"context"
"log"
"net/http"
"os"
"github.com/flatrender/render-svc/internal/db"
"github.com/flatrender/render-svc/internal/devworker"
"github.com/flatrender/render-svc/internal/handlers"
"github.com/flatrender/render-svc/internal/identityclient"
"github.com/flatrender/render-svc/internal/middleware"
"github.com/flatrender/render-svc/internal/notifier"
"github.com/gin-gonic/gin"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
)
func getEnv(key, fallback string) string {
if v := os.Getenv(key); v != "" {
return v
}
return fallback
}
func main() {
// ── Config ────────────────────────────────────────────────────────────────
dbURL := getEnv("DATABASE_URL", "postgres://postgres:postgres@localhost:5432/flatrender?search_path=render,public")
jwtSecret := getEnv("JWT_SECRET", "change-me")
nodeSecret := getEnv("NODE_HMAC_SECRET", "node-secret")
minioEndpoint := getEnv("MINIO_ENDPOINT", "localhost:9000")
minioAccessKey := getEnv("MINIO_ACCESS_KEY", "minioadmin")
minioSecretKey := getEnv("MINIO_SECRET_KEY", "minioadmin")
minioUseSSL := getEnv("MINIO_USE_SSL", "false") == "true"
minioBucket := getEnv("MINIO_BUCKET", "flatrender-exports")
minioTemplatesBucket := getEnv("MINIO_TEMPLATES_BUCKET", "flatrender-templates")
minioUploadBucket := getEnv("MINIO_UPLOAD_BUCKET", "user-uploads") // public-read — scene snapshots
minioPublicBase := getEnv("MINIO_PUBLIC_URL", func() string {
if minioUseSSL {
return "https://" + minioEndpoint
}
return "http://" + minioEndpoint
}())
notificationURL := getEnv("NOTIFICATION_URL", "http://localhost:8080")
identityURL := getEnv("IDENTITY_URL", "")
serviceToken := getEnv("SERVICE_TOKEN", "internal-service-secret")
port := getEnv("PORT", "8080")
devWorker := getEnv("RENDER_DEV_WORKER", "false") == "true"
// ── Database ──────────────────────────────────────────────────────────────
pool, err := pgxpool.New(context.Background(), dbURL)
if err != nil {
log.Fatalf("connect db: %v", err)
}
defer pool.Close()
if err := pool.Ping(context.Background()); err != nil {
log.Fatalf("ping db: %v", err)
}
// ── MinIO ──────────────────────────────────────────────────────────────────
mc, err := minio.New(minioEndpoint, &minio.Options{
Creds: credentials.NewStaticV4(minioAccessKey, minioSecretKey, ""),
Secure: minioUseSSL,
})
if err != nil {
log.Fatalf("minio client: %v", err)
}
// Ensure the render output bucket exists (node agents PUT exports here).
for _, b := range []string{minioBucket, minioTemplatesBucket} {
if exists, berr := mc.BucketExists(context.Background(), b); berr == nil && !exists {
if merr := mc.MakeBucket(context.Background(), b, minio.MakeBucketOptions{}); merr != nil {
log.Printf("warning: could not create bucket %q: %v", b, merr)
} else {
log.Printf("created bucket %q", b)
}
}
}
// ── Store + handlers ──────────────────────────────────────────────────────
store := db.NewStore(pool)
notifyClient := notifier.New(notificationURL, serviceToken)
identityClient := identityclient.New(identityURL, serviceToken)
renderH := handlers.NewRenderHandler(store, identityClient)
snapH := handlers.NewSnapshotHandler(store)
exportH := handlers.NewExportHandler(store, mc, minioBucket)
nodeH := handlers.NewNodeHandler(store)
fontH := handlers.NewFontHandler(store)
bundleH := handlers.NewTemplateBundleHandler(mc, minioTemplatesBucket)
scanH := handlers.NewScanHandler(store, mc, minioTemplatesBucket)
snapJobH := handlers.NewSnapshotJobHandler(store, mc, minioTemplatesBucket, minioUploadBucket, minioPublicBase)
internalH := handlers.NewInternalHandler(store, notifyClient, mc, minioTemplatesBucket, minioBucket)
// ── Dev mock worker (no AE node needed) ────────────────────────────────────
// Drives Queued jobs to Done with progress + preview frames so the render flow
// is exercisable in development. Production keeps this OFF and uses real nodes.
if devWorker {
go devworker.New(store).Run(context.Background())
}
// Snapshot-only dev mock: fulfils scene-snapshot jobs with a generated
// placeholder (no AE), gated separately so it never hijacks real render jobs.
if getEnv("RENDER_DEV_SNAPSHOTS", "false") == "true" {
go devworker.New(store).RunSnapshots(context.Background())
}
// ── Router ────────────────────────────────────────────────────────────────
r := gin.Default()
r.GET("/health", func(c *gin.Context) {
if err := pool.Ping(c.Request.Context()); err != nil {
c.JSON(http.StatusServiceUnavailable, gin.H{"status": "down", "error": err.Error()})
return
}
c.JSON(http.StatusOK, gin.H{"status": "ok"})
})
auth := middleware.JWTAuth(jwtSecret)
admin := middleware.RequireAdmin()
nodeAuth := middleware.NodeHMAC(nodeSecret)
v1 := r.Group("/v1")
// ── Render jobs ───────────────────────────────────────────────────────────
renders := v1.Group("/renders", auth)
{
renders.GET("", renderH.List)
renders.POST("", renderH.Create)
renders.GET("/active", renderH.Active)
renders.GET("/:job_id", renderH.Get)
renders.POST("/:job_id/cancel", renderH.Cancel)
renders.POST("/:job_id/stop", admin, renderH.Stop)
renders.POST("/:job_id/retry", renderH.Retry)
renders.GET("/:job_id/progress", renderH.Progress)
renders.GET("/:job_id/logs", renderH.Logs)
}
// ── Snapshots ─────────────────────────────────────────────────────────────
snaps := v1.Group("/snapshots", auth)
{
snaps.POST("", snapH.Create)
snaps.GET("/:snapshot_id", snapH.Get)
}
// ── Exports ───────────────────────────────────────────────────────────────
exports := v1.Group("/exports", auth)
{
exports.GET("", exportH.List)
exports.GET("/:export_id", exportH.Get)
exports.DELETE("/:export_id", exportH.Delete)
exports.POST("/:export_id/extend", exportH.Extend)
exports.GET("/:export_id/download-url", exportH.DownloadURL)
}
// ── Nodes (admin) ─────────────────────────────────────────────────────────
nodes := v1.Group("/nodes", auth, admin)
{
nodes.GET("", nodeH.List)
nodes.POST("", nodeH.Create)
nodes.GET("/:node_id", nodeH.Get)
nodes.PATCH("/:node_id", nodeH.Patch)
nodes.DELETE("/:node_id", nodeH.Delete)
nodes.POST("/:node_id/restart", nodeH.Restart)
nodes.POST("/:node_id/release", nodeH.Release)
nodes.POST("/:node_id/close-ae", nodeH.CloseAE)
nodes.GET("/:node_id/health", nodeH.Health)
nodes.GET("/:node_id/health/history", nodeH.HealthHistory)
nodes.GET("/:node_id/crashes", nodeH.Crashes)
}
// ── Node updates (admin) ──────────────────────────────────────────────────
v1.GET("/node-updates", auth, admin, nodeH.ListUpdates)
v1.POST("/node-updates/:update_id/rollout", auth, admin, nodeH.Rollout)
// ── Node fonts: install a font on all nodes + verify (admin) ──────────────
nodeFonts := v1.Group("/node-fonts", auth, admin)
{
nodeFonts.GET("", fontH.List)
nodeFonts.POST("", fontH.Create)
nodeFonts.DELETE("/:id", fontH.Delete)
}
// ── Render queue (admin: all users' jobs) ─────────────────────────────────
v1.GET("/admin-renders", auth, admin, renderH.AdminList)
// ── Template bundles (admin: store the canonical .aep/.zip per project) ────
v1.POST("/template-bundles/:project_id", auth, admin, bundleH.Set)
// ── Template scans (admin: read scenes/colours/configs from the AEP) ───────
// ── Scene snapshots (admin: render one frame per scene from AE) ────────────
v1.POST("/scene-snapshots/:project_id", auth, admin, snapJobH.Enqueue)
v1.GET("/scene-snapshots/:project_id", auth, admin, snapJobH.List)
v1.POST("/template-scans/:project_id/quick", auth, admin, scanH.QuickScan) // headless Go quick-scan
v1.POST("/template-scans/:project_id/jobs", auth, admin, scanH.CreateJob) // queue an AE full scan
v1.GET("/template-scan-jobs/:id", auth, admin, scanH.GetJob)
v1.POST("/template-scan-jobs/:id/cancel", auth, admin, scanH.Cancel)
// ── Exports management (admin: all users' rendered videos) ────────────────
adminExports := v1.Group("/admin-exports", auth, admin)
{
adminExports.GET("", exportH.AdminList)
adminExports.DELETE("/:export_id", exportH.AdminDelete)
adminExports.GET("/:export_id/download-url", exportH.AdminDownloadURL)
}
// ── Internal (node agents only — HMAC auth) ───────────────────────────────
internal := v1.Group("/internal", nodeAuth)
{
internal.POST("/nodes/:node_id/heartbeat", internalH.Heartbeat)
internal.POST("/nodes/:node_id/online", internalH.Online)
internal.POST("/nodes/:node_id/cache-update", internalH.CacheUpdate)
internal.GET("/nodes/:node_id/fonts/pending", fontH.Pending)
internal.POST("/nodes/:node_id/fonts/:request_id/status", fontH.Report)
internal.POST("/render/jobs/claim", internalH.Claim)
internal.POST("/render/jobs/:job_id/preview", internalH.Preview)
internal.POST("/render/jobs/:job_id/progress", internalH.RenderProgress)
internal.POST("/render/jobs/:job_id/output-upload-url", internalH.OutputUploadURL)
internal.POST("/render/jobs/:job_id/frames", internalH.FrameProgress)
internal.POST("/render/jobs/:job_id/complete", internalH.Complete)
internal.POST("/render/jobs/:job_id/fail", internalH.Fail)
internal.POST("/render/jobs/:job_id/crash", internalH.Crash)
internal.POST("/render/jobs/:job_id/replica-ready", internalH.ReplicaReady)
// AE scene snapshots (node claims, renders one frame, posts the image URL)
internal.POST("/snapshot/claim", snapJobH.Claim)
internal.POST("/snapshot/:id/upload-url", snapJobH.UploadURL)
internal.POST("/snapshot/:id/result", snapJobH.Result)
internal.POST("/snapshot/:id/fail", snapJobH.Fail)
// AE scan jobs (node claims, runs scan.jsx, posts the ScanResult back)
internal.POST("/scan/claim", scanH.Claim)
internal.GET("/scan/:id/status", scanH.Status) // node watchdog (cancel detection)
internal.POST("/scan/:id/result", scanH.Result)
internal.POST("/scan/:id/fail", scanH.Fail)
}
log.Printf("render-svc listening on :%s", port)
if err := r.Run(":" + port); err != nil {
log.Fatalf("server: %v", err)
}
}