diff --git a/src/Bootstrap/TeamUp.Bootstrap/TeamUpModuleExtensions.cs b/src/Bootstrap/TeamUp.Bootstrap/TeamUpModuleExtensions.cs
index e07f39e..1038fad 100644
--- a/src/Bootstrap/TeamUp.Bootstrap/TeamUpModuleExtensions.cs
+++ b/src/Bootstrap/TeamUp.Bootstrap/TeamUpModuleExtensions.cs
@@ -1,6 +1,7 @@
using Microsoft.AspNetCore.Routing;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
+using TeamUp.SharedKernel.Modularity;
namespace TeamUp.Bootstrap;
@@ -29,4 +30,20 @@ public static class TeamUpModuleExtensions
return endpoints;
}
+
+ /// Runs RegisterWorker for modules with background services. WORKER host only.
+ public static IServiceCollection AddTeamUpWorkerServices(
+ this IServiceCollection services,
+ IConfiguration configuration)
+ {
+ foreach (var module in ModuleCatalog.All)
+ {
+ if (module is IWorkerModule workerModule)
+ {
+ workerModule.RegisterWorker(services, configuration);
+ }
+ }
+
+ return services;
+ }
}
diff --git a/src/Hosts/TeamUp.Worker/Program.cs b/src/Hosts/TeamUp.Worker/Program.cs
index cfc994d..2d2ef01 100644
--- a/src/Hosts/TeamUp.Worker/Program.cs
+++ b/src/Hosts/TeamUp.Worker/Program.cs
@@ -13,6 +13,7 @@ builder.Services.AddSerilog((services, configuration) => configuration
builder.Services.AddTeamUpObservability(builder.Configuration, serviceName: "teamup-worker");
builder.Services.AddTeamUpPersistence(builder.Configuration);
builder.Services.AddTeamUpModules(builder.Configuration);
+builder.Services.AddTeamUpWorkerServices(builder.Configuration); // hosted services: the agent-run job drainer
builder.Services.AddHostedService();
var host = builder.Build();
diff --git a/src/Modules/TeamUp.Modules.Assembler/AssemblerModule.cs b/src/Modules/TeamUp.Modules.Assembler/AssemblerModule.cs
index 2cb64ba..676affe 100644
--- a/src/Modules/TeamUp.Modules.Assembler/AssemblerModule.cs
+++ b/src/Modules/TeamUp.Modules.Assembler/AssemblerModule.cs
@@ -1,27 +1,40 @@
-using Microsoft.AspNetCore.Builder;
-using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Routing;
+using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.DependencyInjection.Extensions;
+using TeamUp.Modules.Assembler.Endpoints;
+using TeamUp.Modules.Assembler.Persistence;
+using TeamUp.Modules.Assembler.Queue;
+using TeamUp.Modules.Assembler.Runtime;
+using TeamUp.Modules.Assembler.Worker;
using TeamUp.SharedKernel.Modularity;
+using TeamUp.SharedKernel.Persistence;
namespace TeamUp.Modules.Assembler;
-/// Context assembly, the model call, output parsing, prompt caching — runs in the worker (M4).
-public sealed class AssemblerModule : IModule
+///
+/// Context assembly, the model call, output parsing — the agent runtime. The job queue + AgentRun
+/// state live here; the drain runs in the worker host (RegisterWorker), the trigger on the web host.
+///
+public sealed class AssemblerModule : IModule, IWorkerModule
{
public string Name => "assembler";
public void Register(IServiceCollection services, IConfiguration configuration)
{
- // Skeleton: no services yet. M4 introduces the jobs table (FOR UPDATE SKIP LOCKED),
- // the AgentRun context, and the assembler pipeline (registered for the worker host).
+ var connectionString = configuration.GetConnectionString("Postgres")
+ ?? throw new InvalidOperationException("Missing connection string 'ConnectionStrings:Postgres'.");
+
+ services.AddDbContext(options => options.UseNpgsql(connectionString));
+ services.AddScoped(sp => sp.GetRequiredService());
+ services.AddScoped();
+ services.AddScoped();
+ services.TryAddSingleton(TimeProvider.System);
}
- public void MapEndpoints(IEndpointRouteBuilder endpoints)
- {
- endpoints.MapGroup($"/api/{Name}")
- .WithTags("Assembler")
- .MapGet("/ping", () => TypedResults.Ok(new ModulePing(Name)));
- }
+ public void RegisterWorker(IServiceCollection services, IConfiguration configuration) =>
+ services.AddHostedService();
+
+ public void MapEndpoints(IEndpointRouteBuilder endpoints) => AssemblerEndpoints.Map(endpoints);
}
diff --git a/src/Modules/TeamUp.Modules.Assembler/Domain/AgentRun.cs b/src/Modules/TeamUp.Modules.Assembler/Domain/AgentRun.cs
new file mode 100644
index 0000000..29bad89
--- /dev/null
+++ b/src/Modules/TeamUp.Modules.Assembler/Domain/AgentRun.cs
@@ -0,0 +1,72 @@
+using TeamUp.SharedKernel.Domain;
+
+namespace TeamUp.Modules.Assembler.Domain;
+
+internal enum AgentRunStatus
+{
+ Queued,
+ Running,
+ Completed,
+ Failed,
+}
+
+///
+/// One execution of an AI seat against a task: the assembled prompt, the raw model output, the
+/// parsed action + risk tag, and the reasoning/assembly trace. Nothing executes off this in M4 —
+/// the action gate (M5) decides whether the parsed action runs or waits in review.
+///
+internal sealed class AgentRun : Entity
+{
+ public Guid SeatId { get; private set; }
+ public Guid WorkItemId { get; private set; }
+ public Guid? AgentId { get; private set; }
+ public AgentRunStatus Status { get; private set; }
+ public string? Prompt { get; private set; }
+ public string? Output { get; private set; }
+ public string? ActionType { get; private set; }
+ public string? ActionRisk { get; private set; }
+ public string? ResultJson { get; private set; }
+ public string? Trace { get; private set; }
+ public string? Error { get; private set; }
+ public long? LatencyMs { get; private set; }
+ public DateTimeOffset CreatedAtUtc { get; private set; }
+ public DateTimeOffset? CompletedAtUtc { get; private set; }
+
+ private AgentRun()
+ {
+ }
+
+ public AgentRun(Guid seatId, Guid workItemId, DateTimeOffset createdAtUtc)
+ {
+ SeatId = seatId;
+ WorkItemId = workItemId;
+ Status = AgentRunStatus.Queued;
+ CreatedAtUtc = createdAtUtc;
+ }
+
+ public void Start(Guid? agentId, string prompt, string? trace)
+ {
+ Status = AgentRunStatus.Running;
+ AgentId = agentId;
+ Prompt = prompt;
+ Trace = trace;
+ }
+
+ public void Complete(string output, string actionType, string actionRisk, string? resultJson, long latencyMs, DateTimeOffset nowUtc)
+ {
+ Status = AgentRunStatus.Completed;
+ Output = output;
+ ActionType = actionType;
+ ActionRisk = actionRisk;
+ ResultJson = resultJson;
+ LatencyMs = latencyMs;
+ CompletedAtUtc = nowUtc;
+ }
+
+ public void Fail(string error, DateTimeOffset nowUtc)
+ {
+ Status = AgentRunStatus.Failed;
+ Error = error;
+ CompletedAtUtc = nowUtc;
+ }
+}
diff --git a/src/Modules/TeamUp.Modules.Assembler/Domain/Job.cs b/src/Modules/TeamUp.Modules.Assembler/Domain/Job.cs
new file mode 100644
index 0000000..8258b6d
--- /dev/null
+++ b/src/Modules/TeamUp.Modules.Assembler/Domain/Job.cs
@@ -0,0 +1,61 @@
+using TeamUp.SharedKernel.Domain;
+
+namespace TeamUp.Modules.Assembler.Domain;
+
+internal enum JobStatus
+{
+ Pending,
+ Processing,
+ Done,
+ Failed,
+}
+
+///
+/// A unit of background work, drained from Postgres with FOR UPDATE SKIP LOCKED by the worker.
+/// The run lifecycle is domain state (kept explicit) rather than an opaque message-bus payload.
+///
+internal sealed class Job : Entity
+{
+ public string Type { get; private set; } = null!;
+ public string Payload { get; private set; } = null!;
+ public JobStatus Status { get; private set; }
+ public int Attempts { get; private set; }
+ public string? LockedBy { get; private set; }
+ public DateTimeOffset? LockedAtUtc { get; private set; }
+ public string? Error { get; private set; }
+ public DateTimeOffset CreatedAtUtc { get; private set; }
+ public DateTimeOffset? CompletedAtUtc { get; private set; }
+
+ private Job()
+ {
+ }
+
+ public Job(string type, string payload, DateTimeOffset createdAtUtc)
+ {
+ Type = type;
+ Payload = payload;
+ Status = JobStatus.Pending;
+ CreatedAtUtc = createdAtUtc;
+ }
+
+ public void MarkProcessing(string worker, DateTimeOffset nowUtc)
+ {
+ Status = JobStatus.Processing;
+ LockedBy = worker;
+ LockedAtUtc = nowUtc;
+ Attempts++;
+ }
+
+ public void MarkDone(DateTimeOffset nowUtc)
+ {
+ Status = JobStatus.Done;
+ CompletedAtUtc = nowUtc;
+ }
+
+ public void MarkFailed(string error, DateTimeOffset nowUtc)
+ {
+ Status = JobStatus.Failed;
+ Error = error;
+ CompletedAtUtc = nowUtc;
+ }
+}
diff --git a/src/Modules/TeamUp.Modules.Assembler/Endpoints/AssemblerDtos.cs b/src/Modules/TeamUp.Modules.Assembler/Endpoints/AssemblerDtos.cs
new file mode 100644
index 0000000..087f5d5
--- /dev/null
+++ b/src/Modules/TeamUp.Modules.Assembler/Endpoints/AssemblerDtos.cs
@@ -0,0 +1,15 @@
+namespace TeamUp.Modules.Assembler.Endpoints;
+
+internal sealed record CreateRunRequest(Guid SeatId, Guid WorkItemId);
+
+internal sealed record RunResponse(
+ Guid Id,
+ Guid SeatId,
+ Guid WorkItemId,
+ Guid? AgentId,
+ string Status,
+ string? ActionType,
+ string? ActionRisk,
+ string? Prompt,
+ string? Output,
+ string? Error);
diff --git a/src/Modules/TeamUp.Modules.Assembler/Endpoints/AssemblerEndpoints.cs b/src/Modules/TeamUp.Modules.Assembler/Endpoints/AssemblerEndpoints.cs
new file mode 100644
index 0000000..63af9f3
--- /dev/null
+++ b/src/Modules/TeamUp.Modules.Assembler/Endpoints/AssemblerEndpoints.cs
@@ -0,0 +1,47 @@
+using System.Text.Json;
+using Microsoft.AspNetCore.Builder;
+using Microsoft.AspNetCore.Http;
+using Microsoft.AspNetCore.Routing;
+using Microsoft.EntityFrameworkCore;
+using TeamUp.Modules.Assembler.Domain;
+using TeamUp.Modules.Assembler.Persistence;
+using TeamUp.Modules.Assembler.Queue;
+using TeamUp.Modules.Assembler.Runtime;
+using TeamUp.SharedKernel.Modularity;
+
+namespace TeamUp.Modules.Assembler.Endpoints;
+
+internal static class AssemblerEndpoints
+{
+ public static void Map(IEndpointRouteBuilder endpoints)
+ {
+ var group = endpoints.MapGroup("/api/assembler").WithTags("Assembler");
+
+ group.MapGet("/ping", () => TypedResults.Ok(new ModulePing("assembler")));
+ group.MapPost("/runs", CreateRun).RequireAuthorization();
+ group.MapGet("/runs/{id:guid}", GetRun).RequireAuthorization();
+ }
+
+ // Dispatch a task to an AI seat: record a queued AgentRun and enqueue the job. The worker
+ // drains it off the request path. (Scope-checking the seat's team is added in Increment 2.)
+ private static async Task CreateRun(
+ CreateRunRequest request, AssemblerDbContext db, JobQueue queue, TimeProvider clock, CancellationToken ct)
+ {
+ var run = new AgentRun(request.SeatId, request.WorkItemId, clock.GetUtcNow());
+ db.AgentRuns.Add(run);
+ await db.SaveChangesAsync(ct);
+
+ await queue.EnqueueAsync("agent.run", JsonSerializer.Serialize(new AgentRunPayload(run.Id)), ct);
+ return Results.Ok(ToResponse(run));
+ }
+
+ private static async Task GetRun(Guid id, AssemblerDbContext db, CancellationToken ct)
+ {
+ var run = await db.AgentRuns.FirstOrDefaultAsync(r => r.Id == id, ct);
+ return run is null ? Results.NotFound() : Results.Ok(ToResponse(run));
+ }
+
+ private static RunResponse ToResponse(AgentRun run) => new(
+ run.Id, run.SeatId, run.WorkItemId, run.AgentId, run.Status.ToString(),
+ run.ActionType, run.ActionRisk, run.Prompt, run.Output, run.Error);
+}
diff --git a/src/Modules/TeamUp.Modules.Assembler/Persistence/AssemblerDbContext.cs b/src/Modules/TeamUp.Modules.Assembler/Persistence/AssemblerDbContext.cs
new file mode 100644
index 0000000..9c86c47
--- /dev/null
+++ b/src/Modules/TeamUp.Modules.Assembler/Persistence/AssemblerDbContext.cs
@@ -0,0 +1,39 @@
+using Microsoft.EntityFrameworkCore;
+using TeamUp.Modules.Assembler.Domain;
+using TeamUp.SharedKernel.Persistence;
+
+namespace TeamUp.Modules.Assembler.Persistence;
+
+internal sealed class AssemblerDbContext(DbContextOptions options)
+ : DbContext(options), IModuleDbContext
+{
+ public DbSet Jobs => Set();
+ public DbSet AgentRuns => Set();
+
+ protected override void OnModelCreating(ModelBuilder modelBuilder)
+ {
+ modelBuilder.HasDefaultSchema("assembler");
+
+ modelBuilder.Entity(job =>
+ {
+ job.ToTable("jobs");
+ job.HasKey(j => j.Id);
+ job.Property(j => j.Type).HasMaxLength(60).IsRequired();
+ job.Property(j => j.Status).HasConversion().HasMaxLength(20);
+ job.Property(j => j.LockedBy).HasMaxLength(120);
+ // Drives the FOR UPDATE SKIP LOCKED claim query.
+ job.HasIndex(j => new { j.Status, j.CreatedAtUtc });
+ });
+
+ modelBuilder.Entity(run =>
+ {
+ run.ToTable("agent_runs");
+ run.HasKey(r => r.Id);
+ run.Property(r => r.Status).HasConversion().HasMaxLength(20);
+ run.Property(r => r.ActionType).HasMaxLength(60);
+ run.Property(r => r.ActionRisk).HasMaxLength(20);
+ run.HasIndex(r => r.WorkItemId);
+ run.HasIndex(r => r.SeatId);
+ });
+ }
+}
diff --git a/src/Modules/TeamUp.Modules.Assembler/Persistence/AssemblerDbContextFactory.cs b/src/Modules/TeamUp.Modules.Assembler/Persistence/AssemblerDbContextFactory.cs
new file mode 100644
index 0000000..dfbc2a9
--- /dev/null
+++ b/src/Modules/TeamUp.Modules.Assembler/Persistence/AssemblerDbContextFactory.cs
@@ -0,0 +1,21 @@
+using Microsoft.EntityFrameworkCore;
+using Microsoft.EntityFrameworkCore.Design;
+
+namespace TeamUp.Modules.Assembler.Persistence;
+
+/// Design-time factory so `dotnet ef` can build the internal context without a host.
+internal sealed class AssemblerDbContextFactory : IDesignTimeDbContextFactory
+{
+ public AssemblerDbContext CreateDbContext(string[] args)
+ {
+ var connectionString =
+ Environment.GetEnvironmentVariable("ConnectionStrings__Postgres")
+ ?? "Host=localhost;Port=5432;Database=teamup;Username=teamup;Password=teamup";
+
+ var options = new DbContextOptionsBuilder()
+ .UseNpgsql(connectionString)
+ .Options;
+
+ return new AssemblerDbContext(options);
+ }
+}
diff --git a/src/Modules/TeamUp.Modules.Assembler/Persistence/Migrations/20260609214035_InitialAssembler.Designer.cs b/src/Modules/TeamUp.Modules.Assembler/Persistence/Migrations/20260609214035_InitialAssembler.Designer.cs
new file mode 100644
index 0000000..4352ffa
--- /dev/null
+++ b/src/Modules/TeamUp.Modules.Assembler/Persistence/Migrations/20260609214035_InitialAssembler.Designer.cs
@@ -0,0 +1,138 @@
+//
+using System;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.EntityFrameworkCore.Infrastructure;
+using Microsoft.EntityFrameworkCore.Migrations;
+using Microsoft.EntityFrameworkCore.Storage.ValueConversion;
+using Npgsql.EntityFrameworkCore.PostgreSQL.Metadata;
+using TeamUp.Modules.Assembler.Persistence;
+
+#nullable disable
+
+namespace TeamUp.Modules.Assembler.Persistence.Migrations
+{
+ [DbContext(typeof(AssemblerDbContext))]
+ [Migration("20260609214035_InitialAssembler")]
+ partial class InitialAssembler
+ {
+ ///
+ protected override void BuildTargetModel(ModelBuilder modelBuilder)
+ {
+#pragma warning disable 612, 618
+ modelBuilder
+ .HasDefaultSchema("assembler")
+ .HasAnnotation("ProductVersion", "10.0.8")
+ .HasAnnotation("Relational:MaxIdentifierLength", 63);
+
+ NpgsqlModelBuilderExtensions.UseIdentityByDefaultColumns(modelBuilder);
+
+ modelBuilder.Entity("TeamUp.Modules.Assembler.Domain.AgentRun", b =>
+ {
+ b.Property("Id")
+ .ValueGeneratedOnAdd()
+ .HasColumnType("uuid");
+
+ b.Property("ActionRisk")
+ .HasMaxLength(20)
+ .HasColumnType("character varying(20)");
+
+ b.Property("ActionType")
+ .HasMaxLength(60)
+ .HasColumnType("character varying(60)");
+
+ b.Property("AgentId")
+ .HasColumnType("uuid");
+
+ b.Property("CompletedAtUtc")
+ .HasColumnType("timestamp with time zone");
+
+ b.Property("CreatedAtUtc")
+ .HasColumnType("timestamp with time zone");
+
+ b.Property("Error")
+ .HasColumnType("text");
+
+ b.Property("LatencyMs")
+ .HasColumnType("bigint");
+
+ b.Property("Output")
+ .HasColumnType("text");
+
+ b.Property("Prompt")
+ .HasColumnType("text");
+
+ b.Property("ResultJson")
+ .HasColumnType("text");
+
+ b.Property("SeatId")
+ .HasColumnType("uuid");
+
+ b.Property("Status")
+ .IsRequired()
+ .HasMaxLength(20)
+ .HasColumnType("character varying(20)");
+
+ b.Property("Trace")
+ .HasColumnType("text");
+
+ b.Property("WorkItemId")
+ .HasColumnType("uuid");
+
+ b.HasKey("Id");
+
+ b.HasIndex("SeatId");
+
+ b.HasIndex("WorkItemId");
+
+ b.ToTable("agent_runs", "assembler");
+ });
+
+ modelBuilder.Entity("TeamUp.Modules.Assembler.Domain.Job", b =>
+ {
+ b.Property("Id")
+ .ValueGeneratedOnAdd()
+ .HasColumnType("uuid");
+
+ b.Property("Attempts")
+ .HasColumnType("integer");
+
+ b.Property("CompletedAtUtc")
+ .HasColumnType("timestamp with time zone");
+
+ b.Property("CreatedAtUtc")
+ .HasColumnType("timestamp with time zone");
+
+ b.Property("Error")
+ .HasColumnType("text");
+
+ b.Property("LockedAtUtc")
+ .HasColumnType("timestamp with time zone");
+
+ b.Property("LockedBy")
+ .HasMaxLength(120)
+ .HasColumnType("character varying(120)");
+
+ b.Property("Payload")
+ .IsRequired()
+ .HasColumnType("text");
+
+ b.Property("Status")
+ .IsRequired()
+ .HasMaxLength(20)
+ .HasColumnType("character varying(20)");
+
+ b.Property("Type")
+ .IsRequired()
+ .HasMaxLength(60)
+ .HasColumnType("character varying(60)");
+
+ b.HasKey("Id");
+
+ b.HasIndex("Status", "CreatedAtUtc");
+
+ b.ToTable("jobs", "assembler");
+ });
+#pragma warning restore 612, 618
+ }
+ }
+}
diff --git a/src/Modules/TeamUp.Modules.Assembler/Persistence/Migrations/20260609214035_InitialAssembler.cs b/src/Modules/TeamUp.Modules.Assembler/Persistence/Migrations/20260609214035_InitialAssembler.cs
new file mode 100644
index 0000000..dff44b5
--- /dev/null
+++ b/src/Modules/TeamUp.Modules.Assembler/Persistence/Migrations/20260609214035_InitialAssembler.cs
@@ -0,0 +1,95 @@
+using System;
+using Microsoft.EntityFrameworkCore.Migrations;
+
+#nullable disable
+
+namespace TeamUp.Modules.Assembler.Persistence.Migrations
+{
+ ///
+ public partial class InitialAssembler : Migration
+ {
+ ///
+ protected override void Up(MigrationBuilder migrationBuilder)
+ {
+ migrationBuilder.EnsureSchema(
+ name: "assembler");
+
+ migrationBuilder.CreateTable(
+ name: "agent_runs",
+ schema: "assembler",
+ columns: table => new
+ {
+ Id = table.Column(type: "uuid", nullable: false),
+ SeatId = table.Column(type: "uuid", nullable: false),
+ WorkItemId = table.Column(type: "uuid", nullable: false),
+ AgentId = table.Column(type: "uuid", nullable: true),
+ Status = table.Column(type: "character varying(20)", maxLength: 20, nullable: false),
+ Prompt = table.Column(type: "text", nullable: true),
+ Output = table.Column(type: "text", nullable: true),
+ ActionType = table.Column(type: "character varying(60)", maxLength: 60, nullable: true),
+ ActionRisk = table.Column(type: "character varying(20)", maxLength: 20, nullable: true),
+ ResultJson = table.Column(type: "text", nullable: true),
+ Trace = table.Column(type: "text", nullable: true),
+ Error = table.Column(type: "text", nullable: true),
+ LatencyMs = table.Column(type: "bigint", nullable: true),
+ CreatedAtUtc = table.Column(type: "timestamp with time zone", nullable: false),
+ CompletedAtUtc = table.Column(type: "timestamp with time zone", nullable: true)
+ },
+ constraints: table =>
+ {
+ table.PrimaryKey("PK_agent_runs", x => x.Id);
+ });
+
+ migrationBuilder.CreateTable(
+ name: "jobs",
+ schema: "assembler",
+ columns: table => new
+ {
+ Id = table.Column(type: "uuid", nullable: false),
+ Type = table.Column(type: "character varying(60)", maxLength: 60, nullable: false),
+ Payload = table.Column(type: "text", nullable: false),
+ Status = table.Column(type: "character varying(20)", maxLength: 20, nullable: false),
+ Attempts = table.Column(type: "integer", nullable: false),
+ LockedBy = table.Column(type: "character varying(120)", maxLength: 120, nullable: true),
+ LockedAtUtc = table.Column(type: "timestamp with time zone", nullable: true),
+ Error = table.Column(type: "text", nullable: true),
+ CreatedAtUtc = table.Column(type: "timestamp with time zone", nullable: false),
+ CompletedAtUtc = table.Column(type: "timestamp with time zone", nullable: true)
+ },
+ constraints: table =>
+ {
+ table.PrimaryKey("PK_jobs", x => x.Id);
+ });
+
+ migrationBuilder.CreateIndex(
+ name: "IX_agent_runs_SeatId",
+ schema: "assembler",
+ table: "agent_runs",
+ column: "SeatId");
+
+ migrationBuilder.CreateIndex(
+ name: "IX_agent_runs_WorkItemId",
+ schema: "assembler",
+ table: "agent_runs",
+ column: "WorkItemId");
+
+ migrationBuilder.CreateIndex(
+ name: "IX_jobs_Status_CreatedAtUtc",
+ schema: "assembler",
+ table: "jobs",
+ columns: new[] { "Status", "CreatedAtUtc" });
+ }
+
+ ///
+ protected override void Down(MigrationBuilder migrationBuilder)
+ {
+ migrationBuilder.DropTable(
+ name: "agent_runs",
+ schema: "assembler");
+
+ migrationBuilder.DropTable(
+ name: "jobs",
+ schema: "assembler");
+ }
+ }
+}
diff --git a/src/Modules/TeamUp.Modules.Assembler/Persistence/Migrations/AssemblerDbContextModelSnapshot.cs b/src/Modules/TeamUp.Modules.Assembler/Persistence/Migrations/AssemblerDbContextModelSnapshot.cs
new file mode 100644
index 0000000..b32aaf9
--- /dev/null
+++ b/src/Modules/TeamUp.Modules.Assembler/Persistence/Migrations/AssemblerDbContextModelSnapshot.cs
@@ -0,0 +1,135 @@
+//
+using System;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.EntityFrameworkCore.Infrastructure;
+using Microsoft.EntityFrameworkCore.Storage.ValueConversion;
+using Npgsql.EntityFrameworkCore.PostgreSQL.Metadata;
+using TeamUp.Modules.Assembler.Persistence;
+
+#nullable disable
+
+namespace TeamUp.Modules.Assembler.Persistence.Migrations
+{
+ [DbContext(typeof(AssemblerDbContext))]
+ partial class AssemblerDbContextModelSnapshot : ModelSnapshot
+ {
+ protected override void BuildModel(ModelBuilder modelBuilder)
+ {
+#pragma warning disable 612, 618
+ modelBuilder
+ .HasDefaultSchema("assembler")
+ .HasAnnotation("ProductVersion", "10.0.8")
+ .HasAnnotation("Relational:MaxIdentifierLength", 63);
+
+ NpgsqlModelBuilderExtensions.UseIdentityByDefaultColumns(modelBuilder);
+
+ modelBuilder.Entity("TeamUp.Modules.Assembler.Domain.AgentRun", b =>
+ {
+ b.Property("Id")
+ .ValueGeneratedOnAdd()
+ .HasColumnType("uuid");
+
+ b.Property("ActionRisk")
+ .HasMaxLength(20)
+ .HasColumnType("character varying(20)");
+
+ b.Property("ActionType")
+ .HasMaxLength(60)
+ .HasColumnType("character varying(60)");
+
+ b.Property("AgentId")
+ .HasColumnType("uuid");
+
+ b.Property("CompletedAtUtc")
+ .HasColumnType("timestamp with time zone");
+
+ b.Property("CreatedAtUtc")
+ .HasColumnType("timestamp with time zone");
+
+ b.Property("Error")
+ .HasColumnType("text");
+
+ b.Property("LatencyMs")
+ .HasColumnType("bigint");
+
+ b.Property("Output")
+ .HasColumnType("text");
+
+ b.Property("Prompt")
+ .HasColumnType("text");
+
+ b.Property("ResultJson")
+ .HasColumnType("text");
+
+ b.Property("SeatId")
+ .HasColumnType("uuid");
+
+ b.Property("Status")
+ .IsRequired()
+ .HasMaxLength(20)
+ .HasColumnType("character varying(20)");
+
+ b.Property("Trace")
+ .HasColumnType("text");
+
+ b.Property("WorkItemId")
+ .HasColumnType("uuid");
+
+ b.HasKey("Id");
+
+ b.HasIndex("SeatId");
+
+ b.HasIndex("WorkItemId");
+
+ b.ToTable("agent_runs", "assembler");
+ });
+
+ modelBuilder.Entity("TeamUp.Modules.Assembler.Domain.Job", b =>
+ {
+ b.Property("Id")
+ .ValueGeneratedOnAdd()
+ .HasColumnType("uuid");
+
+ b.Property("Attempts")
+ .HasColumnType("integer");
+
+ b.Property("CompletedAtUtc")
+ .HasColumnType("timestamp with time zone");
+
+ b.Property("CreatedAtUtc")
+ .HasColumnType("timestamp with time zone");
+
+ b.Property("Error")
+ .HasColumnType("text");
+
+ b.Property("LockedAtUtc")
+ .HasColumnType("timestamp with time zone");
+
+ b.Property("LockedBy")
+ .HasMaxLength(120)
+ .HasColumnType("character varying(120)");
+
+ b.Property("Payload")
+ .IsRequired()
+ .HasColumnType("text");
+
+ b.Property("Status")
+ .IsRequired()
+ .HasMaxLength(20)
+ .HasColumnType("character varying(20)");
+
+ b.Property("Type")
+ .IsRequired()
+ .HasMaxLength(60)
+ .HasColumnType("character varying(60)");
+
+ b.HasKey("Id");
+
+ b.HasIndex("Status", "CreatedAtUtc");
+
+ b.ToTable("jobs", "assembler");
+ });
+#pragma warning restore 612, 618
+ }
+ }
+}
diff --git a/src/Modules/TeamUp.Modules.Assembler/Queue/JobQueue.cs b/src/Modules/TeamUp.Modules.Assembler/Queue/JobQueue.cs
new file mode 100644
index 0000000..a896521
--- /dev/null
+++ b/src/Modules/TeamUp.Modules.Assembler/Queue/JobQueue.cs
@@ -0,0 +1,40 @@
+using Microsoft.EntityFrameworkCore;
+using TeamUp.Modules.Assembler.Domain;
+using TeamUp.Modules.Assembler.Persistence;
+
+namespace TeamUp.Modules.Assembler.Queue;
+
+/// The Postgres-backed agent-run queue. Enqueue inserts; claim uses FOR UPDATE SKIP LOCKED
+/// so multiple workers can drain concurrently without contention.
+internal sealed class JobQueue(AssemblerDbContext db, TimeProvider clock)
+{
+ public async Task EnqueueAsync(string type, string payload, CancellationToken cancellationToken = default)
+ {
+ var job = new Job(type, payload, clock.GetUtcNow());
+ db.Jobs.Add(job);
+ await db.SaveChangesAsync(cancellationToken);
+ return job;
+ }
+
+ public async Task ClaimNextAsync(string worker, CancellationToken cancellationToken = default)
+ {
+ await using var transaction = await db.Database.BeginTransactionAsync(cancellationToken);
+
+ var job = await db.Jobs
+ .FromSqlRaw(
+ "SELECT * FROM assembler.jobs WHERE \"Status\" = 'Pending' " +
+ "ORDER BY \"CreatedAtUtc\" LIMIT 1 FOR UPDATE SKIP LOCKED")
+ .FirstOrDefaultAsync(cancellationToken);
+
+ if (job is null)
+ {
+ await transaction.RollbackAsync(cancellationToken);
+ return null;
+ }
+
+ job.MarkProcessing(worker, clock.GetUtcNow());
+ await db.SaveChangesAsync(cancellationToken);
+ await transaction.CommitAsync(cancellationToken);
+ return job;
+ }
+}
diff --git a/src/Modules/TeamUp.Modules.Assembler/Runtime/AgentRunExecutor.cs b/src/Modules/TeamUp.Modules.Assembler/Runtime/AgentRunExecutor.cs
new file mode 100644
index 0000000..85f5591
--- /dev/null
+++ b/src/Modules/TeamUp.Modules.Assembler/Runtime/AgentRunExecutor.cs
@@ -0,0 +1,48 @@
+using System.Text.Json;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.Extensions.Logging;
+using TeamUp.Modules.Assembler.Domain;
+using TeamUp.Modules.Assembler.Persistence;
+
+namespace TeamUp.Modules.Assembler.Runtime;
+
+internal sealed record AgentRunPayload(Guid RunId);
+
+///
+/// Processes one claimed job: drives the AgentRun lifecycle. In M4 Increment 1 it records a
+/// placeholder; Increment 2 swaps the middle for the real assembler (assemble → model → parse).
+///
+internal sealed class AgentRunExecutor(AssemblerDbContext db, TimeProvider clock, ILogger logger)
+{
+ public async Task ProcessAsync(Job job, CancellationToken cancellationToken = default)
+ {
+ try
+ {
+ var payload = JsonSerializer.Deserialize(job.Payload)
+ ?? throw new InvalidOperationException("Invalid job payload.");
+
+ var run = await db.AgentRuns.FirstOrDefaultAsync(r => r.Id == payload.RunId, cancellationToken)
+ ?? throw new InvalidOperationException($"AgentRun {payload.RunId} not found.");
+
+ run.Start(agentId: null, prompt: "[assembler pending — M4 Increment 2]", trace: null);
+ await db.SaveChangesAsync(cancellationToken);
+
+ // TODO (M4 Increment 2): assemble the prompt, call the model, parse into action + risk.
+ run.Complete(
+ output: "[assembler pending]",
+ actionType: "pending",
+ actionRisk: "read",
+ resultJson: null,
+ latencyMs: 0,
+ clock.GetUtcNow());
+ job.MarkDone(clock.GetUtcNow());
+ await db.SaveChangesAsync(cancellationToken);
+ }
+ catch (Exception ex)
+ {
+ job.MarkFailed(ex.Message, clock.GetUtcNow());
+ await db.SaveChangesAsync(cancellationToken);
+ logger.LogError(ex, "Agent-run job {JobId} failed.", job.Id);
+ }
+ }
+}
diff --git a/src/Modules/TeamUp.Modules.Assembler/TeamUp.Modules.Assembler.csproj b/src/Modules/TeamUp.Modules.Assembler/TeamUp.Modules.Assembler.csproj
index 62eeba1..422bda5 100644
--- a/src/Modules/TeamUp.Modules.Assembler/TeamUp.Modules.Assembler.csproj
+++ b/src/Modules/TeamUp.Modules.Assembler/TeamUp.Modules.Assembler.csproj
@@ -1,12 +1,21 @@
-
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/Modules/TeamUp.Modules.Assembler/Worker/JobProcessor.cs b/src/Modules/TeamUp.Modules.Assembler/Worker/JobProcessor.cs
new file mode 100644
index 0000000..886741a
--- /dev/null
+++ b/src/Modules/TeamUp.Modules.Assembler/Worker/JobProcessor.cs
@@ -0,0 +1,50 @@
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+using TeamUp.Modules.Assembler.Queue;
+using TeamUp.Modules.Assembler.Runtime;
+
+namespace TeamUp.Modules.Assembler.Worker;
+
+/// Drains the agent-run queue on the worker host: claim (SKIP LOCKED) → process, repeat.
+internal sealed class JobProcessor(IServiceScopeFactory scopeFactory, ILogger logger) : BackgroundService
+{
+ private static readonly TimeSpan PollInterval = TimeSpan.FromSeconds(2);
+ private readonly string _worker = $"{Environment.MachineName}:{Environment.ProcessId}";
+
+ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
+ {
+ logger.LogInformation("Agent-run job processor started ({Worker}).", _worker);
+
+ using var timer = new PeriodicTimer(PollInterval);
+ while (!stoppingToken.IsCancellationRequested)
+ {
+ try
+ {
+ await DrainAsync(stoppingToken);
+ await timer.WaitForNextTickAsync(stoppingToken);
+ }
+ catch (OperationCanceledException)
+ {
+ break;
+ }
+ }
+ }
+
+ private async Task DrainAsync(CancellationToken cancellationToken)
+ {
+ while (!cancellationToken.IsCancellationRequested)
+ {
+ await using var scope = scopeFactory.CreateAsyncScope();
+ var queue = scope.ServiceProvider.GetRequiredService();
+ var job = await queue.ClaimNextAsync(_worker, cancellationToken);
+ if (job is null)
+ {
+ break;
+ }
+
+ var executor = scope.ServiceProvider.GetRequiredService();
+ await executor.ProcessAsync(job, cancellationToken);
+ }
+ }
+}
diff --git a/src/Shared/TeamUp.SharedKernel/Modularity/IWorkerModule.cs b/src/Shared/TeamUp.SharedKernel/Modularity/IWorkerModule.cs
new file mode 100644
index 0000000..c159e63
--- /dev/null
+++ b/src/Shared/TeamUp.SharedKernel/Modularity/IWorkerModule.cs
@@ -0,0 +1,14 @@
+using Microsoft.Extensions.Configuration;
+using Microsoft.Extensions.DependencyInjection;
+
+namespace TeamUp.SharedKernel.Modularity;
+
+///
+/// Implemented by modules that contribute background services. runs in
+/// the WORKER host only — so hosted services (e.g. the agent-run job drainer) never start in the web
+/// host, keeping the web off the model path.
+///
+public interface IWorkerModule
+{
+ void RegisterWorker(IServiceCollection services, IConfiguration configuration);
+}
diff --git a/tests/TeamUp.IntegrationTests/AgentRunQueueTests.cs b/tests/TeamUp.IntegrationTests/AgentRunQueueTests.cs
new file mode 100644
index 0000000..c9abf53
--- /dev/null
+++ b/tests/TeamUp.IntegrationTests/AgentRunQueueTests.cs
@@ -0,0 +1,74 @@
+using System.Net;
+using System.Net.Http.Headers;
+using System.Net.Http.Json;
+using Microsoft.Extensions.DependencyInjection;
+using TeamUp.Modules.Assembler.Queue;
+using TeamUp.Modules.Assembler.Runtime;
+using Xunit;
+
+namespace TeamUp.IntegrationTests;
+
+///
+/// M4 Increment 1: a run is enqueued (web), claimed via FOR UPDATE SKIP LOCKED + processed
+/// (worker services), and the AgentRun lifecycle reaches Completed.
+///
+public sealed class AgentRunQueueTests(PostgresFixture postgres) : IClassFixture
+{
+ private sealed record BootstrapResponse(string Token, Guid MemberId, Guid OrganizationId);
+
+ private sealed record RunResponse(
+ Guid Id, Guid SeatId, Guid WorkItemId, Guid? AgentId, string Status,
+ string? ActionType, string? ActionRisk, string? Prompt, string? Output, string? Error);
+
+ [Fact]
+ public async Task Run_is_enqueued_claimed_and_processed()
+ {
+ await using var factory = new TeamUpWebFactory(postgres.ConnectionString);
+ using var anon = factory.CreateClient();
+
+ var bootstrap = await anon.PostAsJsonAsync("/api/identity/bootstrap", new
+ {
+ organizationName = "AliaSaaS",
+ ownerEmail = "owner@alia.test",
+ ownerDisplayName = "Owner",
+ ownerPassword = "Passw0rd!",
+ });
+ var owner = await bootstrap.Content.ReadFromJsonAsync();
+ Assert.NotNull(owner);
+
+ using var client = factory.CreateClient();
+ client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", owner!.Token);
+
+ // Enqueue a run (web).
+ var created = await client.PostAsJsonAsync("/api/assembler/runs", new
+ {
+ seatId = Guid.NewGuid(),
+ workItemId = Guid.NewGuid(),
+ });
+ Assert.Equal(HttpStatusCode.OK, created.StatusCode);
+ var run = await created.Content.ReadFromJsonAsync();
+ Assert.Equal("Queued", run!.Status);
+
+ // Drain one job exactly as the worker does: claim (SKIP LOCKED) then process.
+ await using (var scope = factory.Services.CreateAsyncScope())
+ {
+ var queue = scope.ServiceProvider.GetRequiredService();
+ var job = await queue.ClaimNextAsync("test-worker");
+ Assert.NotNull(job);
+
+ var executor = scope.ServiceProvider.GetRequiredService();
+ await executor.ProcessAsync(job!);
+ }
+
+ // The next claim finds nothing (the only job is done).
+ await using (var scope = factory.Services.CreateAsyncScope())
+ {
+ var queue = scope.ServiceProvider.GetRequiredService();
+ Assert.Null(await queue.ClaimNextAsync("test-worker"));
+ }
+
+ var fetched = await client.GetFromJsonAsync($"/api/assembler/runs/{run.Id}");
+ Assert.Equal("Completed", fetched!.Status);
+ Assert.Equal("pending", fetched.ActionType);
+ }
+}