Files
flatrender/services/payment/internal/db/db.go
T
soroush.asadi 376cdf6a1c
CI/CD / CI · Web (tsc) (push) Successful in 1m10s
CI/CD / Deploy · full stack (push) Failing after 11m4s
feat(payment): route FlatRender plan purchases through the broker
- identity: when FlatPay (broker) is configured, InitiateZarinPalAsync
  routes through pay.flatrender.ir instead of calling ZarinPal directly;
  new HandleBrokerCallbackAsync confirms the payment via the broker
  inquiry API (authoritative, not trusting the redirect) and activates
  the plan. New public endpoint GET /v1/payments/callback/broker
  (already public at the gateway via /callback/*). Env-gated — empty
  FlatPay__ApiKey keeps the legacy direct-ZarinPal path.
- broker: deliver webhooks inline on enqueue (best-effort) in addition
  to the retry loop, so clients credit near-instantly (db.GetWebhook +
  goroutine kick).
- compose + ENV_FILE: FlatPay__* for identity (FLATPAY_FLATRENDER_*).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-16 00:34:45 +03:30

347 lines
12 KiB
Go

package db
import (
"context"
"errors"
"fmt"
"time"
"github.com/flatrender/payment-svc/internal/models"
"github.com/google/uuid"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)
var ErrNotFound = errors.New("not found")
type Store struct {
pool *pgxpool.Pool
}
func NewStore(pool *pgxpool.Pool) *Store { return &Store{pool: pool} }
func (s *Store) Ping(ctx context.Context) error { return s.pool.Ping(ctx) }
// ── Client apps ───────────────────────────────────────────────────────────────
const clientCols = `id, tenant_id, name, slug, api_key, secret,
zarinpal_merchant_id, zarinpal_sandbox, allowed_return_origins, webhook_url,
is_active, created_at, updated_at`
func scanClient(row pgx.Row, withSecret bool) (*models.ClientApp, error) {
var c models.ClientApp
var secret string
if err := row.Scan(
&c.ID, &c.TenantID, &c.Name, &c.Slug, &c.APIKey, &secret,
&c.ZarinPalMerchantID, &c.ZarinPalSandbox, &c.AllowedReturnOrigins, &c.WebhookURL,
&c.IsActive, &c.CreatedAt, &c.UpdatedAt,
); err != nil {
return nil, err
}
if withSecret {
c.Secret = secret
}
return &c, nil
}
func (s *Store) CreateClientApp(ctx context.Context, c *models.ClientApp) (*models.ClientApp, error) {
row := s.pool.QueryRow(ctx, `
INSERT INTO payment.client_apps
(tenant_id, name, slug, api_key, secret, zarinpal_merchant_id, zarinpal_sandbox,
allowed_return_origins, webhook_url, is_active)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10)
RETURNING `+clientCols,
c.TenantID, c.Name, c.Slug, c.APIKey, c.Secret, c.ZarinPalMerchantID, c.ZarinPalSandbox,
c.AllowedReturnOrigins, c.WebhookURL, c.IsActive)
return scanClient(row, true)
}
func (s *Store) ListClientApps(ctx context.Context) ([]*models.ClientApp, error) {
rows, err := s.pool.Query(ctx, `SELECT `+clientCols+` FROM payment.client_apps ORDER BY created_at DESC`)
if err != nil {
return nil, err
}
defer rows.Close()
var out []*models.ClientApp
for rows.Next() {
c, err := scanClient(rows, false)
if err != nil {
return nil, err
}
out = append(out, c)
}
return out, rows.Err()
}
func (s *Store) GetClientApp(ctx context.Context, id uuid.UUID) (*models.ClientApp, error) {
row := s.pool.QueryRow(ctx, `SELECT `+clientCols+` FROM payment.client_apps WHERE id = $1`, id)
c, err := scanClient(row, false)
if errors.Is(err, pgx.ErrNoRows) {
return nil, ErrNotFound
}
return c, err
}
// GetClientByAPIKey returns the client incl. its secret (for auth + signing).
func (s *Store) GetClientByAPIKey(ctx context.Context, apiKey string) (*models.ClientApp, error) {
row := s.pool.QueryRow(ctx, `SELECT `+clientCols+` FROM payment.client_apps WHERE api_key = $1 AND is_active = TRUE`, apiKey)
c, err := scanClient(row, true)
if errors.Is(err, pgx.ErrNoRows) {
return nil, ErrNotFound
}
return c, err
}
func (s *Store) UpdateClientApp(ctx context.Context, id uuid.UUID, c *models.ClientApp) (*models.ClientApp, error) {
row := s.pool.QueryRow(ctx, `
UPDATE payment.client_apps SET
name = $2, zarinpal_merchant_id = $3, zarinpal_sandbox = $4,
allowed_return_origins = $5, webhook_url = $6, is_active = $7
WHERE id = $1
RETURNING `+clientCols,
id, c.Name, c.ZarinPalMerchantID, c.ZarinPalSandbox,
c.AllowedReturnOrigins, c.WebhookURL, c.IsActive)
res, err := scanClient(row, false)
if errors.Is(err, pgx.ErrNoRows) {
return nil, ErrNotFound
}
return res, err
}
func (s *Store) RotateSecret(ctx context.Context, id uuid.UUID, newSecret string) (*models.ClientApp, error) {
row := s.pool.QueryRow(ctx, `UPDATE payment.client_apps SET secret = $2 WHERE id = $1 RETURNING `+clientCols, id, newSecret)
c, err := scanClient(row, true)
if errors.Is(err, pgx.ErrNoRows) {
return nil, ErrNotFound
}
return c, err
}
func (s *Store) DeleteClientApp(ctx context.Context, id uuid.UUID) error {
ct, err := s.pool.Exec(ctx, `DELETE FROM payment.client_apps WHERE id = $1`, id)
if err != nil {
return err
}
if ct.RowsAffected() == 0 {
return ErrNotFound
}
return nil
}
// ── Transactions ──────────────────────────────────────────────────────────────
const txnCols = `id, client_app_id, status, gateway, amount_rial, currency, description,
client_ref, return_url, metadata, payer_mobile, payer_email,
authority, ref_id, card_pan, fee_rial, gateway_response, failure_reason,
paid_at, failed_at, expires_at, created_at, updated_at`
func scanTxn(row pgx.Row) (*models.Transaction, error) {
var t models.Transaction
var meta, gwResp []byte
if err := row.Scan(
&t.ID, &t.ClientAppID, &t.Status, &t.Gateway, &t.AmountRial, &t.Currency, &t.Description,
&t.ClientRef, &t.ReturnURL, &meta, &t.PayerMobile, &t.PayerEmail,
&t.Authority, &t.RefID, &t.CardPan, &t.FeeRial, &gwResp, &t.FailureReason,
&t.PaidAt, &t.FailedAt, &t.ExpiresAt, &t.CreatedAt, &t.UpdatedAt,
); err != nil {
return nil, err
}
t.Metadata = meta
t.GatewayResponse = gwResp
return &t, nil
}
func (s *Store) CreateTransaction(ctx context.Context, t *models.Transaction) (*models.Transaction, error) {
var meta any
if len(t.Metadata) > 0 {
meta = []byte(t.Metadata)
}
row := s.pool.QueryRow(ctx, `
INSERT INTO payment.transactions
(client_app_id, status, gateway, amount_rial, currency, description,
client_ref, return_url, metadata, payer_mobile, payer_email, expires_at)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12)
RETURNING `+txnCols,
t.ClientAppID, t.Status, t.Gateway, t.AmountRial, t.Currency, t.Description,
t.ClientRef, t.ReturnURL, meta, t.PayerMobile, t.PayerEmail, t.ExpiresAt)
return scanTxn(row)
}
func (s *Store) SetAuthority(ctx context.Context, id uuid.UUID, authority string) error {
_, err := s.pool.Exec(ctx, `UPDATE payment.transactions SET authority = $2, status = $3 WHERE id = $1`,
id, authority, models.StatusPending)
return err
}
func (s *Store) GetTransaction(ctx context.Context, id uuid.UUID) (*models.Transaction, error) {
row := s.pool.QueryRow(ctx, `SELECT `+txnCols+` FROM payment.transactions WHERE id = $1`, id)
t, err := scanTxn(row)
if errors.Is(err, pgx.ErrNoRows) {
return nil, ErrNotFound
}
return t, err
}
func (s *Store) GetTransactionByAuthority(ctx context.Context, authority string) (*models.Transaction, error) {
row := s.pool.QueryRow(ctx, `SELECT `+txnCols+` FROM payment.transactions WHERE authority = $1`, authority)
t, err := scanTxn(row)
if errors.Is(err, pgx.ErrNoRows) {
return nil, ErrNotFound
}
return t, err
}
// MarkPaid transitions a pending txn → Paid (idempotent: only if not already terminal).
func (s *Store) MarkPaid(ctx context.Context, id uuid.UUID, refID, cardPan string, fee int64, gwResp []byte) (*models.Transaction, error) {
row := s.pool.QueryRow(ctx, `
UPDATE payment.transactions
SET status = $2, ref_id = $3, card_pan = NULLIF($4,''), fee_rial = $5,
gateway_response = $6, paid_at = NOW()
WHERE id = $1 AND status IN ($7,$8)
RETURNING `+txnCols,
id, models.StatusPaid, refID, cardPan, fee, gwResp, models.StatusPending, models.StatusCreated)
t, err := scanTxn(row)
if errors.Is(err, pgx.ErrNoRows) {
return nil, ErrNotFound // already terminal or missing
}
return t, err
}
func (s *Store) MarkFailed(ctx context.Context, id uuid.UUID, reason string, gwResp []byte) (*models.Transaction, error) {
row := s.pool.QueryRow(ctx, `
UPDATE payment.transactions
SET status = $2, failure_reason = $3, gateway_response = $4, failed_at = NOW()
WHERE id = $1 AND status IN ($5,$6)
RETURNING `+txnCols,
id, models.StatusFailed, reason, gwResp, models.StatusPending, models.StatusCreated)
t, err := scanTxn(row)
if errors.Is(err, pgx.ErrNoRows) {
return nil, ErrNotFound
}
return t, err
}
func (s *Store) ListTransactions(ctx context.Context, clientID *uuid.UUID, status string, page, pageSize int) ([]*models.Transaction, int64, error) {
where := "TRUE"
args := []any{}
i := 1
if clientID != nil {
where += fmt.Sprintf(" AND t.client_app_id = $%d", i)
args = append(args, *clientID)
i++
}
if status != "" {
where += fmt.Sprintf(" AND t.status = $%d", i)
args = append(args, status)
i++
}
var total int64
_ = s.pool.QueryRow(ctx, "SELECT COUNT(*) FROM payment.transactions t WHERE "+where, args...).Scan(&total)
args = append(args, pageSize, (page-1)*pageSize)
q := fmt.Sprintf(`
SELECT t.id, t.client_app_id, t.status, t.gateway, t.amount_rial, t.currency, t.description,
t.client_ref, t.return_url, t.metadata, t.payer_mobile, t.payer_email,
t.authority, t.ref_id, t.card_pan, t.fee_rial, t.gateway_response, t.failure_reason,
t.paid_at, t.failed_at, t.expires_at, t.created_at, t.updated_at, c.slug
FROM payment.transactions t
JOIN payment.client_apps c ON c.id = t.client_app_id
WHERE %s ORDER BY t.created_at DESC LIMIT $%d OFFSET $%d`, where, i, i+1)
rows, err := s.pool.Query(ctx, q, args...)
if err != nil {
return nil, 0, err
}
defer rows.Close()
var out []*models.Transaction
for rows.Next() {
var t models.Transaction
var meta, gwResp []byte
if err := rows.Scan(
&t.ID, &t.ClientAppID, &t.Status, &t.Gateway, &t.AmountRial, &t.Currency, &t.Description,
&t.ClientRef, &t.ReturnURL, &meta, &t.PayerMobile, &t.PayerEmail,
&t.Authority, &t.RefID, &t.CardPan, &t.FeeRial, &gwResp, &t.FailureReason,
&t.PaidAt, &t.FailedAt, &t.ExpiresAt, &t.CreatedAt, &t.UpdatedAt, &t.ClientSlug,
); err != nil {
return nil, 0, err
}
t.Metadata = meta
t.GatewayResponse = gwResp
out = append(out, &t)
}
return out, total, rows.Err()
}
// ── Webhook deliveries ────────────────────────────────────────────────────────
type WebhookDelivery struct {
ID uuid.UUID
TransactionID uuid.UUID
URL string
Payload []byte
Signature string
Attempts int
}
func (s *Store) EnqueueWebhook(ctx context.Context, txnID uuid.UUID, url string, payload []byte, signature string) (uuid.UUID, error) {
var id uuid.UUID
err := s.pool.QueryRow(ctx, `
INSERT INTO payment.webhook_deliveries (transaction_id, url, payload, signature, next_attempt_at)
VALUES ($1,$2,$3,$4, NOW()) RETURNING id`,
txnID, url, payload, signature).Scan(&id)
return id, err
}
// GetWebhook loads a single delivery row (used for the inline immediate attempt).
func (s *Store) GetWebhook(ctx context.Context, id uuid.UUID) (*WebhookDelivery, error) {
var w WebhookDelivery
err := s.pool.QueryRow(ctx, `
SELECT id, transaction_id, url, payload, signature, attempts
FROM payment.webhook_deliveries
WHERE id = $1 AND delivered = FALSE`, id).Scan(
&w.ID, &w.TransactionID, &w.URL, &w.Payload, &w.Signature, &w.Attempts)
if errors.Is(err, pgx.ErrNoRows) {
return nil, ErrNotFound
}
return &w, err
}
// ClaimDueWebhooks returns undelivered webhooks whose next_attempt_at has passed.
func (s *Store) ClaimDueWebhooks(ctx context.Context, limit int) ([]*WebhookDelivery, error) {
rows, err := s.pool.Query(ctx, `
SELECT id, transaction_id, url, payload, signature, attempts
FROM payment.webhook_deliveries
WHERE delivered = FALSE AND attempts < 8 AND next_attempt_at <= NOW()
ORDER BY next_attempt_at ASC LIMIT $1`, limit)
if err != nil {
return nil, err
}
defer rows.Close()
var out []*WebhookDelivery
for rows.Next() {
var w WebhookDelivery
if err := rows.Scan(&w.ID, &w.TransactionID, &w.URL, &w.Payload, &w.Signature, &w.Attempts); err != nil {
return nil, err
}
out = append(out, &w)
}
return out, rows.Err()
}
func (s *Store) MarkWebhookDelivered(ctx context.Context, id uuid.UUID, statusCode int) error {
_, err := s.pool.Exec(ctx, `
UPDATE payment.webhook_deliveries
SET delivered = TRUE, last_status = $2, attempts = attempts + 1
WHERE id = $1`, id, statusCode)
return err
}
func (s *Store) MarkWebhookFailed(ctx context.Context, id uuid.UUID, statusCode int, errMsg string, backoff time.Duration) error {
_, err := s.pool.Exec(ctx, `
UPDATE payment.webhook_deliveries
SET attempts = attempts + 1, last_status = $2, last_error = $3,
next_attempt_at = NOW() + $4::interval
WHERE id = $1`, id, statusCode, errMsg, fmt.Sprintf("%d seconds", int(backoff.Seconds())))
return err
}