bcc69f0a2e
Node-agent — full render pipeline (items 1-3):
- render-svc: ClaimedJob now includes aep_download_url (presigned MinIO GET,
2h TTL, path=templates/{original_project_id}/template.aep)
- render-svc: POST /v1/internal/render/jobs/:id/output-upload-url
allocates Export row + returns presigned MinIO PUT URL + export_id
- render-svc: db.CreateExportForJob() inserts export row with 30-day retention
- render-svc: InternalHandler now owns minio client (templatesBucket + exportsBucket)
MINIO_TEMPLATES_BUCKET env var (default flatrender-templates)
- node-agent: runner/download.go — DownloadFile() + UploadFile() (stdlib only)
- node-agent: client.GetOutputUploadURL() + ClaimedJob.AEPDownloadURL field
- node-agent: runJob() full flow: download AEP → render → get upload URL →
PUT output to MinIO → Complete(export_id)
All steps are non-fatal with fallback (AEP miss → mock, upload fail → no export)
TLS reverse proxy (item 15):
- Caddyfile: three virtual hosts (DOMAIN, API_DOMAIN, STORAGE_DOMAIN)
auto-TLS via Let's Encrypt; security headers; 512MB upload limit on API
- docker-compose.v2.yml: caddy:2-alpine service, ports 80/443/443udp,
caddy_data + caddy_config volumes; env vars DOMAIN/API_DOMAIN/STORAGE_DOMAIN/ACME_EMAIL
- .env.v2.example: new Caddy + MINIO_TEMPLATES_BUCKET entries
Billing portal (item 5):
- Identity: POST /v1/users/me/plan/cancel — sets cancelled_at, auto_renew=false
(access continues to expiry); 404 when no active plan
- POST /api/billing/cancel — frontend proxy, validates auth
- GET /api/billing/portal — redirects to /dashboard/settings?tab=billing
- SettingsBilling: "Cancel plan" button with confirm dialog + optimistic UI,
"Change plan" button; becomes "use client" component
Password reset UI (item 7):
- POST /api/auth/password-reset — proxies /v1/auth/password/reset/request
(always 200, anti-enumeration)
- POST /api/auth/password-reset-confirm — proxies /v1/auth/password/reset/confirm
- AuthPageContent: "Forgot password?" link on sign-in tab opens 2-step reset flow
(email → OTP+new-password) without leaving the auth page
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
279 lines
9.0 KiB
Go
279 lines
9.0 KiB
Go
// Package client provides a typed HTTP client for the V2 render orchestrator's
|
|
// internal (node-agent) API. All requests are authenticated via the shared
|
|
// X-Node-Signature header.
|
|
package client
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
// Client talks to the V2 render orchestrator.
|
|
type Client struct {
|
|
base string
|
|
secret string
|
|
http *http.Client
|
|
}
|
|
|
|
// New returns a Client targeting the given base URL (e.g. "http://gateway:8080").
|
|
func New(baseURL, nodeHMACSecret string) *Client {
|
|
return &Client{
|
|
base: strings.TrimRight(baseURL, "/"),
|
|
secret: nodeHMACSecret,
|
|
http: &http.Client{Timeout: 15 * time.Second},
|
|
}
|
|
}
|
|
|
|
// ── Request helpers ───────────────────────────────────────────────────────────
|
|
|
|
func (c *Client) do(ctx context.Context, method, path string, body any) (*http.Response, error) {
|
|
var bodyReader io.Reader
|
|
if body != nil {
|
|
b, err := json.Marshal(body)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("marshal: %w", err)
|
|
}
|
|
bodyReader = bytes.NewReader(b)
|
|
}
|
|
req, err := http.NewRequestWithContext(ctx, method, c.base+path, bodyReader)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
req.Header.Set("X-Node-Signature", c.secret)
|
|
if body != nil {
|
|
req.Header.Set("Content-Type", "application/json")
|
|
}
|
|
req.Header.Set("Accept", "application/json")
|
|
return c.http.Do(req)
|
|
}
|
|
|
|
func decodeJSON(resp *http.Response, out any) error {
|
|
defer resp.Body.Close()
|
|
if out == nil {
|
|
return nil
|
|
}
|
|
return json.NewDecoder(resp.Body).Decode(out)
|
|
}
|
|
|
|
// ── Domain types ──────────────────────────────────────────────────────────────
|
|
|
|
// OnlineRequest is sent once on startup to mark the node Ready.
|
|
type OnlineRequest struct {
|
|
NodeAgentVersion string `json:"node_agent_version"`
|
|
CurrentAEVersion string `json:"current_ae_version"`
|
|
AvailableAEVersions []string `json:"available_ae_versions"`
|
|
RamGB *int `json:"ram_gb,omitempty"`
|
|
CPUCores *int `json:"cpu_cores,omitempty"`
|
|
CacheUsedGB *int `json:"cache_used_gb,omitempty"`
|
|
CachedTemplateMD5s []string `json:"cached_template_md5s"`
|
|
}
|
|
|
|
// HeartbeatRequest is sent every HeartbeatIntervalSec seconds.
|
|
type HeartbeatRequest struct {
|
|
NodeID string `json:"node_id"`
|
|
Status string `json:"status"` // Ready | Busy
|
|
CPUPct *int `json:"cpu_pct,omitempty"`
|
|
RAMAvailableMB *int `json:"ram_available_mb,omitempty"`
|
|
AERunning *bool `json:"ae_running,omitempty"`
|
|
CurrentJobID *string `json:"current_job_id,omitempty"`
|
|
CacheUsedGB *int `json:"cache_used_gb,omitempty"`
|
|
}
|
|
|
|
// HeartbeatResponse carries optional commands from the orchestrator.
|
|
type HeartbeatResponse struct {
|
|
NextHeartbeatInSec int `json:"next_heartbeat_in_sec"`
|
|
PendingCommands []any `json:"pending_commands"`
|
|
}
|
|
|
|
// ClaimJobRequest asks the orchestrator for the next queued job.
|
|
type ClaimJobRequest struct {
|
|
NodeID string `json:"node_id"`
|
|
Region string `json:"region,omitempty"`
|
|
}
|
|
|
|
// ClaimedJob is the response when a job is successfully claimed.
|
|
type ClaimedJob struct {
|
|
JobID string `json:"job_id"`
|
|
SavedProjectID string `json:"saved_project_id"`
|
|
Quality string `json:"quality"`
|
|
Resolution string `json:"resolution"`
|
|
FrameRate int `json:"frame_rate"`
|
|
HasMusic bool `json:"has_music"`
|
|
HasVoiceover bool `json:"has_voiceover"`
|
|
// AEPDownloadURL is a presigned MinIO GET URL for the .aep template file.
|
|
// Empty when the template has not been uploaded yet — triggers mock render.
|
|
AEPDownloadURL string `json:"aep_download_url,omitempty"`
|
|
}
|
|
|
|
// OutputUploadURLResponse is returned by GetOutputUploadURL.
|
|
type OutputUploadURLResponse struct {
|
|
ExportID string `json:"export_id"`
|
|
UploadURL string `json:"upload_url"`
|
|
ObjectKey string `json:"object_key"`
|
|
}
|
|
|
|
// ProgressRequest reports render progress (frame-level) for a job.
|
|
type ProgressRequest struct {
|
|
FrameJobID string `json:"frame_job_id"`
|
|
FrameNumber int `json:"frame_number"`
|
|
CompletedAt *time.Time `json:"completed_at,omitempty"`
|
|
}
|
|
|
|
// CompleteRequest marks a job as Done.
|
|
type CompleteRequest struct {
|
|
ExportID *string `json:"export_id,omitempty"`
|
|
}
|
|
|
|
// FailRequest marks a job as Failed.
|
|
type FailRequest struct {
|
|
Reason string `json:"reason"`
|
|
AtStep string `json:"at_step,omitempty"`
|
|
}
|
|
|
|
// CrashRequest reports a node crash.
|
|
type CrashRequest struct {
|
|
NodeID string `json:"node_id"`
|
|
LastKnownFrame *int `json:"last_known_frame,omitempty"`
|
|
CrashSignal *string `json:"crash_signal,omitempty"`
|
|
ErrorLogTail *string `json:"error_log_tail,omitempty"`
|
|
}
|
|
|
|
// ── API methods ───────────────────────────────────────────────────────────────
|
|
|
|
// Online marks the node as Ready on startup.
|
|
func (c *Client) Online(ctx context.Context, nodeID string, req OnlineRequest) error {
|
|
resp, err := c.do(ctx, http.MethodPost,
|
|
fmt.Sprintf("/v1/internal/nodes/%s/online", nodeID), req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode >= 300 {
|
|
return fmt.Errorf("online: HTTP %d", resp.StatusCode)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Heartbeat sends a heartbeat and returns the orchestrator's response.
|
|
func (c *Client) Heartbeat(ctx context.Context, nodeID string, req HeartbeatRequest) (*HeartbeatResponse, error) {
|
|
resp, err := c.do(ctx, http.MethodPost,
|
|
fmt.Sprintf("/v1/internal/nodes/%s/heartbeat", nodeID), req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode >= 300 {
|
|
return nil, fmt.Errorf("heartbeat: HTTP %d", resp.StatusCode)
|
|
}
|
|
var out HeartbeatResponse
|
|
_ = json.NewDecoder(resp.Body).Decode(&out)
|
|
return &out, nil
|
|
}
|
|
|
|
// ClaimJob atomically claims the next queued render job.
|
|
// Returns (nil, nil) when the queue is empty (204 No Content).
|
|
func (c *Client) ClaimJob(ctx context.Context, nodeID, region string) (*ClaimedJob, error) {
|
|
resp, err := c.do(ctx, http.MethodPost, "/v1/internal/render/jobs/claim",
|
|
ClaimJobRequest{NodeID: nodeID, Region: region})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode == http.StatusNoContent {
|
|
return nil, nil // nothing queued
|
|
}
|
|
if resp.StatusCode >= 300 {
|
|
return nil, fmt.Errorf("claim: HTTP %d", resp.StatusCode)
|
|
}
|
|
var job ClaimedJob
|
|
if err := json.NewDecoder(resp.Body).Decode(&job); err != nil {
|
|
return nil, fmt.Errorf("claim decode: %w", err)
|
|
}
|
|
return &job, nil
|
|
}
|
|
|
|
// UpdatePreview sends a base64-encoded preview frame to the orchestrator.
|
|
// Errors are non-fatal — the UI simply won't update the preview image.
|
|
func (c *Client) UpdatePreview(ctx context.Context, jobID, imageB64 string) error {
|
|
resp, err := c.do(ctx, http.MethodPost,
|
|
fmt.Sprintf("/v1/internal/render/jobs/%s/preview", jobID),
|
|
map[string]string{"image_b64": imageB64})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode >= 300 {
|
|
return fmt.Errorf("preview: HTTP %d", resp.StatusCode)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// GetOutputUploadURL asks the orchestrator to allocate an Export row and
|
|
// return a presigned MinIO PUT URL for the rendered output file.
|
|
func (c *Client) GetOutputUploadURL(ctx context.Context, jobID string) (*OutputUploadURLResponse, error) {
|
|
resp, err := c.do(ctx, http.MethodPost,
|
|
fmt.Sprintf("/v1/internal/render/jobs/%s/output-upload-url", jobID), nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode >= 300 {
|
|
return nil, fmt.Errorf("output-upload-url: HTTP %d", resp.StatusCode)
|
|
}
|
|
var out OutputUploadURLResponse
|
|
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
|
|
return nil, fmt.Errorf("decode: %w", err)
|
|
}
|
|
return &out, nil
|
|
}
|
|
|
|
// Complete marks a render job as Done.
|
|
func (c *Client) Complete(ctx context.Context, jobID string, exportID *string) error {
|
|
resp, err := c.do(ctx, http.MethodPost,
|
|
fmt.Sprintf("/v1/internal/render/jobs/%s/complete", jobID),
|
|
CompleteRequest{ExportID: exportID})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode >= 300 {
|
|
return fmt.Errorf("complete: HTTP %d", resp.StatusCode)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Fail marks a render job as Failed.
|
|
func (c *Client) Fail(ctx context.Context, jobID, reason, atStep string) error {
|
|
resp, err := c.do(ctx, http.MethodPost,
|
|
fmt.Sprintf("/v1/internal/render/jobs/%s/fail", jobID),
|
|
FailRequest{Reason: reason, AtStep: atStep})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode >= 300 {
|
|
return fmt.Errorf("fail: HTTP %d", resp.StatusCode)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ReportCrash reports a node crash for the given job.
|
|
func (c *Client) ReportCrash(ctx context.Context, jobID string, req CrashRequest) error {
|
|
resp, err := c.do(ctx, http.MethodPost,
|
|
fmt.Sprintf("/v1/internal/render/jobs/%s/crash", jobID), req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode >= 300 {
|
|
return fmt.Errorf("crash: HTTP %d", resp.StatusCode)
|
|
}
|
|
return nil
|
|
}
|