feat(api/offline): idempotency-key middleware for safe write retries

Backend half of offline Phase 1. Lets the offline outbox replay a write after a
lost response without executing it twice (e.g. an order whose POST reached the
server but whose reply never came back).

- IdempotencyRecord entity + table (unique index on (Scope, Key)); migration
  AddIdempotencyRecords. Standalone POCO — no tenant/soft-delete filters.
- IdempotencyMiddleware (after TenantMiddleware, before plan-limit/controllers):
  opt-in via `Idempotency-Key` header on POST/PUT/PATCH/DELETE.
    * Completed key → replays stored status+body with `Idempotent-Replay: true`.
    * In-progress key → 409 IDEMPOTENCY_IN_PROGRESS; the unique index serializes
      racing first requests; stale (>60s) reservations are recovered after a crash.
    * Only <500 responses are cached; 5xx is released so the client can retry.
  Bookkeeping runs in isolated DI scopes so it never contaminates the controller's
  unit of work. Keys are scoped per café — no cross-tenant collisions.
- 5 middleware tests (replay/execute-once, distinct key, pass-through, tenant
  isolation, 5xx-not-cached). Full suite 86 passing.

Next in Phase 1: generalize the POS order queue into a generic client outbox that
sends these keys and remaps client→server ids.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
soroush.asadi
2026-06-02 18:03:57 +03:30
parent 132f0921e0
commit f4583f5169
9 changed files with 3868 additions and 0 deletions
@@ -215,6 +215,9 @@ public static class ServiceCollectionExtensions
app.UseMeeziSecurity(); app.UseMeeziSecurity();
app.UseAuthentication(); app.UseAuthentication();
app.UseMiddleware<Middleware.TenantMiddleware>(); app.UseMiddleware<Middleware.TenantMiddleware>();
// After tenant context (keys are scoped per café), before plan-limit + controllers
// so a replayed write short-circuits without re-consuming limits or re-executing.
app.UseMiddleware<Middleware.IdempotencyMiddleware>();
app.UseMiddleware<Middleware.PlanLimitMiddleware>(); app.UseMiddleware<Middleware.PlanLimitMiddleware>();
app.UseAuthorization(); app.UseAuthorization();
@@ -0,0 +1,188 @@
using System.Text;
using Microsoft.EntityFrameworkCore;
using Meezi.Core.Entities;
using Meezi.Core.Enums;
using Meezi.Core.Interfaces;
using Meezi.Infrastructure.Data;
namespace Meezi.API.Middleware;
/// <summary>
/// Makes mutating requests safe to retry. A client (e.g. the offline outbox)
/// attaches an <c>Idempotency-Key</c> header; if the same key is seen again, the
/// original response is replayed instead of executing the write twice.
///
/// Bookkeeping runs in isolated DI scopes so it never mixes with the controller's
/// own DbContext unit of work. Opt-in via header → non-idempotent and binary/file
/// endpoints are unaffected unless the client explicitly sends a key.
/// </summary>
public class IdempotencyMiddleware
{
private const string HeaderName = "Idempotency-Key";
private const int MaxKeyLength = 200;
private const int MaxStoredBodyBytes = 256 * 1024;
/// <summary>An InProgress record older than this is assumed crashed mid-flight and re-run.</summary>
private static readonly TimeSpan StaleInProgress = TimeSpan.FromSeconds(60);
private readonly RequestDelegate _next;
private readonly ILogger<IdempotencyMiddleware> _logger;
public IdempotencyMiddleware(RequestDelegate next, ILogger<IdempotencyMiddleware> logger)
{
_next = next;
_logger = logger;
}
public async Task InvokeAsync(HttpContext context, ITenantContext tenant, IServiceScopeFactory scopeFactory)
{
var method = context.Request.Method;
var isMutating = HttpMethods.IsPost(method) || HttpMethods.IsPut(method)
|| HttpMethods.IsPatch(method) || HttpMethods.IsDelete(method);
if (!isMutating || !context.Request.Headers.TryGetValue(HeaderName, out var headerValues))
{
await _next(context);
return;
}
var key = headerValues.ToString();
if (string.IsNullOrWhiteSpace(key) || key.Length > MaxKeyLength)
{
// Unusable key — behave as if it wasn't sent rather than reject the write.
await _next(context);
return;
}
var scope = string.IsNullOrEmpty(tenant.CafeId) ? "global" : tenant.CafeId;
var path = context.Request.Path.Value ?? string.Empty;
// 1) Look for an existing record for this (tenant, key).
await using (var lookupScope = scopeFactory.CreateAsyncScope())
{
var db = lookupScope.ServiceProvider.GetRequiredService<AppDbContext>();
var existing = await db.IdempotencyRecords.AsNoTracking()
.FirstOrDefaultAsync(r => r.Scope == scope && r.Key == key, context.RequestAborted);
if (existing is not null)
{
if (existing.Status == IdempotencyStatus.Completed)
{
await ReplayAsync(context, existing);
return;
}
if (DateTime.UtcNow - existing.CreatedAt < StaleInProgress)
{
await WriteConflictAsync(context); // genuine concurrent duplicate
return;
}
// Stale reservation (process likely crashed mid-flight) — drop and re-run.
_logger.LogWarning("Recovering stale idempotency reservation {Key} for scope {Scope}", key, scope);
var stale = await db.IdempotencyRecords
.FirstOrDefaultAsync(r => r.Id == existing.Id, context.RequestAborted);
if (stale is not null)
{
db.IdempotencyRecords.Remove(stale);
await db.SaveChangesAsync(context.RequestAborted);
}
}
}
// 2) Reserve the key. The unique (Scope, Key) index serializes racing first requests.
var record = new IdempotencyRecord
{
Scope = scope,
Key = key,
Method = method,
Path = path,
Status = IdempotencyStatus.InProgress,
};
try
{
await using var reserveScope = scopeFactory.CreateAsyncScope();
var db = reserveScope.ServiceProvider.GetRequiredService<AppDbContext>();
db.IdempotencyRecords.Add(record);
await db.SaveChangesAsync(context.RequestAborted);
}
catch (DbUpdateException)
{
await WriteConflictAsync(context); // another request won the reservation race
return;
}
// 3) Run the real request, capturing its response.
var originalBody = context.Response.Body;
await using var buffer = new MemoryStream();
context.Response.Body = buffer;
try
{
await _next(context);
}
catch
{
context.Response.Body = originalBody;
await DeleteAsync(scopeFactory, record.Id);
throw;
}
var statusCode = context.Response.StatusCode;
buffer.Position = 0;
var bytes = buffer.ToArray();
context.Response.Body = originalBody;
if (bytes.Length > 0)
await originalBody.WriteAsync(bytes, context.RequestAborted);
// 4) Persist the result so retries replay it — except 5xx, which is transient and
// released so the client can retry the same key.
if (statusCode is >= 200 and < 500)
{
var storedBody = bytes.Length is > 0 and <= MaxStoredBodyBytes
? Encoding.UTF8.GetString(bytes)
: null;
await CompleteAsync(scopeFactory, record.Id, statusCode, storedBody);
}
else
{
await DeleteAsync(scopeFactory, record.Id);
}
}
private static async Task ReplayAsync(HttpContext context, IdempotencyRecord record)
{
context.Response.StatusCode = record.ResponseStatusCode;
context.Response.ContentType = "application/json; charset=utf-8";
context.Response.Headers["Idempotent-Replay"] = "true";
if (!string.IsNullOrEmpty(record.ResponseBody))
await context.Response.WriteAsync(record.ResponseBody);
}
private static async Task WriteConflictAsync(HttpContext context)
{
context.Response.StatusCode = StatusCodes.Status409Conflict;
context.Response.ContentType = "application/json; charset=utf-8";
await context.Response.WriteAsync(
"{\"success\":false,\"data\":null,\"error\":{\"code\":\"IDEMPOTENCY_IN_PROGRESS\",\"message\":\"A request with this key is still being processed.\"}}");
}
private static async Task CompleteAsync(IServiceScopeFactory f, string id, int status, string? body)
{
await using var s = f.CreateAsyncScope();
var db = s.ServiceProvider.GetRequiredService<AppDbContext>();
var rec = await db.IdempotencyRecords.FirstOrDefaultAsync(r => r.Id == id);
if (rec is null) return;
rec.Status = IdempotencyStatus.Completed;
rec.ResponseStatusCode = status;
rec.ResponseBody = body;
rec.CompletedAt = DateTime.UtcNow;
await db.SaveChangesAsync();
}
private static async Task DeleteAsync(IServiceScopeFactory f, string id)
{
await using var s = f.CreateAsyncScope();
var db = s.ServiceProvider.GetRequiredService<AppDbContext>();
var rec = await db.IdempotencyRecords.FirstOrDefaultAsync(r => r.Id == id);
if (rec is null) return;
db.IdempotencyRecords.Remove(rec);
await db.SaveChangesAsync();
}
}
@@ -0,0 +1,31 @@
using Meezi.Core.Enums;
namespace Meezi.Core.Entities;
/// <summary>
/// Records a client-supplied Idempotency-Key so a retried write (e.g. an order
/// replayed from the offline outbox after a lost response) returns the original
/// result instead of executing twice. Standalone POCO — deliberately not a
/// TenantEntity, to avoid soft-delete/tenant query filters.
/// </summary>
public class IdempotencyRecord
{
public string Id { get; set; } = Guid.NewGuid().ToString("N");
/// <summary>Tenant scope (CafeId), or "global" for non-tenant requests.</summary>
public string Scope { get; set; } = "global";
/// <summary>The client-supplied Idempotency-Key header value.</summary>
public string Key { get; set; } = string.Empty;
public string Method { get; set; } = string.Empty;
public string Path { get; set; } = string.Empty;
public IdempotencyStatus Status { get; set; } = IdempotencyStatus.InProgress;
public int ResponseStatusCode { get; set; }
public string? ResponseBody { get; set; }
public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
public DateTime? CompletedAt { get; set; }
}
@@ -0,0 +1,9 @@
namespace Meezi.Core.Enums;
public enum IdempotencyStatus
{
/// <summary>Reserved; the original request is still executing.</summary>
InProgress = 0,
/// <summary>Finished; the stored response is replayed on duplicate keys.</summary>
Completed = 1
}
@@ -82,10 +82,25 @@ public class AppDbContext : DbContext
// Immutable audit trail of sensitive POS / management actions. // Immutable audit trail of sensitive POS / management actions.
public DbSet<AuditLog> AuditLogs => Set<AuditLog>(); public DbSet<AuditLog> AuditLogs => Set<AuditLog>();
// Idempotency keys for safe retry of offline-replayed writes.
public DbSet<IdempotencyRecord> IdempotencyRecords => Set<IdempotencyRecord>();
protected override void OnModelCreating(ModelBuilder modelBuilder) protected override void OnModelCreating(ModelBuilder modelBuilder)
{ {
base.OnModelCreating(modelBuilder); base.OnModelCreating(modelBuilder);
modelBuilder.Entity<IdempotencyRecord>(e =>
{
e.HasKey(x => x.Id);
// One result per (tenant, key). The unique index also serializes
// concurrent first-time requests carrying the same key.
e.HasIndex(x => new { x.Scope, x.Key }).IsUnique();
e.Property(x => x.Scope).HasMaxLength(64).IsRequired();
e.Property(x => x.Key).HasMaxLength(200).IsRequired();
e.Property(x => x.Method).HasMaxLength(10).IsRequired();
e.Property(x => x.Path).HasMaxLength(512).IsRequired();
});
modelBuilder.Entity<PushDevice>(e => modelBuilder.Entity<PushDevice>(e =>
{ {
e.HasKey(x => x.Id); e.HasKey(x => x.Id);
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,48 @@
using System;
using Microsoft.EntityFrameworkCore.Migrations;
#nullable disable
namespace Meezi.Infrastructure.Data.Migrations
{
/// <inheritdoc />
public partial class AddIdempotencyRecords : Migration
{
/// <inheritdoc />
protected override void Up(MigrationBuilder migrationBuilder)
{
migrationBuilder.CreateTable(
name: "IdempotencyRecords",
columns: table => new
{
Id = table.Column<string>(type: "text", nullable: false),
Scope = table.Column<string>(type: "character varying(64)", maxLength: 64, nullable: false),
Key = table.Column<string>(type: "character varying(200)", maxLength: 200, nullable: false),
Method = table.Column<string>(type: "character varying(10)", maxLength: 10, nullable: false),
Path = table.Column<string>(type: "character varying(512)", maxLength: 512, nullable: false),
Status = table.Column<int>(type: "integer", nullable: false),
ResponseStatusCode = table.Column<int>(type: "integer", nullable: false),
ResponseBody = table.Column<string>(type: "text", nullable: true),
CreatedAt = table.Column<DateTime>(type: "timestamp with time zone", nullable: false),
CompletedAt = table.Column<DateTime>(type: "timestamp with time zone", nullable: true)
},
constraints: table =>
{
table.PrimaryKey("PK_IdempotencyRecords", x => x.Id);
});
migrationBuilder.CreateIndex(
name: "IX_IdempotencyRecords_Scope_Key",
table: "IdempotencyRecords",
columns: new[] { "Scope", "Key" },
unique: true);
}
/// <inheritdoc />
protected override void Down(MigrationBuilder migrationBuilder)
{
migrationBuilder.DropTable(
name: "IdempotencyRecords");
}
}
}
@@ -1129,6 +1129,54 @@ namespace Meezi.Infrastructure.Data.Migrations
b.ToTable("Expenses"); b.ToTable("Expenses");
}); });
modelBuilder.Entity("Meezi.Core.Entities.IdempotencyRecord", b =>
{
b.Property<string>("Id")
.HasColumnType("text");
b.Property<DateTime?>("CompletedAt")
.HasColumnType("timestamp with time zone");
b.Property<DateTime>("CreatedAt")
.HasColumnType("timestamp with time zone");
b.Property<string>("Key")
.IsRequired()
.HasMaxLength(200)
.HasColumnType("character varying(200)");
b.Property<string>("Method")
.IsRequired()
.HasMaxLength(10)
.HasColumnType("character varying(10)");
b.Property<string>("Path")
.IsRequired()
.HasMaxLength(512)
.HasColumnType("character varying(512)");
b.Property<string>("ResponseBody")
.HasColumnType("text");
b.Property<int>("ResponseStatusCode")
.HasColumnType("integer");
b.Property<string>("Scope")
.IsRequired()
.HasMaxLength(64)
.HasColumnType("character varying(64)");
b.Property<int>("Status")
.HasColumnType("integer");
b.HasKey("Id");
b.HasIndex("Scope", "Key")
.IsUnique();
b.ToTable("IdempotencyRecords");
});
modelBuilder.Entity("Meezi.Core.Entities.Ingredient", b => modelBuilder.Entity("Meezi.Core.Entities.Ingredient", b =>
{ {
b.Property<string>("Id") b.Property<string>("Id")
@@ -0,0 +1,162 @@
using Microsoft.AspNetCore.Http;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging.Abstractions;
using Meezi.API.Middleware;
using Meezi.Core.Enums;
using Meezi.Core.Interfaces;
using Meezi.Infrastructure.Data;
using Xunit;
namespace Meezi.API.Tests;
public class IdempotencyMiddlewareTests
{
private sealed class TestTenant(string? cafeId) : ITenantContext
{
public string? UserId => "user-1";
public string? CafeId => cafeId;
public EmployeeRole? Role => EmployeeRole.Owner;
public PlanTier? PlanTier => Core.Enums.PlanTier.Pro;
public string? Language => "fa";
public string? BranchId => null;
public bool IsSystemAdmin => false;
public bool IsAuthenticated => true;
}
/// <summary>A scope factory whose scopes share one in-memory database, mirroring how the
/// middleware opens isolated DI scopes against the same store in production.</summary>
private static IServiceScopeFactory BuildScopeFactory()
{
var dbName = Guid.NewGuid().ToString();
var services = new ServiceCollection();
services.AddDbContext<AppDbContext>(o => o.UseInMemoryDatabase(dbName));
services.AddLogging();
return services.BuildServiceProvider().GetRequiredService<IServiceScopeFactory>();
}
private static DefaultHttpContext NewPost(string? key)
{
var ctx = new DefaultHttpContext();
ctx.Request.Method = "POST";
ctx.Request.Path = "/api/test";
if (key is not null) ctx.Request.Headers["Idempotency-Key"] = key;
ctx.Response.Body = new MemoryStream();
return ctx;
}
private static string ReadBody(HttpContext ctx)
{
ctx.Response.Body.Position = 0;
return new StreamReader(ctx.Response.Body).ReadToEnd();
}
[Fact]
public async Task SameKey_ExecutesOnce_AndReplaysStoredResponse()
{
var scopeFactory = BuildScopeFactory();
var tenant = new TestTenant("cafe-1");
var calls = 0;
RequestDelegate next = async ctx =>
{
calls++;
ctx.Response.StatusCode = 200;
await ctx.Response.WriteAsync($"{{\"v\":\"{Guid.NewGuid():N}\"}}");
};
var mw = new IdempotencyMiddleware(next, NullLogger<IdempotencyMiddleware>.Instance);
var c1 = NewPost("KEY-1");
await mw.InvokeAsync(c1, tenant, scopeFactory);
var body1 = ReadBody(c1);
var c2 = NewPost("KEY-1");
await mw.InvokeAsync(c2, tenant, scopeFactory);
var body2 = ReadBody(c2);
Assert.Equal(1, calls); // executed exactly once
Assert.Equal(body1, body2); // second call replays the stored body verbatim
Assert.Equal(200, c2.Response.StatusCode);
Assert.Equal("true", c2.Response.Headers["Idempotent-Replay"].ToString());
}
[Fact]
public async Task DifferentKey_ExecutesAgain()
{
var scopeFactory = BuildScopeFactory();
var tenant = new TestTenant("cafe-1");
var calls = 0;
RequestDelegate next = async ctx =>
{
calls++;
ctx.Response.StatusCode = 200;
await ctx.Response.WriteAsync("{\"ok\":true}");
};
var mw = new IdempotencyMiddleware(next, NullLogger<IdempotencyMiddleware>.Instance);
await mw.InvokeAsync(NewPost("A"), tenant, scopeFactory);
await mw.InvokeAsync(NewPost("B"), tenant, scopeFactory);
Assert.Equal(2, calls);
}
[Fact]
public async Task NoKey_PassesThrough_NoIdempotency()
{
var scopeFactory = BuildScopeFactory();
var tenant = new TestTenant("cafe-1");
var calls = 0;
RequestDelegate next = ctx =>
{
calls++;
ctx.Response.StatusCode = 200;
return Task.CompletedTask;
};
var mw = new IdempotencyMiddleware(next, NullLogger<IdempotencyMiddleware>.Instance);
await mw.InvokeAsync(NewPost(null), tenant, scopeFactory);
await mw.InvokeAsync(NewPost(null), tenant, scopeFactory);
Assert.Equal(2, calls);
}
[Fact]
public async Task SameKey_DifferentTenant_IsNotReplayed()
{
var scopeFactory = BuildScopeFactory();
var calls = 0;
RequestDelegate next = async ctx =>
{
calls++;
ctx.Response.StatusCode = 200;
await ctx.Response.WriteAsync("{\"ok\":true}");
};
var mw = new IdempotencyMiddleware(next, NullLogger<IdempotencyMiddleware>.Instance);
await mw.InvokeAsync(NewPost("SHARED"), new TestTenant("cafe-A"), scopeFactory);
await mw.InvokeAsync(NewPost("SHARED"), new TestTenant("cafe-B"), scopeFactory);
Assert.Equal(2, calls); // keys are scoped per café — no cross-tenant collision
}
[Fact]
public async Task ServerError_IsNotCached_SoRetryReexecutes()
{
var scopeFactory = BuildScopeFactory();
var tenant = new TestTenant("cafe-1");
var calls = 0;
RequestDelegate next = async ctx =>
{
calls++;
ctx.Response.StatusCode = 500;
await ctx.Response.WriteAsync("{\"error\":\"boom\"}");
};
var mw = new IdempotencyMiddleware(next, NullLogger<IdempotencyMiddleware>.Instance);
await mw.InvokeAsync(NewPost("KEY-5XX"), tenant, scopeFactory);
var c2 = NewPost("KEY-5XX");
await mw.InvokeAsync(c2, tenant, scopeFactory);
Assert.Equal(2, calls); // 5xx is transient → reservation released, retry runs again
Assert.NotEqual("true", c2.Response.Headers["Idempotent-Replay"].ToString());
}
}