package handlers import ( "context" "fmt" "net/http" "strings" "time" "github.com/flatrender/render-svc/internal/db" "github.com/flatrender/render-svc/internal/models" "github.com/flatrender/render-svc/internal/notifier" "github.com/gin-gonic/gin" "github.com/google/uuid" "github.com/minio/minio-go/v7" ) type InternalHandler struct { store *db.Store notifier *notifier.Client // may be nil — notifications are best-effort minio *minio.Client templatesBucket string // bucket that holds .aep project files exportsBucket string // bucket that receives rendered MP4 outputs } func NewInternalHandler(store *db.Store, n *notifier.Client, mc *minio.Client, templatesBucket, exportsBucket string) *InternalHandler { return &InternalHandler{ store: store, notifier: n, minio: mc, templatesBucket: templatesBucket, exportsBucket: exportsBucket, } } // completeRequest is the body for POST .../complete type completeRequest struct { ExportID *uuid.UUID `json:"export_id"` } // failRequest is the body for POST .../fail type failRequest struct { Reason string `json:"reason" binding:"required"` AtStep string `json:"at_step"` // optional: which render step failed } // POST /v1/internal/render/jobs/:job_id/complete func (h *InternalHandler) Complete(c *gin.Context) { jobID, err := uuid.Parse(c.Param("job_id")) if err != nil { c.JSON(http.StatusBadRequest, models.APIError{Code: "bad_request", Message: "invalid job_id"}) return } var req completeRequest _ = c.ShouldBindJSON(&req) // export_id is optional job, err := h.store.CompleteJob(c.Request.Context(), jobID, req.ExportID) if err != nil { c.JSON(http.StatusInternalServerError, models.APIError{Code: "internal_error", Message: err.Error()}) return } // Fire notification if the user requested it (tell_me_when_done) if h.notifier != nil && job.TellMeWhenDone { jobName := "" if job.Name != nil { jobName = *job.Name } else if job.Title != nil { jobName = *job.Title } h.notifier.NotifyRenderDone(c.Request.Context(), job.UserID, job.TenantID, job.ID, job.ExportID, jobName) } c.JSON(http.StatusOK, gin.H{"status": "done", "job_id": job.ID}) } // POST /v1/internal/render/jobs/:job_id/fail func (h *InternalHandler) Fail(c *gin.Context) { jobID, err := uuid.Parse(c.Param("job_id")) if err != nil { c.JSON(http.StatusBadRequest, models.APIError{Code: "bad_request", Message: "invalid job_id"}) return } var req failRequest if err := c.ShouldBindJSON(&req); err != nil { c.JSON(http.StatusBadRequest, models.APIError{Code: "bad_request", Message: err.Error()}) return } atStep := req.AtStep if atStep == "" { atStep = "Rendering" } job, err := h.store.FailJob(c.Request.Context(), jobID, req.Reason, atStep) if err != nil { c.JSON(http.StatusInternalServerError, models.APIError{Code: "internal_error", Message: err.Error()}) return } // Notify user of failure if h.notifier != nil { jobName := "" if job.Name != nil { jobName = *job.Name } else if job.Title != nil { jobName = *job.Title } h.notifier.NotifyRenderFailed(c.Request.Context(), job.UserID, job.TenantID, job.ID, jobName, req.Reason) } c.JSON(http.StatusOK, gin.H{"status": "failed", "job_id": job.ID}) } // POST /v1/internal/nodes/:node_id/heartbeat func (h *InternalHandler) Heartbeat(c *gin.Context) { nodeID, err := uuid.Parse(c.Param("node_id")) if err != nil { c.JSON(http.StatusBadRequest, models.APIError{Code: "bad_request", Message: "invalid node_id"}) return } var req models.NodeHeartbeatRequest if err := c.ShouldBindJSON(&req); err != nil { c.JSON(http.StatusBadRequest, models.APIError{Code: "bad_request", Message: err.Error()}) return } req.NodeID = nodeID if err := h.store.UpdateNodeHeartbeat(c.Request.Context(), nodeID, &req); err != nil { c.JSON(http.StatusInternalServerError, models.APIError{Code: "internal_error", Message: err.Error()}) return } c.JSON(http.StatusOK, gin.H{ "next_heartbeat_in_sec": 5, "pending_commands": []any{}, }) } // POST /v1/internal/nodes/:node_id/online func (h *InternalHandler) Online(c *gin.Context) { nodeID, err := uuid.Parse(c.Param("node_id")) if err != nil { c.JSON(http.StatusBadRequest, models.APIError{Code: "bad_request", Message: "invalid node_id"}) return } var req models.NodeOnlineRequest if err := c.ShouldBindJSON(&req); err != nil { c.JSON(http.StatusBadRequest, models.APIError{Code: "bad_request", Message: err.Error()}) return } if err := h.store.UpdateNodeOnline(c.Request.Context(), nodeID, &req); err != nil { c.JSON(http.StatusInternalServerError, models.APIError{Code: "internal_error", Message: err.Error()}) return } c.Status(http.StatusOK) } // POST /v1/internal/render/jobs/:job_id/frames func (h *InternalHandler) FrameProgress(c *gin.Context) { jobID, err := uuid.Parse(c.Param("job_id")) if err != nil { c.JSON(http.StatusBadRequest, models.APIError{Code: "bad_request", Message: "invalid job_id"}) return } var req models.FrameProgressRequest if err := c.ShouldBindJSON(&req); err != nil { c.JSON(http.StatusBadRequest, models.APIError{Code: "bad_request", Message: err.Error()}) return } if err := h.store.UpdateFrameProgress(c.Request.Context(), jobID, &req); err != nil { c.JSON(http.StatusInternalServerError, models.APIError{Code: "internal_error", Message: err.Error()}) return } c.Status(http.StatusNoContent) } // POST /v1/internal/render/jobs/:job_id/crash func (h *InternalHandler) Crash(c *gin.Context) { jobID, err := uuid.Parse(c.Param("job_id")) if err != nil { c.JSON(http.StatusBadRequest, models.APIError{Code: "bad_request", Message: "invalid job_id"}) return } var req models.CrashReportRequest if err := c.ShouldBindJSON(&req); err != nil { c.JSON(http.StatusBadRequest, models.APIError{Code: "bad_request", Message: err.Error()}) return } if err := h.store.InsertCrash(c.Request.Context(), req.NodeID, jobID, &req); err != nil { c.JSON(http.StatusInternalServerError, models.APIError{Code: "internal_error", Message: err.Error()}) return } c.JSON(http.StatusOK, gin.H{ "action_recommended": "ResetAndRestart", "reassigned_to_node_id": nil, }) } // POST /v1/internal/render/jobs/:job_id/replica-ready func (h *InternalHandler) ReplicaReady(c *gin.Context) { _, err := uuid.Parse(c.Param("job_id")) if err != nil { c.JSON(http.StatusBadRequest, models.APIError{Code: "bad_request", Message: "invalid job_id"}) return } var req models.ReplicaReadyRequest if err := c.ShouldBindJSON(&req); err != nil { c.JSON(http.StatusBadRequest, models.APIError{Code: "bad_request", Message: err.Error()}) return } // In production: update job step to TemplateCache → JsxGen, signal next pipeline phase c.Status(http.StatusNoContent) } // POST /v1/internal/render/jobs/:job_id/preview // Node agent pushes a base64-encoded frame image so the frontend can show // a live preview while the job is rendering. func (h *InternalHandler) Preview(c *gin.Context) { jobID, err := uuid.Parse(c.Param("job_id")) if err != nil { c.JSON(http.StatusBadRequest, models.APIError{Code: "bad_request", Message: "invalid job_id"}) return } var req struct { ImageB64 string `json:"image_b64" binding:"required"` } if err := c.ShouldBindJSON(&req); err != nil { c.JSON(http.StatusBadRequest, models.APIError{Code: "bad_request", Message: err.Error()}) return } if err := h.store.UpdateJobPreview(c.Request.Context(), jobID, req.ImageB64); err != nil { c.JSON(http.StatusInternalServerError, models.APIError{Code: "internal_error", Message: err.Error()}) return } c.Status(http.StatusNoContent) } // POST /v1/internal/render/jobs/:job_id/progress // Node agent pushes the render percentage so the UI shows a moving bar + ETA. func (h *InternalHandler) RenderProgress(c *gin.Context) { jobID, err := uuid.Parse(c.Param("job_id")) if err != nil { c.JSON(http.StatusBadRequest, models.APIError{Code: "bad_request", Message: "invalid job_id"}) return } var req struct { Progress int `json:"progress"` } if err := c.ShouldBindJSON(&req); err != nil { c.JSON(http.StatusBadRequest, models.APIError{Code: "bad_request", Message: err.Error()}) return } if err := h.store.UpdateJobProgress(c.Request.Context(), jobID, req.Progress); err != nil { c.JSON(http.StatusInternalServerError, models.APIError{Code: "internal_error", Message: err.Error()}) return } c.Status(http.StatusNoContent) } // POST /v1/internal/render/jobs/claim // Node agent calls this to atomically claim the next queued job. // Returns 204 when there is nothing queued (agent should back off and retry). func (h *InternalHandler) Claim(c *gin.Context) { var req models.ClaimJobRequest if err := c.ShouldBindJSON(&req); err != nil { c.JSON(http.StatusBadRequest, models.APIError{Code: "bad_request", Message: err.Error()}) return } job, err := h.store.ClaimJob(c.Request.Context(), req.NodeID, req.Region) if err != nil { c.JSON(http.StatusInternalServerError, models.APIError{Code: "internal_error", Message: err.Error()}) return } if job == nil { c.Status(http.StatusNoContent) // nothing queued return } // Resolve the canonical template object. Each template is stored once, per // project id, at templates/{original_project_id}/ in the templates bucket // and reused by every render of that template. A .zip is a full AE project // bundle (.aep + footage/fonts) the node must extract before rendering. // Errors are non-fatal — the node agent falls back to mock render when URL is empty. // Resolve the render engine + composition for this template. Remotion // templates are code-based and need no .aep download. rcfg, _ := h.store.GetTemplateRenderConfig(c.Request.Context(), job.OriginalProjectID) aepURL := "" isBundle := false bundleMD5 := "" if rcfg.Engine != "Remotion" && h.minio != nil { candidates := []struct { name string bundle bool }{ {"bundle.zip", true}, {"template.aep", false}, {"template.aepx", false}, } for _, cand := range candidates { objectKey := fmt.Sprintf("templates/%s/%s", job.OriginalProjectID, cand.name) info, serr := h.minio.StatObject(context.Background(), h.templatesBucket, objectKey, minio.StatObjectOptions{}) if serr != nil { continue // not this format } purl, perr := h.minio.PresignedGetObject(context.Background(), h.templatesBucket, objectKey, 2*time.Hour, nil) if perr != nil { continue } aepURL = purl.String() isBundle = cand.bundle bundleMD5 = strings.Trim(info.ETag, "\"") break } } // Composition to render: AE comp (-comp) or the Remotion composition id. // Non-fatal: empty → AE node uses the render queue. compName := rcfg.CompName // User's edited input values → the node writes them into the AE project before // rendering, or passes them as Remotion --props. Non-fatal: empty → template defaults. // FlexStory (scene engine) needs the structured per-scene shape (grouped by scene // + per-scene duration + theme colours); everything else uses the flat union. var bindings []models.RenderBinding if rcfg.Engine == "Remotion" && strings.HasPrefix(rcfg.CompName, "FlexStory") { if flex, ferr := h.store.GetFlexStoryProps(c.Request.Context(), job.SavedProjectID); ferr == nil && flex != "" { bindings = []models.RenderBinding{{Key: "__flexprops__", Type: "json", Value: flex}} } } else { bindings, _ = h.store.GetRenderBindings(c.Request.Context(), job.SavedProjectID) } c.JSON(http.StatusOK, models.ClaimedJob{ JobID: job.ID, SavedProjectID: job.SavedProjectID, Quality: job.Quality, Resolution: job.Resolution, FrameRate: job.FrameRate, HasMusic: job.HasMusic, HasVoiceover: job.HasVoiceover, Engine: rcfg.Engine, AEPDownloadURL: aepURL, IsBundle: isBundle, BundleMD5: bundleMD5, CompName: compName, Bindings: bindings, }) } // POST /v1/internal/render/jobs/:job_id/output-upload-url // Node agent calls this after rendering to get a presigned MinIO PUT URL. // Creates an Export record in the DB and returns the export_id + upload URL. func (h *InternalHandler) OutputUploadURL(c *gin.Context) { jobID, err := uuid.Parse(c.Param("job_id")) if err != nil { c.JSON(http.StatusBadRequest, models.APIError{Code: "bad_request", Message: "invalid job_id"}) return } export, err := h.store.CreateExportForJob(c.Request.Context(), jobID) if err != nil { c.JSON(http.StatusInternalServerError, models.APIError{Code: "internal_error", Message: err.Error()}) return } expiry := 2 * time.Hour purl, err := h.minio.PresignedPutObject( context.Background(), h.exportsBucket, export.Path, expiry, ) if err != nil { c.JSON(http.StatusInternalServerError, models.APIError{ Code: "presign_error", Message: "could not generate upload URL", }) return } c.JSON(http.StatusOK, models.OutputUploadURLResponse{ ExportID: export.ID, UploadURL: purl.String(), ObjectKey: export.Path, ExpiresAt: time.Now().Add(expiry), }) } // POST /v1/internal/nodes/:node_id/cache-update func (h *InternalHandler) CacheUpdate(c *gin.Context) { nodeID, err := uuid.Parse(c.Param("node_id")) if err != nil { c.JSON(http.StatusBadRequest, models.APIError{Code: "bad_request", Message: "invalid node_id"}) return } var req models.CacheUpdateRequest if err := c.ShouldBindJSON(&req); err != nil { c.JSON(http.StatusBadRequest, models.APIError{Code: "bad_request", Message: err.Error()}) return } if err := h.store.UpdateNodeCache(c.Request.Context(), nodeID, &req); err != nil { c.JSON(http.StatusInternalServerError, models.APIError{Code: "internal_error", Message: err.Error()}) return } c.Status(http.StatusNoContent) }