package db import ( "context" "fmt" "math" "time" "github.com/flatrender/file-svc/internal/models" "github.com/google/uuid" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" ) type Store struct { pool *pgxpool.Pool } func New(connStr string) (*Store, error) { cfg, err := pgxpool.ParseConfig(connStr) if err != nil { return nil, fmt.Errorf("db config: %w", err) } cfg.MaxConns = 20 cfg.MinConns = 2 pool, err := pgxpool.NewWithConfig(context.Background(), cfg) if err != nil { return nil, fmt.Errorf("db connect: %w", err) } return &Store{pool: pool}, nil } func (s *Store) Close() { s.pool.Close() } func (s *Store) Ping(ctx context.Context) error { return s.pool.Ping(ctx) } // ── Folders ─────────────────────────────────────────────────────────────────── func (s *Store) GetFolders(ctx context.Context, userID uuid.UUID, parentID *uuid.UUID) ([]models.UserFolder, error) { var rows pgx.Rows var err error if parentID == nil { rows, err = s.pool.Query(ctx, `SELECT id, tenant_id, user_id, name, folder_type, parent_folder_id, file_count, total_size_bytes, sort, is_shared, share_token, created_at, updated_at FROM file_mgr.user_folders WHERE user_id = $1 AND parent_folder_id IS NULL AND deleted_at IS NULL ORDER BY sort, name`, userID) } else { rows, err = s.pool.Query(ctx, `SELECT id, tenant_id, user_id, name, folder_type, parent_folder_id, file_count, total_size_bytes, sort, is_shared, share_token, created_at, updated_at FROM file_mgr.user_folders WHERE user_id = $1 AND parent_folder_id = $2 AND deleted_at IS NULL ORDER BY sort, name`, userID, parentID) } if err != nil { return nil, err } defer rows.Close() var folders []models.UserFolder for rows.Next() { var f models.UserFolder if err := rows.Scan(&f.ID, &f.TenantID, &f.UserID, &f.Name, &f.FolderType, &f.ParentFolderID, &f.FileCount, &f.TotalSizeBytes, &f.Sort, &f.IsShared, &f.ShareToken, &f.CreatedAt, &f.UpdatedAt); err != nil { return nil, err } folders = append(folders, f) } return folders, rows.Err() } func (s *Store) CreateFolder(ctx context.Context, tenantID, userID uuid.UUID, req models.CreateFolderRequest) (*models.UserFolder, error) { var f models.UserFolder err := s.pool.QueryRow(ctx, `INSERT INTO file_mgr.user_folders (tenant_id, user_id, name, folder_type, parent_folder_id) VALUES ($1, $2, $3, 'User', $4) RETURNING id, tenant_id, user_id, name, folder_type, parent_folder_id, file_count, total_size_bytes, sort, is_shared, share_token, created_at, updated_at`, tenantID, userID, req.Name, req.ParentFolderID, ).Scan(&f.ID, &f.TenantID, &f.UserID, &f.Name, &f.FolderType, &f.ParentFolderID, &f.FileCount, &f.TotalSizeBytes, &f.Sort, &f.IsShared, &f.ShareToken, &f.CreatedAt, &f.UpdatedAt) if err != nil { return nil, err } return &f, nil } func (s *Store) DeleteFolder(ctx context.Context, id, userID uuid.UUID) error { ct, err := s.pool.Exec(ctx, `UPDATE file_mgr.user_folders SET deleted_at = NOW() WHERE id = $1 AND user_id = $2 AND deleted_at IS NULL`, id, userID) if err != nil { return err } if ct.RowsAffected() == 0 { return pgx.ErrNoRows } return nil } // ── Files ───────────────────────────────────────────────────────────────────── func (s *Store) ListFiles(ctx context.Context, userID uuid.UUID, req models.FileListRequest) ([]models.UserFile, int64, error) { offset := (req.Page - 1) * req.PageSize baseQ := `FROM file_mgr.user_files WHERE user_id = $1 AND deleted_at IS NULL` args := []any{userID} argN := 2 if req.FolderID != nil { baseQ += fmt.Sprintf(" AND user_folder_id = $%d", argN) args = append(args, req.FolderID) argN++ } if req.FileType != nil { baseQ += fmt.Sprintf(" AND file_type = $%d", argN) args = append(args, req.FileType) argN++ } if req.Search != nil { baseQ += fmt.Sprintf(" AND name ILIKE $%d", argN) args = append(args, "%"+*req.Search+"%") argN++ } var total int64 if err := s.pool.QueryRow(ctx, "SELECT COUNT(*) "+baseQ, args...).Scan(&total); err != nil { return nil, 0, err } args = append(args, req.PageSize, offset) rows, err := s.pool.Query(ctx, `SELECT id, tenant_id, user_id, user_folder_id, name, original_filename, file_extension, mime_type, file_type, minio_bucket, minio_key, cdn_url, file_address, size_bytes, md5_hash, thumbnail_url, upload_status, upload_progress, source, last_used_at, use_count, is_public, created_at, updated_at `+baseQ+fmt.Sprintf(` ORDER BY created_at DESC LIMIT $%d OFFSET $%d`, argN, argN+1), args...) if err != nil { return nil, 0, err } defer rows.Close() var files []models.UserFile for rows.Next() { var f models.UserFile if err := rows.Scan(&f.ID, &f.TenantID, &f.UserID, &f.UserFolderID, &f.Name, &f.OriginalFilename, &f.FileExtension, &f.MimeType, &f.FileType, &f.MinioBucket, &f.MinioKey, &f.CdnURL, &f.FileAddress, &f.SizeBytes, &f.Md5Hash, &f.ThumbnailURL, &f.UploadStatus, &f.UploadProgress, &f.Source, &f.LastUsedAt, &f.UseCount, &f.IsPublic, &f.CreatedAt, &f.UpdatedAt); err != nil { return nil, 0, err } files = append(files, f) } return files, total, rows.Err() } func (s *Store) GetFile(ctx context.Context, id, userID uuid.UUID) (*models.UserFile, error) { var f models.UserFile err := s.pool.QueryRow(ctx, `SELECT id, tenant_id, user_id, user_folder_id, name, original_filename, file_extension, mime_type, file_type, minio_bucket, minio_key, cdn_url, file_address, size_bytes, md5_hash, sha256_hash, duration_sec, width, height, fps, bitrate_kbps, codec, has_audio, has_video, thumbnail_url, waveform_data, upload_status, upload_progress, source, export_id, parent_file_id, last_used_at, use_count, is_public, share_token, metadata, created_at, updated_at FROM file_mgr.user_files WHERE id = $1 AND user_id = $2 AND deleted_at IS NULL`, id, userID, ).Scan(&f.ID, &f.TenantID, &f.UserID, &f.UserFolderID, &f.Name, &f.OriginalFilename, &f.FileExtension, &f.MimeType, &f.FileType, &f.MinioBucket, &f.MinioKey, &f.CdnURL, &f.FileAddress, &f.SizeBytes, &f.Md5Hash, &f.Sha256Hash, &f.DurationSec, &f.Width, &f.Height, &f.Fps, &f.BitrateKbps, &f.Codec, &f.HasAudio, &f.HasVideo, &f.ThumbnailURL, &f.WaveformData, &f.UploadStatus, &f.UploadProgress, &f.Source, &f.ExportID, &f.ParentFileID, &f.LastUsedAt, &f.UseCount, &f.IsPublic, &f.ShareToken, &f.Metadata, &f.CreatedAt, &f.UpdatedAt) if err != nil { return nil, err } return &f, nil } func (s *Store) CreateFileRecord(ctx context.Context, f *models.UserFile) error { return s.pool.QueryRow(ctx, `INSERT INTO file_mgr.user_files (tenant_id, user_id, user_folder_id, name, original_filename, file_extension, mime_type, file_type, minio_bucket, minio_key, file_address, size_bytes, upload_status, source) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,'Pending',$13) RETURNING id, created_at, updated_at`, f.TenantID, f.UserID, f.UserFolderID, f.Name, f.OriginalFilename, f.FileExtension, f.MimeType, f.FileType, f.MinioBucket, f.MinioKey, f.FileAddress, f.SizeBytes, f.Source, ).Scan(&f.ID, &f.CreatedAt, &f.UpdatedAt) } func (s *Store) MarkFileReady(ctx context.Context, id uuid.UUID, cdnURL *string) error { _, err := s.pool.Exec(ctx, `UPDATE file_mgr.user_files SET upload_status = 'Ready', upload_progress = 100, cdn_url = $2, updated_at = NOW() WHERE id = $1`, id, cdnURL) return err } func (s *Store) DeleteFile(ctx context.Context, id, userID uuid.UUID) (*models.UserFile, error) { var f models.UserFile err := s.pool.QueryRow(ctx, `UPDATE file_mgr.user_files SET deleted_at = NOW() WHERE id = $1 AND user_id = $2 AND deleted_at IS NULL RETURNING id, minio_bucket, minio_key, size_bytes, user_folder_id`, id, userID, ).Scan(&f.ID, &f.MinioBucket, &f.MinioKey, &f.SizeBytes, &f.UserFolderID) if err != nil { return nil, err } return &f, nil } // ── Storage Quota ───────────────────────────────────────────────────────────── func (s *Store) GetQuota(ctx context.Context, userID uuid.UUID) (*models.StorageQuota, error) { var q models.StorageQuota err := s.pool.QueryRow(ctx, `SELECT user_id, tenant_id, plan_quota_bytes, bonus_quota_bytes, used_bytes, video_count, image_count, audio_count, video_bytes, image_bytes, audio_bytes, last_90pct_notified_at, last_100pct_notified_at, updated_at FROM file_mgr.storage_quotas WHERE user_id = $1`, userID, ).Scan(&q.UserID, &q.TenantID, &q.PlanQuotaBytes, &q.BonusQuotaBytes, &q.UsedBytes, &q.VideoCount, &q.ImageCount, &q.AudioCount, &q.VideoBytes, &q.ImageBytes, &q.AudioBytes, &q.Last90PctNotifiedAt, &q.Last100PctNotifiedAt, &q.UpdatedAt) if err != nil { return nil, err } return &q, nil } func (s *Store) EnsureQuota(ctx context.Context, userID, tenantID uuid.UUID) error { _, err := s.pool.Exec(ctx, `INSERT INTO file_mgr.storage_quotas (user_id, tenant_id) VALUES ($1, $2) ON CONFLICT (user_id) DO NOTHING`, userID, tenantID) return err } func (s *Store) AddUsedBytes(ctx context.Context, userID uuid.UUID, delta int64) error { _, err := s.pool.Exec(ctx, `UPDATE file_mgr.storage_quotas SET used_bytes = used_bytes + $2, updated_at = NOW() WHERE user_id = $1`, userID, delta) return err } // ── Upload Sessions ─────────────────────────────────────────────────────────── func (s *Store) CreateUploadSession(ctx context.Context, sess *models.UploadSession) error { return s.pool.QueryRow(ctx, `INSERT INTO file_mgr.upload_sessions (tenant_id, user_id, minio_bucket, minio_key, minio_upload_id, filename, mime_type, total_size_bytes, chunk_size_bytes, target_folder_id, status) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,'Uploading') RETURNING id, expires_at, created_at, updated_at`, sess.TenantID, sess.UserID, sess.MinioBucket, sess.MinioKey, sess.MinioUploadID, sess.Filename, sess.MimeType, sess.TotalSizeBytes, sess.ChunkSizeBytes, sess.TargetFolderID, ).Scan(&sess.ID, &sess.ExpiresAt, &sess.CreatedAt, &sess.UpdatedAt) } func (s *Store) GetUploadSession(ctx context.Context, id uuid.UUID) (*models.UploadSession, error) { var sess models.UploadSession err := s.pool.QueryRow(ctx, `SELECT id, tenant_id, user_id, minio_bucket, minio_key, minio_upload_id, filename, mime_type, total_size_bytes, chunks_received, bytes_received, chunk_size_bytes, target_folder_id, target_file_id, status, error_message, expires_at, completed_at, created_at, updated_at FROM file_mgr.upload_sessions WHERE id = $1`, id, ).Scan(&sess.ID, &sess.TenantID, &sess.UserID, &sess.MinioBucket, &sess.MinioKey, &sess.MinioUploadID, &sess.Filename, &sess.MimeType, &sess.TotalSizeBytes, &sess.ChunksReceived, &sess.BytesReceived, &sess.ChunkSizeBytes, &sess.TargetFolderID, &sess.TargetFileID, &sess.Status, &sess.ErrorMessage, &sess.ExpiresAt, &sess.CompletedAt, &sess.CreatedAt, &sess.UpdatedAt) if err != nil { return nil, err } return &sess, nil } func (s *Store) CompleteUploadSession(ctx context.Context, id, fileID uuid.UUID) error { now := time.Now() _, err := s.pool.Exec(ctx, `UPDATE file_mgr.upload_sessions SET status = 'Ready', target_file_id = $2, completed_at = $3, updated_at = $3 WHERE id = $1`, id, fileID, now) return err } // ── MinIO Buckets ───────────────────────────────────────────────────────────── func (s *Store) GetBucketByPurpose(ctx context.Context, purpose string) (*models.MinioBucket, error) { var b models.MinioBucket err := s.pool.QueryRow(ctx, `SELECT id, name, region, endpoint, purpose, is_public, cdn_base_url, is_active, created_at FROM file_mgr.minio_buckets WHERE purpose = $1 AND is_active = TRUE LIMIT 1`, purpose, ).Scan(&b.ID, &b.Name, &b.Region, &b.Endpoint, &b.Purpose, &b.IsPublic, &b.CdnBaseURL, &b.IsActive, &b.CreatedAt) if err != nil { return nil, err } return &b, nil } // ── Helpers ─────────────────────────────────────────────────────────────────── func TotalPages(total int64, pageSize int) int { if pageSize == 0 { return 0 } return int(math.Ceil(float64(total) / float64(pageSize))) }