feat(notifications+admin): marketing campaigns
Build backend images / build content-svc (push) Failing after 14s
Build backend images / build file-svc (push) Failing after 22s
Build backend images / build gateway (push) Failing after 1m21s
Build backend images / build identity-svc (push) Failing after 1m43s
Build backend images / build notification-svc (push) Failing after 1m6s
Build backend images / build render-svc (push) Failing after 53s
Build backend images / build studio-svc (push) Failing after 1m5s
Build backend images / build content-svc (push) Failing after 14s
Build backend images / build file-svc (push) Failing after 22s
Build backend images / build gateway (push) Failing after 1m21s
Build backend images / build identity-svc (push) Failing after 1m43s
Build backend images / build notification-svc (push) Failing after 1m6s
Build backend images / build render-svc (push) Failing after 53s
Build backend images / build studio-svc (push) Failing after 1m5s
- campaigns table (migration 19) + CRUD + send endpoint in notification-svc - audience resolution reads cross-schema from identity.users (all / verified / with_plan); send dispatches via the SMS or Email channel and logs deliveries - endpoints: GET/POST /v1/campaigns, POST /v1/campaigns/:id/send, DELETE - gateway route /v1/campaigns/* → notification - /admin/marketing: create campaign (channel, audience, template/subject/body), list with status + sent counts, send, delete Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -40,6 +40,7 @@ func main() {
|
||||
prefH := handlers.NewPreferenceHandler(store)
|
||||
tplH := handlers.NewTemplateHandler(store)
|
||||
chH := handlers.NewChannelHandler(store)
|
||||
campH := handlers.NewCampaignHandler(store)
|
||||
|
||||
r := gin.Default()
|
||||
|
||||
@@ -93,6 +94,12 @@ func main() {
|
||||
v1.POST("/sms/send", auth, admin, chH.SendSMS)
|
||||
v1.POST("/email/send", auth, admin, chH.SendEmail)
|
||||
|
||||
// ── Marketing campaigns (admin) ───────────────────────────────────────────
|
||||
v1.GET("/campaigns", auth, admin, campH.List)
|
||||
v1.POST("/campaigns", auth, admin, campH.Create)
|
||||
v1.POST("/campaigns/:id/send", auth, admin, campH.Send)
|
||||
v1.DELETE("/campaigns/:id", auth, admin, campH.Delete)
|
||||
|
||||
// ── Internal: create notification (service-to-service) ───────────────────
|
||||
v1.POST("/internal/notifications", serviceAuth, notifH.CreateInternal)
|
||||
|
||||
|
||||
@@ -0,0 +1,110 @@
|
||||
package db
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/flatrender/notification-svc/internal/models"
|
||||
"github.com/google/uuid"
|
||||
"github.com/jackc/pgx/v5"
|
||||
)
|
||||
|
||||
func (s *Store) ListCampaigns(ctx context.Context, tenantID uuid.UUID) ([]*models.Campaign, error) {
|
||||
rows, err := s.pool.Query(ctx,
|
||||
`SELECT id, tenant_id, name, channel, audience, subject, body_html, template_code,
|
||||
status, total_count, sent_count, failed_count, created_at, sent_at
|
||||
FROM notification.campaigns WHERE tenant_id = $1 ORDER BY created_at DESC`, tenantID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
var out []*models.Campaign
|
||||
for rows.Next() {
|
||||
c := &models.Campaign{}
|
||||
if err := rows.Scan(&c.ID, &c.TenantID, &c.Name, &c.Channel, &c.Audience, &c.Subject,
|
||||
&c.BodyHTML, &c.TemplateCode, &c.Status, &c.TotalCount, &c.SentCount, &c.FailedCount,
|
||||
&c.CreatedAt, &c.SentAt); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
out = append(out, c)
|
||||
}
|
||||
return out, rows.Err()
|
||||
}
|
||||
|
||||
func (s *Store) GetCampaign(ctx context.Context, id, tenantID uuid.UUID) (*models.Campaign, error) {
|
||||
c := &models.Campaign{}
|
||||
err := s.pool.QueryRow(ctx,
|
||||
`SELECT id, tenant_id, name, channel, audience, subject, body_html, template_code,
|
||||
status, total_count, sent_count, failed_count, created_at, sent_at
|
||||
FROM notification.campaigns WHERE id = $1 AND tenant_id = $2`, id, tenantID).
|
||||
Scan(&c.ID, &c.TenantID, &c.Name, &c.Channel, &c.Audience, &c.Subject, &c.BodyHTML,
|
||||
&c.TemplateCode, &c.Status, &c.TotalCount, &c.SentCount, &c.FailedCount, &c.CreatedAt, &c.SentAt)
|
||||
if err == pgx.ErrNoRows {
|
||||
return nil, nil
|
||||
}
|
||||
return c, err
|
||||
}
|
||||
|
||||
func (s *Store) CreateCampaign(ctx context.Context, tenantID uuid.UUID, req models.CreateCampaignRequest) (*models.Campaign, error) {
|
||||
audience := req.Audience
|
||||
if audience == "" {
|
||||
audience = "all"
|
||||
}
|
||||
var id uuid.UUID
|
||||
err := s.pool.QueryRow(ctx,
|
||||
`INSERT INTO notification.campaigns (tenant_id, name, channel, audience, subject, body_html, template_code)
|
||||
VALUES ($1,$2,$3,$4,$5,$6,$7) RETURNING id`,
|
||||
tenantID, req.Name, req.Channel, audience, req.Subject, req.BodyHTML, req.TemplateCode).Scan(&id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return s.GetCampaign(ctx, id, tenantID)
|
||||
}
|
||||
|
||||
func (s *Store) DeleteCampaign(ctx context.Context, id, tenantID uuid.UUID) error {
|
||||
_, err := s.pool.Exec(ctx, `DELETE FROM notification.campaigns WHERE id=$1 AND tenant_id=$2`, id, tenantID)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Store) UpdateCampaignResult(ctx context.Context, id uuid.UUID, status string, total, sent, failed int) error {
|
||||
_, err := s.pool.Exec(ctx,
|
||||
`UPDATE notification.campaigns SET status=$2, total_count=$3, sent_count=$4, failed_count=$5,
|
||||
sent_at = CASE WHEN $2 IN ('Sent','Failed') THEN NOW() ELSE sent_at END
|
||||
WHERE id=$1`, id, status, total, sent, failed)
|
||||
return err
|
||||
}
|
||||
|
||||
// ResolveAudience returns recipient contact strings (email or phone) for a campaign
|
||||
// segment, read cross-schema from identity.users (same database).
|
||||
func (s *Store) ResolveAudience(ctx context.Context, tenantID uuid.UUID, channel, audience string, limit int) ([]string, error) {
|
||||
col := "email"
|
||||
notNull := "email IS NOT NULL AND email <> ''"
|
||||
if channel == "sms" {
|
||||
col = "phone_number"
|
||||
notNull = "phone_number IS NOT NULL AND phone_number <> ''"
|
||||
}
|
||||
where := "tenant_id = $1 AND deleted_at IS NULL AND ban_account = FALSE AND " + notNull
|
||||
switch audience {
|
||||
case "verified":
|
||||
if channel == "sms" {
|
||||
where += " AND phone_verified = TRUE"
|
||||
} else {
|
||||
where += " AND email_verified = TRUE"
|
||||
}
|
||||
case "with_plan":
|
||||
where += " AND EXISTS (SELECT 1 FROM identity.user_plans up WHERE up.user_id = identity.users.id AND (up.expires_at IS NULL OR up.expires_at > NOW()))"
|
||||
}
|
||||
q := "SELECT " + col + " FROM identity.users WHERE " + where + " LIMIT $2"
|
||||
rows, err := s.pool.Query(ctx, q, tenantID, limit)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
var out []string
|
||||
for rows.Next() {
|
||||
var v string
|
||||
if err := rows.Scan(&v); err == nil && v != "" {
|
||||
out = append(out, v)
|
||||
}
|
||||
}
|
||||
return out, rows.Err()
|
||||
}
|
||||
@@ -0,0 +1,162 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/flatrender/notification-svc/internal/db"
|
||||
"github.com/flatrender/notification-svc/internal/middleware"
|
||||
"github.com/flatrender/notification-svc/internal/models"
|
||||
"github.com/flatrender/notification-svc/internal/sender"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
// Max recipients per campaign send (V1 sends synchronously).
|
||||
const campaignRecipientCap = 1000
|
||||
|
||||
type CampaignHandler struct{ store *db.Store }
|
||||
|
||||
func NewCampaignHandler(s *db.Store) *CampaignHandler { return &CampaignHandler{store: s} }
|
||||
|
||||
func (h *CampaignHandler) tenant(c *gin.Context) uuid.UUID {
|
||||
v, _ := c.Get(middleware.CtxTenantID)
|
||||
id, _ := v.(uuid.UUID)
|
||||
return id
|
||||
}
|
||||
func (h *CampaignHandler) uid(c *gin.Context) uuid.UUID {
|
||||
v, _ := c.Get(middleware.CtxUserID)
|
||||
id, _ := v.(uuid.UUID)
|
||||
return id
|
||||
}
|
||||
|
||||
// GET /v1/campaigns
|
||||
func (h *CampaignHandler) List(c *gin.Context) {
|
||||
list, err := h.store.ListCampaigns(c.Request.Context(), h.tenant(c))
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, models.APIError{Code: "db_error", Message: err.Error()})
|
||||
return
|
||||
}
|
||||
c.JSON(http.StatusOK, gin.H{"data": list})
|
||||
}
|
||||
|
||||
// POST /v1/campaigns
|
||||
func (h *CampaignHandler) Create(c *gin.Context) {
|
||||
var req models.CreateCampaignRequest
|
||||
if err := c.ShouldBindJSON(&req); err != nil {
|
||||
c.JSON(http.StatusBadRequest, models.APIError{Code: "bad_request", Message: err.Error()})
|
||||
return
|
||||
}
|
||||
if req.Channel != "sms" && req.Channel != "email" {
|
||||
c.JSON(http.StatusBadRequest, models.APIError{Code: "bad_request", Message: "channel must be sms or email"})
|
||||
return
|
||||
}
|
||||
camp, err := h.store.CreateCampaign(c.Request.Context(), h.tenant(c), req)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, models.APIError{Code: "db_error", Message: err.Error()})
|
||||
return
|
||||
}
|
||||
c.JSON(http.StatusOK, camp)
|
||||
}
|
||||
|
||||
// DELETE /v1/campaigns/:id
|
||||
func (h *CampaignHandler) Delete(c *gin.Context) {
|
||||
id, err := uuid.Parse(c.Param("id"))
|
||||
if err != nil {
|
||||
c.JSON(http.StatusBadRequest, models.APIError{Code: "bad_request", Message: "invalid id"})
|
||||
return
|
||||
}
|
||||
if err := h.store.DeleteCampaign(c.Request.Context(), id, h.tenant(c)); err != nil {
|
||||
c.JSON(http.StatusInternalServerError, models.APIError{Code: "db_error", Message: err.Error()})
|
||||
return
|
||||
}
|
||||
c.JSON(http.StatusOK, gin.H{"ok": true})
|
||||
}
|
||||
|
||||
// POST /v1/campaigns/:id/send
|
||||
func (h *CampaignHandler) Send(c *gin.Context) {
|
||||
ctx := c.Request.Context()
|
||||
id, err := uuid.Parse(c.Param("id"))
|
||||
if err != nil {
|
||||
c.JSON(http.StatusBadRequest, models.APIError{Code: "bad_request", Message: "invalid id"})
|
||||
return
|
||||
}
|
||||
tenant := h.tenant(c)
|
||||
camp, _ := h.store.GetCampaign(ctx, id, tenant)
|
||||
if camp == nil {
|
||||
c.JSON(http.StatusNotFound, models.APIError{Code: "not_found", Message: "campaign not found"})
|
||||
return
|
||||
}
|
||||
cfg, _ := h.store.GetChannelConfig(ctx, tenant, camp.Channel)
|
||||
if cfg == nil || !cfg.Enabled {
|
||||
c.JSON(http.StatusBadRequest, models.APIError{Code: "not_configured", Message: camp.Channel + " channel is not configured/enabled"})
|
||||
return
|
||||
}
|
||||
|
||||
// Resolve subject/body (from template for email, else campaign fields).
|
||||
subject, body := "", ""
|
||||
if camp.Subject != nil {
|
||||
subject = *camp.Subject
|
||||
}
|
||||
if camp.BodyHTML != nil {
|
||||
body = *camp.BodyHTML
|
||||
}
|
||||
if camp.Channel == "email" && camp.TemplateCode != nil && *camp.TemplateCode != "" {
|
||||
if tpl, _ := h.store.GetEmailTemplate(ctx, *camp.TemplateCode, "fa"); tpl != nil {
|
||||
if tpl.Subject != nil {
|
||||
subject = *tpl.Subject
|
||||
}
|
||||
if tpl.BodyHTML != nil {
|
||||
body = *tpl.BodyHTML
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
recipients, err := h.store.ResolveAudience(ctx, tenant, camp.Channel, camp.Audience, campaignRecipientCap)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, models.APIError{Code: "db_error", Message: err.Error()})
|
||||
return
|
||||
}
|
||||
total := len(recipients)
|
||||
_ = h.store.UpdateCampaignResult(ctx, id, "Sending", total, 0, 0)
|
||||
|
||||
var scfg sender.SMTPConfig
|
||||
if camp.Channel == "email" {
|
||||
scfg = sender.SMTPConfig{
|
||||
Host: str(cfg.Settings["host"]), Port: intv(cfg.Settings["port"]),
|
||||
Username: str(cfg.Settings["username"]), Password: str(cfg.Settings["password"]),
|
||||
FromEmail: str(cfg.Settings["from_email"]), FromName: str(cfg.Settings["from_name"]),
|
||||
UseTLS: boolv(cfg.Settings["use_tls"]),
|
||||
}
|
||||
}
|
||||
|
||||
sent, failed := 0, 0
|
||||
provider := map[string]string{"sms": "kavenegar", "email": "smtp"}[camp.Channel]
|
||||
chLabel := map[string]string{"sms": "SMS", "email": "Email"}[camp.Channel]
|
||||
for _, r := range recipients {
|
||||
var sendErr error
|
||||
if camp.Channel == "email" {
|
||||
sendErr = sender.SendEmail(scfg, r, subject, body)
|
||||
} else {
|
||||
_, sendErr = sender.SendSMS(str(cfg.Settings["api_key"]), str(cfg.Settings["line_number"]), r, body)
|
||||
}
|
||||
status := "Sent"
|
||||
var em *string
|
||||
if sendErr != nil {
|
||||
failed++
|
||||
status = "Failed"
|
||||
e := sendErr.Error()
|
||||
em = &e
|
||||
} else {
|
||||
sent++
|
||||
}
|
||||
subj := &subject
|
||||
_ = h.store.LogDelivery(ctx, tenant, h.uid(c), chLabel, r, subj, &provider, nil, &status, em)
|
||||
}
|
||||
|
||||
final := "Sent"
|
||||
if total > 0 && sent == 0 {
|
||||
final = "Failed"
|
||||
}
|
||||
_ = h.store.UpdateCampaignResult(ctx, id, final, total, sent, failed)
|
||||
c.JSON(http.StatusOK, gin.H{"status": final, "total": total, "sent": sent, "failed": failed})
|
||||
}
|
||||
@@ -223,3 +223,31 @@ type SendEmailRequest struct {
|
||||
Locale string `json:"locale"`
|
||||
Variables map[string]string `json:"variables"`
|
||||
}
|
||||
|
||||
// ── Marketing campaigns ──────────────────────────────────────────────────────
|
||||
|
||||
type Campaign struct {
|
||||
ID uuid.UUID `json:"id"`
|
||||
TenantID uuid.UUID `json:"tenant_id"`
|
||||
Name string `json:"name"`
|
||||
Channel string `json:"channel"` // sms | email
|
||||
Audience string `json:"audience"` // all | verified | with_plan
|
||||
Subject *string `json:"subject,omitempty"`
|
||||
BodyHTML *string `json:"body_html,omitempty"`
|
||||
TemplateCode *string `json:"template_code,omitempty"`
|
||||
Status string `json:"status"`
|
||||
TotalCount int `json:"total_count"`
|
||||
SentCount int `json:"sent_count"`
|
||||
FailedCount int `json:"failed_count"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
SentAt *time.Time `json:"sent_at,omitempty"`
|
||||
}
|
||||
|
||||
type CreateCampaignRequest struct {
|
||||
Name string `json:"name" binding:"required"`
|
||||
Channel string `json:"channel" binding:"required"` // sms | email
|
||||
Audience string `json:"audience"`
|
||||
Subject *string `json:"subject"`
|
||||
BodyHTML *string `json:"body_html"`
|
||||
TemplateCode *string `json:"template_code"`
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user