From 09eaf360a34405261e1e636766ccc655e7827d35 Mon Sep 17 00:00:00 2001 From: "soroush.asadi" Date: Wed, 10 Jun 2026 01:16:37 +0330 Subject: [PATCH] M4: agent-run job queue + worker drain (Increment 1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit SharedKernel: IWorkerModule seam (RegisterWorker runs in the worker host only). Bootstrap: AddTeamUpWorkerServices; the worker host now wires it. Assembler module (schema "assembler", InitialAssembler migration): - Job (Pending→Processing→Done/Failed) + AgentRun (Queued→Running→Completed/Failed) entities. - JobQueue: enqueue + ClaimNextAsync using `FOR UPDATE SKIP LOCKED` in a transaction. - AgentRunExecutor (Increment-1 placeholder — real assemble/model/parse lands in Increment 2). - JobProcessor BackgroundService drains the queue on the worker host (web off the model path). - POST /api/assembler/runs enqueues a run; GET /api/assembler/runs/{id} reads it. Verified: build green; ArchitectureTests 8/8 (Assembler references only SharedKernel); IntegrationTests 28/28 incl. enqueue→claim(SKIP LOCKED)→process→Completed. Co-Authored-By: Claude Opus 4.8 --- .../TeamUpModuleExtensions.cs | 17 +++ src/Hosts/TeamUp.Worker/Program.cs | 1 + .../AssemblerModule.cs | 37 +++-- .../Domain/AgentRun.cs | 72 +++++++++ .../TeamUp.Modules.Assembler/Domain/Job.cs | 61 ++++++++ .../Endpoints/AssemblerDtos.cs | 15 ++ .../Endpoints/AssemblerEndpoints.cs | 47 ++++++ .../Persistence/AssemblerDbContext.cs | 39 +++++ .../Persistence/AssemblerDbContextFactory.cs | 21 +++ ...0260609214035_InitialAssembler.Designer.cs | 138 ++++++++++++++++++ .../20260609214035_InitialAssembler.cs | 95 ++++++++++++ .../AssemblerDbContextModelSnapshot.cs | 135 +++++++++++++++++ .../Queue/JobQueue.cs | 40 +++++ .../Runtime/AgentRunExecutor.cs | 48 ++++++ .../TeamUp.Modules.Assembler.csproj | 19 ++- .../Worker/JobProcessor.cs | 50 +++++++ .../Modularity/IWorkerModule.cs | 14 ++ .../AgentRunQueueTests.cs | 74 ++++++++++ 18 files changed, 906 insertions(+), 17 deletions(-) create mode 100644 src/Modules/TeamUp.Modules.Assembler/Domain/AgentRun.cs create mode 100644 src/Modules/TeamUp.Modules.Assembler/Domain/Job.cs create mode 100644 src/Modules/TeamUp.Modules.Assembler/Endpoints/AssemblerDtos.cs create mode 100644 src/Modules/TeamUp.Modules.Assembler/Endpoints/AssemblerEndpoints.cs create mode 100644 src/Modules/TeamUp.Modules.Assembler/Persistence/AssemblerDbContext.cs create mode 100644 src/Modules/TeamUp.Modules.Assembler/Persistence/AssemblerDbContextFactory.cs create mode 100644 src/Modules/TeamUp.Modules.Assembler/Persistence/Migrations/20260609214035_InitialAssembler.Designer.cs create mode 100644 src/Modules/TeamUp.Modules.Assembler/Persistence/Migrations/20260609214035_InitialAssembler.cs create mode 100644 src/Modules/TeamUp.Modules.Assembler/Persistence/Migrations/AssemblerDbContextModelSnapshot.cs create mode 100644 src/Modules/TeamUp.Modules.Assembler/Queue/JobQueue.cs create mode 100644 src/Modules/TeamUp.Modules.Assembler/Runtime/AgentRunExecutor.cs create mode 100644 src/Modules/TeamUp.Modules.Assembler/Worker/JobProcessor.cs create mode 100644 src/Shared/TeamUp.SharedKernel/Modularity/IWorkerModule.cs create mode 100644 tests/TeamUp.IntegrationTests/AgentRunQueueTests.cs diff --git a/src/Bootstrap/TeamUp.Bootstrap/TeamUpModuleExtensions.cs b/src/Bootstrap/TeamUp.Bootstrap/TeamUpModuleExtensions.cs index e07f39e..1038fad 100644 --- a/src/Bootstrap/TeamUp.Bootstrap/TeamUpModuleExtensions.cs +++ b/src/Bootstrap/TeamUp.Bootstrap/TeamUpModuleExtensions.cs @@ -1,6 +1,7 @@ using Microsoft.AspNetCore.Routing; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; +using TeamUp.SharedKernel.Modularity; namespace TeamUp.Bootstrap; @@ -29,4 +30,20 @@ public static class TeamUpModuleExtensions return endpoints; } + + /// Runs RegisterWorker for modules with background services. WORKER host only. + public static IServiceCollection AddTeamUpWorkerServices( + this IServiceCollection services, + IConfiguration configuration) + { + foreach (var module in ModuleCatalog.All) + { + if (module is IWorkerModule workerModule) + { + workerModule.RegisterWorker(services, configuration); + } + } + + return services; + } } diff --git a/src/Hosts/TeamUp.Worker/Program.cs b/src/Hosts/TeamUp.Worker/Program.cs index cfc994d..2d2ef01 100644 --- a/src/Hosts/TeamUp.Worker/Program.cs +++ b/src/Hosts/TeamUp.Worker/Program.cs @@ -13,6 +13,7 @@ builder.Services.AddSerilog((services, configuration) => configuration builder.Services.AddTeamUpObservability(builder.Configuration, serviceName: "teamup-worker"); builder.Services.AddTeamUpPersistence(builder.Configuration); builder.Services.AddTeamUpModules(builder.Configuration); +builder.Services.AddTeamUpWorkerServices(builder.Configuration); // hosted services: the agent-run job drainer builder.Services.AddHostedService(); var host = builder.Build(); diff --git a/src/Modules/TeamUp.Modules.Assembler/AssemblerModule.cs b/src/Modules/TeamUp.Modules.Assembler/AssemblerModule.cs index 2cb64ba..676affe 100644 --- a/src/Modules/TeamUp.Modules.Assembler/AssemblerModule.cs +++ b/src/Modules/TeamUp.Modules.Assembler/AssemblerModule.cs @@ -1,27 +1,40 @@ -using Microsoft.AspNetCore.Builder; -using Microsoft.AspNetCore.Http; using Microsoft.AspNetCore.Routing; +using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; +using TeamUp.Modules.Assembler.Endpoints; +using TeamUp.Modules.Assembler.Persistence; +using TeamUp.Modules.Assembler.Queue; +using TeamUp.Modules.Assembler.Runtime; +using TeamUp.Modules.Assembler.Worker; using TeamUp.SharedKernel.Modularity; +using TeamUp.SharedKernel.Persistence; namespace TeamUp.Modules.Assembler; -/// Context assembly, the model call, output parsing, prompt caching — runs in the worker (M4). -public sealed class AssemblerModule : IModule +/// +/// Context assembly, the model call, output parsing — the agent runtime. The job queue + AgentRun +/// state live here; the drain runs in the worker host (RegisterWorker), the trigger on the web host. +/// +public sealed class AssemblerModule : IModule, IWorkerModule { public string Name => "assembler"; public void Register(IServiceCollection services, IConfiguration configuration) { - // Skeleton: no services yet. M4 introduces the jobs table (FOR UPDATE SKIP LOCKED), - // the AgentRun context, and the assembler pipeline (registered for the worker host). + var connectionString = configuration.GetConnectionString("Postgres") + ?? throw new InvalidOperationException("Missing connection string 'ConnectionStrings:Postgres'."); + + services.AddDbContext(options => options.UseNpgsql(connectionString)); + services.AddScoped(sp => sp.GetRequiredService()); + services.AddScoped(); + services.AddScoped(); + services.TryAddSingleton(TimeProvider.System); } - public void MapEndpoints(IEndpointRouteBuilder endpoints) - { - endpoints.MapGroup($"/api/{Name}") - .WithTags("Assembler") - .MapGet("/ping", () => TypedResults.Ok(new ModulePing(Name))); - } + public void RegisterWorker(IServiceCollection services, IConfiguration configuration) => + services.AddHostedService(); + + public void MapEndpoints(IEndpointRouteBuilder endpoints) => AssemblerEndpoints.Map(endpoints); } diff --git a/src/Modules/TeamUp.Modules.Assembler/Domain/AgentRun.cs b/src/Modules/TeamUp.Modules.Assembler/Domain/AgentRun.cs new file mode 100644 index 0000000..29bad89 --- /dev/null +++ b/src/Modules/TeamUp.Modules.Assembler/Domain/AgentRun.cs @@ -0,0 +1,72 @@ +using TeamUp.SharedKernel.Domain; + +namespace TeamUp.Modules.Assembler.Domain; + +internal enum AgentRunStatus +{ + Queued, + Running, + Completed, + Failed, +} + +/// +/// One execution of an AI seat against a task: the assembled prompt, the raw model output, the +/// parsed action + risk tag, and the reasoning/assembly trace. Nothing executes off this in M4 — +/// the action gate (M5) decides whether the parsed action runs or waits in review. +/// +internal sealed class AgentRun : Entity +{ + public Guid SeatId { get; private set; } + public Guid WorkItemId { get; private set; } + public Guid? AgentId { get; private set; } + public AgentRunStatus Status { get; private set; } + public string? Prompt { get; private set; } + public string? Output { get; private set; } + public string? ActionType { get; private set; } + public string? ActionRisk { get; private set; } + public string? ResultJson { get; private set; } + public string? Trace { get; private set; } + public string? Error { get; private set; } + public long? LatencyMs { get; private set; } + public DateTimeOffset CreatedAtUtc { get; private set; } + public DateTimeOffset? CompletedAtUtc { get; private set; } + + private AgentRun() + { + } + + public AgentRun(Guid seatId, Guid workItemId, DateTimeOffset createdAtUtc) + { + SeatId = seatId; + WorkItemId = workItemId; + Status = AgentRunStatus.Queued; + CreatedAtUtc = createdAtUtc; + } + + public void Start(Guid? agentId, string prompt, string? trace) + { + Status = AgentRunStatus.Running; + AgentId = agentId; + Prompt = prompt; + Trace = trace; + } + + public void Complete(string output, string actionType, string actionRisk, string? resultJson, long latencyMs, DateTimeOffset nowUtc) + { + Status = AgentRunStatus.Completed; + Output = output; + ActionType = actionType; + ActionRisk = actionRisk; + ResultJson = resultJson; + LatencyMs = latencyMs; + CompletedAtUtc = nowUtc; + } + + public void Fail(string error, DateTimeOffset nowUtc) + { + Status = AgentRunStatus.Failed; + Error = error; + CompletedAtUtc = nowUtc; + } +} diff --git a/src/Modules/TeamUp.Modules.Assembler/Domain/Job.cs b/src/Modules/TeamUp.Modules.Assembler/Domain/Job.cs new file mode 100644 index 0000000..8258b6d --- /dev/null +++ b/src/Modules/TeamUp.Modules.Assembler/Domain/Job.cs @@ -0,0 +1,61 @@ +using TeamUp.SharedKernel.Domain; + +namespace TeamUp.Modules.Assembler.Domain; + +internal enum JobStatus +{ + Pending, + Processing, + Done, + Failed, +} + +/// +/// A unit of background work, drained from Postgres with FOR UPDATE SKIP LOCKED by the worker. +/// The run lifecycle is domain state (kept explicit) rather than an opaque message-bus payload. +/// +internal sealed class Job : Entity +{ + public string Type { get; private set; } = null!; + public string Payload { get; private set; } = null!; + public JobStatus Status { get; private set; } + public int Attempts { get; private set; } + public string? LockedBy { get; private set; } + public DateTimeOffset? LockedAtUtc { get; private set; } + public string? Error { get; private set; } + public DateTimeOffset CreatedAtUtc { get; private set; } + public DateTimeOffset? CompletedAtUtc { get; private set; } + + private Job() + { + } + + public Job(string type, string payload, DateTimeOffset createdAtUtc) + { + Type = type; + Payload = payload; + Status = JobStatus.Pending; + CreatedAtUtc = createdAtUtc; + } + + public void MarkProcessing(string worker, DateTimeOffset nowUtc) + { + Status = JobStatus.Processing; + LockedBy = worker; + LockedAtUtc = nowUtc; + Attempts++; + } + + public void MarkDone(DateTimeOffset nowUtc) + { + Status = JobStatus.Done; + CompletedAtUtc = nowUtc; + } + + public void MarkFailed(string error, DateTimeOffset nowUtc) + { + Status = JobStatus.Failed; + Error = error; + CompletedAtUtc = nowUtc; + } +} diff --git a/src/Modules/TeamUp.Modules.Assembler/Endpoints/AssemblerDtos.cs b/src/Modules/TeamUp.Modules.Assembler/Endpoints/AssemblerDtos.cs new file mode 100644 index 0000000..087f5d5 --- /dev/null +++ b/src/Modules/TeamUp.Modules.Assembler/Endpoints/AssemblerDtos.cs @@ -0,0 +1,15 @@ +namespace TeamUp.Modules.Assembler.Endpoints; + +internal sealed record CreateRunRequest(Guid SeatId, Guid WorkItemId); + +internal sealed record RunResponse( + Guid Id, + Guid SeatId, + Guid WorkItemId, + Guid? AgentId, + string Status, + string? ActionType, + string? ActionRisk, + string? Prompt, + string? Output, + string? Error); diff --git a/src/Modules/TeamUp.Modules.Assembler/Endpoints/AssemblerEndpoints.cs b/src/Modules/TeamUp.Modules.Assembler/Endpoints/AssemblerEndpoints.cs new file mode 100644 index 0000000..63af9f3 --- /dev/null +++ b/src/Modules/TeamUp.Modules.Assembler/Endpoints/AssemblerEndpoints.cs @@ -0,0 +1,47 @@ +using System.Text.Json; +using Microsoft.AspNetCore.Builder; +using Microsoft.AspNetCore.Http; +using Microsoft.AspNetCore.Routing; +using Microsoft.EntityFrameworkCore; +using TeamUp.Modules.Assembler.Domain; +using TeamUp.Modules.Assembler.Persistence; +using TeamUp.Modules.Assembler.Queue; +using TeamUp.Modules.Assembler.Runtime; +using TeamUp.SharedKernel.Modularity; + +namespace TeamUp.Modules.Assembler.Endpoints; + +internal static class AssemblerEndpoints +{ + public static void Map(IEndpointRouteBuilder endpoints) + { + var group = endpoints.MapGroup("/api/assembler").WithTags("Assembler"); + + group.MapGet("/ping", () => TypedResults.Ok(new ModulePing("assembler"))); + group.MapPost("/runs", CreateRun).RequireAuthorization(); + group.MapGet("/runs/{id:guid}", GetRun).RequireAuthorization(); + } + + // Dispatch a task to an AI seat: record a queued AgentRun and enqueue the job. The worker + // drains it off the request path. (Scope-checking the seat's team is added in Increment 2.) + private static async Task CreateRun( + CreateRunRequest request, AssemblerDbContext db, JobQueue queue, TimeProvider clock, CancellationToken ct) + { + var run = new AgentRun(request.SeatId, request.WorkItemId, clock.GetUtcNow()); + db.AgentRuns.Add(run); + await db.SaveChangesAsync(ct); + + await queue.EnqueueAsync("agent.run", JsonSerializer.Serialize(new AgentRunPayload(run.Id)), ct); + return Results.Ok(ToResponse(run)); + } + + private static async Task GetRun(Guid id, AssemblerDbContext db, CancellationToken ct) + { + var run = await db.AgentRuns.FirstOrDefaultAsync(r => r.Id == id, ct); + return run is null ? Results.NotFound() : Results.Ok(ToResponse(run)); + } + + private static RunResponse ToResponse(AgentRun run) => new( + run.Id, run.SeatId, run.WorkItemId, run.AgentId, run.Status.ToString(), + run.ActionType, run.ActionRisk, run.Prompt, run.Output, run.Error); +} diff --git a/src/Modules/TeamUp.Modules.Assembler/Persistence/AssemblerDbContext.cs b/src/Modules/TeamUp.Modules.Assembler/Persistence/AssemblerDbContext.cs new file mode 100644 index 0000000..9c86c47 --- /dev/null +++ b/src/Modules/TeamUp.Modules.Assembler/Persistence/AssemblerDbContext.cs @@ -0,0 +1,39 @@ +using Microsoft.EntityFrameworkCore; +using TeamUp.Modules.Assembler.Domain; +using TeamUp.SharedKernel.Persistence; + +namespace TeamUp.Modules.Assembler.Persistence; + +internal sealed class AssemblerDbContext(DbContextOptions options) + : DbContext(options), IModuleDbContext +{ + public DbSet Jobs => Set(); + public DbSet AgentRuns => Set(); + + protected override void OnModelCreating(ModelBuilder modelBuilder) + { + modelBuilder.HasDefaultSchema("assembler"); + + modelBuilder.Entity(job => + { + job.ToTable("jobs"); + job.HasKey(j => j.Id); + job.Property(j => j.Type).HasMaxLength(60).IsRequired(); + job.Property(j => j.Status).HasConversion().HasMaxLength(20); + job.Property(j => j.LockedBy).HasMaxLength(120); + // Drives the FOR UPDATE SKIP LOCKED claim query. + job.HasIndex(j => new { j.Status, j.CreatedAtUtc }); + }); + + modelBuilder.Entity(run => + { + run.ToTable("agent_runs"); + run.HasKey(r => r.Id); + run.Property(r => r.Status).HasConversion().HasMaxLength(20); + run.Property(r => r.ActionType).HasMaxLength(60); + run.Property(r => r.ActionRisk).HasMaxLength(20); + run.HasIndex(r => r.WorkItemId); + run.HasIndex(r => r.SeatId); + }); + } +} diff --git a/src/Modules/TeamUp.Modules.Assembler/Persistence/AssemblerDbContextFactory.cs b/src/Modules/TeamUp.Modules.Assembler/Persistence/AssemblerDbContextFactory.cs new file mode 100644 index 0000000..dfbc2a9 --- /dev/null +++ b/src/Modules/TeamUp.Modules.Assembler/Persistence/AssemblerDbContextFactory.cs @@ -0,0 +1,21 @@ +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Design; + +namespace TeamUp.Modules.Assembler.Persistence; + +/// Design-time factory so `dotnet ef` can build the internal context without a host. +internal sealed class AssemblerDbContextFactory : IDesignTimeDbContextFactory +{ + public AssemblerDbContext CreateDbContext(string[] args) + { + var connectionString = + Environment.GetEnvironmentVariable("ConnectionStrings__Postgres") + ?? "Host=localhost;Port=5432;Database=teamup;Username=teamup;Password=teamup"; + + var options = new DbContextOptionsBuilder() + .UseNpgsql(connectionString) + .Options; + + return new AssemblerDbContext(options); + } +} diff --git a/src/Modules/TeamUp.Modules.Assembler/Persistence/Migrations/20260609214035_InitialAssembler.Designer.cs b/src/Modules/TeamUp.Modules.Assembler/Persistence/Migrations/20260609214035_InitialAssembler.Designer.cs new file mode 100644 index 0000000..4352ffa --- /dev/null +++ b/src/Modules/TeamUp.Modules.Assembler/Persistence/Migrations/20260609214035_InitialAssembler.Designer.cs @@ -0,0 +1,138 @@ +// +using System; +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Infrastructure; +using Microsoft.EntityFrameworkCore.Migrations; +using Microsoft.EntityFrameworkCore.Storage.ValueConversion; +using Npgsql.EntityFrameworkCore.PostgreSQL.Metadata; +using TeamUp.Modules.Assembler.Persistence; + +#nullable disable + +namespace TeamUp.Modules.Assembler.Persistence.Migrations +{ + [DbContext(typeof(AssemblerDbContext))] + [Migration("20260609214035_InitialAssembler")] + partial class InitialAssembler + { + /// + protected override void BuildTargetModel(ModelBuilder modelBuilder) + { +#pragma warning disable 612, 618 + modelBuilder + .HasDefaultSchema("assembler") + .HasAnnotation("ProductVersion", "10.0.8") + .HasAnnotation("Relational:MaxIdentifierLength", 63); + + NpgsqlModelBuilderExtensions.UseIdentityByDefaultColumns(modelBuilder); + + modelBuilder.Entity("TeamUp.Modules.Assembler.Domain.AgentRun", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("uuid"); + + b.Property("ActionRisk") + .HasMaxLength(20) + .HasColumnType("character varying(20)"); + + b.Property("ActionType") + .HasMaxLength(60) + .HasColumnType("character varying(60)"); + + b.Property("AgentId") + .HasColumnType("uuid"); + + b.Property("CompletedAtUtc") + .HasColumnType("timestamp with time zone"); + + b.Property("CreatedAtUtc") + .HasColumnType("timestamp with time zone"); + + b.Property("Error") + .HasColumnType("text"); + + b.Property("LatencyMs") + .HasColumnType("bigint"); + + b.Property("Output") + .HasColumnType("text"); + + b.Property("Prompt") + .HasColumnType("text"); + + b.Property("ResultJson") + .HasColumnType("text"); + + b.Property("SeatId") + .HasColumnType("uuid"); + + b.Property("Status") + .IsRequired() + .HasMaxLength(20) + .HasColumnType("character varying(20)"); + + b.Property("Trace") + .HasColumnType("text"); + + b.Property("WorkItemId") + .HasColumnType("uuid"); + + b.HasKey("Id"); + + b.HasIndex("SeatId"); + + b.HasIndex("WorkItemId"); + + b.ToTable("agent_runs", "assembler"); + }); + + modelBuilder.Entity("TeamUp.Modules.Assembler.Domain.Job", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("uuid"); + + b.Property("Attempts") + .HasColumnType("integer"); + + b.Property("CompletedAtUtc") + .HasColumnType("timestamp with time zone"); + + b.Property("CreatedAtUtc") + .HasColumnType("timestamp with time zone"); + + b.Property("Error") + .HasColumnType("text"); + + b.Property("LockedAtUtc") + .HasColumnType("timestamp with time zone"); + + b.Property("LockedBy") + .HasMaxLength(120) + .HasColumnType("character varying(120)"); + + b.Property("Payload") + .IsRequired() + .HasColumnType("text"); + + b.Property("Status") + .IsRequired() + .HasMaxLength(20) + .HasColumnType("character varying(20)"); + + b.Property("Type") + .IsRequired() + .HasMaxLength(60) + .HasColumnType("character varying(60)"); + + b.HasKey("Id"); + + b.HasIndex("Status", "CreatedAtUtc"); + + b.ToTable("jobs", "assembler"); + }); +#pragma warning restore 612, 618 + } + } +} diff --git a/src/Modules/TeamUp.Modules.Assembler/Persistence/Migrations/20260609214035_InitialAssembler.cs b/src/Modules/TeamUp.Modules.Assembler/Persistence/Migrations/20260609214035_InitialAssembler.cs new file mode 100644 index 0000000..dff44b5 --- /dev/null +++ b/src/Modules/TeamUp.Modules.Assembler/Persistence/Migrations/20260609214035_InitialAssembler.cs @@ -0,0 +1,95 @@ +using System; +using Microsoft.EntityFrameworkCore.Migrations; + +#nullable disable + +namespace TeamUp.Modules.Assembler.Persistence.Migrations +{ + /// + public partial class InitialAssembler : Migration + { + /// + protected override void Up(MigrationBuilder migrationBuilder) + { + migrationBuilder.EnsureSchema( + name: "assembler"); + + migrationBuilder.CreateTable( + name: "agent_runs", + schema: "assembler", + columns: table => new + { + Id = table.Column(type: "uuid", nullable: false), + SeatId = table.Column(type: "uuid", nullable: false), + WorkItemId = table.Column(type: "uuid", nullable: false), + AgentId = table.Column(type: "uuid", nullable: true), + Status = table.Column(type: "character varying(20)", maxLength: 20, nullable: false), + Prompt = table.Column(type: "text", nullable: true), + Output = table.Column(type: "text", nullable: true), + ActionType = table.Column(type: "character varying(60)", maxLength: 60, nullable: true), + ActionRisk = table.Column(type: "character varying(20)", maxLength: 20, nullable: true), + ResultJson = table.Column(type: "text", nullable: true), + Trace = table.Column(type: "text", nullable: true), + Error = table.Column(type: "text", nullable: true), + LatencyMs = table.Column(type: "bigint", nullable: true), + CreatedAtUtc = table.Column(type: "timestamp with time zone", nullable: false), + CompletedAtUtc = table.Column(type: "timestamp with time zone", nullable: true) + }, + constraints: table => + { + table.PrimaryKey("PK_agent_runs", x => x.Id); + }); + + migrationBuilder.CreateTable( + name: "jobs", + schema: "assembler", + columns: table => new + { + Id = table.Column(type: "uuid", nullable: false), + Type = table.Column(type: "character varying(60)", maxLength: 60, nullable: false), + Payload = table.Column(type: "text", nullable: false), + Status = table.Column(type: "character varying(20)", maxLength: 20, nullable: false), + Attempts = table.Column(type: "integer", nullable: false), + LockedBy = table.Column(type: "character varying(120)", maxLength: 120, nullable: true), + LockedAtUtc = table.Column(type: "timestamp with time zone", nullable: true), + Error = table.Column(type: "text", nullable: true), + CreatedAtUtc = table.Column(type: "timestamp with time zone", nullable: false), + CompletedAtUtc = table.Column(type: "timestamp with time zone", nullable: true) + }, + constraints: table => + { + table.PrimaryKey("PK_jobs", x => x.Id); + }); + + migrationBuilder.CreateIndex( + name: "IX_agent_runs_SeatId", + schema: "assembler", + table: "agent_runs", + column: "SeatId"); + + migrationBuilder.CreateIndex( + name: "IX_agent_runs_WorkItemId", + schema: "assembler", + table: "agent_runs", + column: "WorkItemId"); + + migrationBuilder.CreateIndex( + name: "IX_jobs_Status_CreatedAtUtc", + schema: "assembler", + table: "jobs", + columns: new[] { "Status", "CreatedAtUtc" }); + } + + /// + protected override void Down(MigrationBuilder migrationBuilder) + { + migrationBuilder.DropTable( + name: "agent_runs", + schema: "assembler"); + + migrationBuilder.DropTable( + name: "jobs", + schema: "assembler"); + } + } +} diff --git a/src/Modules/TeamUp.Modules.Assembler/Persistence/Migrations/AssemblerDbContextModelSnapshot.cs b/src/Modules/TeamUp.Modules.Assembler/Persistence/Migrations/AssemblerDbContextModelSnapshot.cs new file mode 100644 index 0000000..b32aaf9 --- /dev/null +++ b/src/Modules/TeamUp.Modules.Assembler/Persistence/Migrations/AssemblerDbContextModelSnapshot.cs @@ -0,0 +1,135 @@ +// +using System; +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Infrastructure; +using Microsoft.EntityFrameworkCore.Storage.ValueConversion; +using Npgsql.EntityFrameworkCore.PostgreSQL.Metadata; +using TeamUp.Modules.Assembler.Persistence; + +#nullable disable + +namespace TeamUp.Modules.Assembler.Persistence.Migrations +{ + [DbContext(typeof(AssemblerDbContext))] + partial class AssemblerDbContextModelSnapshot : ModelSnapshot + { + protected override void BuildModel(ModelBuilder modelBuilder) + { +#pragma warning disable 612, 618 + modelBuilder + .HasDefaultSchema("assembler") + .HasAnnotation("ProductVersion", "10.0.8") + .HasAnnotation("Relational:MaxIdentifierLength", 63); + + NpgsqlModelBuilderExtensions.UseIdentityByDefaultColumns(modelBuilder); + + modelBuilder.Entity("TeamUp.Modules.Assembler.Domain.AgentRun", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("uuid"); + + b.Property("ActionRisk") + .HasMaxLength(20) + .HasColumnType("character varying(20)"); + + b.Property("ActionType") + .HasMaxLength(60) + .HasColumnType("character varying(60)"); + + b.Property("AgentId") + .HasColumnType("uuid"); + + b.Property("CompletedAtUtc") + .HasColumnType("timestamp with time zone"); + + b.Property("CreatedAtUtc") + .HasColumnType("timestamp with time zone"); + + b.Property("Error") + .HasColumnType("text"); + + b.Property("LatencyMs") + .HasColumnType("bigint"); + + b.Property("Output") + .HasColumnType("text"); + + b.Property("Prompt") + .HasColumnType("text"); + + b.Property("ResultJson") + .HasColumnType("text"); + + b.Property("SeatId") + .HasColumnType("uuid"); + + b.Property("Status") + .IsRequired() + .HasMaxLength(20) + .HasColumnType("character varying(20)"); + + b.Property("Trace") + .HasColumnType("text"); + + b.Property("WorkItemId") + .HasColumnType("uuid"); + + b.HasKey("Id"); + + b.HasIndex("SeatId"); + + b.HasIndex("WorkItemId"); + + b.ToTable("agent_runs", "assembler"); + }); + + modelBuilder.Entity("TeamUp.Modules.Assembler.Domain.Job", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("uuid"); + + b.Property("Attempts") + .HasColumnType("integer"); + + b.Property("CompletedAtUtc") + .HasColumnType("timestamp with time zone"); + + b.Property("CreatedAtUtc") + .HasColumnType("timestamp with time zone"); + + b.Property("Error") + .HasColumnType("text"); + + b.Property("LockedAtUtc") + .HasColumnType("timestamp with time zone"); + + b.Property("LockedBy") + .HasMaxLength(120) + .HasColumnType("character varying(120)"); + + b.Property("Payload") + .IsRequired() + .HasColumnType("text"); + + b.Property("Status") + .IsRequired() + .HasMaxLength(20) + .HasColumnType("character varying(20)"); + + b.Property("Type") + .IsRequired() + .HasMaxLength(60) + .HasColumnType("character varying(60)"); + + b.HasKey("Id"); + + b.HasIndex("Status", "CreatedAtUtc"); + + b.ToTable("jobs", "assembler"); + }); +#pragma warning restore 612, 618 + } + } +} diff --git a/src/Modules/TeamUp.Modules.Assembler/Queue/JobQueue.cs b/src/Modules/TeamUp.Modules.Assembler/Queue/JobQueue.cs new file mode 100644 index 0000000..a896521 --- /dev/null +++ b/src/Modules/TeamUp.Modules.Assembler/Queue/JobQueue.cs @@ -0,0 +1,40 @@ +using Microsoft.EntityFrameworkCore; +using TeamUp.Modules.Assembler.Domain; +using TeamUp.Modules.Assembler.Persistence; + +namespace TeamUp.Modules.Assembler.Queue; + +/// The Postgres-backed agent-run queue. Enqueue inserts; claim uses FOR UPDATE SKIP LOCKED +/// so multiple workers can drain concurrently without contention. +internal sealed class JobQueue(AssemblerDbContext db, TimeProvider clock) +{ + public async Task EnqueueAsync(string type, string payload, CancellationToken cancellationToken = default) + { + var job = new Job(type, payload, clock.GetUtcNow()); + db.Jobs.Add(job); + await db.SaveChangesAsync(cancellationToken); + return job; + } + + public async Task ClaimNextAsync(string worker, CancellationToken cancellationToken = default) + { + await using var transaction = await db.Database.BeginTransactionAsync(cancellationToken); + + var job = await db.Jobs + .FromSqlRaw( + "SELECT * FROM assembler.jobs WHERE \"Status\" = 'Pending' " + + "ORDER BY \"CreatedAtUtc\" LIMIT 1 FOR UPDATE SKIP LOCKED") + .FirstOrDefaultAsync(cancellationToken); + + if (job is null) + { + await transaction.RollbackAsync(cancellationToken); + return null; + } + + job.MarkProcessing(worker, clock.GetUtcNow()); + await db.SaveChangesAsync(cancellationToken); + await transaction.CommitAsync(cancellationToken); + return job; + } +} diff --git a/src/Modules/TeamUp.Modules.Assembler/Runtime/AgentRunExecutor.cs b/src/Modules/TeamUp.Modules.Assembler/Runtime/AgentRunExecutor.cs new file mode 100644 index 0000000..85f5591 --- /dev/null +++ b/src/Modules/TeamUp.Modules.Assembler/Runtime/AgentRunExecutor.cs @@ -0,0 +1,48 @@ +using System.Text.Json; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Logging; +using TeamUp.Modules.Assembler.Domain; +using TeamUp.Modules.Assembler.Persistence; + +namespace TeamUp.Modules.Assembler.Runtime; + +internal sealed record AgentRunPayload(Guid RunId); + +/// +/// Processes one claimed job: drives the AgentRun lifecycle. In M4 Increment 1 it records a +/// placeholder; Increment 2 swaps the middle for the real assembler (assemble → model → parse). +/// +internal sealed class AgentRunExecutor(AssemblerDbContext db, TimeProvider clock, ILogger logger) +{ + public async Task ProcessAsync(Job job, CancellationToken cancellationToken = default) + { + try + { + var payload = JsonSerializer.Deserialize(job.Payload) + ?? throw new InvalidOperationException("Invalid job payload."); + + var run = await db.AgentRuns.FirstOrDefaultAsync(r => r.Id == payload.RunId, cancellationToken) + ?? throw new InvalidOperationException($"AgentRun {payload.RunId} not found."); + + run.Start(agentId: null, prompt: "[assembler pending — M4 Increment 2]", trace: null); + await db.SaveChangesAsync(cancellationToken); + + // TODO (M4 Increment 2): assemble the prompt, call the model, parse into action + risk. + run.Complete( + output: "[assembler pending]", + actionType: "pending", + actionRisk: "read", + resultJson: null, + latencyMs: 0, + clock.GetUtcNow()); + job.MarkDone(clock.GetUtcNow()); + await db.SaveChangesAsync(cancellationToken); + } + catch (Exception ex) + { + job.MarkFailed(ex.Message, clock.GetUtcNow()); + await db.SaveChangesAsync(cancellationToken); + logger.LogError(ex, "Agent-run job {JobId} failed.", job.Id); + } + } +} diff --git a/src/Modules/TeamUp.Modules.Assembler/TeamUp.Modules.Assembler.csproj b/src/Modules/TeamUp.Modules.Assembler/TeamUp.Modules.Assembler.csproj index 62eeba1..422bda5 100644 --- a/src/Modules/TeamUp.Modules.Assembler/TeamUp.Modules.Assembler.csproj +++ b/src/Modules/TeamUp.Modules.Assembler/TeamUp.Modules.Assembler.csproj @@ -1,12 +1,21 @@ - + + + + + + + + + + + diff --git a/src/Modules/TeamUp.Modules.Assembler/Worker/JobProcessor.cs b/src/Modules/TeamUp.Modules.Assembler/Worker/JobProcessor.cs new file mode 100644 index 0000000..886741a --- /dev/null +++ b/src/Modules/TeamUp.Modules.Assembler/Worker/JobProcessor.cs @@ -0,0 +1,50 @@ +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using TeamUp.Modules.Assembler.Queue; +using TeamUp.Modules.Assembler.Runtime; + +namespace TeamUp.Modules.Assembler.Worker; + +/// Drains the agent-run queue on the worker host: claim (SKIP LOCKED) → process, repeat. +internal sealed class JobProcessor(IServiceScopeFactory scopeFactory, ILogger logger) : BackgroundService +{ + private static readonly TimeSpan PollInterval = TimeSpan.FromSeconds(2); + private readonly string _worker = $"{Environment.MachineName}:{Environment.ProcessId}"; + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + logger.LogInformation("Agent-run job processor started ({Worker}).", _worker); + + using var timer = new PeriodicTimer(PollInterval); + while (!stoppingToken.IsCancellationRequested) + { + try + { + await DrainAsync(stoppingToken); + await timer.WaitForNextTickAsync(stoppingToken); + } + catch (OperationCanceledException) + { + break; + } + } + } + + private async Task DrainAsync(CancellationToken cancellationToken) + { + while (!cancellationToken.IsCancellationRequested) + { + await using var scope = scopeFactory.CreateAsyncScope(); + var queue = scope.ServiceProvider.GetRequiredService(); + var job = await queue.ClaimNextAsync(_worker, cancellationToken); + if (job is null) + { + break; + } + + var executor = scope.ServiceProvider.GetRequiredService(); + await executor.ProcessAsync(job, cancellationToken); + } + } +} diff --git a/src/Shared/TeamUp.SharedKernel/Modularity/IWorkerModule.cs b/src/Shared/TeamUp.SharedKernel/Modularity/IWorkerModule.cs new file mode 100644 index 0000000..c159e63 --- /dev/null +++ b/src/Shared/TeamUp.SharedKernel/Modularity/IWorkerModule.cs @@ -0,0 +1,14 @@ +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; + +namespace TeamUp.SharedKernel.Modularity; + +/// +/// Implemented by modules that contribute background services. runs in +/// the WORKER host only — so hosted services (e.g. the agent-run job drainer) never start in the web +/// host, keeping the web off the model path. +/// +public interface IWorkerModule +{ + void RegisterWorker(IServiceCollection services, IConfiguration configuration); +} diff --git a/tests/TeamUp.IntegrationTests/AgentRunQueueTests.cs b/tests/TeamUp.IntegrationTests/AgentRunQueueTests.cs new file mode 100644 index 0000000..c9abf53 --- /dev/null +++ b/tests/TeamUp.IntegrationTests/AgentRunQueueTests.cs @@ -0,0 +1,74 @@ +using System.Net; +using System.Net.Http.Headers; +using System.Net.Http.Json; +using Microsoft.Extensions.DependencyInjection; +using TeamUp.Modules.Assembler.Queue; +using TeamUp.Modules.Assembler.Runtime; +using Xunit; + +namespace TeamUp.IntegrationTests; + +/// +/// M4 Increment 1: a run is enqueued (web), claimed via FOR UPDATE SKIP LOCKED + processed +/// (worker services), and the AgentRun lifecycle reaches Completed. +/// +public sealed class AgentRunQueueTests(PostgresFixture postgres) : IClassFixture +{ + private sealed record BootstrapResponse(string Token, Guid MemberId, Guid OrganizationId); + + private sealed record RunResponse( + Guid Id, Guid SeatId, Guid WorkItemId, Guid? AgentId, string Status, + string? ActionType, string? ActionRisk, string? Prompt, string? Output, string? Error); + + [Fact] + public async Task Run_is_enqueued_claimed_and_processed() + { + await using var factory = new TeamUpWebFactory(postgres.ConnectionString); + using var anon = factory.CreateClient(); + + var bootstrap = await anon.PostAsJsonAsync("/api/identity/bootstrap", new + { + organizationName = "AliaSaaS", + ownerEmail = "owner@alia.test", + ownerDisplayName = "Owner", + ownerPassword = "Passw0rd!", + }); + var owner = await bootstrap.Content.ReadFromJsonAsync(); + Assert.NotNull(owner); + + using var client = factory.CreateClient(); + client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", owner!.Token); + + // Enqueue a run (web). + var created = await client.PostAsJsonAsync("/api/assembler/runs", new + { + seatId = Guid.NewGuid(), + workItemId = Guid.NewGuid(), + }); + Assert.Equal(HttpStatusCode.OK, created.StatusCode); + var run = await created.Content.ReadFromJsonAsync(); + Assert.Equal("Queued", run!.Status); + + // Drain one job exactly as the worker does: claim (SKIP LOCKED) then process. + await using (var scope = factory.Services.CreateAsyncScope()) + { + var queue = scope.ServiceProvider.GetRequiredService(); + var job = await queue.ClaimNextAsync("test-worker"); + Assert.NotNull(job); + + var executor = scope.ServiceProvider.GetRequiredService(); + await executor.ProcessAsync(job!); + } + + // The next claim finds nothing (the only job is done). + await using (var scope = factory.Services.CreateAsyncScope()) + { + var queue = scope.ServiceProvider.GetRequiredService(); + Assert.Null(await queue.ClaimNextAsync("test-worker")); + } + + var fetched = await client.GetFromJsonAsync($"/api/assembler/runs/{run.Id}"); + Assert.Equal("Completed", fetched!.Status); + Assert.Equal("pending", fetched.ActionType); + } +}