package db import ( "context" "errors" "fmt" "time" "github.com/flatrender/payment-svc/internal/models" "github.com/google/uuid" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" ) var ErrNotFound = errors.New("not found") type Store struct { pool *pgxpool.Pool } func NewStore(pool *pgxpool.Pool) *Store { return &Store{pool: pool} } func (s *Store) Ping(ctx context.Context) error { return s.pool.Ping(ctx) } // ── Client apps ─────────────────────────────────────────────────────────────── const clientCols = `id, tenant_id, name, slug, api_key, secret, zarinpal_merchant_id, zarinpal_sandbox, allowed_return_origins, webhook_url, is_active, created_at, updated_at` func scanClient(row pgx.Row, withSecret bool) (*models.ClientApp, error) { var c models.ClientApp var secret string if err := row.Scan( &c.ID, &c.TenantID, &c.Name, &c.Slug, &c.APIKey, &secret, &c.ZarinPalMerchantID, &c.ZarinPalSandbox, &c.AllowedReturnOrigins, &c.WebhookURL, &c.IsActive, &c.CreatedAt, &c.UpdatedAt, ); err != nil { return nil, err } if withSecret { c.Secret = secret } return &c, nil } func (s *Store) CreateClientApp(ctx context.Context, c *models.ClientApp) (*models.ClientApp, error) { row := s.pool.QueryRow(ctx, ` INSERT INTO payment.client_apps (tenant_id, name, slug, api_key, secret, zarinpal_merchant_id, zarinpal_sandbox, allowed_return_origins, webhook_url, is_active) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10) RETURNING `+clientCols, c.TenantID, c.Name, c.Slug, c.APIKey, c.Secret, c.ZarinPalMerchantID, c.ZarinPalSandbox, c.AllowedReturnOrigins, c.WebhookURL, c.IsActive) return scanClient(row, true) } func (s *Store) ListClientApps(ctx context.Context) ([]*models.ClientApp, error) { rows, err := s.pool.Query(ctx, `SELECT `+clientCols+` FROM payment.client_apps ORDER BY created_at DESC`) if err != nil { return nil, err } defer rows.Close() var out []*models.ClientApp for rows.Next() { c, err := scanClient(rows, false) if err != nil { return nil, err } out = append(out, c) } return out, rows.Err() } func (s *Store) GetClientApp(ctx context.Context, id uuid.UUID) (*models.ClientApp, error) { row := s.pool.QueryRow(ctx, `SELECT `+clientCols+` FROM payment.client_apps WHERE id = $1`, id) c, err := scanClient(row, false) if errors.Is(err, pgx.ErrNoRows) { return nil, ErrNotFound } return c, err } // GetClientByAPIKey returns the client incl. its secret (for auth + signing). func (s *Store) GetClientByAPIKey(ctx context.Context, apiKey string) (*models.ClientApp, error) { row := s.pool.QueryRow(ctx, `SELECT `+clientCols+` FROM payment.client_apps WHERE api_key = $1 AND is_active = TRUE`, apiKey) c, err := scanClient(row, true) if errors.Is(err, pgx.ErrNoRows) { return nil, ErrNotFound } return c, err } func (s *Store) UpdateClientApp(ctx context.Context, id uuid.UUID, c *models.ClientApp) (*models.ClientApp, error) { row := s.pool.QueryRow(ctx, ` UPDATE payment.client_apps SET name = $2, zarinpal_merchant_id = $3, zarinpal_sandbox = $4, allowed_return_origins = $5, webhook_url = $6, is_active = $7 WHERE id = $1 RETURNING `+clientCols, id, c.Name, c.ZarinPalMerchantID, c.ZarinPalSandbox, c.AllowedReturnOrigins, c.WebhookURL, c.IsActive) res, err := scanClient(row, false) if errors.Is(err, pgx.ErrNoRows) { return nil, ErrNotFound } return res, err } func (s *Store) RotateSecret(ctx context.Context, id uuid.UUID, newSecret string) (*models.ClientApp, error) { row := s.pool.QueryRow(ctx, `UPDATE payment.client_apps SET secret = $2 WHERE id = $1 RETURNING `+clientCols, id, newSecret) c, err := scanClient(row, true) if errors.Is(err, pgx.ErrNoRows) { return nil, ErrNotFound } return c, err } func (s *Store) DeleteClientApp(ctx context.Context, id uuid.UUID) error { ct, err := s.pool.Exec(ctx, `DELETE FROM payment.client_apps WHERE id = $1`, id) if err != nil { return err } if ct.RowsAffected() == 0 { return ErrNotFound } return nil } // ── Transactions ────────────────────────────────────────────────────────────── const txnCols = `id, client_app_id, status, gateway, amount_rial, currency, description, client_ref, return_url, metadata, payer_mobile, payer_email, authority, ref_id, card_pan, fee_rial, gateway_response, failure_reason, paid_at, failed_at, expires_at, created_at, updated_at` func scanTxn(row pgx.Row) (*models.Transaction, error) { var t models.Transaction var meta, gwResp []byte if err := row.Scan( &t.ID, &t.ClientAppID, &t.Status, &t.Gateway, &t.AmountRial, &t.Currency, &t.Description, &t.ClientRef, &t.ReturnURL, &meta, &t.PayerMobile, &t.PayerEmail, &t.Authority, &t.RefID, &t.CardPan, &t.FeeRial, &gwResp, &t.FailureReason, &t.PaidAt, &t.FailedAt, &t.ExpiresAt, &t.CreatedAt, &t.UpdatedAt, ); err != nil { return nil, err } t.Metadata = meta t.GatewayResponse = gwResp return &t, nil } func (s *Store) CreateTransaction(ctx context.Context, t *models.Transaction) (*models.Transaction, error) { var meta any if len(t.Metadata) > 0 { meta = []byte(t.Metadata) } row := s.pool.QueryRow(ctx, ` INSERT INTO payment.transactions (client_app_id, status, gateway, amount_rial, currency, description, client_ref, return_url, metadata, payer_mobile, payer_email, expires_at) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12) RETURNING `+txnCols, t.ClientAppID, t.Status, t.Gateway, t.AmountRial, t.Currency, t.Description, t.ClientRef, t.ReturnURL, meta, t.PayerMobile, t.PayerEmail, t.ExpiresAt) return scanTxn(row) } func (s *Store) SetAuthority(ctx context.Context, id uuid.UUID, authority string) error { _, err := s.pool.Exec(ctx, `UPDATE payment.transactions SET authority = $2, status = $3 WHERE id = $1`, id, authority, models.StatusPending) return err } func (s *Store) GetTransaction(ctx context.Context, id uuid.UUID) (*models.Transaction, error) { row := s.pool.QueryRow(ctx, `SELECT `+txnCols+` FROM payment.transactions WHERE id = $1`, id) t, err := scanTxn(row) if errors.Is(err, pgx.ErrNoRows) { return nil, ErrNotFound } return t, err } func (s *Store) GetTransactionByAuthority(ctx context.Context, authority string) (*models.Transaction, error) { row := s.pool.QueryRow(ctx, `SELECT `+txnCols+` FROM payment.transactions WHERE authority = $1`, authority) t, err := scanTxn(row) if errors.Is(err, pgx.ErrNoRows) { return nil, ErrNotFound } return t, err } // MarkPaid transitions a pending txn → Paid (idempotent: only if not already terminal). func (s *Store) MarkPaid(ctx context.Context, id uuid.UUID, refID, cardPan string, fee int64, gwResp []byte) (*models.Transaction, error) { row := s.pool.QueryRow(ctx, ` UPDATE payment.transactions SET status = $2, ref_id = $3, card_pan = NULLIF($4,''), fee_rial = $5, gateway_response = $6, paid_at = NOW() WHERE id = $1 AND status IN ($7,$8) RETURNING `+txnCols, id, models.StatusPaid, refID, cardPan, fee, gwResp, models.StatusPending, models.StatusCreated) t, err := scanTxn(row) if errors.Is(err, pgx.ErrNoRows) { return nil, ErrNotFound // already terminal or missing } return t, err } func (s *Store) MarkFailed(ctx context.Context, id uuid.UUID, reason string, gwResp []byte) (*models.Transaction, error) { row := s.pool.QueryRow(ctx, ` UPDATE payment.transactions SET status = $2, failure_reason = $3, gateway_response = $4, failed_at = NOW() WHERE id = $1 AND status IN ($5,$6) RETURNING `+txnCols, id, models.StatusFailed, reason, gwResp, models.StatusPending, models.StatusCreated) t, err := scanTxn(row) if errors.Is(err, pgx.ErrNoRows) { return nil, ErrNotFound } return t, err } func (s *Store) ListTransactions(ctx context.Context, clientID *uuid.UUID, status string, page, pageSize int) ([]*models.Transaction, int64, error) { where := "TRUE" args := []any{} i := 1 if clientID != nil { where += fmt.Sprintf(" AND t.client_app_id = $%d", i) args = append(args, *clientID) i++ } if status != "" { where += fmt.Sprintf(" AND t.status = $%d", i) args = append(args, status) i++ } var total int64 _ = s.pool.QueryRow(ctx, "SELECT COUNT(*) FROM payment.transactions t WHERE "+where, args...).Scan(&total) args = append(args, pageSize, (page-1)*pageSize) q := fmt.Sprintf(` SELECT t.id, t.client_app_id, t.status, t.gateway, t.amount_rial, t.currency, t.description, t.client_ref, t.return_url, t.metadata, t.payer_mobile, t.payer_email, t.authority, t.ref_id, t.card_pan, t.fee_rial, t.gateway_response, t.failure_reason, t.paid_at, t.failed_at, t.expires_at, t.created_at, t.updated_at, c.slug FROM payment.transactions t JOIN payment.client_apps c ON c.id = t.client_app_id WHERE %s ORDER BY t.created_at DESC LIMIT $%d OFFSET $%d`, where, i, i+1) rows, err := s.pool.Query(ctx, q, args...) if err != nil { return nil, 0, err } defer rows.Close() var out []*models.Transaction for rows.Next() { var t models.Transaction var meta, gwResp []byte if err := rows.Scan( &t.ID, &t.ClientAppID, &t.Status, &t.Gateway, &t.AmountRial, &t.Currency, &t.Description, &t.ClientRef, &t.ReturnURL, &meta, &t.PayerMobile, &t.PayerEmail, &t.Authority, &t.RefID, &t.CardPan, &t.FeeRial, &gwResp, &t.FailureReason, &t.PaidAt, &t.FailedAt, &t.ExpiresAt, &t.CreatedAt, &t.UpdatedAt, &t.ClientSlug, ); err != nil { return nil, 0, err } t.Metadata = meta t.GatewayResponse = gwResp out = append(out, &t) } return out, total, rows.Err() } // ── Webhook deliveries ──────────────────────────────────────────────────────── type WebhookDelivery struct { ID uuid.UUID TransactionID uuid.UUID URL string Payload []byte Signature string Attempts int } func (s *Store) EnqueueWebhook(ctx context.Context, txnID uuid.UUID, url string, payload []byte, signature string) (uuid.UUID, error) { var id uuid.UUID err := s.pool.QueryRow(ctx, ` INSERT INTO payment.webhook_deliveries (transaction_id, url, payload, signature, next_attempt_at) VALUES ($1,$2,$3,$4, NOW()) RETURNING id`, txnID, url, payload, signature).Scan(&id) return id, err } // ClaimDueWebhooks returns undelivered webhooks whose next_attempt_at has passed. func (s *Store) ClaimDueWebhooks(ctx context.Context, limit int) ([]*WebhookDelivery, error) { rows, err := s.pool.Query(ctx, ` SELECT id, transaction_id, url, payload, signature, attempts FROM payment.webhook_deliveries WHERE delivered = FALSE AND attempts < 8 AND next_attempt_at <= NOW() ORDER BY next_attempt_at ASC LIMIT $1`, limit) if err != nil { return nil, err } defer rows.Close() var out []*WebhookDelivery for rows.Next() { var w WebhookDelivery if err := rows.Scan(&w.ID, &w.TransactionID, &w.URL, &w.Payload, &w.Signature, &w.Attempts); err != nil { return nil, err } out = append(out, &w) } return out, rows.Err() } func (s *Store) MarkWebhookDelivered(ctx context.Context, id uuid.UUID, statusCode int) error { _, err := s.pool.Exec(ctx, ` UPDATE payment.webhook_deliveries SET delivered = TRUE, last_status = $2, attempts = attempts + 1 WHERE id = $1`, id, statusCode) return err } func (s *Store) MarkWebhookFailed(ctx context.Context, id uuid.UUID, statusCode int, errMsg string, backoff time.Duration) error { _, err := s.pool.Exec(ctx, ` UPDATE payment.webhook_deliveries SET attempts = attempts + 1, last_status = $2, last_error = $3, next_attempt_at = NOW() + $4::interval WHERE id = $1`, id, statusCode, errMsg, fmt.Sprintf("%d seconds", int(backoff.Seconds()))) return err }