Files
flatrender/services/node-agent/internal/client/client.go
T
soroush.asadi bf6c04aba3
Build backend images / build content-svc (push) Failing after 58s
Build backend images / build file-svc (push) Failing after 45s
Build backend images / build gateway (push) Failing after 52s
Build backend images / build identity-svc (push) Failing after 54s
Build backend images / build notification-svc (push) Failing after 56s
Build backend images / build render-svc (push) Failing after 56s
Build backend images / build studio-svc (push) Failing after 49s
fix(render): node reports progress → moving bar + ETA (was stuck 0%/Preparing)
The node's onProgress callback only LOGGED — it never POSTed, so render_progress stayed
0 and step stayed Preparing (no bar, no ETA). Add render-svc POST
/v1/internal/render/jobs/{id}/progress (UpdateJobProgress: set render_progress + bump
step Queued/Preparing→Rendering once >0) + client UpdateProgress + wire onProgress to
post it (8s best-effort timeout, AE-CPU/DB-starvation tolerant). Preview already posts;
real-frame preview is epic C.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-07 07:51:01 +03:30

438 lines
15 KiB
Go

// Package client provides a typed HTTP client for the V2 render orchestrator's
// internal (node-agent) API. All requests are authenticated via the shared
// X-Node-Signature header.
package client
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"time"
)
// Client talks to the V2 render orchestrator.
type Client struct {
base string
secret string
http *http.Client
}
// New returns a Client targeting the given base URL (e.g. "http://gateway:8080").
func New(baseURL, nodeHMACSecret string) *Client {
return &Client{
base: strings.TrimRight(baseURL, "/"),
secret: nodeHMACSecret,
// 60s: the post-render output-upload-url call (export insert + presign) can be
// slow when the DB is briefly busy right after a CPU-heavy render/transcode.
http: &http.Client{Timeout: 60 * time.Second},
}
}
// ── Request helpers ───────────────────────────────────────────────────────────
func (c *Client) do(ctx context.Context, method, path string, body any) (*http.Response, error) {
var bodyReader io.Reader
if body != nil {
b, err := json.Marshal(body)
if err != nil {
return nil, fmt.Errorf("marshal: %w", err)
}
bodyReader = bytes.NewReader(b)
}
req, err := http.NewRequestWithContext(ctx, method, c.base+path, bodyReader)
if err != nil {
return nil, err
}
req.Header.Set("X-Node-Signature", c.secret)
if body != nil {
req.Header.Set("Content-Type", "application/json")
}
req.Header.Set("Accept", "application/json")
return c.http.Do(req)
}
func decodeJSON(resp *http.Response, out any) error {
defer resp.Body.Close()
if out == nil {
return nil
}
return json.NewDecoder(resp.Body).Decode(out)
}
// ── Fonts ───────────────────────────────────────────────────────────────────
type PendingFont struct {
ID string `json:"id"`
Name string `json:"name"`
SystemName string `json:"system_name"`
FileURL string `json:"file_url"`
}
type pendingFontsResp struct {
Fonts []PendingFont `json:"fonts"`
}
// PendingFonts returns fonts this node still needs to install.
func (c *Client) PendingFonts(ctx context.Context, nodeID string) ([]PendingFont, error) {
resp, err := c.do(ctx, http.MethodGet, fmt.Sprintf("/v1/internal/nodes/%s/fonts/pending", nodeID), nil)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
resp.Body.Close()
return nil, fmt.Errorf("pending fonts status %d", resp.StatusCode)
}
var out pendingFontsResp
if err := decodeJSON(resp, &out); err != nil {
return nil, err
}
return out.Fonts, nil
}
// ReportFont reports a font install result (Installed | Failed).
func (c *Client) ReportFont(ctx context.Context, nodeID, requestID, status, errMsg string) error {
resp, err := c.do(ctx, http.MethodPost,
fmt.Sprintf("/v1/internal/nodes/%s/fonts/%s/status", nodeID, requestID),
map[string]string{"status": status, "error": errMsg})
if err != nil {
return err
}
resp.Body.Close()
return nil
}
// ── Domain types ──────────────────────────────────────────────────────────────
// OnlineRequest is sent once on startup to mark the node Ready.
type OnlineRequest struct {
NodeAgentVersion string `json:"node_agent_version"`
CurrentAEVersion string `json:"current_ae_version"`
AvailableAEVersions []string `json:"available_ae_versions"`
RamGB *int `json:"ram_gb,omitempty"`
CPUCores *int `json:"cpu_cores,omitempty"`
CacheUsedGB *int `json:"cache_used_gb,omitempty"`
CachedTemplateMD5s []string `json:"cached_template_md5s"`
}
// HeartbeatRequest is sent every HeartbeatIntervalSec seconds.
type HeartbeatRequest struct {
NodeID string `json:"node_id"`
Status string `json:"status"` // Ready | Busy
CPUPct *int `json:"cpu_pct,omitempty"`
RAMAvailableMB *int `json:"ram_available_mb,omitempty"`
DiskUsedPct *int `json:"disk_used_pct,omitempty"`
DiskTotalGB *int `json:"disk_total_gb,omitempty"`
AERunning *bool `json:"ae_running,omitempty"`
CurrentJobID *string `json:"current_job_id,omitempty"`
CacheUsedGB *int `json:"cache_used_gb,omitempty"`
}
// HeartbeatResponse carries optional commands from the orchestrator.
type HeartbeatResponse struct {
NextHeartbeatInSec int `json:"next_heartbeat_in_sec"`
PendingCommands []any `json:"pending_commands"`
}
// ClaimJobRequest asks the orchestrator for the next queued job.
type ClaimJobRequest struct {
NodeID string `json:"node_id"`
Region string `json:"region,omitempty"`
}
// ClaimedJob is the response when a job is successfully claimed.
type ClaimedJob struct {
JobID string `json:"job_id"`
SavedProjectID string `json:"saved_project_id"`
Quality string `json:"quality"`
Resolution string `json:"resolution"`
FrameRate int `json:"frame_rate"`
HasMusic bool `json:"has_music"`
HasVoiceover bool `json:"has_voiceover"`
// AEPDownloadURL is a presigned MinIO GET URL for the .aep template file
// (or .zip bundle). Empty when the template has not been uploaded yet — triggers mock render.
AEPDownloadURL string `json:"aep_download_url,omitempty"`
// IsBundle is true when AEPDownloadURL points to a .zip bundle (.aep + footage/fonts)
// that must be extracted before rendering.
IsBundle bool `json:"is_bundle,omitempty"`
// BundleMD5 identifies the bundle content; used as a local cache key so repeated
// renders of the same template download + extract it only once.
BundleMD5 string `json:"bundle_md5,omitempty"`
// CompName is the AE composition to render (-comp), e.g. "frfinal". Empty → the
// node falls back to the project's render queue (-rqindex 1).
CompName string `json:"comp_name,omitempty"`
// Bindings are the user's edited input values to write into the AE project before
// rendering (render binder). Key = AE layer/footage name (frl_c{n}{t|m}{i}).
Bindings []RenderBinding `json:"bindings,omitempty"`
}
// RenderBinding is one input value to write into the AE project before render.
type RenderBinding struct {
Key string `json:"key"`
Type string `json:"type"`
Value string `json:"value"`
}
// OutputUploadURLResponse is returned by GetOutputUploadURL.
type OutputUploadURLResponse struct {
ExportID string `json:"export_id"`
UploadURL string `json:"upload_url"`
ObjectKey string `json:"object_key"`
}
// ProgressRequest reports render progress (frame-level) for a job.
type ProgressRequest struct {
FrameJobID string `json:"frame_job_id"`
FrameNumber int `json:"frame_number"`
CompletedAt *time.Time `json:"completed_at,omitempty"`
}
// CompleteRequest marks a job as Done.
type CompleteRequest struct {
ExportID *string `json:"export_id,omitempty"`
}
// FailRequest marks a job as Failed.
type FailRequest struct {
Reason string `json:"reason"`
AtStep string `json:"at_step,omitempty"`
}
// CrashRequest reports a node crash.
type CrashRequest struct {
NodeID string `json:"node_id"`
LastKnownFrame *int `json:"last_known_frame,omitempty"`
CrashSignal *string `json:"crash_signal,omitempty"`
ErrorLogTail *string `json:"error_log_tail,omitempty"`
}
// ── API methods ───────────────────────────────────────────────────────────────
// Online marks the node as Ready on startup.
func (c *Client) Online(ctx context.Context, nodeID string, req OnlineRequest) error {
resp, err := c.do(ctx, http.MethodPost,
fmt.Sprintf("/v1/internal/nodes/%s/online", nodeID), req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode >= 300 {
return fmt.Errorf("online: HTTP %d", resp.StatusCode)
}
return nil
}
// Heartbeat sends a heartbeat and returns the orchestrator's response.
func (c *Client) Heartbeat(ctx context.Context, nodeID string, req HeartbeatRequest) (*HeartbeatResponse, error) {
resp, err := c.do(ctx, http.MethodPost,
fmt.Sprintf("/v1/internal/nodes/%s/heartbeat", nodeID), req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode >= 300 {
return nil, fmt.Errorf("heartbeat: HTTP %d", resp.StatusCode)
}
var out HeartbeatResponse
_ = json.NewDecoder(resp.Body).Decode(&out)
return &out, nil
}
// ClaimJob atomically claims the next queued render job.
// Returns (nil, nil) when the queue is empty (204 No Content).
func (c *Client) ClaimJob(ctx context.Context, nodeID, region string) (*ClaimedJob, error) {
resp, err := c.do(ctx, http.MethodPost, "/v1/internal/render/jobs/claim",
ClaimJobRequest{NodeID: nodeID, Region: region})
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusNoContent {
return nil, nil // nothing queued
}
if resp.StatusCode >= 300 {
return nil, fmt.Errorf("claim: HTTP %d", resp.StatusCode)
}
var job ClaimedJob
if err := json.NewDecoder(resp.Body).Decode(&job); err != nil {
return nil, fmt.Errorf("claim decode: %w", err)
}
return &job, nil
}
// ScanClaim is returned when an AE scan job is claimed.
type ScanClaim struct {
ScanJobID string `json:"scan_job_id"`
ProjectID string `json:"project_id"`
Mode string `json:"mode"` // fix | flexible | mockup | musicvisualizer
AEPDownloadURL string `json:"aep_download_url"`
IsBundle bool `json:"is_bundle"`
BundleMD5 string `json:"bundle_md5"`
}
// ClaimScan atomically claims the next queued AE scan job.
// Returns (nil, nil) when nothing is queued (204 No Content).
func (c *Client) ClaimScan(ctx context.Context, nodeID, region string) (*ScanClaim, error) {
resp, err := c.do(ctx, http.MethodPost, "/v1/internal/scan/claim",
ClaimJobRequest{NodeID: nodeID, Region: region})
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusNoContent {
return nil, nil
}
if resp.StatusCode >= 300 {
return nil, fmt.Errorf("scan claim: HTTP %d", resp.StatusCode)
}
var sc ScanClaim
if err := json.NewDecoder(resp.Body).Decode(&sc); err != nil {
return nil, fmt.Errorf("scan claim decode: %w", err)
}
return &sc, nil
}
// ReportScanResult posts the raw ScanResult JSON produced by scan.jsx.
func (c *Client) ReportScanResult(ctx context.Context, scanJobID string, resultJSON []byte) error {
// json.RawMessage marshals to its raw bytes, so the body is the JSON verbatim.
resp, err := c.do(ctx, http.MethodPost,
fmt.Sprintf("/v1/internal/scan/%s/result", scanJobID), json.RawMessage(resultJSON))
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode >= 300 {
return fmt.Errorf("scan result: HTTP %d", resp.StatusCode)
}
return nil
}
// ReportScanFail marks a scan job as failed.
func (c *Client) ReportScanFail(ctx context.Context, scanJobID, reason string) error {
resp, err := c.do(ctx, http.MethodPost,
fmt.Sprintf("/v1/internal/scan/%s/fail", scanJobID), FailRequest{Reason: reason})
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode >= 300 {
return fmt.Errorf("scan fail: HTTP %d", resp.StatusCode)
}
return nil
}
// ScanStatus returns the scan job's current status (for the cancel watchdog).
func (c *Client) ScanStatus(ctx context.Context, scanJobID string) (string, error) {
resp, err := c.do(ctx, http.MethodGet, fmt.Sprintf("/v1/internal/scan/%s/status", scanJobID), nil)
if err != nil {
return "", err
}
defer resp.Body.Close()
if resp.StatusCode >= 300 {
return "", fmt.Errorf("scan status: HTTP %d", resp.StatusCode)
}
var out struct {
Status string `json:"status"`
}
_ = json.NewDecoder(resp.Body).Decode(&out)
return out.Status, nil
}
// UpdatePreview sends a base64-encoded preview frame to the orchestrator.
// Errors are non-fatal — the UI simply won't update the preview image.
func (c *Client) UpdatePreview(ctx context.Context, jobID, imageB64 string) error {
resp, err := c.do(ctx, http.MethodPost,
fmt.Sprintf("/v1/internal/render/jobs/%s/preview", jobID),
map[string]string{"image_b64": imageB64})
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode >= 300 {
return fmt.Errorf("preview: HTTP %d", resp.StatusCode)
}
return nil
}
// UpdateProgress reports the render percentage so the UI shows a moving bar + ETA.
// Errors are non-fatal — the UI just won't advance.
func (c *Client) UpdateProgress(ctx context.Context, jobID string, percent int) error {
resp, err := c.do(ctx, http.MethodPost,
fmt.Sprintf("/v1/internal/render/jobs/%s/progress", jobID),
map[string]int{"progress": percent})
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode >= 300 {
return fmt.Errorf("progress: HTTP %d", resp.StatusCode)
}
return nil
}
// GetOutputUploadURL asks the orchestrator to allocate an Export row and
// return a presigned MinIO PUT URL for the rendered output file.
func (c *Client) GetOutputUploadURL(ctx context.Context, jobID string) (*OutputUploadURLResponse, error) {
resp, err := c.do(ctx, http.MethodPost,
fmt.Sprintf("/v1/internal/render/jobs/%s/output-upload-url", jobID), nil)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode >= 300 {
return nil, fmt.Errorf("output-upload-url: HTTP %d", resp.StatusCode)
}
var out OutputUploadURLResponse
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
return nil, fmt.Errorf("decode: %w", err)
}
return &out, nil
}
// Complete marks a render job as Done.
func (c *Client) Complete(ctx context.Context, jobID string, exportID *string) error {
resp, err := c.do(ctx, http.MethodPost,
fmt.Sprintf("/v1/internal/render/jobs/%s/complete", jobID),
CompleteRequest{ExportID: exportID})
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode >= 300 {
return fmt.Errorf("complete: HTTP %d", resp.StatusCode)
}
return nil
}
// Fail marks a render job as Failed.
func (c *Client) Fail(ctx context.Context, jobID, reason, atStep string) error {
resp, err := c.do(ctx, http.MethodPost,
fmt.Sprintf("/v1/internal/render/jobs/%s/fail", jobID),
FailRequest{Reason: reason, AtStep: atStep})
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode >= 300 {
return fmt.Errorf("fail: HTTP %d", resp.StatusCode)
}
return nil
}
// ReportCrash reports a node crash for the given job.
func (c *Client) ReportCrash(ctx context.Context, jobID string, req CrashRequest) error {
resp, err := c.do(ctx, http.MethodPost,
fmt.Sprintf("/v1/internal/render/jobs/%s/crash", jobID), req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode >= 300 {
return fmt.Errorf("crash: HTTP %d", resp.StatusCode)
}
return nil
}