Files
flatrender/services/render/internal/db/db.go
T
soroush.asadi 0a7dd9b84c
Build backend images / build content-svc (push) Failing after 45s
Build backend images / build file-svc (push) Failing after 55s
Build backend images / build gateway (push) Failing after 53s
Build backend images / build identity-svc (push) Failing after 54s
Build backend images / build notification-svc (push) Failing after 53s
Build backend images / build render-svc (push) Failing after 47s
Build backend images / build studio-svc (push) Failing after 51s
feat(nodes): live CPU/RAM/disk monitoring in the node list
- node-agent: internal/metrics — read CPU% (GetSystemTimes), RAM (GlobalMemoryStatusEx),
  disk used%/total (GetDiskFreeSpaceEx) via stdlib kernel32 (no external dep; windows
  build + non-windows stub). Heartbeat now reports cpu_pct/ram_available_mb/disk_used_pct/
  disk_total_gb + ae_running.
- render-svc: heartbeat persists last_disk_pct + disk_total_gb (migration 29); RenderNode
  model + node SELECT/scan carry them.
- admin: rewrite NodesTable to the real RenderNode shape (fixes a pre-existing items/V2Node
  mismatch that left the list empty) + a CPU/RAM/disk bars column + stale-heartbeat flag.
- assets-bundle ingestion: ProjectMediaBundle (jszip) auto-maps project.zip → project/scene
  image/demo/colour + music; PatchProject gains image/full_demo/shared_colors_svg.
- scan: RGBA (4-number) colours recognised + frshare single-int controls detected.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-04 20:01:18 +03:30

934 lines
34 KiB
Go

package db
import (
"context"
"fmt"
"time"
"github.com/flatrender/render-svc/internal/models"
"github.com/google/uuid"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)
type Store struct {
pool *pgxpool.Pool
}
func NewStore(pool *pgxpool.Pool) *Store {
return &Store{pool: pool}
}
// ── Nodes ────────────────────────────────────────────────────────────────────
func (s *Store) ListNodes(ctx context.Context, region, status string) ([]*models.RenderNode, error) {
q := `SELECT id, name, region, node_ip::text, worker_port, public_endpoint,
ram_gb, cpu_cores, gpu_model, storage_gb,
current_ae_version::text, available_ae_versions, node_agent_version,
node_kind::text, owner_user_id, owner_tenant_id,
status::text, current_job_id, current_frame_job_id, job_started_at,
last_heartbeat_at, last_cpu_pct, last_ram_available_mb, ae_running,
lifetime_task_count, lifetime_crash_count, consecutive_failures,
priority, is_active, accepts_new_jobs,
last_maintenance_at, next_maintenance_at, maintenance_reason,
cached_template_md5s, cache_used_gb, created_at, updated_at
FROM render.render_nodes
WHERE is_active = TRUE`
args := pgx.NamedArgs{}
if region != "" {
q += " AND region = @region"
args["region"] = region
}
if status != "" {
q += " AND status::text = @status"
args["status"] = status
}
q += " ORDER BY region, priority DESC"
rows, err := s.pool.Query(ctx, q, args)
if err != nil {
return nil, err
}
defer rows.Close()
return scanNodes(rows)
}
func (s *Store) GetNodeByID(ctx context.Context, id uuid.UUID) (*models.RenderNode, error) {
rows, err := s.pool.Query(ctx, `
SELECT id, name, region, node_ip::text, worker_port, public_endpoint,
ram_gb, cpu_cores, gpu_model, storage_gb,
current_ae_version::text, available_ae_versions, node_agent_version,
node_kind::text, owner_user_id, owner_tenant_id,
status::text, current_job_id, current_frame_job_id, job_started_at,
last_heartbeat_at, last_cpu_pct, last_ram_available_mb, ae_running,
lifetime_task_count, lifetime_crash_count, consecutive_failures,
priority, is_active, accepts_new_jobs,
last_maintenance_at, next_maintenance_at, maintenance_reason,
cached_template_md5s, cache_used_gb, created_at, updated_at, last_disk_pct, disk_total_gb
FROM render.render_nodes WHERE id = $1`, id)
if err != nil {
return nil, err
}
defer rows.Close()
nodes, err := scanNodes(rows)
if err != nil {
return nil, err
}
if len(nodes) == 0 {
return nil, fmt.Errorf("node not found")
}
return nodes[0], nil
}
func (s *Store) CreateNode(ctx context.Context, req *models.NodeCreateRequest) (*models.RenderNode, error) {
kind := "Shared"
if req.NodeKind != nil {
kind = *req.NodeKind
}
priority := 100
if req.Priority != nil {
priority = *req.Priority
}
var id uuid.UUID
err := s.pool.QueryRow(ctx, `
INSERT INTO render.render_nodes
(name, region, node_ip, worker_port, current_ae_version, ram_gb, cpu_cores,
node_kind, owner_user_id, priority, status)
VALUES ($1, $2, $3::inet, $4, $5::ae_version, $6, $7, $8::node_kind, $9, $10, 'Offline'::node_status)
RETURNING id`,
req.Name, req.Region, req.NodeIP, req.WorkerPort, req.CurrentAEVersion,
req.RamGB, req.CPUCores, kind, req.OwnerUserID, priority,
).Scan(&id)
if err != nil {
return nil, err
}
return s.GetNodeByID(ctx, id)
}
func (s *Store) PatchNode(ctx context.Context, id uuid.UUID, req *models.NodePatchRequest) (*models.RenderNode, error) {
if req.Priority != nil {
_, err := s.pool.Exec(ctx, `UPDATE render.render_nodes SET priority = $1, updated_at = NOW() WHERE id = $2`, *req.Priority, id)
if err != nil {
return nil, err
}
}
if req.IsActive != nil {
_, err := s.pool.Exec(ctx, `UPDATE render.render_nodes SET is_active = $1, updated_at = NOW() WHERE id = $2`, *req.IsActive, id)
if err != nil {
return nil, err
}
}
if req.AcceptsNewJobs != nil {
_, err := s.pool.Exec(ctx, `UPDATE render.render_nodes SET accepts_new_jobs = $1, updated_at = NOW() WHERE id = $2`, *req.AcceptsNewJobs, id)
if err != nil {
return nil, err
}
}
if req.NodeKind != nil {
_, err := s.pool.Exec(ctx, `UPDATE render.render_nodes SET node_kind = $1::node_kind, updated_at = NOW() WHERE id = $2`, *req.NodeKind, id)
if err != nil {
return nil, err
}
}
if req.OwnerUserID != nil {
_, err := s.pool.Exec(ctx, `UPDATE render.render_nodes SET owner_user_id = $1, updated_at = NOW() WHERE id = $2`, *req.OwnerUserID, id)
if err != nil {
return nil, err
}
}
if req.NextMaintenanceAt != nil {
_, err := s.pool.Exec(ctx, `UPDATE render.render_nodes SET next_maintenance_at = $1, maintenance_reason = $2, updated_at = NOW() WHERE id = $3`, *req.NextMaintenanceAt, req.MaintenanceReason, id)
if err != nil {
return nil, err
}
}
return s.GetNodeByID(ctx, id)
}
// DeleteNode permanently removes a render node.
func (s *Store) DeleteNode(ctx context.Context, id uuid.UUID) error {
_, err := s.pool.Exec(ctx, `DELETE FROM render.render_nodes WHERE id = $1`, id)
return err
}
func (s *Store) UpdateNodeHeartbeat(ctx context.Context, nodeID uuid.UUID, req *models.NodeHeartbeatRequest) error {
_, err := s.pool.Exec(ctx, `
UPDATE render.render_nodes SET
status = $1::node_status,
last_heartbeat_at = NOW(),
last_cpu_pct = $2,
last_ram_available_mb = $3,
last_disk_pct = $4,
disk_total_gb = COALESCE($5, disk_total_gb),
ae_running = COALESCE($6, ae_running),
current_job_id = $7,
current_frame_job_id = $8,
updated_at = NOW()
WHERE id = $9`,
req.Status, req.CPUPct, req.RAMAvailableMB, req.DiskUsedPct, req.DiskTotalGB,
req.AERunning, req.CurrentJobID, nil, nodeID,
)
return err
}
func (s *Store) UpdateNodeOnline(ctx context.Context, nodeID uuid.UUID, req *models.NodeOnlineRequest) error {
_, err := s.pool.Exec(ctx, `
UPDATE render.render_nodes SET
status = 'Ready'::node_status,
node_agent_version = $1,
current_ae_version = $2::ae_version,
available_ae_versions = $3,
ram_gb = COALESCE($4, ram_gb),
cpu_cores = COALESCE($5, cpu_cores),
cache_used_gb = COALESCE($6, cache_used_gb),
cached_template_md5s = $7,
last_heartbeat_at = NOW(),
updated_at = NOW()
WHERE id = $8`,
req.NodeAgentVersion, req.CurrentAEVersion, req.AvailableAEVersions,
req.RamGB, req.CPUCores, req.CacheUsedGB, req.CachedTemplateMD5s, nodeID,
)
return err
}
func (s *Store) ReleaseNode(ctx context.Context, nodeID uuid.UUID) error {
_, err := s.pool.Exec(ctx, `
UPDATE render.render_nodes SET
status = 'Ready'::node_status,
current_job_id = NULL,
current_frame_job_id = NULL,
job_started_at = NULL,
updated_at = NOW()
WHERE id = $1`, nodeID)
return err
}
func (s *Store) ListNodeHealthHistory(ctx context.Context, nodeID uuid.UUID, from, to time.Time) ([]*models.NodeHealthLog, error) {
rows, err := s.pool.Query(ctx, `
SELECT id, node_id, recorded_at, status::text, cpu_pct, ram_available_mb,
ae_running, current_job_id, current_frame, cache_used_gb
FROM render.node_health_logs
WHERE node_id = $1 AND recorded_at BETWEEN $2 AND $3
ORDER BY recorded_at DESC LIMIT 1000`, nodeID, from, to)
if err != nil {
return nil, err
}
defer rows.Close()
var out []*models.NodeHealthLog
for rows.Next() {
h := &models.NodeHealthLog{}
if err := rows.Scan(&h.ID, &h.NodeID, &h.RecordedAt, &h.Status,
&h.CPUPct, &h.RAMAvailableMB, &h.AERunning, &h.CurrentJobID,
&h.CurrentFrame, &h.CacheUsedGB); err != nil {
return nil, err
}
out = append(out, h)
}
return out, rows.Err()
}
func (s *Store) ListNodeCrashes(ctx context.Context, nodeID uuid.UUID) ([]*models.NodeCrash, error) {
rows, err := s.pool.Query(ctx, `
SELECT id, node_id, render_job_id, frame_job_id, crashed_at,
last_known_frame, crash_signal, error_log, log_file_url,
auto_recovered, recovery_action, recovered_at, created_at
FROM render.node_crashes
WHERE node_id = $1 ORDER BY crashed_at DESC LIMIT 100`, nodeID)
if err != nil {
return nil, err
}
defer rows.Close()
var out []*models.NodeCrash
for rows.Next() {
c := &models.NodeCrash{}
if err := rows.Scan(&c.ID, &c.NodeID, &c.RenderJobID, &c.FrameJobID, &c.CrashedAt,
&c.LastKnownFrame, &c.CrashSignal, &c.ErrorLog, &c.LogFileURL,
&c.AutoRecovered, &c.RecoveryAction, &c.RecoveredAt, &c.CreatedAt); err != nil {
return nil, err
}
out = append(out, c)
}
return out, rows.Err()
}
func (s *Store) InsertCrash(ctx context.Context, nodeID, jobID uuid.UUID, req *models.CrashReportRequest) error {
_, err := s.pool.Exec(ctx, `
INSERT INTO render.node_crashes
(node_id, render_job_id, frame_job_id, last_known_frame, crash_signal, error_log, log_file_url)
VALUES ($1, $2, $3, $4, $5, $6, $7)`,
nodeID, jobID, req.FrameJobID, req.LastKnownFrame, req.CrashSignal,
req.ErrorLogTail, req.LogFileURL)
if err != nil {
return err
}
// Increment crash counts
_, err = s.pool.Exec(ctx, `
UPDATE render.render_nodes SET
lifetime_crash_count = lifetime_crash_count + 1,
consecutive_failures = consecutive_failures + 1,
updated_at = NOW()
WHERE id = $1`, nodeID)
return err
}
func (s *Store) UpdateNodeCache(ctx context.Context, nodeID uuid.UUID, req *models.CacheUpdateRequest) error {
switch req.Action {
case "Downloaded":
if req.ProjectID != nil && req.FileSizeBytes != nil {
_, err := s.pool.Exec(ctx, `
INSERT INTO render.node_template_cache (node_id, project_id, aep_file_md5, file_size_bytes, local_path)
VALUES ($1, $2, $3, $4, '')
ON CONFLICT (node_id, aep_file_md5) DO UPDATE SET
last_used_at = NOW(), use_count = node_template_cache.use_count + 1`,
nodeID, *req.ProjectID, req.AEPFileMD5, *req.FileSizeBytes)
if err != nil {
return err
}
}
if req.CacheUsedGB != nil {
_, _ = s.pool.Exec(ctx, `UPDATE render.render_nodes SET cache_used_gb = $1, updated_at = NOW() WHERE id = $2`, *req.CacheUsedGB, nodeID)
}
case "Evicted":
_, err := s.pool.Exec(ctx, `DELETE FROM render.node_template_cache WHERE node_id = $1 AND aep_file_md5 = $2`, nodeID, req.AEPFileMD5)
if err != nil {
return err
}
if req.CacheUsedGB != nil {
_, _ = s.pool.Exec(ctx, `UPDATE render.render_nodes SET cache_used_gb = $1, updated_at = NOW() WHERE id = $2`, *req.CacheUsedGB, nodeID)
}
}
return nil
}
// ── Render Jobs ───────────────────────────────────────────────────────────────
func (s *Store) ListJobs(ctx context.Context, userID uuid.UUID, status string, page, pageSize int) ([]*models.RenderJob, int64, error) {
q := `SELECT id, tenant_id, user_id, saved_project_id, original_project_id,
project_name, title, name, external_job_id, priority_queue::text, priority_score,
step::text, render_progress, convert_progress, image_preview_b64,
price_type::text, paid_price_minor, discount_code, support_flatrender,
mode, quality::text, resolution, r_height, frame_rate, is_60_fps,
duration_sec, export_duration_sec,
has_music, has_sfx, has_voiceover, music_volume, sfx_volume, voiceover_volume,
render_node_count, current_active_nodes, region, tell_me_when_done,
retry_count, max_retries, repair_attempts, failed_message, failed_at_step::text,
export_id, task_start_date, queued_at, started_at, completed_at, created_at, updated_at
FROM render.render_jobs WHERE user_id = $1`
args := []any{userID}
argIdx := 2
if status != "" {
q += fmt.Sprintf(" AND step::text = $%d", argIdx)
args = append(args, status)
argIdx++
}
var total int64
countQ := `SELECT COUNT(*) FROM render.render_jobs WHERE user_id = $1`
if status != "" {
countQ += fmt.Sprintf(" AND step::text = $2")
_ = s.pool.QueryRow(ctx, countQ, args...).Scan(&total)
} else {
_ = s.pool.QueryRow(ctx, countQ, userID).Scan(&total)
}
q += fmt.Sprintf(" ORDER BY created_at DESC LIMIT $%d OFFSET $%d", argIdx, argIdx+1)
args = append(args, pageSize, (page-1)*pageSize)
rows, err := s.pool.Query(ctx, q, args...)
if err != nil {
return nil, 0, err
}
defer rows.Close()
jobs, err := scanJobs(rows)
return jobs, total, err
}
func (s *Store) GetJobByID(ctx context.Context, id, userID uuid.UUID) (*models.RenderJob, error) {
rows, err := s.pool.Query(ctx, `
SELECT id, tenant_id, user_id, saved_project_id, original_project_id,
project_name, title, name, external_job_id, priority_queue::text, priority_score,
step::text, render_progress, convert_progress, image_preview_b64,
price_type::text, paid_price_minor, discount_code, support_flatrender,
mode, quality::text, resolution, r_height, frame_rate, is_60_fps,
duration_sec, export_duration_sec,
has_music, has_sfx, has_voiceover, music_volume, sfx_volume, voiceover_volume,
render_node_count, current_active_nodes, region, tell_me_when_done,
retry_count, max_retries, repair_attempts, failed_message, failed_at_step::text,
export_id, task_start_date, queued_at, started_at, completed_at, created_at, updated_at
FROM render.render_jobs WHERE id = $1 AND user_id = $2`, id, userID)
if err != nil {
return nil, err
}
defer rows.Close()
jobs, err := scanJobs(rows)
if err != nil {
return nil, err
}
if len(jobs) == 0 {
return nil, fmt.Errorf("job not found")
}
return jobs[0], nil
}
func (s *Store) CreateJob(ctx context.Context, userID, tenantID uuid.UUID, req *models.RenderJobCreateRequest) (*models.RenderJob, error) {
priceType := "Free"
if req.PriceType != nil {
priceType = *req.PriceType
}
frameRate := 30
if req.FrameRate != nil {
frameRate = *req.FrameRate
}
tellMe := true
if req.TellMeWhenDone != nil {
tellMe = *req.TellMeWhenDone
}
var id uuid.UUID
err := s.pool.QueryRow(ctx, `
INSERT INTO render.render_jobs
(tenant_id, user_id, saved_project_id, original_project_id,
priority_queue, step, price_type, quality, resolution, r_height,
frame_rate, is_60_fps, duration_sec, mode, tell_me_when_done, region)
VALUES ($1, $2, $3, $3,
'paid'::render_priority_queue, 'Queued'::render_step, $4::price_kind,
$5::render_quality, $6, 1080, $7, COALESCE($8, FALSE),
0, 'FIX', $9, $10)
RETURNING id`,
tenantID, userID, req.SavedProjectID, priceType,
req.Quality, req.Resolution, frameRate, req.Is60FPS,
tellMe, req.PreferredRegion,
).Scan(&id)
if err != nil {
return nil, err
}
return s.GetJobByID(ctx, id, userID)
}
// CompleteJob marks a render job as Done. Returns the updated job so callers
// can read user_id / tenant_id for downstream notifications.
func (s *Store) CompleteJob(ctx context.Context, jobID uuid.UUID, exportID *uuid.UUID) (*models.RenderJob, error) {
_, err := s.pool.Exec(ctx, `
UPDATE render.render_jobs SET
step = 'Done'::render_step,
render_progress = 100,
export_id = COALESCE($1, export_id),
completed_at = NOW(),
updated_at = NOW()
WHERE id = $2
AND step NOT IN ('Done'::render_step, 'Failed'::render_step, 'Cancelled'::render_step)`,
exportID, jobID)
if err != nil {
return nil, err
}
return s.getJobByIDInternal(ctx, jobID)
}
// FailJob marks a render job as Failed. Returns the updated job.
func (s *Store) FailJob(ctx context.Context, jobID uuid.UUID, reason, atStep string) (*models.RenderJob, error) {
_, err := s.pool.Exec(ctx, `
UPDATE render.render_jobs SET
step = 'Failed'::render_step,
failed_message = $1,
failed_at_step = $2::render_step,
completed_at = NOW(),
updated_at = NOW()
WHERE id = $3
AND step NOT IN ('Done'::render_step, 'Failed'::render_step, 'Cancelled'::render_step)`,
reason, atStep, jobID)
if err != nil {
return nil, err
}
return s.getJobByIDInternal(ctx, jobID)
}
// getJobByIDInternal retrieves a job by ID without an ownership check.
// Used only for internal operations (node agent callbacks).
func (s *Store) getJobByIDInternal(ctx context.Context, id uuid.UUID) (*models.RenderJob, error) {
rows, err := s.pool.Query(ctx, `
SELECT id, tenant_id, user_id, saved_project_id, original_project_id,
project_name, title, name, external_job_id, priority_queue::text, priority_score,
step::text, render_progress, convert_progress, image_preview_b64,
price_type::text, paid_price_minor, discount_code, support_flatrender,
mode, quality::text, resolution, r_height, frame_rate, is_60_fps,
duration_sec, export_duration_sec,
has_music, has_sfx, has_voiceover, music_volume, sfx_volume, voiceover_volume,
render_node_count, current_active_nodes, region, tell_me_when_done,
retry_count, max_retries, repair_attempts, failed_message, failed_at_step::text,
export_id, task_start_date, queued_at, started_at, completed_at, created_at, updated_at
FROM render.render_jobs WHERE id = $1`, id)
if err != nil {
return nil, err
}
defer rows.Close()
jobs, err := scanJobs(rows)
if err != nil {
return nil, err
}
if len(jobs) == 0 {
return nil, fmt.Errorf("job %s not found", id)
}
return jobs[0], nil
}
// ClaimJob atomically picks the highest-priority Queued job (optionally filtered
// by region) and moves it to Preparing, setting the current_job_id on the node.
// Returns (nil, nil) when there is nothing to do.
func (s *Store) ClaimJob(ctx context.Context, nodeID uuid.UUID, region string) (*models.RenderJob, error) {
tx, err := s.pool.Begin(ctx)
if err != nil {
return nil, err
}
defer func() { _ = tx.Rollback(ctx) }()
q := `SELECT id FROM render.render_jobs
WHERE step = 'Queued'::render_step`
args := []any{}
argIdx := 1
if region != "" {
q += fmt.Sprintf(" AND (region IS NULL OR region = $%d)", argIdx)
args = append(args, region)
argIdx++
}
q += " ORDER BY priority_score DESC, queued_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED"
var jobID uuid.UUID
if err := tx.QueryRow(ctx, q, args...).Scan(&jobID); err != nil {
if err.Error() == "no rows in result set" {
return nil, nil // nothing to do
}
return nil, err
}
// Advance to Preparing and assign to this node
_, err = tx.Exec(ctx, `
UPDATE render.render_jobs SET
step = 'Preparing'::render_step,
started_at = COALESCE(started_at, NOW()),
updated_at = NOW()
WHERE id = $1`, jobID)
if err != nil {
return nil, err
}
_, err = tx.Exec(ctx, `
UPDATE render.render_nodes SET
status = 'Busy'::node_status,
current_job_id = $1,
job_started_at = NOW(),
updated_at = NOW()
WHERE id = $2`, jobID, nodeID)
if err != nil {
return nil, err
}
if err := tx.Commit(ctx); err != nil {
return nil, err
}
return s.getJobByIDInternal(ctx, jobID)
}
// CreateExportForJob allocates a new Export row for a completed render job.
// The export starts with a placeholder path `exports/{export_id}/output.mp4`.
// The node agent uploads the MP4 to that MinIO path, then calls CompleteJob
// with the returned export_id.
func (s *Store) CreateExportForJob(ctx context.Context, jobID uuid.UUID) (*models.Export, error) {
// Look up the job to get tenant/user/project context
job, err := s.getJobByIDInternal(ctx, jobID)
if err != nil {
return nil, fmt.Errorf("job not found: %w", err)
}
exportID := uuid.New()
path := fmt.Sprintf("exports/%s/output.mp4", exportID)
now := time.Now()
autoDelete := now.AddDate(0, 0, 30) // 30-day retention
_, err = s.pool.Exec(ctx, `
INSERT INTO render.exports
(id, tenant_id, user_id, saved_project_id, original_project_id,
render_job_id, path, file_extension, file_type, render_quality,
create_type, size_bytes, produce_date, auto_delete_date,
delete_notified, created_at)
VALUES
($1, $2, $3, $4, $5,
$6, $7, 'mp4', 'video', $8,
'render', 0, $9, $10,
false, $9)`,
exportID, job.TenantID, job.UserID, job.SavedProjectID, job.OriginalProjectID,
job.ID, path, job.Quality,
now, autoDelete,
)
if err != nil {
return nil, fmt.Errorf("create export: %w", err)
}
return &models.Export{
ID: exportID,
TenantID: job.TenantID,
UserID: job.UserID,
SavedProjectID: job.SavedProjectID,
Path: path,
FileExtension: "mp4",
FileType: "video",
RenderQuality: job.Quality,
CreateType: "render",
ProduceDate: now,
AutoDeleteDate: autoDelete,
CreatedAt: now,
}, nil
}
// UpdateJobPreview stores a base64-encoded preview frame for a running job.
// Called by the node agent every N frames to power the live preview UI.
func (s *Store) UpdateJobPreview(ctx context.Context, jobID uuid.UUID, imageB64 string) error {
_, err := s.pool.Exec(ctx, `
UPDATE render.render_jobs
SET image_preview_b64 = $1, updated_at = NOW()
WHERE id = $2
AND step NOT IN ('Done'::render_step, 'Failed'::render_step, 'Cancelled'::render_step)`,
imageB64, jobID)
return err
}
func (s *Store) CancelJob(ctx context.Context, id, userID uuid.UUID) (bool, error) {
tag, err := s.pool.Exec(ctx, `
UPDATE render.render_jobs
SET step = 'Cancelled'::render_step, completed_at = NOW(), updated_at = NOW()
WHERE id = $1 AND user_id = $2 AND step NOT IN ('Done','Failed','Cancelled')`,
id, userID)
return tag.RowsAffected() > 0, err
}
// StopJob cancels any in-progress job regardless of owner (admin action), frees the
// assigned node, and returns the job's owner id so the caller can refund their charge.
func (s *Store) StopJob(ctx context.Context, id uuid.UUID) (bool, uuid.UUID, error) {
var ownerID uuid.UUID
err := s.pool.QueryRow(ctx, `
UPDATE render.render_jobs
SET step = 'Cancelled'::render_step, completed_at = NOW(), updated_at = NOW()
WHERE id = $1 AND step NOT IN ('Done','Failed','Cancelled')
RETURNING user_id`, id).Scan(&ownerID)
if err == pgx.ErrNoRows {
return false, uuid.Nil, nil
}
if err != nil {
return false, uuid.Nil, err
}
// Release any node that was actively working this job.
_, _ = s.pool.Exec(ctx,
`UPDATE render.render_nodes SET status = 'Ready'::node_status, current_frame_job_id = NULL, updated_at = NOW()
WHERE current_frame_job_id IN (SELECT id FROM render.frame_jobs WHERE render_job_id = $1)`, id)
return true, ownerID, nil
}
func (s *Store) GetJobProgress(ctx context.Context, id, userID uuid.UUID) (*models.RenderJob, error) {
return s.GetJobByID(ctx, id, userID)
}
func (s *Store) ListFrameJobs(ctx context.Context, renderJobID uuid.UUID) ([]*models.FrameJob, error) {
rows, err := s.pool.Query(ctx, `
SELECT id, render_job_id, node_id, start_frame, end_frame, collect_frame_count,
order_value, folder_name, convert_url, status::text, frames_rendered,
frames_validated, attempt, last_error, output_mp4_url,
assigned_at, started_at, last_progress_at, completed_at, created_at, updated_at
FROM render.frame_jobs WHERE render_job_id = $1 ORDER BY order_value`, renderJobID)
if err != nil {
return nil, err
}
defer rows.Close()
return scanFrameJobs(rows)
}
func (s *Store) UpdateFrameProgress(ctx context.Context, jobID uuid.UUID, req *models.FrameProgressRequest) error {
_, err := s.pool.Exec(ctx, `
UPDATE render.frame_jobs SET
frames_rendered = frames_rendered + 1,
last_progress_at = NOW(),
status = CASE WHEN $1 IS NOT NULL THEN 'Validated'::frame_job_status ELSE status END,
completed_at = $1,
updated_at = NOW()
WHERE id = $2`,
req.CompletedAt, req.FrameJobID)
return err
}
// ── Snapshots ────────────────────────────────────────────────────────────────
func (s *Store) GetCachedSnapshot(ctx context.Context, projectID uuid.UUID, sceneKey string, frame int, hash string) (*models.Snapshot, error) {
rows, err := s.pool.Query(ctx, `
SELECT id, tenant_id, user_id, saved_project_id, scene_key, frame_number,
inputs_hash, status, render_node_id, image_url, thumbnail_url,
width, height, size_bytes, requested_at, completed_at, duration_ms,
expires_at, error_message, created_at
FROM render.snapshots
WHERE saved_project_id = $1 AND scene_key = $2 AND frame_number = $3
AND inputs_hash = $4 AND status = 'Done' AND expires_at > NOW()
LIMIT 1`, projectID, sceneKey, frame, hash)
if err != nil {
return nil, err
}
defer rows.Close()
snaps, err := scanSnapshots(rows)
if err != nil || len(snaps) == 0 {
return nil, err
}
return snaps[0], nil
}
func (s *Store) CreateSnapshot(ctx context.Context, userID, tenantID uuid.UUID, req *models.SnapshotCreateRequest, hash string) (*models.Snapshot, error) {
var id uuid.UUID
err := s.pool.QueryRow(ctx, `
INSERT INTO render.snapshots
(tenant_id, user_id, saved_project_id, scene_key, frame_number, inputs_hash, status)
VALUES ($1, $2, $3, $4, $5, $6, 'Pending')
RETURNING id`,
tenantID, userID, req.SavedProjectID, req.SceneKey, req.FrameNumber, hash,
).Scan(&id)
if err != nil {
return nil, err
}
return s.GetSnapshotByID(ctx, id)
}
func (s *Store) GetSnapshotByID(ctx context.Context, id uuid.UUID) (*models.Snapshot, error) {
rows, err := s.pool.Query(ctx, `
SELECT id, tenant_id, user_id, saved_project_id, scene_key, frame_number,
inputs_hash, status, render_node_id, image_url, thumbnail_url,
width, height, size_bytes, requested_at, completed_at, duration_ms,
expires_at, error_message, created_at
FROM render.snapshots WHERE id = $1`, id)
if err != nil {
return nil, err
}
defer rows.Close()
snaps, err := scanSnapshots(rows)
if err != nil {
return nil, err
}
if len(snaps) == 0 {
return nil, fmt.Errorf("snapshot not found")
}
return snaps[0], nil
}
// ── Exports ──────────────────────────────────────────────────────────────────
func (s *Store) ListExports(ctx context.Context, userID uuid.UUID, page, pageSize int) ([]*models.Export, int64, error) {
var total int64
_ = s.pool.QueryRow(ctx,
`SELECT COUNT(*) FROM render.exports WHERE user_id = $1 AND deleted_at IS NULL`, userID).Scan(&total)
rows, err := s.pool.Query(ctx, `
SELECT id, tenant_id, user_id, saved_project_id, project_id, render_job_id,
image, path, file_extension, file_type::text, render_quality::text,
create_type::text, size_bytes, duration_sec, width, height,
produce_date, auto_delete_date, delete_notified, created_at, deleted_at
FROM render.exports WHERE user_id = $1 AND deleted_at IS NULL
ORDER BY produce_date DESC LIMIT $2 OFFSET $3`,
userID, pageSize, (page-1)*pageSize)
if err != nil {
return nil, 0, err
}
defer rows.Close()
exports, err := scanExports(rows)
return exports, total, err
}
func (s *Store) GetExportByID(ctx context.Context, id, userID uuid.UUID) (*models.Export, error) {
rows, err := s.pool.Query(ctx, `
SELECT id, tenant_id, user_id, saved_project_id, project_id, render_job_id,
image, path, file_extension, file_type::text, render_quality::text,
create_type::text, size_bytes, duration_sec, width, height,
produce_date, auto_delete_date, delete_notified, created_at, deleted_at
FROM render.exports WHERE id = $1 AND user_id = $2 AND deleted_at IS NULL`, id, userID)
if err != nil {
return nil, err
}
defer rows.Close()
exports, err := scanExports(rows)
if err != nil {
return nil, err
}
if len(exports) == 0 {
return nil, fmt.Errorf("export not found")
}
return exports[0], nil
}
func (s *Store) ListExportFiles(ctx context.Context, exportID uuid.UUID) ([]*models.ExportFile, error) {
rows, err := s.pool.Query(ctx, `
SELECT id, export_id, user_id, name, thumbnail, path, size_bytes,
file_type::text, width, height, sort, created_at
FROM render.export_files WHERE export_id = $1 ORDER BY sort`, exportID)
if err != nil {
return nil, err
}
defer rows.Close()
var out []*models.ExportFile
for rows.Next() {
f := &models.ExportFile{}
if err := rows.Scan(&f.ID, &f.ExportID, &f.UserID, &f.Name, &f.Thumbnail,
&f.Path, &f.SizeBytes, &f.FileType, &f.Width, &f.Height, &f.Sort, &f.CreatedAt); err != nil {
return nil, err
}
out = append(out, f)
}
return out, rows.Err()
}
func (s *Store) SoftDeleteExport(ctx context.Context, id, userID uuid.UUID) error {
tag, err := s.pool.Exec(ctx, `
UPDATE render.exports SET deleted_at = NOW() WHERE id = $1 AND user_id = $2 AND deleted_at IS NULL`, id, userID)
if err != nil {
return err
}
if tag.RowsAffected() == 0 {
return fmt.Errorf("export not found")
}
return nil
}
func (s *Store) ExtendExportDeleteDate(ctx context.Context, id, userID uuid.UUID, days int) (time.Time, error) {
var newDate time.Time
err := s.pool.QueryRow(ctx, `
UPDATE render.exports SET auto_delete_date = auto_delete_date + ($1 || ' days')::interval
WHERE id = $2 AND user_id = $3 AND deleted_at IS NULL
RETURNING auto_delete_date`, days, id, userID).Scan(&newDate)
return newDate, err
}
// ── Node Updates ─────────────────────────────────────────────────────────────
func (s *Store) ListNodeUpdates(ctx context.Context) ([]*models.NodeUpdate, error) {
rows, err := s.pool.Query(ctx, `
SELECT id, update_file_name, update_number, description, target_ae_version::text,
in_update_queue, rolled_out_to_node_ids, last_update_queue_date, create_date, created_at
FROM render.node_updates ORDER BY update_number DESC`)
if err != nil {
return nil, err
}
defer rows.Close()
var out []*models.NodeUpdate
for rows.Next() {
u := &models.NodeUpdate{}
if err := rows.Scan(&u.ID, &u.UpdateFileName, &u.UpdateNumber, &u.Description,
&u.TargetAEVersion, &u.InUpdateQueue, &u.RolledOutToNodeIDs,
&u.LastUpdateQueueDate, &u.CreateDate, &u.CreatedAt); err != nil {
return nil, err
}
out = append(out, u)
}
return out, rows.Err()
}
func (s *Store) QueueUpdateRollout(ctx context.Context, updateID uuid.UUID, nodeIDs []uuid.UUID) error {
_, err := s.pool.Exec(ctx, `
UPDATE render.node_updates SET
in_update_queue = TRUE,
last_update_queue_date = NOW(),
rolled_out_to_node_ids = rolled_out_to_node_ids || $1
WHERE id = $2`, nodeIDs, updateID)
return err
}
// ── Scanners ─────────────────────────────────────────────────────────────────
func scanNodes(rows pgx.Rows) ([]*models.RenderNode, error) {
var out []*models.RenderNode
for rows.Next() {
n := &models.RenderNode{}
if err := rows.Scan(
&n.ID, &n.Name, &n.Region, &n.NodeIP, &n.WorkerPort, &n.PublicEndpoint,
&n.RamGB, &n.CPUCores, &n.GPUModel, &n.StorageGB,
&n.CurrentAEVersion, &n.AvailableAEVersions, &n.NodeAgentVersion,
&n.NodeKind, &n.OwnerUserID, &n.OwnerTenantID,
&n.Status, &n.CurrentJobID, &n.CurrentFrameJobID, &n.JobStartedAt,
&n.LastHeartbeatAt, &n.LastCPUPct, &n.LastRAMAvailableMB, &n.AERunning,
&n.LifetimeTaskCount, &n.LifetimeCrashCount, &n.ConsecutiveFailures,
&n.Priority, &n.IsActive, &n.AcceptsNewJobs,
&n.LastMaintenanceAt, &n.NextMaintenanceAt, &n.MaintenanceReason,
&n.CachedTemplateMD5s, &n.CacheUsedGB, &n.CreatedAt, &n.UpdatedAt,
&n.LastDiskPct, &n.DiskTotalGB,
); err != nil {
return nil, err
}
out = append(out, n)
}
return out, rows.Err()
}
func scanJobs(rows pgx.Rows) ([]*models.RenderJob, error) {
var out []*models.RenderJob
for rows.Next() {
j := &models.RenderJob{}
if err := rows.Scan(
&j.ID, &j.TenantID, &j.UserID, &j.SavedProjectID, &j.OriginalProjectID,
&j.ProjectName, &j.Title, &j.Name, &j.ExternalJobID, &j.PriorityQueue, &j.PriorityScore,
&j.Step, &j.RenderProgress, &j.ConvertProgress, &j.ImagePreviewB64,
&j.PriceType, &j.PaidPriceMinor, &j.DiscountCode, &j.SupportFlatrender,
&j.Mode, &j.Quality, &j.Resolution, &j.RHeight, &j.FrameRate, &j.Is60FPS,
&j.DurationSec, &j.ExportDurationSec,
&j.HasMusic, &j.HasSFX, &j.HasVoiceover, &j.MusicVolume, &j.SFXVolume, &j.VoiceoverVolume,
&j.RenderNodeCount, &j.CurrentActiveNodes, &j.Region, &j.TellMeWhenDone,
&j.RetryCount, &j.MaxRetries, &j.RepairAttempts, &j.FailedMessage, &j.FailedAtStep,
&j.ExportID, &j.TaskStartDate, &j.QueuedAt, &j.StartedAt, &j.CompletedAt,
&j.CreatedAt, &j.UpdatedAt,
); err != nil {
return nil, err
}
out = append(out, j)
}
return out, rows.Err()
}
func scanFrameJobs(rows pgx.Rows) ([]*models.FrameJob, error) {
var out []*models.FrameJob
for rows.Next() {
f := &models.FrameJob{}
if err := rows.Scan(
&f.ID, &f.RenderJobID, &f.NodeID, &f.StartFrame, &f.EndFrame, &f.CollectFrameCount,
&f.OrderValue, &f.FolderName, &f.ConvertURL, &f.Status, &f.FramesRendered,
&f.FramesValidated, &f.Attempt, &f.LastError, &f.OutputMP4URL,
&f.AssignedAt, &f.StartedAt, &f.LastProgressAt, &f.CompletedAt, &f.CreatedAt, &f.UpdatedAt,
); err != nil {
return nil, err
}
out = append(out, f)
}
return out, rows.Err()
}
func scanSnapshots(rows pgx.Rows) ([]*models.Snapshot, error) {
var out []*models.Snapshot
for rows.Next() {
s := &models.Snapshot{}
if err := rows.Scan(
&s.ID, &s.TenantID, &s.UserID, &s.SavedProjectID, &s.SceneKey, &s.FrameNumber,
&s.InputsHash, &s.Status, &s.RenderNodeID, &s.ImageURL, &s.ThumbnailURL,
&s.Width, &s.Height, &s.SizeBytes, &s.RequestedAt, &s.CompletedAt,
&s.DurationMS, &s.ExpiresAt, &s.ErrorMessage, &s.CreatedAt,
); err != nil {
return nil, err
}
out = append(out, s)
}
return out, rows.Err()
}
func scanExports(rows pgx.Rows) ([]*models.Export, error) {
var out []*models.Export
for rows.Next() {
e := &models.Export{}
if err := rows.Scan(
&e.ID, &e.TenantID, &e.UserID, &e.SavedProjectID, &e.ProjectID, &e.RenderJobID,
&e.Image, &e.Path, &e.FileExtension, &e.FileType, &e.RenderQuality,
&e.CreateType, &e.SizeBytes, &e.DurationSec, &e.Width, &e.Height,
&e.ProduceDate, &e.AutoDeleteDate, &e.DeleteNotified, &e.CreatedAt, &e.DeletedAt,
); err != nil {
return nil, err
}
out = append(out, e)
}
return out, rows.Err()
}