4f04f6bf75
Render engine - Add Remotion (code-based) as a 2nd render engine alongside After Effects. node-agent dispatches on Job.Engine; RunRemotion maps bindings -> --props, renders native then ffmpeg-scales to the quality tier (aspect-preserving). - content.projects.render_engine + render_remotion_comp (migration 32); render-svc claim resolves engine and routes (skips .aep for Remotion). - Admin TemplatesAdmin gains an engine picker + Remotion composition id field. Template pack (services/remotion) - 16 branded, Persian (Vazirmatn), color- and text-editable templates, each in 3 aspects (16:9 / 1:1 / 9:16): LogoMotion, Opener, InstaPromo, YouTubeIntro, Slideshow, HappyBirthday, SalePromo, QuoteCard, EventInvite, Countdown, GlitterReveal (editable logo image), NowruzGreeting (animated characters), and 4 cinematic 3D templates via @remotion/three (Hero3D, Nowruz3D, Birthday3D, Promo3D) with reflections + bloom/DOF/vignette. - scripts/seed_remotion_templates.py seeds containers/projects/scenes/colors. Pricing - Rewrite /pricing to the seconds-based model (charge = length x resolution), data-driven from /v1/plans, Toman, broker checkout. Coming-soon - Persian experimental-build overlay on all pages (launch date + countdown). Fixes - middleware matcher bypasses all static asset paths; catalog mapping passes cover image + preview video so real thumbnails render. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
1165 lines
42 KiB
Go
1165 lines
42 KiB
Go
package db
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/flatrender/render-svc/internal/models"
|
|
"github.com/google/uuid"
|
|
"github.com/jackc/pgx/v5"
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
)
|
|
|
|
type Store struct {
|
|
pool *pgxpool.Pool
|
|
}
|
|
|
|
func NewStore(pool *pgxpool.Pool) *Store {
|
|
return &Store{pool: pool}
|
|
}
|
|
|
|
// ── Nodes ────────────────────────────────────────────────────────────────────
|
|
|
|
func (s *Store) ListNodes(ctx context.Context, region, status string) ([]*models.RenderNode, error) {
|
|
q := `SELECT id, name, region, node_ip::text, worker_port, public_endpoint,
|
|
ram_gb, cpu_cores, gpu_model, storage_gb,
|
|
current_ae_version::text, available_ae_versions, node_agent_version,
|
|
node_kind::text, owner_user_id, owner_tenant_id,
|
|
status::text, current_job_id, current_frame_job_id, job_started_at,
|
|
last_heartbeat_at, last_cpu_pct, last_ram_available_mb, ae_running,
|
|
lifetime_task_count, lifetime_crash_count, consecutive_failures,
|
|
priority, is_active, accepts_new_jobs,
|
|
last_maintenance_at, next_maintenance_at, maintenance_reason,
|
|
cached_template_md5s, cache_used_gb, created_at, updated_at,
|
|
last_disk_pct, disk_total_gb
|
|
FROM render.render_nodes
|
|
WHERE is_active = TRUE`
|
|
args := pgx.NamedArgs{}
|
|
if region != "" {
|
|
q += " AND region = @region"
|
|
args["region"] = region
|
|
}
|
|
if status != "" {
|
|
q += " AND status::text = @status"
|
|
args["status"] = status
|
|
}
|
|
q += " ORDER BY region, priority DESC"
|
|
|
|
rows, err := s.pool.Query(ctx, q, args)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
return scanNodes(rows)
|
|
}
|
|
|
|
func (s *Store) GetNodeByID(ctx context.Context, id uuid.UUID) (*models.RenderNode, error) {
|
|
rows, err := s.pool.Query(ctx, `
|
|
SELECT id, name, region, node_ip::text, worker_port, public_endpoint,
|
|
ram_gb, cpu_cores, gpu_model, storage_gb,
|
|
current_ae_version::text, available_ae_versions, node_agent_version,
|
|
node_kind::text, owner_user_id, owner_tenant_id,
|
|
status::text, current_job_id, current_frame_job_id, job_started_at,
|
|
last_heartbeat_at, last_cpu_pct, last_ram_available_mb, ae_running,
|
|
lifetime_task_count, lifetime_crash_count, consecutive_failures,
|
|
priority, is_active, accepts_new_jobs,
|
|
last_maintenance_at, next_maintenance_at, maintenance_reason,
|
|
cached_template_md5s, cache_used_gb, created_at, updated_at, last_disk_pct, disk_total_gb
|
|
FROM render.render_nodes WHERE id = $1`, id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
nodes, err := scanNodes(rows)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(nodes) == 0 {
|
|
return nil, fmt.Errorf("node not found")
|
|
}
|
|
return nodes[0], nil
|
|
}
|
|
|
|
func (s *Store) CreateNode(ctx context.Context, req *models.NodeCreateRequest) (*models.RenderNode, error) {
|
|
kind := "Shared"
|
|
if req.NodeKind != nil {
|
|
kind = *req.NodeKind
|
|
}
|
|
priority := 100
|
|
if req.Priority != nil {
|
|
priority = *req.Priority
|
|
}
|
|
var id uuid.UUID
|
|
err := s.pool.QueryRow(ctx, `
|
|
INSERT INTO render.render_nodes
|
|
(name, region, node_ip, worker_port, current_ae_version, ram_gb, cpu_cores,
|
|
node_kind, owner_user_id, priority, status)
|
|
VALUES ($1, $2, $3::inet, $4, $5::ae_version, $6, $7, $8::node_kind, $9, $10, 'Offline'::node_status)
|
|
RETURNING id`,
|
|
req.Name, req.Region, req.NodeIP, req.WorkerPort, req.CurrentAEVersion,
|
|
req.RamGB, req.CPUCores, kind, req.OwnerUserID, priority,
|
|
).Scan(&id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return s.GetNodeByID(ctx, id)
|
|
}
|
|
|
|
func (s *Store) PatchNode(ctx context.Context, id uuid.UUID, req *models.NodePatchRequest) (*models.RenderNode, error) {
|
|
if req.Priority != nil {
|
|
_, err := s.pool.Exec(ctx, `UPDATE render.render_nodes SET priority = $1, updated_at = NOW() WHERE id = $2`, *req.Priority, id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
if req.IsActive != nil {
|
|
_, err := s.pool.Exec(ctx, `UPDATE render.render_nodes SET is_active = $1, updated_at = NOW() WHERE id = $2`, *req.IsActive, id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
if req.AcceptsNewJobs != nil {
|
|
_, err := s.pool.Exec(ctx, `UPDATE render.render_nodes SET accepts_new_jobs = $1, updated_at = NOW() WHERE id = $2`, *req.AcceptsNewJobs, id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
if req.NodeKind != nil {
|
|
_, err := s.pool.Exec(ctx, `UPDATE render.render_nodes SET node_kind = $1::node_kind, updated_at = NOW() WHERE id = $2`, *req.NodeKind, id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
if req.OwnerUserID != nil {
|
|
_, err := s.pool.Exec(ctx, `UPDATE render.render_nodes SET owner_user_id = $1, updated_at = NOW() WHERE id = $2`, *req.OwnerUserID, id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
if req.NextMaintenanceAt != nil {
|
|
_, err := s.pool.Exec(ctx, `UPDATE render.render_nodes SET next_maintenance_at = $1, maintenance_reason = $2, updated_at = NOW() WHERE id = $3`, *req.NextMaintenanceAt, req.MaintenanceReason, id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
return s.GetNodeByID(ctx, id)
|
|
}
|
|
|
|
// DeleteNode permanently removes a render node.
|
|
func (s *Store) DeleteNode(ctx context.Context, id uuid.UUID) error {
|
|
_, err := s.pool.Exec(ctx, `DELETE FROM render.render_nodes WHERE id = $1`, id)
|
|
return err
|
|
}
|
|
|
|
func (s *Store) UpdateNodeHeartbeat(ctx context.Context, nodeID uuid.UUID, req *models.NodeHeartbeatRequest) error {
|
|
_, err := s.pool.Exec(ctx, `
|
|
UPDATE render.render_nodes SET
|
|
status = $1::node_status,
|
|
last_heartbeat_at = NOW(),
|
|
last_cpu_pct = $2,
|
|
last_ram_available_mb = $3,
|
|
last_disk_pct = $4,
|
|
disk_total_gb = COALESCE($5, disk_total_gb),
|
|
ae_running = COALESCE($6, ae_running),
|
|
current_job_id = $7,
|
|
current_frame_job_id = $8,
|
|
updated_at = NOW()
|
|
WHERE id = $9`,
|
|
req.Status, req.CPUPct, req.RAMAvailableMB, req.DiskUsedPct, req.DiskTotalGB,
|
|
req.AERunning, req.CurrentJobID, nil, nodeID,
|
|
)
|
|
return err
|
|
}
|
|
|
|
func (s *Store) UpdateNodeOnline(ctx context.Context, nodeID uuid.UUID, req *models.NodeOnlineRequest) error {
|
|
_, err := s.pool.Exec(ctx, `
|
|
UPDATE render.render_nodes SET
|
|
status = 'Ready'::node_status,
|
|
node_agent_version = $1,
|
|
current_ae_version = $2::ae_version,
|
|
available_ae_versions = $3,
|
|
ram_gb = COALESCE($4, ram_gb),
|
|
cpu_cores = COALESCE($5, cpu_cores),
|
|
cache_used_gb = COALESCE($6, cache_used_gb),
|
|
cached_template_md5s = $7,
|
|
last_heartbeat_at = NOW(),
|
|
updated_at = NOW()
|
|
WHERE id = $8`,
|
|
req.NodeAgentVersion, req.CurrentAEVersion, req.AvailableAEVersions,
|
|
req.RamGB, req.CPUCores, req.CacheUsedGB, req.CachedTemplateMD5s, nodeID,
|
|
)
|
|
return err
|
|
}
|
|
|
|
func (s *Store) ReleaseNode(ctx context.Context, nodeID uuid.UUID) error {
|
|
_, err := s.pool.Exec(ctx, `
|
|
UPDATE render.render_nodes SET
|
|
status = 'Ready'::node_status,
|
|
current_job_id = NULL,
|
|
current_frame_job_id = NULL,
|
|
job_started_at = NULL,
|
|
updated_at = NOW()
|
|
WHERE id = $1`, nodeID)
|
|
return err
|
|
}
|
|
|
|
func (s *Store) ListNodeHealthHistory(ctx context.Context, nodeID uuid.UUID, from, to time.Time) ([]*models.NodeHealthLog, error) {
|
|
rows, err := s.pool.Query(ctx, `
|
|
SELECT id, node_id, recorded_at, status::text, cpu_pct, ram_available_mb,
|
|
ae_running, current_job_id, current_frame, cache_used_gb
|
|
FROM render.node_health_logs
|
|
WHERE node_id = $1 AND recorded_at BETWEEN $2 AND $3
|
|
ORDER BY recorded_at DESC LIMIT 1000`, nodeID, from, to)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
var out []*models.NodeHealthLog
|
|
for rows.Next() {
|
|
h := &models.NodeHealthLog{}
|
|
if err := rows.Scan(&h.ID, &h.NodeID, &h.RecordedAt, &h.Status,
|
|
&h.CPUPct, &h.RAMAvailableMB, &h.AERunning, &h.CurrentJobID,
|
|
&h.CurrentFrame, &h.CacheUsedGB); err != nil {
|
|
return nil, err
|
|
}
|
|
out = append(out, h)
|
|
}
|
|
return out, rows.Err()
|
|
}
|
|
|
|
func (s *Store) ListNodeCrashes(ctx context.Context, nodeID uuid.UUID) ([]*models.NodeCrash, error) {
|
|
rows, err := s.pool.Query(ctx, `
|
|
SELECT id, node_id, render_job_id, frame_job_id, crashed_at,
|
|
last_known_frame, crash_signal, error_log, log_file_url,
|
|
auto_recovered, recovery_action, recovered_at, created_at
|
|
FROM render.node_crashes
|
|
WHERE node_id = $1 ORDER BY crashed_at DESC LIMIT 100`, nodeID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
var out []*models.NodeCrash
|
|
for rows.Next() {
|
|
c := &models.NodeCrash{}
|
|
if err := rows.Scan(&c.ID, &c.NodeID, &c.RenderJobID, &c.FrameJobID, &c.CrashedAt,
|
|
&c.LastKnownFrame, &c.CrashSignal, &c.ErrorLog, &c.LogFileURL,
|
|
&c.AutoRecovered, &c.RecoveryAction, &c.RecoveredAt, &c.CreatedAt); err != nil {
|
|
return nil, err
|
|
}
|
|
out = append(out, c)
|
|
}
|
|
return out, rows.Err()
|
|
}
|
|
|
|
func (s *Store) InsertCrash(ctx context.Context, nodeID, jobID uuid.UUID, req *models.CrashReportRequest) error {
|
|
_, err := s.pool.Exec(ctx, `
|
|
INSERT INTO render.node_crashes
|
|
(node_id, render_job_id, frame_job_id, last_known_frame, crash_signal, error_log, log_file_url)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7)`,
|
|
nodeID, jobID, req.FrameJobID, req.LastKnownFrame, req.CrashSignal,
|
|
req.ErrorLogTail, req.LogFileURL)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// Increment crash counts
|
|
_, err = s.pool.Exec(ctx, `
|
|
UPDATE render.render_nodes SET
|
|
lifetime_crash_count = lifetime_crash_count + 1,
|
|
consecutive_failures = consecutive_failures + 1,
|
|
updated_at = NOW()
|
|
WHERE id = $1`, nodeID)
|
|
return err
|
|
}
|
|
|
|
func (s *Store) UpdateNodeCache(ctx context.Context, nodeID uuid.UUID, req *models.CacheUpdateRequest) error {
|
|
switch req.Action {
|
|
case "Downloaded":
|
|
if req.ProjectID != nil && req.FileSizeBytes != nil {
|
|
_, err := s.pool.Exec(ctx, `
|
|
INSERT INTO render.node_template_cache (node_id, project_id, aep_file_md5, file_size_bytes, local_path)
|
|
VALUES ($1, $2, $3, $4, '')
|
|
ON CONFLICT (node_id, aep_file_md5) DO UPDATE SET
|
|
last_used_at = NOW(), use_count = node_template_cache.use_count + 1`,
|
|
nodeID, *req.ProjectID, req.AEPFileMD5, *req.FileSizeBytes)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if req.CacheUsedGB != nil {
|
|
_, _ = s.pool.Exec(ctx, `UPDATE render.render_nodes SET cache_used_gb = $1, updated_at = NOW() WHERE id = $2`, *req.CacheUsedGB, nodeID)
|
|
}
|
|
case "Evicted":
|
|
_, err := s.pool.Exec(ctx, `DELETE FROM render.node_template_cache WHERE node_id = $1 AND aep_file_md5 = $2`, nodeID, req.AEPFileMD5)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if req.CacheUsedGB != nil {
|
|
_, _ = s.pool.Exec(ctx, `UPDATE render.render_nodes SET cache_used_gb = $1, updated_at = NOW() WHERE id = $2`, *req.CacheUsedGB, nodeID)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ── Render Jobs ───────────────────────────────────────────────────────────────
|
|
|
|
func (s *Store) ListJobs(ctx context.Context, userID uuid.UUID, status string, page, pageSize int) ([]*models.RenderJob, int64, error) {
|
|
q := `SELECT id, tenant_id, user_id, saved_project_id, original_project_id,
|
|
project_name, title, name, external_job_id, priority_queue::text, priority_score,
|
|
step::text, render_progress, convert_progress, image_preview_b64,
|
|
price_type::text, paid_price_minor, discount_code, support_flatrender,
|
|
mode, quality::text, resolution, r_height, frame_rate, is_60_fps,
|
|
duration_sec, export_duration_sec,
|
|
has_music, has_sfx, has_voiceover, music_volume, sfx_volume, voiceover_volume,
|
|
render_node_count, current_active_nodes, region, tell_me_when_done,
|
|
retry_count, max_retries, repair_attempts, failed_message, failed_at_step::text,
|
|
export_id, task_start_date, queued_at, started_at, completed_at, created_at, updated_at
|
|
FROM render.render_jobs WHERE user_id = $1`
|
|
args := []any{userID}
|
|
argIdx := 2
|
|
if status != "" {
|
|
q += fmt.Sprintf(" AND step::text = $%d", argIdx)
|
|
args = append(args, status)
|
|
argIdx++
|
|
}
|
|
|
|
var total int64
|
|
countQ := `SELECT COUNT(*) FROM render.render_jobs WHERE user_id = $1`
|
|
if status != "" {
|
|
countQ += fmt.Sprintf(" AND step::text = $2")
|
|
_ = s.pool.QueryRow(ctx, countQ, args...).Scan(&total)
|
|
} else {
|
|
_ = s.pool.QueryRow(ctx, countQ, userID).Scan(&total)
|
|
}
|
|
|
|
q += fmt.Sprintf(" ORDER BY created_at DESC LIMIT $%d OFFSET $%d", argIdx, argIdx+1)
|
|
args = append(args, pageSize, (page-1)*pageSize)
|
|
|
|
rows, err := s.pool.Query(ctx, q, args...)
|
|
if err != nil {
|
|
return nil, 0, err
|
|
}
|
|
defer rows.Close()
|
|
jobs, err := scanJobs(rows)
|
|
return jobs, total, err
|
|
}
|
|
|
|
func (s *Store) GetJobByID(ctx context.Context, id, userID uuid.UUID) (*models.RenderJob, error) {
|
|
rows, err := s.pool.Query(ctx, `
|
|
SELECT id, tenant_id, user_id, saved_project_id, original_project_id,
|
|
project_name, title, name, external_job_id, priority_queue::text, priority_score,
|
|
step::text, render_progress, convert_progress, image_preview_b64,
|
|
price_type::text, paid_price_minor, discount_code, support_flatrender,
|
|
mode, quality::text, resolution, r_height, frame_rate, is_60_fps,
|
|
duration_sec, export_duration_sec,
|
|
has_music, has_sfx, has_voiceover, music_volume, sfx_volume, voiceover_volume,
|
|
render_node_count, current_active_nodes, region, tell_me_when_done,
|
|
retry_count, max_retries, repair_attempts, failed_message, failed_at_step::text,
|
|
export_id, task_start_date, queued_at, started_at, completed_at, created_at, updated_at
|
|
FROM render.render_jobs WHERE id = $1 AND user_id = $2`, id, userID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
jobs, err := scanJobs(rows)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(jobs) == 0 {
|
|
return nil, fmt.Errorf("job not found")
|
|
}
|
|
return jobs[0], nil
|
|
}
|
|
|
|
// CountActiveJobs returns how many non-terminal render jobs the user currently has.
|
|
// Used to enforce the per-user concurrent-render ceiling.
|
|
func (s *Store) CountActiveJobs(ctx context.Context, userID uuid.UUID) (int, error) {
|
|
var n int
|
|
err := s.pool.QueryRow(ctx, `
|
|
SELECT COUNT(*) FROM render.render_jobs
|
|
WHERE user_id = $1
|
|
AND step NOT IN ('Done'::render_step, 'Failed'::render_step, 'Cancelled'::render_step)`,
|
|
userID).Scan(&n)
|
|
return n, err
|
|
}
|
|
|
|
// ListActiveJobs returns the user's in-flight render jobs (most recent first),
|
|
// lightweight projection for the app-wide mini progress widget.
|
|
func (s *Store) ListActiveJobs(ctx context.Context, userID uuid.UUID) ([]*models.RenderJob, error) {
|
|
rows, err := s.pool.Query(ctx, `
|
|
SELECT id, saved_project_id, name, step, render_progress, image_preview_b64, created_at
|
|
FROM render.render_jobs
|
|
WHERE user_id = $1
|
|
AND step NOT IN ('Done'::render_step, 'Failed'::render_step, 'Cancelled'::render_step)
|
|
ORDER BY created_at DESC`,
|
|
userID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var out []*models.RenderJob
|
|
for rows.Next() {
|
|
j := &models.RenderJob{}
|
|
if err := rows.Scan(&j.ID, &j.SavedProjectID, &j.Name, &j.Step,
|
|
&j.RenderProgress, &j.ImagePreviewB64, &j.CreatedAt); err != nil {
|
|
return nil, err
|
|
}
|
|
out = append(out, j)
|
|
}
|
|
return out, rows.Err()
|
|
}
|
|
|
|
// ResolutionHeight maps a quality-tier label to its output height in pixels.
|
|
// Used for r_height (stored) and the node's ffmpeg downscale.
|
|
func ResolutionHeight(resolution string) int {
|
|
switch strings.ToLower(strings.TrimSpace(resolution)) {
|
|
case "360p":
|
|
return 360
|
|
case "540p":
|
|
return 540
|
|
case "720p":
|
|
return 720
|
|
case "1080p", "fullhd":
|
|
return 1080
|
|
case "4k", "2160p":
|
|
return 2160
|
|
default:
|
|
return 1080
|
|
}
|
|
}
|
|
|
|
func (s *Store) CreateJob(ctx context.Context, userID, tenantID uuid.UUID, req *models.RenderJobCreateRequest) (*models.RenderJob, error) {
|
|
priceType := "Free"
|
|
if req.PriceType != nil {
|
|
priceType = *req.PriceType
|
|
}
|
|
frameRate := 30
|
|
if req.FrameRate != nil {
|
|
frameRate = *req.FrameRate
|
|
}
|
|
tellMe := true
|
|
if req.TellMeWhenDone != nil {
|
|
tellMe = *req.TellMeWhenDone
|
|
}
|
|
|
|
var id uuid.UUID
|
|
err := s.pool.QueryRow(ctx, `
|
|
INSERT INTO render.render_jobs
|
|
(tenant_id, user_id, saved_project_id, original_project_id,
|
|
priority_queue, step, price_type, quality, resolution, r_height,
|
|
frame_rate, is_60_fps, duration_sec, mode, tell_me_when_done, region)
|
|
VALUES ($1, $2, $3,
|
|
-- original_project_id = the TEMPLATE the saved project was created from
|
|
-- (the render bundle lives at templates/{original_project_id}/). Fall back
|
|
-- to the saved-project id only if the lookup is somehow null.
|
|
COALESCE((SELECT original_project_id FROM studio.saved_projects WHERE id = $3), $3),
|
|
'paid'::render_priority_queue, 'Queued'::render_step, $4::price_kind,
|
|
$5::render_quality, $6, $11, $7, COALESCE($8, FALSE),
|
|
0, 'FIX', $9, $10)
|
|
RETURNING id`,
|
|
tenantID, userID, req.SavedProjectID, priceType,
|
|
req.Quality, req.Resolution, frameRate, req.Is60FPS,
|
|
tellMe, req.PreferredRegion, ResolutionHeight(req.Resolution),
|
|
).Scan(&id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return s.GetJobByID(ctx, id, userID)
|
|
}
|
|
|
|
// CompleteJob marks a render job as Done. Returns the updated job so callers
|
|
// can read user_id / tenant_id for downstream notifications.
|
|
func (s *Store) CompleteJob(ctx context.Context, jobID uuid.UUID, exportID *uuid.UUID) (*models.RenderJob, error) {
|
|
_, err := s.pool.Exec(ctx, `
|
|
UPDATE render.render_jobs SET
|
|
step = 'Done'::render_step,
|
|
render_progress = 100,
|
|
export_id = COALESCE($1, export_id),
|
|
completed_at = NOW(),
|
|
updated_at = NOW()
|
|
WHERE id = $2
|
|
AND step NOT IN ('Done'::render_step, 'Failed'::render_step, 'Cancelled'::render_step)`,
|
|
exportID, jobID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return s.getJobByIDInternal(ctx, jobID)
|
|
}
|
|
|
|
// DevClaimNextQueued atomically picks the oldest Queued job and moves it to
|
|
// Preparing (started_at set). Returns (nil, nil) when the queue is empty.
|
|
//
|
|
// This is the dev/mock path: it claims WITHOUT assigning a render node, so the
|
|
// in-process dev worker can simulate a render end-to-end with no Windows AE node.
|
|
// Never enabled in production (gated by RENDER_DEV_WORKER).
|
|
func (s *Store) DevClaimNextQueued(ctx context.Context) (uuid.UUID, error) {
|
|
tx, err := s.pool.Begin(ctx)
|
|
if err != nil {
|
|
return uuid.Nil, err
|
|
}
|
|
defer func() { _ = tx.Rollback(ctx) }()
|
|
|
|
var jobID uuid.UUID
|
|
err = tx.QueryRow(ctx, `
|
|
SELECT id FROM render.render_jobs
|
|
WHERE step = 'Queued'::render_step
|
|
ORDER BY priority_score DESC, queued_at ASC
|
|
LIMIT 1 FOR UPDATE SKIP LOCKED`).Scan(&jobID)
|
|
if err != nil {
|
|
if err.Error() == "no rows in result set" {
|
|
return uuid.Nil, nil
|
|
}
|
|
return uuid.Nil, err
|
|
}
|
|
|
|
_, err = tx.Exec(ctx, `
|
|
UPDATE render.render_jobs SET
|
|
step = 'Preparing'::render_step,
|
|
started_at = COALESCE(started_at, NOW()),
|
|
updated_at = NOW()
|
|
WHERE id = $1`, jobID)
|
|
if err != nil {
|
|
return uuid.Nil, err
|
|
}
|
|
if err := tx.Commit(ctx); err != nil {
|
|
return uuid.Nil, err
|
|
}
|
|
return jobID, nil
|
|
}
|
|
|
|
// UpdateJobStepProgress sets the step + progress for a job (dev worker + future
|
|
// fine-grained progress). No-op on terminal jobs.
|
|
func (s *Store) UpdateJobStepProgress(ctx context.Context, jobID uuid.UUID, step string, progress int) error {
|
|
_, err := s.pool.Exec(ctx, `
|
|
UPDATE render.render_jobs SET
|
|
step = $1::render_step,
|
|
render_progress = $2,
|
|
updated_at = NOW()
|
|
WHERE id = $3
|
|
AND step NOT IN ('Done'::render_step, 'Failed'::render_step, 'Cancelled'::render_step)`,
|
|
step, progress, jobID)
|
|
return err
|
|
}
|
|
|
|
// FailJob marks a render job as Failed. Returns the updated job.
|
|
func (s *Store) FailJob(ctx context.Context, jobID uuid.UUID, reason, atStep string) (*models.RenderJob, error) {
|
|
_, err := s.pool.Exec(ctx, `
|
|
UPDATE render.render_jobs SET
|
|
step = 'Failed'::render_step,
|
|
failed_message = $1,
|
|
failed_at_step = $2::render_step,
|
|
completed_at = NOW(),
|
|
updated_at = NOW()
|
|
WHERE id = $3
|
|
AND step NOT IN ('Done'::render_step, 'Failed'::render_step, 'Cancelled'::render_step)`,
|
|
reason, atStep, jobID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return s.getJobByIDInternal(ctx, jobID)
|
|
}
|
|
|
|
// getJobByIDInternal retrieves a job by ID without an ownership check.
|
|
// Used only for internal operations (node agent callbacks).
|
|
func (s *Store) getJobByIDInternal(ctx context.Context, id uuid.UUID) (*models.RenderJob, error) {
|
|
rows, err := s.pool.Query(ctx, `
|
|
SELECT id, tenant_id, user_id, saved_project_id, original_project_id,
|
|
project_name, title, name, external_job_id, priority_queue::text, priority_score,
|
|
step::text, render_progress, convert_progress, image_preview_b64,
|
|
price_type::text, paid_price_minor, discount_code, support_flatrender,
|
|
mode, quality::text, resolution, r_height, frame_rate, is_60_fps,
|
|
duration_sec, export_duration_sec,
|
|
has_music, has_sfx, has_voiceover, music_volume, sfx_volume, voiceover_volume,
|
|
render_node_count, current_active_nodes, region, tell_me_when_done,
|
|
retry_count, max_retries, repair_attempts, failed_message, failed_at_step::text,
|
|
export_id, task_start_date, queued_at, started_at, completed_at, created_at, updated_at
|
|
FROM render.render_jobs WHERE id = $1`, id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
jobs, err := scanJobs(rows)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(jobs) == 0 {
|
|
return nil, fmt.Errorf("job %s not found", id)
|
|
}
|
|
return jobs[0], nil
|
|
}
|
|
|
|
// ClaimJob atomically picks the highest-priority Queued job (optionally filtered
|
|
// by region) and moves it to Preparing, setting the current_job_id on the node.
|
|
// Returns (nil, nil) when there is nothing to do.
|
|
func (s *Store) ClaimJob(ctx context.Context, nodeID uuid.UUID, region string) (*models.RenderJob, error) {
|
|
tx, err := s.pool.Begin(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer func() { _ = tx.Rollback(ctx) }()
|
|
|
|
q := `SELECT id FROM render.render_jobs
|
|
WHERE step = 'Queued'::render_step`
|
|
args := []any{}
|
|
argIdx := 1
|
|
if region != "" {
|
|
q += fmt.Sprintf(" AND (region IS NULL OR region = $%d)", argIdx)
|
|
args = append(args, region)
|
|
argIdx++
|
|
}
|
|
q += " ORDER BY priority_score DESC, queued_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED"
|
|
|
|
var jobID uuid.UUID
|
|
if err := tx.QueryRow(ctx, q, args...).Scan(&jobID); err != nil {
|
|
if err.Error() == "no rows in result set" {
|
|
return nil, nil // nothing to do
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
// Advance to Preparing and assign to this node
|
|
_, err = tx.Exec(ctx, `
|
|
UPDATE render.render_jobs SET
|
|
step = 'Preparing'::render_step,
|
|
started_at = COALESCE(started_at, NOW()),
|
|
updated_at = NOW()
|
|
WHERE id = $1`, jobID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
_, err = tx.Exec(ctx, `
|
|
UPDATE render.render_nodes SET
|
|
status = 'Busy'::node_status,
|
|
current_job_id = $1,
|
|
job_started_at = NOW(),
|
|
updated_at = NOW()
|
|
WHERE id = $2`, jobID, nodeID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := tx.Commit(ctx); err != nil {
|
|
return nil, err
|
|
}
|
|
return s.getJobByIDInternal(ctx, jobID)
|
|
}
|
|
|
|
// CreateExportForJob allocates a new Export row for a completed render job.
|
|
// The export starts with a placeholder path `exports/{export_id}/output.mp4`.
|
|
// The node agent uploads the MP4 to that MinIO path, then calls CompleteJob
|
|
// with the returned export_id.
|
|
// GetTemplateCompName returns the After Effects composition to render for a
|
|
// template (content.projects.render_aep_comp), e.g. "frfinal". aerender needs
|
|
// this via -comp; without it AE opens the project but renders nothing.
|
|
func (s *Store) GetTemplateCompName(ctx context.Context, originalProjectID uuid.UUID) (string, error) {
|
|
var comp *string
|
|
err := s.pool.QueryRow(ctx,
|
|
`SELECT render_aep_comp FROM content.projects WHERE id = $1`, originalProjectID).Scan(&comp)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
if comp == nil {
|
|
return "", nil
|
|
}
|
|
return *comp, nil
|
|
}
|
|
|
|
// TemplateRenderConfig describes how a template should be rendered.
|
|
type TemplateRenderConfig struct {
|
|
// Engine is "AfterEffects" or "Remotion".
|
|
Engine string
|
|
// CompName is the composition to render: render_aep_comp for AE, or
|
|
// render_remotion_comp for Remotion.
|
|
CompName string
|
|
}
|
|
|
|
// GetTemplateRenderConfig resolves the render engine + composition for a template.
|
|
// For Remotion templates the composition id comes from render_remotion_comp; for
|
|
// After Effects it comes from render_aep_comp. Defaults to AfterEffects when the
|
|
// render_engine column is missing/empty (older rows pre-migration).
|
|
func (s *Store) GetTemplateRenderConfig(ctx context.Context, originalProjectID uuid.UUID) (TemplateRenderConfig, error) {
|
|
var engine *string
|
|
var aepComp, remotionComp *string
|
|
err := s.pool.QueryRow(ctx,
|
|
`SELECT render_engine, render_aep_comp, render_remotion_comp
|
|
FROM content.projects WHERE id = $1`, originalProjectID).Scan(&engine, &aepComp, &remotionComp)
|
|
if err != nil {
|
|
return TemplateRenderConfig{}, err
|
|
}
|
|
cfg := TemplateRenderConfig{Engine: "AfterEffects"}
|
|
if engine != nil && *engine != "" {
|
|
cfg.Engine = *engine
|
|
}
|
|
if cfg.Engine == "Remotion" {
|
|
if remotionComp != nil {
|
|
cfg.CompName = *remotionComp
|
|
}
|
|
} else if aepComp != nil {
|
|
cfg.CompName = *aepComp
|
|
}
|
|
return cfg, nil
|
|
}
|
|
|
|
// GetRenderBindings returns the user's edited input values for a saved project so the
|
|
// node can write them into the AE project before rendering (the render binder). Only
|
|
// inputs with a non-empty value are returned (defaults are already in the template).
|
|
func (s *Store) GetRenderBindings(ctx context.Context, savedProjectID uuid.UUID) ([]models.RenderBinding, error) {
|
|
rows, err := s.pool.Query(ctx, `
|
|
SELECT c.key, c.type, COALESCE(c.value, '')
|
|
FROM studio.saved_scene_contents c
|
|
JOIN studio.saved_scenes s ON s.id = c.saved_scene_id
|
|
WHERE s.saved_project_id = $1 AND c.value IS NOT NULL AND c.value <> ''
|
|
UNION
|
|
-- project-wide shared colours (frshare/frd_* layers): bind.jsx writes these
|
|
-- into the frshare comp's text layers so template expressions propagate them.
|
|
SELECT sc.element_key, 'color', sc.value
|
|
FROM studio.saved_shared_colors sc
|
|
WHERE sc.saved_project_id = $1 AND sc.value IS NOT NULL AND sc.value <> ''
|
|
UNION
|
|
-- per-scene colours (frl_c* layers)
|
|
SELECT cc.element_key, 'color', cc.value
|
|
FROM studio.saved_scene_colors cc
|
|
JOIN studio.saved_scenes s2 ON s2.id = cc.saved_scene_id
|
|
WHERE s2.saved_project_id = $1 AND cc.value IS NOT NULL AND cc.value <> ''`,
|
|
savedProjectID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
var out []models.RenderBinding
|
|
for rows.Next() {
|
|
var b models.RenderBinding
|
|
if err := rows.Scan(&b.Key, &b.Type, &b.Value); err != nil {
|
|
return nil, err
|
|
}
|
|
out = append(out, b)
|
|
}
|
|
return out, rows.Err()
|
|
}
|
|
|
|
func (s *Store) CreateExportForJob(ctx context.Context, jobID uuid.UUID) (*models.Export, error) {
|
|
// Look up the job to get tenant/user/project context
|
|
job, err := s.getJobByIDInternal(ctx, jobID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("job not found: %w", err)
|
|
}
|
|
|
|
exportID := uuid.New()
|
|
path := fmt.Sprintf("exports/%s/output.mp4", exportID)
|
|
now := time.Now()
|
|
autoDelete := now.AddDate(0, 0, 30) // 30-day retention
|
|
|
|
_, err = s.pool.Exec(ctx, `
|
|
INSERT INTO render.exports
|
|
(id, tenant_id, user_id, saved_project_id, project_id,
|
|
render_job_id, path, file_extension, file_type, render_quality,
|
|
create_type, size_bytes, produce_date, auto_delete_date,
|
|
delete_notified, created_at)
|
|
VALUES
|
|
($1, $2, $3, $4, $5,
|
|
$6, $7, 'mp4', 'Video'::render.export_file_type, $8::render.render_quality,
|
|
'Render'::render.export_create_type, 0, $9, $10,
|
|
false, $9)`,
|
|
exportID, job.TenantID, job.UserID, job.SavedProjectID, job.OriginalProjectID,
|
|
job.ID, path, job.Quality,
|
|
now, autoDelete,
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("create export: %w", err)
|
|
}
|
|
|
|
return &models.Export{
|
|
ID: exportID,
|
|
TenantID: job.TenantID,
|
|
UserID: job.UserID,
|
|
SavedProjectID: job.SavedProjectID,
|
|
Path: path,
|
|
FileExtension: "mp4",
|
|
FileType: "video",
|
|
RenderQuality: job.Quality,
|
|
CreateType: "render",
|
|
ProduceDate: now,
|
|
AutoDeleteDate: autoDelete,
|
|
CreatedAt: now,
|
|
}, nil
|
|
}
|
|
|
|
// UpdateJobPreview stores a base64-encoded preview frame for a running job.
|
|
// Called by the node agent every N frames to power the live preview UI.
|
|
func (s *Store) UpdateJobPreview(ctx context.Context, jobID uuid.UUID, imageB64 string) error {
|
|
_, err := s.pool.Exec(ctx, `
|
|
UPDATE render.render_jobs
|
|
SET image_preview_b64 = $1, updated_at = NOW()
|
|
WHERE id = $2
|
|
AND step NOT IN ('Done'::render_step, 'Failed'::render_step, 'Cancelled'::render_step)`,
|
|
imageB64, jobID)
|
|
return err
|
|
}
|
|
|
|
// UpdateJobProgress stores the render percentage for a running job and advances the
|
|
// step to 'Rendering' once it actually starts (so the UI shows progress + ETA, not a
|
|
// stuck 'Preparing'). Called by the node agent every few seconds during a render.
|
|
func (s *Store) UpdateJobProgress(ctx context.Context, jobID uuid.UUID, progress int) error {
|
|
if progress < 0 {
|
|
progress = 0
|
|
}
|
|
if progress > 100 {
|
|
progress = 100
|
|
}
|
|
_, err := s.pool.Exec(ctx, `
|
|
UPDATE render.render_jobs
|
|
SET render_progress = $1,
|
|
step = CASE WHEN step IN ('Queued'::render_step, 'Preparing'::render_step) AND $1 > 0
|
|
THEN 'Rendering'::render_step ELSE step END,
|
|
started_at = COALESCE(started_at, NOW()),
|
|
updated_at = NOW()
|
|
WHERE id = $2
|
|
AND step NOT IN ('Done'::render_step, 'Failed'::render_step, 'Cancelled'::render_step)`,
|
|
progress, jobID)
|
|
return err
|
|
}
|
|
|
|
func (s *Store) CancelJob(ctx context.Context, id, userID uuid.UUID) (bool, error) {
|
|
tag, err := s.pool.Exec(ctx, `
|
|
UPDATE render.render_jobs
|
|
SET step = 'Cancelled'::render_step, completed_at = NOW(), updated_at = NOW()
|
|
WHERE id = $1 AND user_id = $2 AND step NOT IN ('Done','Failed','Cancelled')`,
|
|
id, userID)
|
|
return tag.RowsAffected() > 0, err
|
|
}
|
|
|
|
// StopJob cancels any in-progress job regardless of owner (admin action), frees the
|
|
// assigned node, and returns the job's owner id so the caller can refund their charge.
|
|
func (s *Store) StopJob(ctx context.Context, id uuid.UUID) (bool, uuid.UUID, error) {
|
|
var ownerID uuid.UUID
|
|
err := s.pool.QueryRow(ctx, `
|
|
UPDATE render.render_jobs
|
|
SET step = 'Cancelled'::render_step, completed_at = NOW(), updated_at = NOW()
|
|
WHERE id = $1 AND step NOT IN ('Done','Failed','Cancelled')
|
|
RETURNING user_id`, id).Scan(&ownerID)
|
|
if err == pgx.ErrNoRows {
|
|
return false, uuid.Nil, nil
|
|
}
|
|
if err != nil {
|
|
return false, uuid.Nil, err
|
|
}
|
|
// Release any node that was actively working this job.
|
|
_, _ = s.pool.Exec(ctx,
|
|
`UPDATE render.render_nodes SET status = 'Ready'::node_status, current_frame_job_id = NULL, updated_at = NOW()
|
|
WHERE current_frame_job_id IN (SELECT id FROM render.frame_jobs WHERE render_job_id = $1)`, id)
|
|
return true, ownerID, nil
|
|
}
|
|
|
|
func (s *Store) GetJobProgress(ctx context.Context, id, userID uuid.UUID) (*models.RenderJob, error) {
|
|
return s.GetJobByID(ctx, id, userID)
|
|
}
|
|
|
|
func (s *Store) ListFrameJobs(ctx context.Context, renderJobID uuid.UUID) ([]*models.FrameJob, error) {
|
|
rows, err := s.pool.Query(ctx, `
|
|
SELECT id, render_job_id, node_id, start_frame, end_frame, collect_frame_count,
|
|
order_value, folder_name, convert_url, status::text, frames_rendered,
|
|
frames_validated, attempt, last_error, output_mp4_url,
|
|
assigned_at, started_at, last_progress_at, completed_at, created_at, updated_at
|
|
FROM render.frame_jobs WHERE render_job_id = $1 ORDER BY order_value`, renderJobID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
return scanFrameJobs(rows)
|
|
}
|
|
|
|
func (s *Store) UpdateFrameProgress(ctx context.Context, jobID uuid.UUID, req *models.FrameProgressRequest) error {
|
|
_, err := s.pool.Exec(ctx, `
|
|
UPDATE render.frame_jobs SET
|
|
frames_rendered = frames_rendered + 1,
|
|
last_progress_at = NOW(),
|
|
status = CASE WHEN $1 IS NOT NULL THEN 'Validated'::frame_job_status ELSE status END,
|
|
completed_at = $1,
|
|
updated_at = NOW()
|
|
WHERE id = $2`,
|
|
req.CompletedAt, req.FrameJobID)
|
|
return err
|
|
}
|
|
|
|
// ── Snapshots ────────────────────────────────────────────────────────────────
|
|
|
|
func (s *Store) GetCachedSnapshot(ctx context.Context, projectID uuid.UUID, sceneKey string, frame int, hash string) (*models.Snapshot, error) {
|
|
rows, err := s.pool.Query(ctx, `
|
|
SELECT id, tenant_id, user_id, saved_project_id, scene_key, frame_number,
|
|
inputs_hash, status, render_node_id, image_url, thumbnail_url,
|
|
width, height, size_bytes, requested_at, completed_at, duration_ms,
|
|
expires_at, error_message, created_at
|
|
FROM render.snapshots
|
|
WHERE saved_project_id = $1 AND scene_key = $2 AND frame_number = $3
|
|
AND inputs_hash = $4 AND status = 'Done' AND expires_at > NOW()
|
|
LIMIT 1`, projectID, sceneKey, frame, hash)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
snaps, err := scanSnapshots(rows)
|
|
if err != nil || len(snaps) == 0 {
|
|
return nil, err
|
|
}
|
|
return snaps[0], nil
|
|
}
|
|
|
|
func (s *Store) CreateSnapshot(ctx context.Context, userID, tenantID uuid.UUID, req *models.SnapshotCreateRequest, hash string) (*models.Snapshot, error) {
|
|
var id uuid.UUID
|
|
err := s.pool.QueryRow(ctx, `
|
|
INSERT INTO render.snapshots
|
|
(tenant_id, user_id, saved_project_id, scene_key, frame_number, inputs_hash, status)
|
|
VALUES ($1, $2, $3, $4, $5, $6, 'Pending')
|
|
RETURNING id`,
|
|
tenantID, userID, req.SavedProjectID, req.SceneKey, req.FrameNumber, hash,
|
|
).Scan(&id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return s.GetSnapshotByID(ctx, id)
|
|
}
|
|
|
|
func (s *Store) GetSnapshotByID(ctx context.Context, id uuid.UUID) (*models.Snapshot, error) {
|
|
rows, err := s.pool.Query(ctx, `
|
|
SELECT id, tenant_id, user_id, saved_project_id, scene_key, frame_number,
|
|
inputs_hash, status, render_node_id, image_url, thumbnail_url,
|
|
width, height, size_bytes, requested_at, completed_at, duration_ms,
|
|
expires_at, error_message, created_at
|
|
FROM render.snapshots WHERE id = $1`, id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
snaps, err := scanSnapshots(rows)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(snaps) == 0 {
|
|
return nil, fmt.Errorf("snapshot not found")
|
|
}
|
|
return snaps[0], nil
|
|
}
|
|
|
|
// ── Exports ──────────────────────────────────────────────────────────────────
|
|
|
|
func (s *Store) ListExports(ctx context.Context, userID uuid.UUID, page, pageSize int) ([]*models.Export, int64, error) {
|
|
var total int64
|
|
_ = s.pool.QueryRow(ctx,
|
|
`SELECT COUNT(*) FROM render.exports WHERE user_id = $1 AND deleted_at IS NULL`, userID).Scan(&total)
|
|
|
|
rows, err := s.pool.Query(ctx, `
|
|
SELECT id, tenant_id, user_id, saved_project_id, project_id, render_job_id,
|
|
image, path, file_extension, file_type::text, render_quality::text,
|
|
create_type::text, size_bytes, duration_sec, width, height,
|
|
produce_date, auto_delete_date, delete_notified, created_at, deleted_at
|
|
FROM render.exports WHERE user_id = $1 AND deleted_at IS NULL
|
|
ORDER BY produce_date DESC LIMIT $2 OFFSET $3`,
|
|
userID, pageSize, (page-1)*pageSize)
|
|
if err != nil {
|
|
return nil, 0, err
|
|
}
|
|
defer rows.Close()
|
|
exports, err := scanExports(rows)
|
|
return exports, total, err
|
|
}
|
|
|
|
func (s *Store) GetExportByID(ctx context.Context, id, userID uuid.UUID) (*models.Export, error) {
|
|
rows, err := s.pool.Query(ctx, `
|
|
SELECT id, tenant_id, user_id, saved_project_id, project_id, render_job_id,
|
|
image, path, file_extension, file_type::text, render_quality::text,
|
|
create_type::text, size_bytes, duration_sec, width, height,
|
|
produce_date, auto_delete_date, delete_notified, created_at, deleted_at
|
|
FROM render.exports WHERE id = $1 AND user_id = $2 AND deleted_at IS NULL`, id, userID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
exports, err := scanExports(rows)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(exports) == 0 {
|
|
return nil, fmt.Errorf("export not found")
|
|
}
|
|
return exports[0], nil
|
|
}
|
|
|
|
func (s *Store) ListExportFiles(ctx context.Context, exportID uuid.UUID) ([]*models.ExportFile, error) {
|
|
rows, err := s.pool.Query(ctx, `
|
|
SELECT id, export_id, user_id, name, thumbnail, path, size_bytes,
|
|
file_type::text, width, height, sort, created_at
|
|
FROM render.export_files WHERE export_id = $1 ORDER BY sort`, exportID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
var out []*models.ExportFile
|
|
for rows.Next() {
|
|
f := &models.ExportFile{}
|
|
if err := rows.Scan(&f.ID, &f.ExportID, &f.UserID, &f.Name, &f.Thumbnail,
|
|
&f.Path, &f.SizeBytes, &f.FileType, &f.Width, &f.Height, &f.Sort, &f.CreatedAt); err != nil {
|
|
return nil, err
|
|
}
|
|
out = append(out, f)
|
|
}
|
|
return out, rows.Err()
|
|
}
|
|
|
|
func (s *Store) SoftDeleteExport(ctx context.Context, id, userID uuid.UUID) error {
|
|
tag, err := s.pool.Exec(ctx, `
|
|
UPDATE render.exports SET deleted_at = NOW() WHERE id = $1 AND user_id = $2 AND deleted_at IS NULL`, id, userID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if tag.RowsAffected() == 0 {
|
|
return fmt.Errorf("export not found")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *Store) ExtendExportDeleteDate(ctx context.Context, id, userID uuid.UUID, days int) (time.Time, error) {
|
|
var newDate time.Time
|
|
err := s.pool.QueryRow(ctx, `
|
|
UPDATE render.exports SET auto_delete_date = auto_delete_date + ($1 || ' days')::interval
|
|
WHERE id = $2 AND user_id = $3 AND deleted_at IS NULL
|
|
RETURNING auto_delete_date`, days, id, userID).Scan(&newDate)
|
|
return newDate, err
|
|
}
|
|
|
|
// ── Node Updates ─────────────────────────────────────────────────────────────
|
|
|
|
func (s *Store) ListNodeUpdates(ctx context.Context) ([]*models.NodeUpdate, error) {
|
|
rows, err := s.pool.Query(ctx, `
|
|
SELECT id, update_file_name, update_number, description, target_ae_version::text,
|
|
in_update_queue, rolled_out_to_node_ids, last_update_queue_date, create_date, created_at
|
|
FROM render.node_updates ORDER BY update_number DESC`)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
var out []*models.NodeUpdate
|
|
for rows.Next() {
|
|
u := &models.NodeUpdate{}
|
|
if err := rows.Scan(&u.ID, &u.UpdateFileName, &u.UpdateNumber, &u.Description,
|
|
&u.TargetAEVersion, &u.InUpdateQueue, &u.RolledOutToNodeIDs,
|
|
&u.LastUpdateQueueDate, &u.CreateDate, &u.CreatedAt); err != nil {
|
|
return nil, err
|
|
}
|
|
out = append(out, u)
|
|
}
|
|
return out, rows.Err()
|
|
}
|
|
|
|
func (s *Store) QueueUpdateRollout(ctx context.Context, updateID uuid.UUID, nodeIDs []uuid.UUID) error {
|
|
_, err := s.pool.Exec(ctx, `
|
|
UPDATE render.node_updates SET
|
|
in_update_queue = TRUE,
|
|
last_update_queue_date = NOW(),
|
|
rolled_out_to_node_ids = rolled_out_to_node_ids || $1
|
|
WHERE id = $2`, nodeIDs, updateID)
|
|
return err
|
|
}
|
|
|
|
// ── Scanners ─────────────────────────────────────────────────────────────────
|
|
|
|
func scanNodes(rows pgx.Rows) ([]*models.RenderNode, error) {
|
|
var out []*models.RenderNode
|
|
for rows.Next() {
|
|
n := &models.RenderNode{}
|
|
if err := rows.Scan(
|
|
&n.ID, &n.Name, &n.Region, &n.NodeIP, &n.WorkerPort, &n.PublicEndpoint,
|
|
&n.RamGB, &n.CPUCores, &n.GPUModel, &n.StorageGB,
|
|
&n.CurrentAEVersion, &n.AvailableAEVersions, &n.NodeAgentVersion,
|
|
&n.NodeKind, &n.OwnerUserID, &n.OwnerTenantID,
|
|
&n.Status, &n.CurrentJobID, &n.CurrentFrameJobID, &n.JobStartedAt,
|
|
&n.LastHeartbeatAt, &n.LastCPUPct, &n.LastRAMAvailableMB, &n.AERunning,
|
|
&n.LifetimeTaskCount, &n.LifetimeCrashCount, &n.ConsecutiveFailures,
|
|
&n.Priority, &n.IsActive, &n.AcceptsNewJobs,
|
|
&n.LastMaintenanceAt, &n.NextMaintenanceAt, &n.MaintenanceReason,
|
|
&n.CachedTemplateMD5s, &n.CacheUsedGB, &n.CreatedAt, &n.UpdatedAt,
|
|
&n.LastDiskPct, &n.DiskTotalGB,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
out = append(out, n)
|
|
}
|
|
return out, rows.Err()
|
|
}
|
|
|
|
func scanJobs(rows pgx.Rows) ([]*models.RenderJob, error) {
|
|
var out []*models.RenderJob
|
|
for rows.Next() {
|
|
j := &models.RenderJob{}
|
|
if err := rows.Scan(
|
|
&j.ID, &j.TenantID, &j.UserID, &j.SavedProjectID, &j.OriginalProjectID,
|
|
&j.ProjectName, &j.Title, &j.Name, &j.ExternalJobID, &j.PriorityQueue, &j.PriorityScore,
|
|
&j.Step, &j.RenderProgress, &j.ConvertProgress, &j.ImagePreviewB64,
|
|
&j.PriceType, &j.PaidPriceMinor, &j.DiscountCode, &j.SupportFlatrender,
|
|
&j.Mode, &j.Quality, &j.Resolution, &j.RHeight, &j.FrameRate, &j.Is60FPS,
|
|
&j.DurationSec, &j.ExportDurationSec,
|
|
&j.HasMusic, &j.HasSFX, &j.HasVoiceover, &j.MusicVolume, &j.SFXVolume, &j.VoiceoverVolume,
|
|
&j.RenderNodeCount, &j.CurrentActiveNodes, &j.Region, &j.TellMeWhenDone,
|
|
&j.RetryCount, &j.MaxRetries, &j.RepairAttempts, &j.FailedMessage, &j.FailedAtStep,
|
|
&j.ExportID, &j.TaskStartDate, &j.QueuedAt, &j.StartedAt, &j.CompletedAt,
|
|
&j.CreatedAt, &j.UpdatedAt,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
out = append(out, j)
|
|
}
|
|
return out, rows.Err()
|
|
}
|
|
|
|
func scanFrameJobs(rows pgx.Rows) ([]*models.FrameJob, error) {
|
|
var out []*models.FrameJob
|
|
for rows.Next() {
|
|
f := &models.FrameJob{}
|
|
if err := rows.Scan(
|
|
&f.ID, &f.RenderJobID, &f.NodeID, &f.StartFrame, &f.EndFrame, &f.CollectFrameCount,
|
|
&f.OrderValue, &f.FolderName, &f.ConvertURL, &f.Status, &f.FramesRendered,
|
|
&f.FramesValidated, &f.Attempt, &f.LastError, &f.OutputMP4URL,
|
|
&f.AssignedAt, &f.StartedAt, &f.LastProgressAt, &f.CompletedAt, &f.CreatedAt, &f.UpdatedAt,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
out = append(out, f)
|
|
}
|
|
return out, rows.Err()
|
|
}
|
|
|
|
func scanSnapshots(rows pgx.Rows) ([]*models.Snapshot, error) {
|
|
var out []*models.Snapshot
|
|
for rows.Next() {
|
|
s := &models.Snapshot{}
|
|
if err := rows.Scan(
|
|
&s.ID, &s.TenantID, &s.UserID, &s.SavedProjectID, &s.SceneKey, &s.FrameNumber,
|
|
&s.InputsHash, &s.Status, &s.RenderNodeID, &s.ImageURL, &s.ThumbnailURL,
|
|
&s.Width, &s.Height, &s.SizeBytes, &s.RequestedAt, &s.CompletedAt,
|
|
&s.DurationMS, &s.ExpiresAt, &s.ErrorMessage, &s.CreatedAt,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
out = append(out, s)
|
|
}
|
|
return out, rows.Err()
|
|
}
|
|
|
|
func scanExports(rows pgx.Rows) ([]*models.Export, error) {
|
|
var out []*models.Export
|
|
for rows.Next() {
|
|
e := &models.Export{}
|
|
if err := rows.Scan(
|
|
&e.ID, &e.TenantID, &e.UserID, &e.SavedProjectID, &e.ProjectID, &e.RenderJobID,
|
|
&e.Image, &e.Path, &e.FileExtension, &e.FileType, &e.RenderQuality,
|
|
&e.CreateType, &e.SizeBytes, &e.DurationSec, &e.Width, &e.Height,
|
|
&e.ProduceDate, &e.AutoDeleteDate, &e.DeleteNotified, &e.CreatedAt, &e.DeletedAt,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
out = append(out, e)
|
|
}
|
|
return out, rows.Err()
|
|
}
|