// Package client provides a typed HTTP client for the V2 render orchestrator's // internal (node-agent) API. All requests are authenticated via the shared // X-Node-Signature header. package client import ( "bytes" "context" "encoding/json" "fmt" "io" "net/http" "strings" "time" ) // Client talks to the V2 render orchestrator. type Client struct { base string secret string http *http.Client } // New returns a Client targeting the given base URL (e.g. "http://gateway:8080"). func New(baseURL, nodeHMACSecret string) *Client { return &Client{ base: strings.TrimRight(baseURL, "/"), secret: nodeHMACSecret, // 60s: the post-render output-upload-url call (export insert + presign) can be // slow when the DB is briefly busy right after a CPU-heavy render/transcode. http: &http.Client{Timeout: 60 * time.Second}, } } // ── Request helpers ─────────────────────────────────────────────────────────── func (c *Client) do(ctx context.Context, method, path string, body any) (*http.Response, error) { var bodyReader io.Reader if body != nil { b, err := json.Marshal(body) if err != nil { return nil, fmt.Errorf("marshal: %w", err) } bodyReader = bytes.NewReader(b) } req, err := http.NewRequestWithContext(ctx, method, c.base+path, bodyReader) if err != nil { return nil, err } req.Header.Set("X-Node-Signature", c.secret) if body != nil { req.Header.Set("Content-Type", "application/json") } req.Header.Set("Accept", "application/json") return c.http.Do(req) } func decodeJSON(resp *http.Response, out any) error { defer resp.Body.Close() if out == nil { return nil } return json.NewDecoder(resp.Body).Decode(out) } // ── Fonts ─────────────────────────────────────────────────────────────────── type PendingFont struct { ID string `json:"id"` Name string `json:"name"` SystemName string `json:"system_name"` FileURL string `json:"file_url"` } type pendingFontsResp struct { Fonts []PendingFont `json:"fonts"` } // PendingFonts returns fonts this node still needs to install. func (c *Client) PendingFonts(ctx context.Context, nodeID string) ([]PendingFont, error) { resp, err := c.do(ctx, http.MethodGet, fmt.Sprintf("/v1/internal/nodes/%s/fonts/pending", nodeID), nil) if err != nil { return nil, err } if resp.StatusCode != http.StatusOK { resp.Body.Close() return nil, fmt.Errorf("pending fonts status %d", resp.StatusCode) } var out pendingFontsResp if err := decodeJSON(resp, &out); err != nil { return nil, err } return out.Fonts, nil } // ReportFont reports a font install result (Installed | Failed). func (c *Client) ReportFont(ctx context.Context, nodeID, requestID, status, errMsg string) error { resp, err := c.do(ctx, http.MethodPost, fmt.Sprintf("/v1/internal/nodes/%s/fonts/%s/status", nodeID, requestID), map[string]string{"status": status, "error": errMsg}) if err != nil { return err } resp.Body.Close() return nil } // ── Domain types ────────────────────────────────────────────────────────────── // OnlineRequest is sent once on startup to mark the node Ready. type OnlineRequest struct { NodeAgentVersion string `json:"node_agent_version"` CurrentAEVersion string `json:"current_ae_version"` AvailableAEVersions []string `json:"available_ae_versions"` RamGB *int `json:"ram_gb,omitempty"` CPUCores *int `json:"cpu_cores,omitempty"` CacheUsedGB *int `json:"cache_used_gb,omitempty"` CachedTemplateMD5s []string `json:"cached_template_md5s"` } // HeartbeatRequest is sent every HeartbeatIntervalSec seconds. type HeartbeatRequest struct { NodeID string `json:"node_id"` Status string `json:"status"` // Ready | Busy CPUPct *int `json:"cpu_pct,omitempty"` RAMAvailableMB *int `json:"ram_available_mb,omitempty"` DiskUsedPct *int `json:"disk_used_pct,omitempty"` DiskTotalGB *int `json:"disk_total_gb,omitempty"` AERunning *bool `json:"ae_running,omitempty"` CurrentJobID *string `json:"current_job_id,omitempty"` CacheUsedGB *int `json:"cache_used_gb,omitempty"` } // HeartbeatResponse carries optional commands from the orchestrator. type HeartbeatResponse struct { NextHeartbeatInSec int `json:"next_heartbeat_in_sec"` PendingCommands []any `json:"pending_commands"` } // ClaimJobRequest asks the orchestrator for the next queued job. type ClaimJobRequest struct { NodeID string `json:"node_id"` Region string `json:"region,omitempty"` } // ClaimedJob is the response when a job is successfully claimed. type ClaimedJob struct { JobID string `json:"job_id"` SavedProjectID string `json:"saved_project_id"` Quality string `json:"quality"` Resolution string `json:"resolution"` FrameRate int `json:"frame_rate"` HasMusic bool `json:"has_music"` HasVoiceover bool `json:"has_voiceover"` // AEPDownloadURL is a presigned MinIO GET URL for the .aep template file // (or .zip bundle). Empty when the template has not been uploaded yet — triggers mock render. AEPDownloadURL string `json:"aep_download_url,omitempty"` // IsBundle is true when AEPDownloadURL points to a .zip bundle (.aep + footage/fonts) // that must be extracted before rendering. IsBundle bool `json:"is_bundle,omitempty"` // BundleMD5 identifies the bundle content; used as a local cache key so repeated // renders of the same template download + extract it only once. BundleMD5 string `json:"bundle_md5,omitempty"` // CompName is the AE composition to render (-comp), e.g. "frfinal". Empty → the // node falls back to the project's render queue (-rqindex 1). CompName string `json:"comp_name,omitempty"` // Bindings are the user's edited input values to write into the AE project before // rendering (render binder). Key = AE layer/footage name (frl_c{n}{t|m}{i}). Bindings []RenderBinding `json:"bindings,omitempty"` } // RenderBinding is one input value to write into the AE project before render. type RenderBinding struct { Key string `json:"key"` Type string `json:"type"` Value string `json:"value"` } // OutputUploadURLResponse is returned by GetOutputUploadURL. type OutputUploadURLResponse struct { ExportID string `json:"export_id"` UploadURL string `json:"upload_url"` ObjectKey string `json:"object_key"` } // ProgressRequest reports render progress (frame-level) for a job. type ProgressRequest struct { FrameJobID string `json:"frame_job_id"` FrameNumber int `json:"frame_number"` CompletedAt *time.Time `json:"completed_at,omitempty"` } // CompleteRequest marks a job as Done. type CompleteRequest struct { ExportID *string `json:"export_id,omitempty"` } // FailRequest marks a job as Failed. type FailRequest struct { Reason string `json:"reason"` AtStep string `json:"at_step,omitempty"` } // CrashRequest reports a node crash. type CrashRequest struct { NodeID string `json:"node_id"` LastKnownFrame *int `json:"last_known_frame,omitempty"` CrashSignal *string `json:"crash_signal,omitempty"` ErrorLogTail *string `json:"error_log_tail,omitempty"` } // ── API methods ─────────────────────────────────────────────────────────────── // Online marks the node as Ready on startup. func (c *Client) Online(ctx context.Context, nodeID string, req OnlineRequest) error { resp, err := c.do(ctx, http.MethodPost, fmt.Sprintf("/v1/internal/nodes/%s/online", nodeID), req) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode >= 300 { return fmt.Errorf("online: HTTP %d", resp.StatusCode) } return nil } // Heartbeat sends a heartbeat and returns the orchestrator's response. func (c *Client) Heartbeat(ctx context.Context, nodeID string, req HeartbeatRequest) (*HeartbeatResponse, error) { resp, err := c.do(ctx, http.MethodPost, fmt.Sprintf("/v1/internal/nodes/%s/heartbeat", nodeID), req) if err != nil { return nil, err } defer resp.Body.Close() if resp.StatusCode >= 300 { return nil, fmt.Errorf("heartbeat: HTTP %d", resp.StatusCode) } var out HeartbeatResponse _ = json.NewDecoder(resp.Body).Decode(&out) return &out, nil } // ClaimJob atomically claims the next queued render job. // Returns (nil, nil) when the queue is empty (204 No Content). func (c *Client) ClaimJob(ctx context.Context, nodeID, region string) (*ClaimedJob, error) { resp, err := c.do(ctx, http.MethodPost, "/v1/internal/render/jobs/claim", ClaimJobRequest{NodeID: nodeID, Region: region}) if err != nil { return nil, err } defer resp.Body.Close() if resp.StatusCode == http.StatusNoContent { return nil, nil // nothing queued } if resp.StatusCode >= 300 { return nil, fmt.Errorf("claim: HTTP %d", resp.StatusCode) } var job ClaimedJob if err := json.NewDecoder(resp.Body).Decode(&job); err != nil { return nil, fmt.Errorf("claim decode: %w", err) } return &job, nil } // ScanClaim is returned when an AE scan job is claimed. type ScanClaim struct { ScanJobID string `json:"scan_job_id"` ProjectID string `json:"project_id"` Mode string `json:"mode"` // fix | flexible | mockup | musicvisualizer AEPDownloadURL string `json:"aep_download_url"` IsBundle bool `json:"is_bundle"` BundleMD5 string `json:"bundle_md5"` } // ClaimScan atomically claims the next queued AE scan job. // Returns (nil, nil) when nothing is queued (204 No Content). func (c *Client) ClaimScan(ctx context.Context, nodeID, region string) (*ScanClaim, error) { resp, err := c.do(ctx, http.MethodPost, "/v1/internal/scan/claim", ClaimJobRequest{NodeID: nodeID, Region: region}) if err != nil { return nil, err } defer resp.Body.Close() if resp.StatusCode == http.StatusNoContent { return nil, nil } if resp.StatusCode >= 300 { return nil, fmt.Errorf("scan claim: HTTP %d", resp.StatusCode) } var sc ScanClaim if err := json.NewDecoder(resp.Body).Decode(&sc); err != nil { return nil, fmt.Errorf("scan claim decode: %w", err) } return &sc, nil } // ReportScanResult posts the raw ScanResult JSON produced by scan.jsx. func (c *Client) ReportScanResult(ctx context.Context, scanJobID string, resultJSON []byte) error { // json.RawMessage marshals to its raw bytes, so the body is the JSON verbatim. resp, err := c.do(ctx, http.MethodPost, fmt.Sprintf("/v1/internal/scan/%s/result", scanJobID), json.RawMessage(resultJSON)) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode >= 300 { return fmt.Errorf("scan result: HTTP %d", resp.StatusCode) } return nil } // ReportScanFail marks a scan job as failed. func (c *Client) ReportScanFail(ctx context.Context, scanJobID, reason string) error { resp, err := c.do(ctx, http.MethodPost, fmt.Sprintf("/v1/internal/scan/%s/fail", scanJobID), FailRequest{Reason: reason}) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode >= 300 { return fmt.Errorf("scan fail: HTTP %d", resp.StatusCode) } return nil } // ScanStatus returns the scan job's current status (for the cancel watchdog). func (c *Client) ScanStatus(ctx context.Context, scanJobID string) (string, error) { resp, err := c.do(ctx, http.MethodGet, fmt.Sprintf("/v1/internal/scan/%s/status", scanJobID), nil) if err != nil { return "", err } defer resp.Body.Close() if resp.StatusCode >= 300 { return "", fmt.Errorf("scan status: HTTP %d", resp.StatusCode) } var out struct { Status string `json:"status"` } _ = json.NewDecoder(resp.Body).Decode(&out) return out.Status, nil } // UpdatePreview sends a base64-encoded preview frame to the orchestrator. // Errors are non-fatal — the UI simply won't update the preview image. func (c *Client) UpdatePreview(ctx context.Context, jobID, imageB64 string) error { resp, err := c.do(ctx, http.MethodPost, fmt.Sprintf("/v1/internal/render/jobs/%s/preview", jobID), map[string]string{"image_b64": imageB64}) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode >= 300 { return fmt.Errorf("preview: HTTP %d", resp.StatusCode) } return nil } // GetOutputUploadURL asks the orchestrator to allocate an Export row and // return a presigned MinIO PUT URL for the rendered output file. func (c *Client) GetOutputUploadURL(ctx context.Context, jobID string) (*OutputUploadURLResponse, error) { resp, err := c.do(ctx, http.MethodPost, fmt.Sprintf("/v1/internal/render/jobs/%s/output-upload-url", jobID), nil) if err != nil { return nil, err } defer resp.Body.Close() if resp.StatusCode >= 300 { return nil, fmt.Errorf("output-upload-url: HTTP %d", resp.StatusCode) } var out OutputUploadURLResponse if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { return nil, fmt.Errorf("decode: %w", err) } return &out, nil } // Complete marks a render job as Done. func (c *Client) Complete(ctx context.Context, jobID string, exportID *string) error { resp, err := c.do(ctx, http.MethodPost, fmt.Sprintf("/v1/internal/render/jobs/%s/complete", jobID), CompleteRequest{ExportID: exportID}) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode >= 300 { return fmt.Errorf("complete: HTTP %d", resp.StatusCode) } return nil } // Fail marks a render job as Failed. func (c *Client) Fail(ctx context.Context, jobID, reason, atStep string) error { resp, err := c.do(ctx, http.MethodPost, fmt.Sprintf("/v1/internal/render/jobs/%s/fail", jobID), FailRequest{Reason: reason, AtStep: atStep}) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode >= 300 { return fmt.Errorf("fail: HTTP %d", resp.StatusCode) } return nil } // ReportCrash reports a node crash for the given job. func (c *Client) ReportCrash(ctx context.Context, jobID string, req CrashRequest) error { resp, err := c.do(ctx, http.MethodPost, fmt.Sprintf("/v1/internal/render/jobs/%s/crash", jobID), req) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode >= 300 { return fmt.Errorf("crash: HTTP %d", resp.StatusCode) } return nil }