7f2f65dd8a
Build backend images / build content-svc (push) Failing after 53s
Build backend images / build file-svc (push) Failing after 47s
Build backend images / build gateway (push) Failing after 52s
Build backend images / build identity-svc (push) Failing after 58s
Build backend images / build notification-svc (push) Failing after 55s
Build backend images / build render-svc (push) Failing after 59s
Build backend images / build studio-svc (push) Failing after 48s
Push a font once → every node installs it → admin sees per-node status. - render-svc: font_requests + node_fonts tables (mig 25); admin GET/POST/DELETE /v1/node-fonts (with per-node status matrix); internal (HMAC) GET pending + POST status for node-agents - node-agent: fontSyncLoop polls pending fonts every 60s, downloads, installs (Windows Fonts dir + registry / macOS / linux fc-cache), reports Installed/Failed - gateway: /v1/node-fonts/* → render - admin /admin/node-fonts: upload a .ttf/.otf → install on all nodes; per-node Installed/Pending/Failed badges + counts + delete Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
321 lines
10 KiB
Go
321 lines
10 KiB
Go
// Package client provides a typed HTTP client for the V2 render orchestrator's
|
|
// internal (node-agent) API. All requests are authenticated via the shared
|
|
// X-Node-Signature header.
|
|
package client
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
// Client talks to the V2 render orchestrator.
|
|
type Client struct {
|
|
base string
|
|
secret string
|
|
http *http.Client
|
|
}
|
|
|
|
// New returns a Client targeting the given base URL (e.g. "http://gateway:8080").
|
|
func New(baseURL, nodeHMACSecret string) *Client {
|
|
return &Client{
|
|
base: strings.TrimRight(baseURL, "/"),
|
|
secret: nodeHMACSecret,
|
|
http: &http.Client{Timeout: 15 * time.Second},
|
|
}
|
|
}
|
|
|
|
// ── Request helpers ───────────────────────────────────────────────────────────
|
|
|
|
func (c *Client) do(ctx context.Context, method, path string, body any) (*http.Response, error) {
|
|
var bodyReader io.Reader
|
|
if body != nil {
|
|
b, err := json.Marshal(body)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("marshal: %w", err)
|
|
}
|
|
bodyReader = bytes.NewReader(b)
|
|
}
|
|
req, err := http.NewRequestWithContext(ctx, method, c.base+path, bodyReader)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
req.Header.Set("X-Node-Signature", c.secret)
|
|
if body != nil {
|
|
req.Header.Set("Content-Type", "application/json")
|
|
}
|
|
req.Header.Set("Accept", "application/json")
|
|
return c.http.Do(req)
|
|
}
|
|
|
|
func decodeJSON(resp *http.Response, out any) error {
|
|
defer resp.Body.Close()
|
|
if out == nil {
|
|
return nil
|
|
}
|
|
return json.NewDecoder(resp.Body).Decode(out)
|
|
}
|
|
|
|
// ── Fonts ───────────────────────────────────────────────────────────────────
|
|
|
|
type PendingFont struct {
|
|
ID string `json:"id"`
|
|
Name string `json:"name"`
|
|
SystemName string `json:"system_name"`
|
|
FileURL string `json:"file_url"`
|
|
}
|
|
|
|
type pendingFontsResp struct {
|
|
Fonts []PendingFont `json:"fonts"`
|
|
}
|
|
|
|
// PendingFonts returns fonts this node still needs to install.
|
|
func (c *Client) PendingFonts(ctx context.Context, nodeID string) ([]PendingFont, error) {
|
|
resp, err := c.do(ctx, http.MethodGet, fmt.Sprintf("/v1/internal/nodes/%s/fonts/pending", nodeID), nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if resp.StatusCode != http.StatusOK {
|
|
resp.Body.Close()
|
|
return nil, fmt.Errorf("pending fonts status %d", resp.StatusCode)
|
|
}
|
|
var out pendingFontsResp
|
|
if err := decodeJSON(resp, &out); err != nil {
|
|
return nil, err
|
|
}
|
|
return out.Fonts, nil
|
|
}
|
|
|
|
// ReportFont reports a font install result (Installed | Failed).
|
|
func (c *Client) ReportFont(ctx context.Context, nodeID, requestID, status, errMsg string) error {
|
|
resp, err := c.do(ctx, http.MethodPost,
|
|
fmt.Sprintf("/v1/internal/nodes/%s/fonts/%s/status", nodeID, requestID),
|
|
map[string]string{"status": status, "error": errMsg})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
resp.Body.Close()
|
|
return nil
|
|
}
|
|
|
|
// ── Domain types ──────────────────────────────────────────────────────────────
|
|
|
|
// OnlineRequest is sent once on startup to mark the node Ready.
|
|
type OnlineRequest struct {
|
|
NodeAgentVersion string `json:"node_agent_version"`
|
|
CurrentAEVersion string `json:"current_ae_version"`
|
|
AvailableAEVersions []string `json:"available_ae_versions"`
|
|
RamGB *int `json:"ram_gb,omitempty"`
|
|
CPUCores *int `json:"cpu_cores,omitempty"`
|
|
CacheUsedGB *int `json:"cache_used_gb,omitempty"`
|
|
CachedTemplateMD5s []string `json:"cached_template_md5s"`
|
|
}
|
|
|
|
// HeartbeatRequest is sent every HeartbeatIntervalSec seconds.
|
|
type HeartbeatRequest struct {
|
|
NodeID string `json:"node_id"`
|
|
Status string `json:"status"` // Ready | Busy
|
|
CPUPct *int `json:"cpu_pct,omitempty"`
|
|
RAMAvailableMB *int `json:"ram_available_mb,omitempty"`
|
|
AERunning *bool `json:"ae_running,omitempty"`
|
|
CurrentJobID *string `json:"current_job_id,omitempty"`
|
|
CacheUsedGB *int `json:"cache_used_gb,omitempty"`
|
|
}
|
|
|
|
// HeartbeatResponse carries optional commands from the orchestrator.
|
|
type HeartbeatResponse struct {
|
|
NextHeartbeatInSec int `json:"next_heartbeat_in_sec"`
|
|
PendingCommands []any `json:"pending_commands"`
|
|
}
|
|
|
|
// ClaimJobRequest asks the orchestrator for the next queued job.
|
|
type ClaimJobRequest struct {
|
|
NodeID string `json:"node_id"`
|
|
Region string `json:"region,omitempty"`
|
|
}
|
|
|
|
// ClaimedJob is the response when a job is successfully claimed.
|
|
type ClaimedJob struct {
|
|
JobID string `json:"job_id"`
|
|
SavedProjectID string `json:"saved_project_id"`
|
|
Quality string `json:"quality"`
|
|
Resolution string `json:"resolution"`
|
|
FrameRate int `json:"frame_rate"`
|
|
HasMusic bool `json:"has_music"`
|
|
HasVoiceover bool `json:"has_voiceover"`
|
|
// AEPDownloadURL is a presigned MinIO GET URL for the .aep template file.
|
|
// Empty when the template has not been uploaded yet — triggers mock render.
|
|
AEPDownloadURL string `json:"aep_download_url,omitempty"`
|
|
}
|
|
|
|
// OutputUploadURLResponse is returned by GetOutputUploadURL.
|
|
type OutputUploadURLResponse struct {
|
|
ExportID string `json:"export_id"`
|
|
UploadURL string `json:"upload_url"`
|
|
ObjectKey string `json:"object_key"`
|
|
}
|
|
|
|
// ProgressRequest reports render progress (frame-level) for a job.
|
|
type ProgressRequest struct {
|
|
FrameJobID string `json:"frame_job_id"`
|
|
FrameNumber int `json:"frame_number"`
|
|
CompletedAt *time.Time `json:"completed_at,omitempty"`
|
|
}
|
|
|
|
// CompleteRequest marks a job as Done.
|
|
type CompleteRequest struct {
|
|
ExportID *string `json:"export_id,omitempty"`
|
|
}
|
|
|
|
// FailRequest marks a job as Failed.
|
|
type FailRequest struct {
|
|
Reason string `json:"reason"`
|
|
AtStep string `json:"at_step,omitempty"`
|
|
}
|
|
|
|
// CrashRequest reports a node crash.
|
|
type CrashRequest struct {
|
|
NodeID string `json:"node_id"`
|
|
LastKnownFrame *int `json:"last_known_frame,omitempty"`
|
|
CrashSignal *string `json:"crash_signal,omitempty"`
|
|
ErrorLogTail *string `json:"error_log_tail,omitempty"`
|
|
}
|
|
|
|
// ── API methods ───────────────────────────────────────────────────────────────
|
|
|
|
// Online marks the node as Ready on startup.
|
|
func (c *Client) Online(ctx context.Context, nodeID string, req OnlineRequest) error {
|
|
resp, err := c.do(ctx, http.MethodPost,
|
|
fmt.Sprintf("/v1/internal/nodes/%s/online", nodeID), req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode >= 300 {
|
|
return fmt.Errorf("online: HTTP %d", resp.StatusCode)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Heartbeat sends a heartbeat and returns the orchestrator's response.
|
|
func (c *Client) Heartbeat(ctx context.Context, nodeID string, req HeartbeatRequest) (*HeartbeatResponse, error) {
|
|
resp, err := c.do(ctx, http.MethodPost,
|
|
fmt.Sprintf("/v1/internal/nodes/%s/heartbeat", nodeID), req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode >= 300 {
|
|
return nil, fmt.Errorf("heartbeat: HTTP %d", resp.StatusCode)
|
|
}
|
|
var out HeartbeatResponse
|
|
_ = json.NewDecoder(resp.Body).Decode(&out)
|
|
return &out, nil
|
|
}
|
|
|
|
// ClaimJob atomically claims the next queued render job.
|
|
// Returns (nil, nil) when the queue is empty (204 No Content).
|
|
func (c *Client) ClaimJob(ctx context.Context, nodeID, region string) (*ClaimedJob, error) {
|
|
resp, err := c.do(ctx, http.MethodPost, "/v1/internal/render/jobs/claim",
|
|
ClaimJobRequest{NodeID: nodeID, Region: region})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode == http.StatusNoContent {
|
|
return nil, nil // nothing queued
|
|
}
|
|
if resp.StatusCode >= 300 {
|
|
return nil, fmt.Errorf("claim: HTTP %d", resp.StatusCode)
|
|
}
|
|
var job ClaimedJob
|
|
if err := json.NewDecoder(resp.Body).Decode(&job); err != nil {
|
|
return nil, fmt.Errorf("claim decode: %w", err)
|
|
}
|
|
return &job, nil
|
|
}
|
|
|
|
// UpdatePreview sends a base64-encoded preview frame to the orchestrator.
|
|
// Errors are non-fatal — the UI simply won't update the preview image.
|
|
func (c *Client) UpdatePreview(ctx context.Context, jobID, imageB64 string) error {
|
|
resp, err := c.do(ctx, http.MethodPost,
|
|
fmt.Sprintf("/v1/internal/render/jobs/%s/preview", jobID),
|
|
map[string]string{"image_b64": imageB64})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode >= 300 {
|
|
return fmt.Errorf("preview: HTTP %d", resp.StatusCode)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// GetOutputUploadURL asks the orchestrator to allocate an Export row and
|
|
// return a presigned MinIO PUT URL for the rendered output file.
|
|
func (c *Client) GetOutputUploadURL(ctx context.Context, jobID string) (*OutputUploadURLResponse, error) {
|
|
resp, err := c.do(ctx, http.MethodPost,
|
|
fmt.Sprintf("/v1/internal/render/jobs/%s/output-upload-url", jobID), nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode >= 300 {
|
|
return nil, fmt.Errorf("output-upload-url: HTTP %d", resp.StatusCode)
|
|
}
|
|
var out OutputUploadURLResponse
|
|
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
|
|
return nil, fmt.Errorf("decode: %w", err)
|
|
}
|
|
return &out, nil
|
|
}
|
|
|
|
// Complete marks a render job as Done.
|
|
func (c *Client) Complete(ctx context.Context, jobID string, exportID *string) error {
|
|
resp, err := c.do(ctx, http.MethodPost,
|
|
fmt.Sprintf("/v1/internal/render/jobs/%s/complete", jobID),
|
|
CompleteRequest{ExportID: exportID})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode >= 300 {
|
|
return fmt.Errorf("complete: HTTP %d", resp.StatusCode)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Fail marks a render job as Failed.
|
|
func (c *Client) Fail(ctx context.Context, jobID, reason, atStep string) error {
|
|
resp, err := c.do(ctx, http.MethodPost,
|
|
fmt.Sprintf("/v1/internal/render/jobs/%s/fail", jobID),
|
|
FailRequest{Reason: reason, AtStep: atStep})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode >= 300 {
|
|
return fmt.Errorf("fail: HTTP %d", resp.StatusCode)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ReportCrash reports a node crash for the given job.
|
|
func (c *Client) ReportCrash(ctx context.Context, jobID string, req CrashRequest) error {
|
|
resp, err := c.do(ctx, http.MethodPost,
|
|
fmt.Sprintf("/v1/internal/render/jobs/%s/crash", jobID), req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode >= 300 {
|
|
return fmt.Errorf("crash: HTTP %d", resp.StatusCode)
|
|
}
|
|
return nil
|
|
}
|