6dbb14d146
Build backend images / build content-svc (push) Failing after 14s
Build backend images / build file-svc (push) Failing after 22s
Build backend images / build gateway (push) Failing after 1m21s
Build backend images / build identity-svc (push) Failing after 1m43s
Build backend images / build notification-svc (push) Failing after 1m6s
Build backend images / build render-svc (push) Failing after 53s
Build backend images / build studio-svc (push) Failing after 1m5s
- campaigns table (migration 19) + CRUD + send endpoint in notification-svc - audience resolution reads cross-schema from identity.users (all / verified / with_plan); send dispatches via the SMS or Email channel and logs deliveries - endpoints: GET/POST /v1/campaigns, POST /v1/campaigns/:id/send, DELETE - gateway route /v1/campaigns/* → notification - /admin/marketing: create campaign (channel, audience, template/subject/body), list with status + sent counts, send, delete Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
111 lines
3.9 KiB
Go
111 lines
3.9 KiB
Go
package db
|
|
|
|
import (
|
|
"context"
|
|
|
|
"github.com/flatrender/notification-svc/internal/models"
|
|
"github.com/google/uuid"
|
|
"github.com/jackc/pgx/v5"
|
|
)
|
|
|
|
func (s *Store) ListCampaigns(ctx context.Context, tenantID uuid.UUID) ([]*models.Campaign, error) {
|
|
rows, err := s.pool.Query(ctx,
|
|
`SELECT id, tenant_id, name, channel, audience, subject, body_html, template_code,
|
|
status, total_count, sent_count, failed_count, created_at, sent_at
|
|
FROM notification.campaigns WHERE tenant_id = $1 ORDER BY created_at DESC`, tenantID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
var out []*models.Campaign
|
|
for rows.Next() {
|
|
c := &models.Campaign{}
|
|
if err := rows.Scan(&c.ID, &c.TenantID, &c.Name, &c.Channel, &c.Audience, &c.Subject,
|
|
&c.BodyHTML, &c.TemplateCode, &c.Status, &c.TotalCount, &c.SentCount, &c.FailedCount,
|
|
&c.CreatedAt, &c.SentAt); err != nil {
|
|
return nil, err
|
|
}
|
|
out = append(out, c)
|
|
}
|
|
return out, rows.Err()
|
|
}
|
|
|
|
func (s *Store) GetCampaign(ctx context.Context, id, tenantID uuid.UUID) (*models.Campaign, error) {
|
|
c := &models.Campaign{}
|
|
err := s.pool.QueryRow(ctx,
|
|
`SELECT id, tenant_id, name, channel, audience, subject, body_html, template_code,
|
|
status, total_count, sent_count, failed_count, created_at, sent_at
|
|
FROM notification.campaigns WHERE id = $1 AND tenant_id = $2`, id, tenantID).
|
|
Scan(&c.ID, &c.TenantID, &c.Name, &c.Channel, &c.Audience, &c.Subject, &c.BodyHTML,
|
|
&c.TemplateCode, &c.Status, &c.TotalCount, &c.SentCount, &c.FailedCount, &c.CreatedAt, &c.SentAt)
|
|
if err == pgx.ErrNoRows {
|
|
return nil, nil
|
|
}
|
|
return c, err
|
|
}
|
|
|
|
func (s *Store) CreateCampaign(ctx context.Context, tenantID uuid.UUID, req models.CreateCampaignRequest) (*models.Campaign, error) {
|
|
audience := req.Audience
|
|
if audience == "" {
|
|
audience = "all"
|
|
}
|
|
var id uuid.UUID
|
|
err := s.pool.QueryRow(ctx,
|
|
`INSERT INTO notification.campaigns (tenant_id, name, channel, audience, subject, body_html, template_code)
|
|
VALUES ($1,$2,$3,$4,$5,$6,$7) RETURNING id`,
|
|
tenantID, req.Name, req.Channel, audience, req.Subject, req.BodyHTML, req.TemplateCode).Scan(&id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return s.GetCampaign(ctx, id, tenantID)
|
|
}
|
|
|
|
func (s *Store) DeleteCampaign(ctx context.Context, id, tenantID uuid.UUID) error {
|
|
_, err := s.pool.Exec(ctx, `DELETE FROM notification.campaigns WHERE id=$1 AND tenant_id=$2`, id, tenantID)
|
|
return err
|
|
}
|
|
|
|
func (s *Store) UpdateCampaignResult(ctx context.Context, id uuid.UUID, status string, total, sent, failed int) error {
|
|
_, err := s.pool.Exec(ctx,
|
|
`UPDATE notification.campaigns SET status=$2, total_count=$3, sent_count=$4, failed_count=$5,
|
|
sent_at = CASE WHEN $2 IN ('Sent','Failed') THEN NOW() ELSE sent_at END
|
|
WHERE id=$1`, id, status, total, sent, failed)
|
|
return err
|
|
}
|
|
|
|
// ResolveAudience returns recipient contact strings (email or phone) for a campaign
|
|
// segment, read cross-schema from identity.users (same database).
|
|
func (s *Store) ResolveAudience(ctx context.Context, tenantID uuid.UUID, channel, audience string, limit int) ([]string, error) {
|
|
col := "email"
|
|
notNull := "email IS NOT NULL AND email <> ''"
|
|
if channel == "sms" {
|
|
col = "phone_number"
|
|
notNull = "phone_number IS NOT NULL AND phone_number <> ''"
|
|
}
|
|
where := "tenant_id = $1 AND deleted_at IS NULL AND ban_account = FALSE AND " + notNull
|
|
switch audience {
|
|
case "verified":
|
|
if channel == "sms" {
|
|
where += " AND phone_verified = TRUE"
|
|
} else {
|
|
where += " AND email_verified = TRUE"
|
|
}
|
|
case "with_plan":
|
|
where += " AND EXISTS (SELECT 1 FROM identity.user_plans up WHERE up.user_id = identity.users.id AND (up.expires_at IS NULL OR up.expires_at > NOW()))"
|
|
}
|
|
q := "SELECT " + col + " FROM identity.users WHERE " + where + " LIMIT $2"
|
|
rows, err := s.pool.Query(ctx, q, tenantID, limit)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
var out []string
|
|
for rows.Next() {
|
|
var v string
|
|
if err := rows.Scan(&v); err == nil && v != "" {
|
|
out = append(out, v)
|
|
}
|
|
}
|
|
return out, rows.Err()
|
|
}
|