f4583f5169
Backend half of offline Phase 1. Lets the offline outbox replay a write after a
lost response without executing it twice (e.g. an order whose POST reached the
server but whose reply never came back).
- IdempotencyRecord entity + table (unique index on (Scope, Key)); migration
AddIdempotencyRecords. Standalone POCO — no tenant/soft-delete filters.
- IdempotencyMiddleware (after TenantMiddleware, before plan-limit/controllers):
opt-in via `Idempotency-Key` header on POST/PUT/PATCH/DELETE.
* Completed key → replays stored status+body with `Idempotent-Replay: true`.
* In-progress key → 409 IDEMPOTENCY_IN_PROGRESS; the unique index serializes
racing first requests; stale (>60s) reservations are recovered after a crash.
* Only <500 responses are cached; 5xx is released so the client can retry.
Bookkeeping runs in isolated DI scopes so it never contaminates the controller's
unit of work. Keys are scoped per café — no cross-tenant collisions.
- 5 middleware tests (replay/execute-once, distinct key, pass-through, tenant
isolation, 5xx-not-cached). Full suite 86 passing.
Next in Phase 1: generalize the POS order queue into a generic client outbox that
sends these keys and remaps client→server ids.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
189 lines
7.4 KiB
C#
189 lines
7.4 KiB
C#
using System.Text;
|
|
using Microsoft.EntityFrameworkCore;
|
|
using Meezi.Core.Entities;
|
|
using Meezi.Core.Enums;
|
|
using Meezi.Core.Interfaces;
|
|
using Meezi.Infrastructure.Data;
|
|
|
|
namespace Meezi.API.Middleware;
|
|
|
|
/// <summary>
|
|
/// Makes mutating requests safe to retry. A client (e.g. the offline outbox)
|
|
/// attaches an <c>Idempotency-Key</c> header; if the same key is seen again, the
|
|
/// original response is replayed instead of executing the write twice.
|
|
///
|
|
/// Bookkeeping runs in isolated DI scopes so it never mixes with the controller's
|
|
/// own DbContext unit of work. Opt-in via header → non-idempotent and binary/file
|
|
/// endpoints are unaffected unless the client explicitly sends a key.
|
|
/// </summary>
|
|
public class IdempotencyMiddleware
|
|
{
|
|
private const string HeaderName = "Idempotency-Key";
|
|
private const int MaxKeyLength = 200;
|
|
private const int MaxStoredBodyBytes = 256 * 1024;
|
|
/// <summary>An InProgress record older than this is assumed crashed mid-flight and re-run.</summary>
|
|
private static readonly TimeSpan StaleInProgress = TimeSpan.FromSeconds(60);
|
|
|
|
private readonly RequestDelegate _next;
|
|
private readonly ILogger<IdempotencyMiddleware> _logger;
|
|
|
|
public IdempotencyMiddleware(RequestDelegate next, ILogger<IdempotencyMiddleware> logger)
|
|
{
|
|
_next = next;
|
|
_logger = logger;
|
|
}
|
|
|
|
public async Task InvokeAsync(HttpContext context, ITenantContext tenant, IServiceScopeFactory scopeFactory)
|
|
{
|
|
var method = context.Request.Method;
|
|
var isMutating = HttpMethods.IsPost(method) || HttpMethods.IsPut(method)
|
|
|| HttpMethods.IsPatch(method) || HttpMethods.IsDelete(method);
|
|
|
|
if (!isMutating || !context.Request.Headers.TryGetValue(HeaderName, out var headerValues))
|
|
{
|
|
await _next(context);
|
|
return;
|
|
}
|
|
|
|
var key = headerValues.ToString();
|
|
if (string.IsNullOrWhiteSpace(key) || key.Length > MaxKeyLength)
|
|
{
|
|
// Unusable key — behave as if it wasn't sent rather than reject the write.
|
|
await _next(context);
|
|
return;
|
|
}
|
|
|
|
var scope = string.IsNullOrEmpty(tenant.CafeId) ? "global" : tenant.CafeId;
|
|
var path = context.Request.Path.Value ?? string.Empty;
|
|
|
|
// 1) Look for an existing record for this (tenant, key).
|
|
await using (var lookupScope = scopeFactory.CreateAsyncScope())
|
|
{
|
|
var db = lookupScope.ServiceProvider.GetRequiredService<AppDbContext>();
|
|
var existing = await db.IdempotencyRecords.AsNoTracking()
|
|
.FirstOrDefaultAsync(r => r.Scope == scope && r.Key == key, context.RequestAborted);
|
|
|
|
if (existing is not null)
|
|
{
|
|
if (existing.Status == IdempotencyStatus.Completed)
|
|
{
|
|
await ReplayAsync(context, existing);
|
|
return;
|
|
}
|
|
if (DateTime.UtcNow - existing.CreatedAt < StaleInProgress)
|
|
{
|
|
await WriteConflictAsync(context); // genuine concurrent duplicate
|
|
return;
|
|
}
|
|
// Stale reservation (process likely crashed mid-flight) — drop and re-run.
|
|
_logger.LogWarning("Recovering stale idempotency reservation {Key} for scope {Scope}", key, scope);
|
|
var stale = await db.IdempotencyRecords
|
|
.FirstOrDefaultAsync(r => r.Id == existing.Id, context.RequestAborted);
|
|
if (stale is not null)
|
|
{
|
|
db.IdempotencyRecords.Remove(stale);
|
|
await db.SaveChangesAsync(context.RequestAborted);
|
|
}
|
|
}
|
|
}
|
|
|
|
// 2) Reserve the key. The unique (Scope, Key) index serializes racing first requests.
|
|
var record = new IdempotencyRecord
|
|
{
|
|
Scope = scope,
|
|
Key = key,
|
|
Method = method,
|
|
Path = path,
|
|
Status = IdempotencyStatus.InProgress,
|
|
};
|
|
try
|
|
{
|
|
await using var reserveScope = scopeFactory.CreateAsyncScope();
|
|
var db = reserveScope.ServiceProvider.GetRequiredService<AppDbContext>();
|
|
db.IdempotencyRecords.Add(record);
|
|
await db.SaveChangesAsync(context.RequestAborted);
|
|
}
|
|
catch (DbUpdateException)
|
|
{
|
|
await WriteConflictAsync(context); // another request won the reservation race
|
|
return;
|
|
}
|
|
|
|
// 3) Run the real request, capturing its response.
|
|
var originalBody = context.Response.Body;
|
|
await using var buffer = new MemoryStream();
|
|
context.Response.Body = buffer;
|
|
try
|
|
{
|
|
await _next(context);
|
|
}
|
|
catch
|
|
{
|
|
context.Response.Body = originalBody;
|
|
await DeleteAsync(scopeFactory, record.Id);
|
|
throw;
|
|
}
|
|
|
|
var statusCode = context.Response.StatusCode;
|
|
buffer.Position = 0;
|
|
var bytes = buffer.ToArray();
|
|
context.Response.Body = originalBody;
|
|
if (bytes.Length > 0)
|
|
await originalBody.WriteAsync(bytes, context.RequestAborted);
|
|
|
|
// 4) Persist the result so retries replay it — except 5xx, which is transient and
|
|
// released so the client can retry the same key.
|
|
if (statusCode is >= 200 and < 500)
|
|
{
|
|
var storedBody = bytes.Length is > 0 and <= MaxStoredBodyBytes
|
|
? Encoding.UTF8.GetString(bytes)
|
|
: null;
|
|
await CompleteAsync(scopeFactory, record.Id, statusCode, storedBody);
|
|
}
|
|
else
|
|
{
|
|
await DeleteAsync(scopeFactory, record.Id);
|
|
}
|
|
}
|
|
|
|
private static async Task ReplayAsync(HttpContext context, IdempotencyRecord record)
|
|
{
|
|
context.Response.StatusCode = record.ResponseStatusCode;
|
|
context.Response.ContentType = "application/json; charset=utf-8";
|
|
context.Response.Headers["Idempotent-Replay"] = "true";
|
|
if (!string.IsNullOrEmpty(record.ResponseBody))
|
|
await context.Response.WriteAsync(record.ResponseBody);
|
|
}
|
|
|
|
private static async Task WriteConflictAsync(HttpContext context)
|
|
{
|
|
context.Response.StatusCode = StatusCodes.Status409Conflict;
|
|
context.Response.ContentType = "application/json; charset=utf-8";
|
|
await context.Response.WriteAsync(
|
|
"{\"success\":false,\"data\":null,\"error\":{\"code\":\"IDEMPOTENCY_IN_PROGRESS\",\"message\":\"A request with this key is still being processed.\"}}");
|
|
}
|
|
|
|
private static async Task CompleteAsync(IServiceScopeFactory f, string id, int status, string? body)
|
|
{
|
|
await using var s = f.CreateAsyncScope();
|
|
var db = s.ServiceProvider.GetRequiredService<AppDbContext>();
|
|
var rec = await db.IdempotencyRecords.FirstOrDefaultAsync(r => r.Id == id);
|
|
if (rec is null) return;
|
|
rec.Status = IdempotencyStatus.Completed;
|
|
rec.ResponseStatusCode = status;
|
|
rec.ResponseBody = body;
|
|
rec.CompletedAt = DateTime.UtcNow;
|
|
await db.SaveChangesAsync();
|
|
}
|
|
|
|
private static async Task DeleteAsync(IServiceScopeFactory f, string id)
|
|
{
|
|
await using var s = f.CreateAsyncScope();
|
|
var db = s.ServiceProvider.GetRequiredService<AppDbContext>();
|
|
var rec = await db.IdempotencyRecords.FirstOrDefaultAsync(r => r.Id == id);
|
|
if (rec is null) return;
|
|
db.IdempotencyRecords.Remove(rec);
|
|
await db.SaveChangesAsync();
|
|
}
|
|
}
|