package db import ( "context" "fmt" "strings" "time" "github.com/flatrender/render-svc/internal/models" "github.com/google/uuid" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" ) type Store struct { pool *pgxpool.Pool } func NewStore(pool *pgxpool.Pool) *Store { return &Store{pool: pool} } // ── Nodes ──────────────────────────────────────────────────────────────────── func (s *Store) ListNodes(ctx context.Context, region, status string) ([]*models.RenderNode, error) { q := `SELECT id, name, region, node_ip::text, worker_port, public_endpoint, ram_gb, cpu_cores, gpu_model, storage_gb, current_ae_version::text, available_ae_versions, node_agent_version, node_kind::text, owner_user_id, owner_tenant_id, status::text, current_job_id, current_frame_job_id, job_started_at, last_heartbeat_at, last_cpu_pct, last_ram_available_mb, ae_running, lifetime_task_count, lifetime_crash_count, consecutive_failures, priority, is_active, accepts_new_jobs, last_maintenance_at, next_maintenance_at, maintenance_reason, cached_template_md5s, cache_used_gb, created_at, updated_at, last_disk_pct, disk_total_gb FROM render.render_nodes WHERE is_active = TRUE` args := pgx.NamedArgs{} if region != "" { q += " AND region = @region" args["region"] = region } if status != "" { q += " AND status::text = @status" args["status"] = status } q += " ORDER BY region, priority DESC" rows, err := s.pool.Query(ctx, q, args) if err != nil { return nil, err } defer rows.Close() return scanNodes(rows) } func (s *Store) GetNodeByID(ctx context.Context, id uuid.UUID) (*models.RenderNode, error) { rows, err := s.pool.Query(ctx, ` SELECT id, name, region, node_ip::text, worker_port, public_endpoint, ram_gb, cpu_cores, gpu_model, storage_gb, current_ae_version::text, available_ae_versions, node_agent_version, node_kind::text, owner_user_id, owner_tenant_id, status::text, current_job_id, current_frame_job_id, job_started_at, last_heartbeat_at, last_cpu_pct, last_ram_available_mb, ae_running, lifetime_task_count, lifetime_crash_count, consecutive_failures, priority, is_active, accepts_new_jobs, last_maintenance_at, next_maintenance_at, maintenance_reason, cached_template_md5s, cache_used_gb, created_at, updated_at, last_disk_pct, disk_total_gb FROM render.render_nodes WHERE id = $1`, id) if err != nil { return nil, err } defer rows.Close() nodes, err := scanNodes(rows) if err != nil { return nil, err } if len(nodes) == 0 { return nil, fmt.Errorf("node not found") } return nodes[0], nil } func (s *Store) CreateNode(ctx context.Context, req *models.NodeCreateRequest) (*models.RenderNode, error) { kind := "Shared" if req.NodeKind != nil { kind = *req.NodeKind } priority := 100 if req.Priority != nil { priority = *req.Priority } var id uuid.UUID err := s.pool.QueryRow(ctx, ` INSERT INTO render.render_nodes (name, region, node_ip, worker_port, current_ae_version, ram_gb, cpu_cores, node_kind, owner_user_id, priority, status) VALUES ($1, $2, $3::inet, $4, $5::ae_version, $6, $7, $8::node_kind, $9, $10, 'Offline'::node_status) RETURNING id`, req.Name, req.Region, req.NodeIP, req.WorkerPort, req.CurrentAEVersion, req.RamGB, req.CPUCores, kind, req.OwnerUserID, priority, ).Scan(&id) if err != nil { return nil, err } return s.GetNodeByID(ctx, id) } func (s *Store) PatchNode(ctx context.Context, id uuid.UUID, req *models.NodePatchRequest) (*models.RenderNode, error) { if req.Priority != nil { _, err := s.pool.Exec(ctx, `UPDATE render.render_nodes SET priority = $1, updated_at = NOW() WHERE id = $2`, *req.Priority, id) if err != nil { return nil, err } } if req.IsActive != nil { _, err := s.pool.Exec(ctx, `UPDATE render.render_nodes SET is_active = $1, updated_at = NOW() WHERE id = $2`, *req.IsActive, id) if err != nil { return nil, err } } if req.AcceptsNewJobs != nil { _, err := s.pool.Exec(ctx, `UPDATE render.render_nodes SET accepts_new_jobs = $1, updated_at = NOW() WHERE id = $2`, *req.AcceptsNewJobs, id) if err != nil { return nil, err } } if req.NodeKind != nil { _, err := s.pool.Exec(ctx, `UPDATE render.render_nodes SET node_kind = $1::node_kind, updated_at = NOW() WHERE id = $2`, *req.NodeKind, id) if err != nil { return nil, err } } if req.OwnerUserID != nil { _, err := s.pool.Exec(ctx, `UPDATE render.render_nodes SET owner_user_id = $1, updated_at = NOW() WHERE id = $2`, *req.OwnerUserID, id) if err != nil { return nil, err } } if req.NextMaintenanceAt != nil { _, err := s.pool.Exec(ctx, `UPDATE render.render_nodes SET next_maintenance_at = $1, maintenance_reason = $2, updated_at = NOW() WHERE id = $3`, *req.NextMaintenanceAt, req.MaintenanceReason, id) if err != nil { return nil, err } } return s.GetNodeByID(ctx, id) } // DeleteNode permanently removes a render node. func (s *Store) DeleteNode(ctx context.Context, id uuid.UUID) error { _, err := s.pool.Exec(ctx, `DELETE FROM render.render_nodes WHERE id = $1`, id) return err } func (s *Store) UpdateNodeHeartbeat(ctx context.Context, nodeID uuid.UUID, req *models.NodeHeartbeatRequest) error { _, err := s.pool.Exec(ctx, ` UPDATE render.render_nodes SET status = $1::node_status, last_heartbeat_at = NOW(), last_cpu_pct = $2, last_ram_available_mb = $3, last_disk_pct = $4, disk_total_gb = COALESCE($5, disk_total_gb), ae_running = COALESCE($6, ae_running), current_job_id = $7, current_frame_job_id = $8, updated_at = NOW() WHERE id = $9`, req.Status, req.CPUPct, req.RAMAvailableMB, req.DiskUsedPct, req.DiskTotalGB, req.AERunning, req.CurrentJobID, nil, nodeID, ) return err } func (s *Store) UpdateNodeOnline(ctx context.Context, nodeID uuid.UUID, req *models.NodeOnlineRequest) error { _, err := s.pool.Exec(ctx, ` UPDATE render.render_nodes SET status = 'Ready'::node_status, node_agent_version = $1, current_ae_version = $2::ae_version, available_ae_versions = $3, ram_gb = COALESCE($4, ram_gb), cpu_cores = COALESCE($5, cpu_cores), cache_used_gb = COALESCE($6, cache_used_gb), cached_template_md5s = $7, last_heartbeat_at = NOW(), updated_at = NOW() WHERE id = $8`, req.NodeAgentVersion, req.CurrentAEVersion, req.AvailableAEVersions, req.RamGB, req.CPUCores, req.CacheUsedGB, req.CachedTemplateMD5s, nodeID, ) return err } func (s *Store) ReleaseNode(ctx context.Context, nodeID uuid.UUID) error { _, err := s.pool.Exec(ctx, ` UPDATE render.render_nodes SET status = 'Ready'::node_status, current_job_id = NULL, current_frame_job_id = NULL, job_started_at = NULL, updated_at = NOW() WHERE id = $1`, nodeID) return err } func (s *Store) ListNodeHealthHistory(ctx context.Context, nodeID uuid.UUID, from, to time.Time) ([]*models.NodeHealthLog, error) { rows, err := s.pool.Query(ctx, ` SELECT id, node_id, recorded_at, status::text, cpu_pct, ram_available_mb, ae_running, current_job_id, current_frame, cache_used_gb FROM render.node_health_logs WHERE node_id = $1 AND recorded_at BETWEEN $2 AND $3 ORDER BY recorded_at DESC LIMIT 1000`, nodeID, from, to) if err != nil { return nil, err } defer rows.Close() var out []*models.NodeHealthLog for rows.Next() { h := &models.NodeHealthLog{} if err := rows.Scan(&h.ID, &h.NodeID, &h.RecordedAt, &h.Status, &h.CPUPct, &h.RAMAvailableMB, &h.AERunning, &h.CurrentJobID, &h.CurrentFrame, &h.CacheUsedGB); err != nil { return nil, err } out = append(out, h) } return out, rows.Err() } func (s *Store) ListNodeCrashes(ctx context.Context, nodeID uuid.UUID) ([]*models.NodeCrash, error) { rows, err := s.pool.Query(ctx, ` SELECT id, node_id, render_job_id, frame_job_id, crashed_at, last_known_frame, crash_signal, error_log, log_file_url, auto_recovered, recovery_action, recovered_at, created_at FROM render.node_crashes WHERE node_id = $1 ORDER BY crashed_at DESC LIMIT 100`, nodeID) if err != nil { return nil, err } defer rows.Close() var out []*models.NodeCrash for rows.Next() { c := &models.NodeCrash{} if err := rows.Scan(&c.ID, &c.NodeID, &c.RenderJobID, &c.FrameJobID, &c.CrashedAt, &c.LastKnownFrame, &c.CrashSignal, &c.ErrorLog, &c.LogFileURL, &c.AutoRecovered, &c.RecoveryAction, &c.RecoveredAt, &c.CreatedAt); err != nil { return nil, err } out = append(out, c) } return out, rows.Err() } func (s *Store) InsertCrash(ctx context.Context, nodeID, jobID uuid.UUID, req *models.CrashReportRequest) error { _, err := s.pool.Exec(ctx, ` INSERT INTO render.node_crashes (node_id, render_job_id, frame_job_id, last_known_frame, crash_signal, error_log, log_file_url) VALUES ($1, $2, $3, $4, $5, $6, $7)`, nodeID, jobID, req.FrameJobID, req.LastKnownFrame, req.CrashSignal, req.ErrorLogTail, req.LogFileURL) if err != nil { return err } // Increment crash counts _, err = s.pool.Exec(ctx, ` UPDATE render.render_nodes SET lifetime_crash_count = lifetime_crash_count + 1, consecutive_failures = consecutive_failures + 1, updated_at = NOW() WHERE id = $1`, nodeID) return err } func (s *Store) UpdateNodeCache(ctx context.Context, nodeID uuid.UUID, req *models.CacheUpdateRequest) error { switch req.Action { case "Downloaded": if req.ProjectID != nil && req.FileSizeBytes != nil { _, err := s.pool.Exec(ctx, ` INSERT INTO render.node_template_cache (node_id, project_id, aep_file_md5, file_size_bytes, local_path) VALUES ($1, $2, $3, $4, '') ON CONFLICT (node_id, aep_file_md5) DO UPDATE SET last_used_at = NOW(), use_count = node_template_cache.use_count + 1`, nodeID, *req.ProjectID, req.AEPFileMD5, *req.FileSizeBytes) if err != nil { return err } } if req.CacheUsedGB != nil { _, _ = s.pool.Exec(ctx, `UPDATE render.render_nodes SET cache_used_gb = $1, updated_at = NOW() WHERE id = $2`, *req.CacheUsedGB, nodeID) } case "Evicted": _, err := s.pool.Exec(ctx, `DELETE FROM render.node_template_cache WHERE node_id = $1 AND aep_file_md5 = $2`, nodeID, req.AEPFileMD5) if err != nil { return err } if req.CacheUsedGB != nil { _, _ = s.pool.Exec(ctx, `UPDATE render.render_nodes SET cache_used_gb = $1, updated_at = NOW() WHERE id = $2`, *req.CacheUsedGB, nodeID) } } return nil } // ── Render Jobs ─────────────────────────────────────────────────────────────── func (s *Store) ListJobs(ctx context.Context, userID uuid.UUID, status string, page, pageSize int) ([]*models.RenderJob, int64, error) { q := `SELECT id, tenant_id, user_id, saved_project_id, original_project_id, project_name, title, name, external_job_id, priority_queue::text, priority_score, step::text, render_progress, convert_progress, image_preview_b64, price_type::text, paid_price_minor, discount_code, support_flatrender, mode, quality::text, resolution, r_height, frame_rate, is_60_fps, duration_sec, export_duration_sec, has_music, has_sfx, has_voiceover, music_volume, sfx_volume, voiceover_volume, render_node_count, current_active_nodes, region, tell_me_when_done, retry_count, max_retries, repair_attempts, failed_message, failed_at_step::text, export_id, task_start_date, queued_at, started_at, completed_at, created_at, updated_at FROM render.render_jobs WHERE user_id = $1` args := []any{userID} argIdx := 2 if status != "" { q += fmt.Sprintf(" AND step::text = $%d", argIdx) args = append(args, status) argIdx++ } var total int64 countQ := `SELECT COUNT(*) FROM render.render_jobs WHERE user_id = $1` if status != "" { countQ += fmt.Sprintf(" AND step::text = $2") _ = s.pool.QueryRow(ctx, countQ, args...).Scan(&total) } else { _ = s.pool.QueryRow(ctx, countQ, userID).Scan(&total) } q += fmt.Sprintf(" ORDER BY created_at DESC LIMIT $%d OFFSET $%d", argIdx, argIdx+1) args = append(args, pageSize, (page-1)*pageSize) rows, err := s.pool.Query(ctx, q, args...) if err != nil { return nil, 0, err } defer rows.Close() jobs, err := scanJobs(rows) return jobs, total, err } func (s *Store) GetJobByID(ctx context.Context, id, userID uuid.UUID) (*models.RenderJob, error) { rows, err := s.pool.Query(ctx, ` SELECT id, tenant_id, user_id, saved_project_id, original_project_id, project_name, title, name, external_job_id, priority_queue::text, priority_score, step::text, render_progress, convert_progress, image_preview_b64, price_type::text, paid_price_minor, discount_code, support_flatrender, mode, quality::text, resolution, r_height, frame_rate, is_60_fps, duration_sec, export_duration_sec, has_music, has_sfx, has_voiceover, music_volume, sfx_volume, voiceover_volume, render_node_count, current_active_nodes, region, tell_me_when_done, retry_count, max_retries, repair_attempts, failed_message, failed_at_step::text, export_id, task_start_date, queued_at, started_at, completed_at, created_at, updated_at FROM render.render_jobs WHERE id = $1 AND user_id = $2`, id, userID) if err != nil { return nil, err } defer rows.Close() jobs, err := scanJobs(rows) if err != nil { return nil, err } if len(jobs) == 0 { return nil, fmt.Errorf("job not found") } return jobs[0], nil } // CountActiveJobs returns how many non-terminal render jobs the user currently has. // Used to enforce the per-user concurrent-render ceiling. func (s *Store) CountActiveJobs(ctx context.Context, userID uuid.UUID) (int, error) { var n int err := s.pool.QueryRow(ctx, ` SELECT COUNT(*) FROM render.render_jobs WHERE user_id = $1 AND step NOT IN ('Done'::render_step, 'Failed'::render_step, 'Cancelled'::render_step)`, userID).Scan(&n) return n, err } // ListActiveJobs returns the user's in-flight render jobs (most recent first), // lightweight projection for the app-wide mini progress widget. func (s *Store) ListActiveJobs(ctx context.Context, userID uuid.UUID) ([]*models.RenderJob, error) { rows, err := s.pool.Query(ctx, ` SELECT id, saved_project_id, name, step, render_progress, image_preview_b64, created_at FROM render.render_jobs WHERE user_id = $1 AND step NOT IN ('Done'::render_step, 'Failed'::render_step, 'Cancelled'::render_step) ORDER BY created_at DESC`, userID) if err != nil { return nil, err } defer rows.Close() var out []*models.RenderJob for rows.Next() { j := &models.RenderJob{} if err := rows.Scan(&j.ID, &j.SavedProjectID, &j.Name, &j.Step, &j.RenderProgress, &j.ImagePreviewB64, &j.CreatedAt); err != nil { return nil, err } out = append(out, j) } return out, rows.Err() } // ResolutionHeight maps a quality-tier label to its output height in pixels. // Used for r_height (stored) and the node's ffmpeg downscale. func ResolutionHeight(resolution string) int { switch strings.ToLower(strings.TrimSpace(resolution)) { case "360p": return 360 case "540p": return 540 case "720p": return 720 case "1080p", "fullhd": return 1080 case "4k", "2160p": return 2160 default: return 1080 } } func (s *Store) CreateJob(ctx context.Context, userID, tenantID uuid.UUID, req *models.RenderJobCreateRequest) (*models.RenderJob, error) { priceType := "Free" if req.PriceType != nil { priceType = *req.PriceType } frameRate := 30 if req.FrameRate != nil { frameRate = *req.FrameRate } tellMe := true if req.TellMeWhenDone != nil { tellMe = *req.TellMeWhenDone } var id uuid.UUID err := s.pool.QueryRow(ctx, ` INSERT INTO render.render_jobs (tenant_id, user_id, saved_project_id, original_project_id, priority_queue, step, price_type, quality, resolution, r_height, frame_rate, is_60_fps, duration_sec, mode, tell_me_when_done, region) VALUES ($1, $2, $3, -- original_project_id = the TEMPLATE the saved project was created from -- (the render bundle lives at templates/{original_project_id}/). Fall back -- to the saved-project id only if the lookup is somehow null. COALESCE((SELECT original_project_id FROM studio.saved_projects WHERE id = $3), $3), 'paid'::render_priority_queue, 'Queued'::render_step, $4::price_kind, $5::render_quality, $6, $11, $7, COALESCE($8, FALSE), 0, 'FIX', $9, $10) RETURNING id`, tenantID, userID, req.SavedProjectID, priceType, req.Quality, req.Resolution, frameRate, req.Is60FPS, tellMe, req.PreferredRegion, ResolutionHeight(req.Resolution), ).Scan(&id) if err != nil { return nil, err } return s.GetJobByID(ctx, id, userID) } // CompleteJob marks a render job as Done. Returns the updated job so callers // can read user_id / tenant_id for downstream notifications. func (s *Store) CompleteJob(ctx context.Context, jobID uuid.UUID, exportID *uuid.UUID) (*models.RenderJob, error) { _, err := s.pool.Exec(ctx, ` UPDATE render.render_jobs SET step = 'Done'::render_step, render_progress = 100, export_id = COALESCE($1, export_id), completed_at = NOW(), updated_at = NOW() WHERE id = $2 AND step NOT IN ('Done'::render_step, 'Failed'::render_step, 'Cancelled'::render_step)`, exportID, jobID) if err != nil { return nil, err } return s.getJobByIDInternal(ctx, jobID) } // DevClaimNextQueued atomically picks the oldest Queued job and moves it to // Preparing (started_at set). Returns (nil, nil) when the queue is empty. // // This is the dev/mock path: it claims WITHOUT assigning a render node, so the // in-process dev worker can simulate a render end-to-end with no Windows AE node. // Never enabled in production (gated by RENDER_DEV_WORKER). func (s *Store) DevClaimNextQueued(ctx context.Context) (uuid.UUID, error) { tx, err := s.pool.Begin(ctx) if err != nil { return uuid.Nil, err } defer func() { _ = tx.Rollback(ctx) }() var jobID uuid.UUID err = tx.QueryRow(ctx, ` SELECT id FROM render.render_jobs WHERE step = 'Queued'::render_step ORDER BY priority_score DESC, queued_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED`).Scan(&jobID) if err != nil { if err.Error() == "no rows in result set" { return uuid.Nil, nil } return uuid.Nil, err } _, err = tx.Exec(ctx, ` UPDATE render.render_jobs SET step = 'Preparing'::render_step, started_at = COALESCE(started_at, NOW()), updated_at = NOW() WHERE id = $1`, jobID) if err != nil { return uuid.Nil, err } if err := tx.Commit(ctx); err != nil { return uuid.Nil, err } return jobID, nil } // UpdateJobStepProgress sets the step + progress for a job (dev worker + future // fine-grained progress). No-op on terminal jobs. func (s *Store) UpdateJobStepProgress(ctx context.Context, jobID uuid.UUID, step string, progress int) error { _, err := s.pool.Exec(ctx, ` UPDATE render.render_jobs SET step = $1::render_step, render_progress = $2, updated_at = NOW() WHERE id = $3 AND step NOT IN ('Done'::render_step, 'Failed'::render_step, 'Cancelled'::render_step)`, step, progress, jobID) return err } // FailJob marks a render job as Failed. Returns the updated job. func (s *Store) FailJob(ctx context.Context, jobID uuid.UUID, reason, atStep string) (*models.RenderJob, error) { _, err := s.pool.Exec(ctx, ` UPDATE render.render_jobs SET step = 'Failed'::render_step, failed_message = $1, failed_at_step = $2::render_step, completed_at = NOW(), updated_at = NOW() WHERE id = $3 AND step NOT IN ('Done'::render_step, 'Failed'::render_step, 'Cancelled'::render_step)`, reason, atStep, jobID) if err != nil { return nil, err } return s.getJobByIDInternal(ctx, jobID) } // getJobByIDInternal retrieves a job by ID without an ownership check. // Used only for internal operations (node agent callbacks). func (s *Store) getJobByIDInternal(ctx context.Context, id uuid.UUID) (*models.RenderJob, error) { rows, err := s.pool.Query(ctx, ` SELECT id, tenant_id, user_id, saved_project_id, original_project_id, project_name, title, name, external_job_id, priority_queue::text, priority_score, step::text, render_progress, convert_progress, image_preview_b64, price_type::text, paid_price_minor, discount_code, support_flatrender, mode, quality::text, resolution, r_height, frame_rate, is_60_fps, duration_sec, export_duration_sec, has_music, has_sfx, has_voiceover, music_volume, sfx_volume, voiceover_volume, render_node_count, current_active_nodes, region, tell_me_when_done, retry_count, max_retries, repair_attempts, failed_message, failed_at_step::text, export_id, task_start_date, queued_at, started_at, completed_at, created_at, updated_at FROM render.render_jobs WHERE id = $1`, id) if err != nil { return nil, err } defer rows.Close() jobs, err := scanJobs(rows) if err != nil { return nil, err } if len(jobs) == 0 { return nil, fmt.Errorf("job %s not found", id) } return jobs[0], nil } // ClaimJob atomically picks the highest-priority Queued job (optionally filtered // by region) and moves it to Preparing, setting the current_job_id on the node. // Returns (nil, nil) when there is nothing to do. func (s *Store) ClaimJob(ctx context.Context, nodeID uuid.UUID, region string) (*models.RenderJob, error) { tx, err := s.pool.Begin(ctx) if err != nil { return nil, err } defer func() { _ = tx.Rollback(ctx) }() q := `SELECT id FROM render.render_jobs WHERE step = 'Queued'::render_step` args := []any{} argIdx := 1 if region != "" { q += fmt.Sprintf(" AND (region IS NULL OR region = $%d)", argIdx) args = append(args, region) argIdx++ } q += " ORDER BY priority_score DESC, queued_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED" var jobID uuid.UUID if err := tx.QueryRow(ctx, q, args...).Scan(&jobID); err != nil { if err.Error() == "no rows in result set" { return nil, nil // nothing to do } return nil, err } // Advance to Preparing and assign to this node _, err = tx.Exec(ctx, ` UPDATE render.render_jobs SET step = 'Preparing'::render_step, started_at = COALESCE(started_at, NOW()), updated_at = NOW() WHERE id = $1`, jobID) if err != nil { return nil, err } _, err = tx.Exec(ctx, ` UPDATE render.render_nodes SET status = 'Busy'::node_status, current_job_id = $1, job_started_at = NOW(), updated_at = NOW() WHERE id = $2`, jobID, nodeID) if err != nil { return nil, err } if err := tx.Commit(ctx); err != nil { return nil, err } return s.getJobByIDInternal(ctx, jobID) } // CreateExportForJob allocates a new Export row for a completed render job. // The export starts with a placeholder path `exports/{export_id}/output.mp4`. // The node agent uploads the MP4 to that MinIO path, then calls CompleteJob // with the returned export_id. // GetTemplateCompName returns the After Effects composition to render for a // template (content.projects.render_aep_comp), e.g. "frfinal". aerender needs // this via -comp; without it AE opens the project but renders nothing. func (s *Store) GetTemplateCompName(ctx context.Context, originalProjectID uuid.UUID) (string, error) { var comp *string err := s.pool.QueryRow(ctx, `SELECT render_aep_comp FROM content.projects WHERE id = $1`, originalProjectID).Scan(&comp) if err != nil { return "", err } if comp == nil { return "", nil } return *comp, nil } // GetRenderBindings returns the user's edited input values for a saved project so the // node can write them into the AE project before rendering (the render binder). Only // inputs with a non-empty value are returned (defaults are already in the template). func (s *Store) GetRenderBindings(ctx context.Context, savedProjectID uuid.UUID) ([]models.RenderBinding, error) { rows, err := s.pool.Query(ctx, ` SELECT c.key, c.type, COALESCE(c.value, '') FROM studio.saved_scene_contents c JOIN studio.saved_scenes s ON s.id = c.saved_scene_id WHERE s.saved_project_id = $1 AND c.value IS NOT NULL AND c.value <> ''`, savedProjectID) if err != nil { return nil, err } defer rows.Close() var out []models.RenderBinding for rows.Next() { var b models.RenderBinding if err := rows.Scan(&b.Key, &b.Type, &b.Value); err != nil { return nil, err } out = append(out, b) } return out, rows.Err() } func (s *Store) CreateExportForJob(ctx context.Context, jobID uuid.UUID) (*models.Export, error) { // Look up the job to get tenant/user/project context job, err := s.getJobByIDInternal(ctx, jobID) if err != nil { return nil, fmt.Errorf("job not found: %w", err) } exportID := uuid.New() path := fmt.Sprintf("exports/%s/output.mp4", exportID) now := time.Now() autoDelete := now.AddDate(0, 0, 30) // 30-day retention _, err = s.pool.Exec(ctx, ` INSERT INTO render.exports (id, tenant_id, user_id, saved_project_id, project_id, render_job_id, path, file_extension, file_type, render_quality, create_type, size_bytes, produce_date, auto_delete_date, delete_notified, created_at) VALUES ($1, $2, $3, $4, $5, $6, $7, 'mp4', 'Video'::render.export_file_type, $8::render.render_quality, 'Render'::render.export_create_type, 0, $9, $10, false, $9)`, exportID, job.TenantID, job.UserID, job.SavedProjectID, job.OriginalProjectID, job.ID, path, job.Quality, now, autoDelete, ) if err != nil { return nil, fmt.Errorf("create export: %w", err) } return &models.Export{ ID: exportID, TenantID: job.TenantID, UserID: job.UserID, SavedProjectID: job.SavedProjectID, Path: path, FileExtension: "mp4", FileType: "video", RenderQuality: job.Quality, CreateType: "render", ProduceDate: now, AutoDeleteDate: autoDelete, CreatedAt: now, }, nil } // UpdateJobPreview stores a base64-encoded preview frame for a running job. // Called by the node agent every N frames to power the live preview UI. func (s *Store) UpdateJobPreview(ctx context.Context, jobID uuid.UUID, imageB64 string) error { _, err := s.pool.Exec(ctx, ` UPDATE render.render_jobs SET image_preview_b64 = $1, updated_at = NOW() WHERE id = $2 AND step NOT IN ('Done'::render_step, 'Failed'::render_step, 'Cancelled'::render_step)`, imageB64, jobID) return err } // UpdateJobProgress stores the render percentage for a running job and advances the // step to 'Rendering' once it actually starts (so the UI shows progress + ETA, not a // stuck 'Preparing'). Called by the node agent every few seconds during a render. func (s *Store) UpdateJobProgress(ctx context.Context, jobID uuid.UUID, progress int) error { if progress < 0 { progress = 0 } if progress > 100 { progress = 100 } _, err := s.pool.Exec(ctx, ` UPDATE render.render_jobs SET render_progress = $1, step = CASE WHEN step IN ('Queued'::render_step, 'Preparing'::render_step) AND $1 > 0 THEN 'Rendering'::render_step ELSE step END, started_at = COALESCE(started_at, NOW()), updated_at = NOW() WHERE id = $2 AND step NOT IN ('Done'::render_step, 'Failed'::render_step, 'Cancelled'::render_step)`, progress, jobID) return err } func (s *Store) CancelJob(ctx context.Context, id, userID uuid.UUID) (bool, error) { tag, err := s.pool.Exec(ctx, ` UPDATE render.render_jobs SET step = 'Cancelled'::render_step, completed_at = NOW(), updated_at = NOW() WHERE id = $1 AND user_id = $2 AND step NOT IN ('Done','Failed','Cancelled')`, id, userID) return tag.RowsAffected() > 0, err } // StopJob cancels any in-progress job regardless of owner (admin action), frees the // assigned node, and returns the job's owner id so the caller can refund their charge. func (s *Store) StopJob(ctx context.Context, id uuid.UUID) (bool, uuid.UUID, error) { var ownerID uuid.UUID err := s.pool.QueryRow(ctx, ` UPDATE render.render_jobs SET step = 'Cancelled'::render_step, completed_at = NOW(), updated_at = NOW() WHERE id = $1 AND step NOT IN ('Done','Failed','Cancelled') RETURNING user_id`, id).Scan(&ownerID) if err == pgx.ErrNoRows { return false, uuid.Nil, nil } if err != nil { return false, uuid.Nil, err } // Release any node that was actively working this job. _, _ = s.pool.Exec(ctx, `UPDATE render.render_nodes SET status = 'Ready'::node_status, current_frame_job_id = NULL, updated_at = NOW() WHERE current_frame_job_id IN (SELECT id FROM render.frame_jobs WHERE render_job_id = $1)`, id) return true, ownerID, nil } func (s *Store) GetJobProgress(ctx context.Context, id, userID uuid.UUID) (*models.RenderJob, error) { return s.GetJobByID(ctx, id, userID) } func (s *Store) ListFrameJobs(ctx context.Context, renderJobID uuid.UUID) ([]*models.FrameJob, error) { rows, err := s.pool.Query(ctx, ` SELECT id, render_job_id, node_id, start_frame, end_frame, collect_frame_count, order_value, folder_name, convert_url, status::text, frames_rendered, frames_validated, attempt, last_error, output_mp4_url, assigned_at, started_at, last_progress_at, completed_at, created_at, updated_at FROM render.frame_jobs WHERE render_job_id = $1 ORDER BY order_value`, renderJobID) if err != nil { return nil, err } defer rows.Close() return scanFrameJobs(rows) } func (s *Store) UpdateFrameProgress(ctx context.Context, jobID uuid.UUID, req *models.FrameProgressRequest) error { _, err := s.pool.Exec(ctx, ` UPDATE render.frame_jobs SET frames_rendered = frames_rendered + 1, last_progress_at = NOW(), status = CASE WHEN $1 IS NOT NULL THEN 'Validated'::frame_job_status ELSE status END, completed_at = $1, updated_at = NOW() WHERE id = $2`, req.CompletedAt, req.FrameJobID) return err } // ── Snapshots ──────────────────────────────────────────────────────────────── func (s *Store) GetCachedSnapshot(ctx context.Context, projectID uuid.UUID, sceneKey string, frame int, hash string) (*models.Snapshot, error) { rows, err := s.pool.Query(ctx, ` SELECT id, tenant_id, user_id, saved_project_id, scene_key, frame_number, inputs_hash, status, render_node_id, image_url, thumbnail_url, width, height, size_bytes, requested_at, completed_at, duration_ms, expires_at, error_message, created_at FROM render.snapshots WHERE saved_project_id = $1 AND scene_key = $2 AND frame_number = $3 AND inputs_hash = $4 AND status = 'Done' AND expires_at > NOW() LIMIT 1`, projectID, sceneKey, frame, hash) if err != nil { return nil, err } defer rows.Close() snaps, err := scanSnapshots(rows) if err != nil || len(snaps) == 0 { return nil, err } return snaps[0], nil } func (s *Store) CreateSnapshot(ctx context.Context, userID, tenantID uuid.UUID, req *models.SnapshotCreateRequest, hash string) (*models.Snapshot, error) { var id uuid.UUID err := s.pool.QueryRow(ctx, ` INSERT INTO render.snapshots (tenant_id, user_id, saved_project_id, scene_key, frame_number, inputs_hash, status) VALUES ($1, $2, $3, $4, $5, $6, 'Pending') RETURNING id`, tenantID, userID, req.SavedProjectID, req.SceneKey, req.FrameNumber, hash, ).Scan(&id) if err != nil { return nil, err } return s.GetSnapshotByID(ctx, id) } func (s *Store) GetSnapshotByID(ctx context.Context, id uuid.UUID) (*models.Snapshot, error) { rows, err := s.pool.Query(ctx, ` SELECT id, tenant_id, user_id, saved_project_id, scene_key, frame_number, inputs_hash, status, render_node_id, image_url, thumbnail_url, width, height, size_bytes, requested_at, completed_at, duration_ms, expires_at, error_message, created_at FROM render.snapshots WHERE id = $1`, id) if err != nil { return nil, err } defer rows.Close() snaps, err := scanSnapshots(rows) if err != nil { return nil, err } if len(snaps) == 0 { return nil, fmt.Errorf("snapshot not found") } return snaps[0], nil } // ── Exports ────────────────────────────────────────────────────────────────── func (s *Store) ListExports(ctx context.Context, userID uuid.UUID, page, pageSize int) ([]*models.Export, int64, error) { var total int64 _ = s.pool.QueryRow(ctx, `SELECT COUNT(*) FROM render.exports WHERE user_id = $1 AND deleted_at IS NULL`, userID).Scan(&total) rows, err := s.pool.Query(ctx, ` SELECT id, tenant_id, user_id, saved_project_id, project_id, render_job_id, image, path, file_extension, file_type::text, render_quality::text, create_type::text, size_bytes, duration_sec, width, height, produce_date, auto_delete_date, delete_notified, created_at, deleted_at FROM render.exports WHERE user_id = $1 AND deleted_at IS NULL ORDER BY produce_date DESC LIMIT $2 OFFSET $3`, userID, pageSize, (page-1)*pageSize) if err != nil { return nil, 0, err } defer rows.Close() exports, err := scanExports(rows) return exports, total, err } func (s *Store) GetExportByID(ctx context.Context, id, userID uuid.UUID) (*models.Export, error) { rows, err := s.pool.Query(ctx, ` SELECT id, tenant_id, user_id, saved_project_id, project_id, render_job_id, image, path, file_extension, file_type::text, render_quality::text, create_type::text, size_bytes, duration_sec, width, height, produce_date, auto_delete_date, delete_notified, created_at, deleted_at FROM render.exports WHERE id = $1 AND user_id = $2 AND deleted_at IS NULL`, id, userID) if err != nil { return nil, err } defer rows.Close() exports, err := scanExports(rows) if err != nil { return nil, err } if len(exports) == 0 { return nil, fmt.Errorf("export not found") } return exports[0], nil } func (s *Store) ListExportFiles(ctx context.Context, exportID uuid.UUID) ([]*models.ExportFile, error) { rows, err := s.pool.Query(ctx, ` SELECT id, export_id, user_id, name, thumbnail, path, size_bytes, file_type::text, width, height, sort, created_at FROM render.export_files WHERE export_id = $1 ORDER BY sort`, exportID) if err != nil { return nil, err } defer rows.Close() var out []*models.ExportFile for rows.Next() { f := &models.ExportFile{} if err := rows.Scan(&f.ID, &f.ExportID, &f.UserID, &f.Name, &f.Thumbnail, &f.Path, &f.SizeBytes, &f.FileType, &f.Width, &f.Height, &f.Sort, &f.CreatedAt); err != nil { return nil, err } out = append(out, f) } return out, rows.Err() } func (s *Store) SoftDeleteExport(ctx context.Context, id, userID uuid.UUID) error { tag, err := s.pool.Exec(ctx, ` UPDATE render.exports SET deleted_at = NOW() WHERE id = $1 AND user_id = $2 AND deleted_at IS NULL`, id, userID) if err != nil { return err } if tag.RowsAffected() == 0 { return fmt.Errorf("export not found") } return nil } func (s *Store) ExtendExportDeleteDate(ctx context.Context, id, userID uuid.UUID, days int) (time.Time, error) { var newDate time.Time err := s.pool.QueryRow(ctx, ` UPDATE render.exports SET auto_delete_date = auto_delete_date + ($1 || ' days')::interval WHERE id = $2 AND user_id = $3 AND deleted_at IS NULL RETURNING auto_delete_date`, days, id, userID).Scan(&newDate) return newDate, err } // ── Node Updates ───────────────────────────────────────────────────────────── func (s *Store) ListNodeUpdates(ctx context.Context) ([]*models.NodeUpdate, error) { rows, err := s.pool.Query(ctx, ` SELECT id, update_file_name, update_number, description, target_ae_version::text, in_update_queue, rolled_out_to_node_ids, last_update_queue_date, create_date, created_at FROM render.node_updates ORDER BY update_number DESC`) if err != nil { return nil, err } defer rows.Close() var out []*models.NodeUpdate for rows.Next() { u := &models.NodeUpdate{} if err := rows.Scan(&u.ID, &u.UpdateFileName, &u.UpdateNumber, &u.Description, &u.TargetAEVersion, &u.InUpdateQueue, &u.RolledOutToNodeIDs, &u.LastUpdateQueueDate, &u.CreateDate, &u.CreatedAt); err != nil { return nil, err } out = append(out, u) } return out, rows.Err() } func (s *Store) QueueUpdateRollout(ctx context.Context, updateID uuid.UUID, nodeIDs []uuid.UUID) error { _, err := s.pool.Exec(ctx, ` UPDATE render.node_updates SET in_update_queue = TRUE, last_update_queue_date = NOW(), rolled_out_to_node_ids = rolled_out_to_node_ids || $1 WHERE id = $2`, nodeIDs, updateID) return err } // ── Scanners ───────────────────────────────────────────────────────────────── func scanNodes(rows pgx.Rows) ([]*models.RenderNode, error) { var out []*models.RenderNode for rows.Next() { n := &models.RenderNode{} if err := rows.Scan( &n.ID, &n.Name, &n.Region, &n.NodeIP, &n.WorkerPort, &n.PublicEndpoint, &n.RamGB, &n.CPUCores, &n.GPUModel, &n.StorageGB, &n.CurrentAEVersion, &n.AvailableAEVersions, &n.NodeAgentVersion, &n.NodeKind, &n.OwnerUserID, &n.OwnerTenantID, &n.Status, &n.CurrentJobID, &n.CurrentFrameJobID, &n.JobStartedAt, &n.LastHeartbeatAt, &n.LastCPUPct, &n.LastRAMAvailableMB, &n.AERunning, &n.LifetimeTaskCount, &n.LifetimeCrashCount, &n.ConsecutiveFailures, &n.Priority, &n.IsActive, &n.AcceptsNewJobs, &n.LastMaintenanceAt, &n.NextMaintenanceAt, &n.MaintenanceReason, &n.CachedTemplateMD5s, &n.CacheUsedGB, &n.CreatedAt, &n.UpdatedAt, &n.LastDiskPct, &n.DiskTotalGB, ); err != nil { return nil, err } out = append(out, n) } return out, rows.Err() } func scanJobs(rows pgx.Rows) ([]*models.RenderJob, error) { var out []*models.RenderJob for rows.Next() { j := &models.RenderJob{} if err := rows.Scan( &j.ID, &j.TenantID, &j.UserID, &j.SavedProjectID, &j.OriginalProjectID, &j.ProjectName, &j.Title, &j.Name, &j.ExternalJobID, &j.PriorityQueue, &j.PriorityScore, &j.Step, &j.RenderProgress, &j.ConvertProgress, &j.ImagePreviewB64, &j.PriceType, &j.PaidPriceMinor, &j.DiscountCode, &j.SupportFlatrender, &j.Mode, &j.Quality, &j.Resolution, &j.RHeight, &j.FrameRate, &j.Is60FPS, &j.DurationSec, &j.ExportDurationSec, &j.HasMusic, &j.HasSFX, &j.HasVoiceover, &j.MusicVolume, &j.SFXVolume, &j.VoiceoverVolume, &j.RenderNodeCount, &j.CurrentActiveNodes, &j.Region, &j.TellMeWhenDone, &j.RetryCount, &j.MaxRetries, &j.RepairAttempts, &j.FailedMessage, &j.FailedAtStep, &j.ExportID, &j.TaskStartDate, &j.QueuedAt, &j.StartedAt, &j.CompletedAt, &j.CreatedAt, &j.UpdatedAt, ); err != nil { return nil, err } out = append(out, j) } return out, rows.Err() } func scanFrameJobs(rows pgx.Rows) ([]*models.FrameJob, error) { var out []*models.FrameJob for rows.Next() { f := &models.FrameJob{} if err := rows.Scan( &f.ID, &f.RenderJobID, &f.NodeID, &f.StartFrame, &f.EndFrame, &f.CollectFrameCount, &f.OrderValue, &f.FolderName, &f.ConvertURL, &f.Status, &f.FramesRendered, &f.FramesValidated, &f.Attempt, &f.LastError, &f.OutputMP4URL, &f.AssignedAt, &f.StartedAt, &f.LastProgressAt, &f.CompletedAt, &f.CreatedAt, &f.UpdatedAt, ); err != nil { return nil, err } out = append(out, f) } return out, rows.Err() } func scanSnapshots(rows pgx.Rows) ([]*models.Snapshot, error) { var out []*models.Snapshot for rows.Next() { s := &models.Snapshot{} if err := rows.Scan( &s.ID, &s.TenantID, &s.UserID, &s.SavedProjectID, &s.SceneKey, &s.FrameNumber, &s.InputsHash, &s.Status, &s.RenderNodeID, &s.ImageURL, &s.ThumbnailURL, &s.Width, &s.Height, &s.SizeBytes, &s.RequestedAt, &s.CompletedAt, &s.DurationMS, &s.ExpiresAt, &s.ErrorMessage, &s.CreatedAt, ); err != nil { return nil, err } out = append(out, s) } return out, rows.Err() } func scanExports(rows pgx.Rows) ([]*models.Export, error) { var out []*models.Export for rows.Next() { e := &models.Export{} if err := rows.Scan( &e.ID, &e.TenantID, &e.UserID, &e.SavedProjectID, &e.ProjectID, &e.RenderJobID, &e.Image, &e.Path, &e.FileExtension, &e.FileType, &e.RenderQuality, &e.CreateType, &e.SizeBytes, &e.DurationSec, &e.Width, &e.Height, &e.ProduceDate, &e.AutoDeleteDate, &e.DeleteNotified, &e.CreatedAt, &e.DeletedAt, ); err != nil { return nil, err } out = append(out, e) } return out, rows.Err() }