M4: agent-run job queue + worker drain (Increment 1)
SharedKernel: IWorkerModule seam (RegisterWorker runs in the worker host only).
Bootstrap: AddTeamUpWorkerServices; the worker host now wires it.
Assembler module (schema "assembler", InitialAssembler migration):
- Job (Pending→Processing→Done/Failed) + AgentRun (Queued→Running→Completed/Failed) entities.
- JobQueue: enqueue + ClaimNextAsync using `FOR UPDATE SKIP LOCKED` in a transaction.
- AgentRunExecutor (Increment-1 placeholder — real assemble/model/parse lands in Increment 2).
- JobProcessor BackgroundService drains the queue on the worker host (web off the model path).
- POST /api/assembler/runs enqueues a run; GET /api/assembler/runs/{id} reads it.
Verified: build green; ArchitectureTests 8/8 (Assembler references only SharedKernel);
IntegrationTests 28/28 incl. enqueue→claim(SKIP LOCKED)→process→Completed.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
using Microsoft.AspNetCore.Routing;
|
||||
using Microsoft.Extensions.Configuration;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using TeamUp.SharedKernel.Modularity;
|
||||
|
||||
namespace TeamUp.Bootstrap;
|
||||
|
||||
@@ -29,4 +30,20 @@ public static class TeamUpModuleExtensions
|
||||
|
||||
return endpoints;
|
||||
}
|
||||
|
||||
/// <summary>Runs <c>RegisterWorker</c> for modules with background services. WORKER host only.</summary>
|
||||
public static IServiceCollection AddTeamUpWorkerServices(
|
||||
this IServiceCollection services,
|
||||
IConfiguration configuration)
|
||||
{
|
||||
foreach (var module in ModuleCatalog.All)
|
||||
{
|
||||
if (module is IWorkerModule workerModule)
|
||||
{
|
||||
workerModule.RegisterWorker(services, configuration);
|
||||
}
|
||||
}
|
||||
|
||||
return services;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,6 +13,7 @@ builder.Services.AddSerilog((services, configuration) => configuration
|
||||
builder.Services.AddTeamUpObservability(builder.Configuration, serviceName: "teamup-worker");
|
||||
builder.Services.AddTeamUpPersistence(builder.Configuration);
|
||||
builder.Services.AddTeamUpModules(builder.Configuration);
|
||||
builder.Services.AddTeamUpWorkerServices(builder.Configuration); // hosted services: the agent-run job drainer
|
||||
builder.Services.AddHostedService<HeartbeatService>();
|
||||
|
||||
var host = builder.Build();
|
||||
|
||||
@@ -1,27 +1,40 @@
|
||||
using Microsoft.AspNetCore.Builder;
|
||||
using Microsoft.AspNetCore.Http;
|
||||
using Microsoft.AspNetCore.Routing;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Microsoft.Extensions.Configuration;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.DependencyInjection.Extensions;
|
||||
using TeamUp.Modules.Assembler.Endpoints;
|
||||
using TeamUp.Modules.Assembler.Persistence;
|
||||
using TeamUp.Modules.Assembler.Queue;
|
||||
using TeamUp.Modules.Assembler.Runtime;
|
||||
using TeamUp.Modules.Assembler.Worker;
|
||||
using TeamUp.SharedKernel.Modularity;
|
||||
using TeamUp.SharedKernel.Persistence;
|
||||
|
||||
namespace TeamUp.Modules.Assembler;
|
||||
|
||||
/// <summary>Context assembly, the model call, output parsing, prompt caching — runs in the worker (M4).</summary>
|
||||
public sealed class AssemblerModule : IModule
|
||||
/// <summary>
|
||||
/// Context assembly, the model call, output parsing — the agent runtime. The job queue + AgentRun
|
||||
/// state live here; the drain runs in the worker host (RegisterWorker), the trigger on the web host.
|
||||
/// </summary>
|
||||
public sealed class AssemblerModule : IModule, IWorkerModule
|
||||
{
|
||||
public string Name => "assembler";
|
||||
|
||||
public void Register(IServiceCollection services, IConfiguration configuration)
|
||||
{
|
||||
// Skeleton: no services yet. M4 introduces the jobs table (FOR UPDATE SKIP LOCKED),
|
||||
// the AgentRun context, and the assembler pipeline (registered for the worker host).
|
||||
var connectionString = configuration.GetConnectionString("Postgres")
|
||||
?? throw new InvalidOperationException("Missing connection string 'ConnectionStrings:Postgres'.");
|
||||
|
||||
services.AddDbContext<AssemblerDbContext>(options => options.UseNpgsql(connectionString));
|
||||
services.AddScoped<IModuleDbContext>(sp => sp.GetRequiredService<AssemblerDbContext>());
|
||||
services.AddScoped<JobQueue>();
|
||||
services.AddScoped<AgentRunExecutor>();
|
||||
services.TryAddSingleton(TimeProvider.System);
|
||||
}
|
||||
|
||||
public void MapEndpoints(IEndpointRouteBuilder endpoints)
|
||||
{
|
||||
endpoints.MapGroup($"/api/{Name}")
|
||||
.WithTags("Assembler")
|
||||
.MapGet("/ping", () => TypedResults.Ok(new ModulePing(Name)));
|
||||
}
|
||||
public void RegisterWorker(IServiceCollection services, IConfiguration configuration) =>
|
||||
services.AddHostedService<JobProcessor>();
|
||||
|
||||
public void MapEndpoints(IEndpointRouteBuilder endpoints) => AssemblerEndpoints.Map(endpoints);
|
||||
}
|
||||
|
||||
@@ -0,0 +1,72 @@
|
||||
using TeamUp.SharedKernel.Domain;
|
||||
|
||||
namespace TeamUp.Modules.Assembler.Domain;
|
||||
|
||||
internal enum AgentRunStatus
|
||||
{
|
||||
Queued,
|
||||
Running,
|
||||
Completed,
|
||||
Failed,
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// One execution of an AI seat against a task: the assembled prompt, the raw model output, the
|
||||
/// parsed action + risk tag, and the reasoning/assembly trace. Nothing executes off this in M4 —
|
||||
/// the action gate (M5) decides whether the parsed action runs or waits in review.
|
||||
/// </summary>
|
||||
internal sealed class AgentRun : Entity
|
||||
{
|
||||
public Guid SeatId { get; private set; }
|
||||
public Guid WorkItemId { get; private set; }
|
||||
public Guid? AgentId { get; private set; }
|
||||
public AgentRunStatus Status { get; private set; }
|
||||
public string? Prompt { get; private set; }
|
||||
public string? Output { get; private set; }
|
||||
public string? ActionType { get; private set; }
|
||||
public string? ActionRisk { get; private set; }
|
||||
public string? ResultJson { get; private set; }
|
||||
public string? Trace { get; private set; }
|
||||
public string? Error { get; private set; }
|
||||
public long? LatencyMs { get; private set; }
|
||||
public DateTimeOffset CreatedAtUtc { get; private set; }
|
||||
public DateTimeOffset? CompletedAtUtc { get; private set; }
|
||||
|
||||
private AgentRun()
|
||||
{
|
||||
}
|
||||
|
||||
public AgentRun(Guid seatId, Guid workItemId, DateTimeOffset createdAtUtc)
|
||||
{
|
||||
SeatId = seatId;
|
||||
WorkItemId = workItemId;
|
||||
Status = AgentRunStatus.Queued;
|
||||
CreatedAtUtc = createdAtUtc;
|
||||
}
|
||||
|
||||
public void Start(Guid? agentId, string prompt, string? trace)
|
||||
{
|
||||
Status = AgentRunStatus.Running;
|
||||
AgentId = agentId;
|
||||
Prompt = prompt;
|
||||
Trace = trace;
|
||||
}
|
||||
|
||||
public void Complete(string output, string actionType, string actionRisk, string? resultJson, long latencyMs, DateTimeOffset nowUtc)
|
||||
{
|
||||
Status = AgentRunStatus.Completed;
|
||||
Output = output;
|
||||
ActionType = actionType;
|
||||
ActionRisk = actionRisk;
|
||||
ResultJson = resultJson;
|
||||
LatencyMs = latencyMs;
|
||||
CompletedAtUtc = nowUtc;
|
||||
}
|
||||
|
||||
public void Fail(string error, DateTimeOffset nowUtc)
|
||||
{
|
||||
Status = AgentRunStatus.Failed;
|
||||
Error = error;
|
||||
CompletedAtUtc = nowUtc;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,61 @@
|
||||
using TeamUp.SharedKernel.Domain;
|
||||
|
||||
namespace TeamUp.Modules.Assembler.Domain;
|
||||
|
||||
internal enum JobStatus
|
||||
{
|
||||
Pending,
|
||||
Processing,
|
||||
Done,
|
||||
Failed,
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// A unit of background work, drained from Postgres with <c>FOR UPDATE SKIP LOCKED</c> by the worker.
|
||||
/// The run lifecycle is domain state (kept explicit) rather than an opaque message-bus payload.
|
||||
/// </summary>
|
||||
internal sealed class Job : Entity
|
||||
{
|
||||
public string Type { get; private set; } = null!;
|
||||
public string Payload { get; private set; } = null!;
|
||||
public JobStatus Status { get; private set; }
|
||||
public int Attempts { get; private set; }
|
||||
public string? LockedBy { get; private set; }
|
||||
public DateTimeOffset? LockedAtUtc { get; private set; }
|
||||
public string? Error { get; private set; }
|
||||
public DateTimeOffset CreatedAtUtc { get; private set; }
|
||||
public DateTimeOffset? CompletedAtUtc { get; private set; }
|
||||
|
||||
private Job()
|
||||
{
|
||||
}
|
||||
|
||||
public Job(string type, string payload, DateTimeOffset createdAtUtc)
|
||||
{
|
||||
Type = type;
|
||||
Payload = payload;
|
||||
Status = JobStatus.Pending;
|
||||
CreatedAtUtc = createdAtUtc;
|
||||
}
|
||||
|
||||
public void MarkProcessing(string worker, DateTimeOffset nowUtc)
|
||||
{
|
||||
Status = JobStatus.Processing;
|
||||
LockedBy = worker;
|
||||
LockedAtUtc = nowUtc;
|
||||
Attempts++;
|
||||
}
|
||||
|
||||
public void MarkDone(DateTimeOffset nowUtc)
|
||||
{
|
||||
Status = JobStatus.Done;
|
||||
CompletedAtUtc = nowUtc;
|
||||
}
|
||||
|
||||
public void MarkFailed(string error, DateTimeOffset nowUtc)
|
||||
{
|
||||
Status = JobStatus.Failed;
|
||||
Error = error;
|
||||
CompletedAtUtc = nowUtc;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,15 @@
|
||||
namespace TeamUp.Modules.Assembler.Endpoints;
|
||||
|
||||
internal sealed record CreateRunRequest(Guid SeatId, Guid WorkItemId);
|
||||
|
||||
internal sealed record RunResponse(
|
||||
Guid Id,
|
||||
Guid SeatId,
|
||||
Guid WorkItemId,
|
||||
Guid? AgentId,
|
||||
string Status,
|
||||
string? ActionType,
|
||||
string? ActionRisk,
|
||||
string? Prompt,
|
||||
string? Output,
|
||||
string? Error);
|
||||
@@ -0,0 +1,47 @@
|
||||
using System.Text.Json;
|
||||
using Microsoft.AspNetCore.Builder;
|
||||
using Microsoft.AspNetCore.Http;
|
||||
using Microsoft.AspNetCore.Routing;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using TeamUp.Modules.Assembler.Domain;
|
||||
using TeamUp.Modules.Assembler.Persistence;
|
||||
using TeamUp.Modules.Assembler.Queue;
|
||||
using TeamUp.Modules.Assembler.Runtime;
|
||||
using TeamUp.SharedKernel.Modularity;
|
||||
|
||||
namespace TeamUp.Modules.Assembler.Endpoints;
|
||||
|
||||
internal static class AssemblerEndpoints
|
||||
{
|
||||
public static void Map(IEndpointRouteBuilder endpoints)
|
||||
{
|
||||
var group = endpoints.MapGroup("/api/assembler").WithTags("Assembler");
|
||||
|
||||
group.MapGet("/ping", () => TypedResults.Ok(new ModulePing("assembler")));
|
||||
group.MapPost("/runs", CreateRun).RequireAuthorization();
|
||||
group.MapGet("/runs/{id:guid}", GetRun).RequireAuthorization();
|
||||
}
|
||||
|
||||
// Dispatch a task to an AI seat: record a queued AgentRun and enqueue the job. The worker
|
||||
// drains it off the request path. (Scope-checking the seat's team is added in Increment 2.)
|
||||
private static async Task<IResult> CreateRun(
|
||||
CreateRunRequest request, AssemblerDbContext db, JobQueue queue, TimeProvider clock, CancellationToken ct)
|
||||
{
|
||||
var run = new AgentRun(request.SeatId, request.WorkItemId, clock.GetUtcNow());
|
||||
db.AgentRuns.Add(run);
|
||||
await db.SaveChangesAsync(ct);
|
||||
|
||||
await queue.EnqueueAsync("agent.run", JsonSerializer.Serialize(new AgentRunPayload(run.Id)), ct);
|
||||
return Results.Ok(ToResponse(run));
|
||||
}
|
||||
|
||||
private static async Task<IResult> GetRun(Guid id, AssemblerDbContext db, CancellationToken ct)
|
||||
{
|
||||
var run = await db.AgentRuns.FirstOrDefaultAsync(r => r.Id == id, ct);
|
||||
return run is null ? Results.NotFound() : Results.Ok(ToResponse(run));
|
||||
}
|
||||
|
||||
private static RunResponse ToResponse(AgentRun run) => new(
|
||||
run.Id, run.SeatId, run.WorkItemId, run.AgentId, run.Status.ToString(),
|
||||
run.ActionType, run.ActionRisk, run.Prompt, run.Output, run.Error);
|
||||
}
|
||||
@@ -0,0 +1,39 @@
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using TeamUp.Modules.Assembler.Domain;
|
||||
using TeamUp.SharedKernel.Persistence;
|
||||
|
||||
namespace TeamUp.Modules.Assembler.Persistence;
|
||||
|
||||
internal sealed class AssemblerDbContext(DbContextOptions<AssemblerDbContext> options)
|
||||
: DbContext(options), IModuleDbContext
|
||||
{
|
||||
public DbSet<Job> Jobs => Set<Job>();
|
||||
public DbSet<AgentRun> AgentRuns => Set<AgentRun>();
|
||||
|
||||
protected override void OnModelCreating(ModelBuilder modelBuilder)
|
||||
{
|
||||
modelBuilder.HasDefaultSchema("assembler");
|
||||
|
||||
modelBuilder.Entity<Job>(job =>
|
||||
{
|
||||
job.ToTable("jobs");
|
||||
job.HasKey(j => j.Id);
|
||||
job.Property(j => j.Type).HasMaxLength(60).IsRequired();
|
||||
job.Property(j => j.Status).HasConversion<string>().HasMaxLength(20);
|
||||
job.Property(j => j.LockedBy).HasMaxLength(120);
|
||||
// Drives the FOR UPDATE SKIP LOCKED claim query.
|
||||
job.HasIndex(j => new { j.Status, j.CreatedAtUtc });
|
||||
});
|
||||
|
||||
modelBuilder.Entity<AgentRun>(run =>
|
||||
{
|
||||
run.ToTable("agent_runs");
|
||||
run.HasKey(r => r.Id);
|
||||
run.Property(r => r.Status).HasConversion<string>().HasMaxLength(20);
|
||||
run.Property(r => r.ActionType).HasMaxLength(60);
|
||||
run.Property(r => r.ActionRisk).HasMaxLength(20);
|
||||
run.HasIndex(r => r.WorkItemId);
|
||||
run.HasIndex(r => r.SeatId);
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,21 @@
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Microsoft.EntityFrameworkCore.Design;
|
||||
|
||||
namespace TeamUp.Modules.Assembler.Persistence;
|
||||
|
||||
/// <summary>Design-time factory so `dotnet ef` can build the internal context without a host.</summary>
|
||||
internal sealed class AssemblerDbContextFactory : IDesignTimeDbContextFactory<AssemblerDbContext>
|
||||
{
|
||||
public AssemblerDbContext CreateDbContext(string[] args)
|
||||
{
|
||||
var connectionString =
|
||||
Environment.GetEnvironmentVariable("ConnectionStrings__Postgres")
|
||||
?? "Host=localhost;Port=5432;Database=teamup;Username=teamup;Password=teamup";
|
||||
|
||||
var options = new DbContextOptionsBuilder<AssemblerDbContext>()
|
||||
.UseNpgsql(connectionString)
|
||||
.Options;
|
||||
|
||||
return new AssemblerDbContext(options);
|
||||
}
|
||||
}
|
||||
+138
@@ -0,0 +1,138 @@
|
||||
// <auto-generated />
|
||||
using System;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Microsoft.EntityFrameworkCore.Infrastructure;
|
||||
using Microsoft.EntityFrameworkCore.Migrations;
|
||||
using Microsoft.EntityFrameworkCore.Storage.ValueConversion;
|
||||
using Npgsql.EntityFrameworkCore.PostgreSQL.Metadata;
|
||||
using TeamUp.Modules.Assembler.Persistence;
|
||||
|
||||
#nullable disable
|
||||
|
||||
namespace TeamUp.Modules.Assembler.Persistence.Migrations
|
||||
{
|
||||
[DbContext(typeof(AssemblerDbContext))]
|
||||
[Migration("20260609214035_InitialAssembler")]
|
||||
partial class InitialAssembler
|
||||
{
|
||||
/// <inheritdoc />
|
||||
protected override void BuildTargetModel(ModelBuilder modelBuilder)
|
||||
{
|
||||
#pragma warning disable 612, 618
|
||||
modelBuilder
|
||||
.HasDefaultSchema("assembler")
|
||||
.HasAnnotation("ProductVersion", "10.0.8")
|
||||
.HasAnnotation("Relational:MaxIdentifierLength", 63);
|
||||
|
||||
NpgsqlModelBuilderExtensions.UseIdentityByDefaultColumns(modelBuilder);
|
||||
|
||||
modelBuilder.Entity("TeamUp.Modules.Assembler.Domain.AgentRun", b =>
|
||||
{
|
||||
b.Property<Guid>("Id")
|
||||
.ValueGeneratedOnAdd()
|
||||
.HasColumnType("uuid");
|
||||
|
||||
b.Property<string>("ActionRisk")
|
||||
.HasMaxLength(20)
|
||||
.HasColumnType("character varying(20)");
|
||||
|
||||
b.Property<string>("ActionType")
|
||||
.HasMaxLength(60)
|
||||
.HasColumnType("character varying(60)");
|
||||
|
||||
b.Property<Guid?>("AgentId")
|
||||
.HasColumnType("uuid");
|
||||
|
||||
b.Property<DateTimeOffset?>("CompletedAtUtc")
|
||||
.HasColumnType("timestamp with time zone");
|
||||
|
||||
b.Property<DateTimeOffset>("CreatedAtUtc")
|
||||
.HasColumnType("timestamp with time zone");
|
||||
|
||||
b.Property<string>("Error")
|
||||
.HasColumnType("text");
|
||||
|
||||
b.Property<long?>("LatencyMs")
|
||||
.HasColumnType("bigint");
|
||||
|
||||
b.Property<string>("Output")
|
||||
.HasColumnType("text");
|
||||
|
||||
b.Property<string>("Prompt")
|
||||
.HasColumnType("text");
|
||||
|
||||
b.Property<string>("ResultJson")
|
||||
.HasColumnType("text");
|
||||
|
||||
b.Property<Guid>("SeatId")
|
||||
.HasColumnType("uuid");
|
||||
|
||||
b.Property<string>("Status")
|
||||
.IsRequired()
|
||||
.HasMaxLength(20)
|
||||
.HasColumnType("character varying(20)");
|
||||
|
||||
b.Property<string>("Trace")
|
||||
.HasColumnType("text");
|
||||
|
||||
b.Property<Guid>("WorkItemId")
|
||||
.HasColumnType("uuid");
|
||||
|
||||
b.HasKey("Id");
|
||||
|
||||
b.HasIndex("SeatId");
|
||||
|
||||
b.HasIndex("WorkItemId");
|
||||
|
||||
b.ToTable("agent_runs", "assembler");
|
||||
});
|
||||
|
||||
modelBuilder.Entity("TeamUp.Modules.Assembler.Domain.Job", b =>
|
||||
{
|
||||
b.Property<Guid>("Id")
|
||||
.ValueGeneratedOnAdd()
|
||||
.HasColumnType("uuid");
|
||||
|
||||
b.Property<int>("Attempts")
|
||||
.HasColumnType("integer");
|
||||
|
||||
b.Property<DateTimeOffset?>("CompletedAtUtc")
|
||||
.HasColumnType("timestamp with time zone");
|
||||
|
||||
b.Property<DateTimeOffset>("CreatedAtUtc")
|
||||
.HasColumnType("timestamp with time zone");
|
||||
|
||||
b.Property<string>("Error")
|
||||
.HasColumnType("text");
|
||||
|
||||
b.Property<DateTimeOffset?>("LockedAtUtc")
|
||||
.HasColumnType("timestamp with time zone");
|
||||
|
||||
b.Property<string>("LockedBy")
|
||||
.HasMaxLength(120)
|
||||
.HasColumnType("character varying(120)");
|
||||
|
||||
b.Property<string>("Payload")
|
||||
.IsRequired()
|
||||
.HasColumnType("text");
|
||||
|
||||
b.Property<string>("Status")
|
||||
.IsRequired()
|
||||
.HasMaxLength(20)
|
||||
.HasColumnType("character varying(20)");
|
||||
|
||||
b.Property<string>("Type")
|
||||
.IsRequired()
|
||||
.HasMaxLength(60)
|
||||
.HasColumnType("character varying(60)");
|
||||
|
||||
b.HasKey("Id");
|
||||
|
||||
b.HasIndex("Status", "CreatedAtUtc");
|
||||
|
||||
b.ToTable("jobs", "assembler");
|
||||
});
|
||||
#pragma warning restore 612, 618
|
||||
}
|
||||
}
|
||||
}
|
||||
+95
@@ -0,0 +1,95 @@
|
||||
using System;
|
||||
using Microsoft.EntityFrameworkCore.Migrations;
|
||||
|
||||
#nullable disable
|
||||
|
||||
namespace TeamUp.Modules.Assembler.Persistence.Migrations
|
||||
{
|
||||
/// <inheritdoc />
|
||||
public partial class InitialAssembler : Migration
|
||||
{
|
||||
/// <inheritdoc />
|
||||
protected override void Up(MigrationBuilder migrationBuilder)
|
||||
{
|
||||
migrationBuilder.EnsureSchema(
|
||||
name: "assembler");
|
||||
|
||||
migrationBuilder.CreateTable(
|
||||
name: "agent_runs",
|
||||
schema: "assembler",
|
||||
columns: table => new
|
||||
{
|
||||
Id = table.Column<Guid>(type: "uuid", nullable: false),
|
||||
SeatId = table.Column<Guid>(type: "uuid", nullable: false),
|
||||
WorkItemId = table.Column<Guid>(type: "uuid", nullable: false),
|
||||
AgentId = table.Column<Guid>(type: "uuid", nullable: true),
|
||||
Status = table.Column<string>(type: "character varying(20)", maxLength: 20, nullable: false),
|
||||
Prompt = table.Column<string>(type: "text", nullable: true),
|
||||
Output = table.Column<string>(type: "text", nullable: true),
|
||||
ActionType = table.Column<string>(type: "character varying(60)", maxLength: 60, nullable: true),
|
||||
ActionRisk = table.Column<string>(type: "character varying(20)", maxLength: 20, nullable: true),
|
||||
ResultJson = table.Column<string>(type: "text", nullable: true),
|
||||
Trace = table.Column<string>(type: "text", nullable: true),
|
||||
Error = table.Column<string>(type: "text", nullable: true),
|
||||
LatencyMs = table.Column<long>(type: "bigint", nullable: true),
|
||||
CreatedAtUtc = table.Column<DateTimeOffset>(type: "timestamp with time zone", nullable: false),
|
||||
CompletedAtUtc = table.Column<DateTimeOffset>(type: "timestamp with time zone", nullable: true)
|
||||
},
|
||||
constraints: table =>
|
||||
{
|
||||
table.PrimaryKey("PK_agent_runs", x => x.Id);
|
||||
});
|
||||
|
||||
migrationBuilder.CreateTable(
|
||||
name: "jobs",
|
||||
schema: "assembler",
|
||||
columns: table => new
|
||||
{
|
||||
Id = table.Column<Guid>(type: "uuid", nullable: false),
|
||||
Type = table.Column<string>(type: "character varying(60)", maxLength: 60, nullable: false),
|
||||
Payload = table.Column<string>(type: "text", nullable: false),
|
||||
Status = table.Column<string>(type: "character varying(20)", maxLength: 20, nullable: false),
|
||||
Attempts = table.Column<int>(type: "integer", nullable: false),
|
||||
LockedBy = table.Column<string>(type: "character varying(120)", maxLength: 120, nullable: true),
|
||||
LockedAtUtc = table.Column<DateTimeOffset>(type: "timestamp with time zone", nullable: true),
|
||||
Error = table.Column<string>(type: "text", nullable: true),
|
||||
CreatedAtUtc = table.Column<DateTimeOffset>(type: "timestamp with time zone", nullable: false),
|
||||
CompletedAtUtc = table.Column<DateTimeOffset>(type: "timestamp with time zone", nullable: true)
|
||||
},
|
||||
constraints: table =>
|
||||
{
|
||||
table.PrimaryKey("PK_jobs", x => x.Id);
|
||||
});
|
||||
|
||||
migrationBuilder.CreateIndex(
|
||||
name: "IX_agent_runs_SeatId",
|
||||
schema: "assembler",
|
||||
table: "agent_runs",
|
||||
column: "SeatId");
|
||||
|
||||
migrationBuilder.CreateIndex(
|
||||
name: "IX_agent_runs_WorkItemId",
|
||||
schema: "assembler",
|
||||
table: "agent_runs",
|
||||
column: "WorkItemId");
|
||||
|
||||
migrationBuilder.CreateIndex(
|
||||
name: "IX_jobs_Status_CreatedAtUtc",
|
||||
schema: "assembler",
|
||||
table: "jobs",
|
||||
columns: new[] { "Status", "CreatedAtUtc" });
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
protected override void Down(MigrationBuilder migrationBuilder)
|
||||
{
|
||||
migrationBuilder.DropTable(
|
||||
name: "agent_runs",
|
||||
schema: "assembler");
|
||||
|
||||
migrationBuilder.DropTable(
|
||||
name: "jobs",
|
||||
schema: "assembler");
|
||||
}
|
||||
}
|
||||
}
|
||||
+135
@@ -0,0 +1,135 @@
|
||||
// <auto-generated />
|
||||
using System;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Microsoft.EntityFrameworkCore.Infrastructure;
|
||||
using Microsoft.EntityFrameworkCore.Storage.ValueConversion;
|
||||
using Npgsql.EntityFrameworkCore.PostgreSQL.Metadata;
|
||||
using TeamUp.Modules.Assembler.Persistence;
|
||||
|
||||
#nullable disable
|
||||
|
||||
namespace TeamUp.Modules.Assembler.Persistence.Migrations
|
||||
{
|
||||
[DbContext(typeof(AssemblerDbContext))]
|
||||
partial class AssemblerDbContextModelSnapshot : ModelSnapshot
|
||||
{
|
||||
protected override void BuildModel(ModelBuilder modelBuilder)
|
||||
{
|
||||
#pragma warning disable 612, 618
|
||||
modelBuilder
|
||||
.HasDefaultSchema("assembler")
|
||||
.HasAnnotation("ProductVersion", "10.0.8")
|
||||
.HasAnnotation("Relational:MaxIdentifierLength", 63);
|
||||
|
||||
NpgsqlModelBuilderExtensions.UseIdentityByDefaultColumns(modelBuilder);
|
||||
|
||||
modelBuilder.Entity("TeamUp.Modules.Assembler.Domain.AgentRun", b =>
|
||||
{
|
||||
b.Property<Guid>("Id")
|
||||
.ValueGeneratedOnAdd()
|
||||
.HasColumnType("uuid");
|
||||
|
||||
b.Property<string>("ActionRisk")
|
||||
.HasMaxLength(20)
|
||||
.HasColumnType("character varying(20)");
|
||||
|
||||
b.Property<string>("ActionType")
|
||||
.HasMaxLength(60)
|
||||
.HasColumnType("character varying(60)");
|
||||
|
||||
b.Property<Guid?>("AgentId")
|
||||
.HasColumnType("uuid");
|
||||
|
||||
b.Property<DateTimeOffset?>("CompletedAtUtc")
|
||||
.HasColumnType("timestamp with time zone");
|
||||
|
||||
b.Property<DateTimeOffset>("CreatedAtUtc")
|
||||
.HasColumnType("timestamp with time zone");
|
||||
|
||||
b.Property<string>("Error")
|
||||
.HasColumnType("text");
|
||||
|
||||
b.Property<long?>("LatencyMs")
|
||||
.HasColumnType("bigint");
|
||||
|
||||
b.Property<string>("Output")
|
||||
.HasColumnType("text");
|
||||
|
||||
b.Property<string>("Prompt")
|
||||
.HasColumnType("text");
|
||||
|
||||
b.Property<string>("ResultJson")
|
||||
.HasColumnType("text");
|
||||
|
||||
b.Property<Guid>("SeatId")
|
||||
.HasColumnType("uuid");
|
||||
|
||||
b.Property<string>("Status")
|
||||
.IsRequired()
|
||||
.HasMaxLength(20)
|
||||
.HasColumnType("character varying(20)");
|
||||
|
||||
b.Property<string>("Trace")
|
||||
.HasColumnType("text");
|
||||
|
||||
b.Property<Guid>("WorkItemId")
|
||||
.HasColumnType("uuid");
|
||||
|
||||
b.HasKey("Id");
|
||||
|
||||
b.HasIndex("SeatId");
|
||||
|
||||
b.HasIndex("WorkItemId");
|
||||
|
||||
b.ToTable("agent_runs", "assembler");
|
||||
});
|
||||
|
||||
modelBuilder.Entity("TeamUp.Modules.Assembler.Domain.Job", b =>
|
||||
{
|
||||
b.Property<Guid>("Id")
|
||||
.ValueGeneratedOnAdd()
|
||||
.HasColumnType("uuid");
|
||||
|
||||
b.Property<int>("Attempts")
|
||||
.HasColumnType("integer");
|
||||
|
||||
b.Property<DateTimeOffset?>("CompletedAtUtc")
|
||||
.HasColumnType("timestamp with time zone");
|
||||
|
||||
b.Property<DateTimeOffset>("CreatedAtUtc")
|
||||
.HasColumnType("timestamp with time zone");
|
||||
|
||||
b.Property<string>("Error")
|
||||
.HasColumnType("text");
|
||||
|
||||
b.Property<DateTimeOffset?>("LockedAtUtc")
|
||||
.HasColumnType("timestamp with time zone");
|
||||
|
||||
b.Property<string>("LockedBy")
|
||||
.HasMaxLength(120)
|
||||
.HasColumnType("character varying(120)");
|
||||
|
||||
b.Property<string>("Payload")
|
||||
.IsRequired()
|
||||
.HasColumnType("text");
|
||||
|
||||
b.Property<string>("Status")
|
||||
.IsRequired()
|
||||
.HasMaxLength(20)
|
||||
.HasColumnType("character varying(20)");
|
||||
|
||||
b.Property<string>("Type")
|
||||
.IsRequired()
|
||||
.HasMaxLength(60)
|
||||
.HasColumnType("character varying(60)");
|
||||
|
||||
b.HasKey("Id");
|
||||
|
||||
b.HasIndex("Status", "CreatedAtUtc");
|
||||
|
||||
b.ToTable("jobs", "assembler");
|
||||
});
|
||||
#pragma warning restore 612, 618
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,40 @@
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using TeamUp.Modules.Assembler.Domain;
|
||||
using TeamUp.Modules.Assembler.Persistence;
|
||||
|
||||
namespace TeamUp.Modules.Assembler.Queue;
|
||||
|
||||
/// <summary>The Postgres-backed agent-run queue. Enqueue inserts; claim uses FOR UPDATE SKIP LOCKED
|
||||
/// so multiple workers can drain concurrently without contention.</summary>
|
||||
internal sealed class JobQueue(AssemblerDbContext db, TimeProvider clock)
|
||||
{
|
||||
public async Task<Job> EnqueueAsync(string type, string payload, CancellationToken cancellationToken = default)
|
||||
{
|
||||
var job = new Job(type, payload, clock.GetUtcNow());
|
||||
db.Jobs.Add(job);
|
||||
await db.SaveChangesAsync(cancellationToken);
|
||||
return job;
|
||||
}
|
||||
|
||||
public async Task<Job?> ClaimNextAsync(string worker, CancellationToken cancellationToken = default)
|
||||
{
|
||||
await using var transaction = await db.Database.BeginTransactionAsync(cancellationToken);
|
||||
|
||||
var job = await db.Jobs
|
||||
.FromSqlRaw(
|
||||
"SELECT * FROM assembler.jobs WHERE \"Status\" = 'Pending' " +
|
||||
"ORDER BY \"CreatedAtUtc\" LIMIT 1 FOR UPDATE SKIP LOCKED")
|
||||
.FirstOrDefaultAsync(cancellationToken);
|
||||
|
||||
if (job is null)
|
||||
{
|
||||
await transaction.RollbackAsync(cancellationToken);
|
||||
return null;
|
||||
}
|
||||
|
||||
job.MarkProcessing(worker, clock.GetUtcNow());
|
||||
await db.SaveChangesAsync(cancellationToken);
|
||||
await transaction.CommitAsync(cancellationToken);
|
||||
return job;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,48 @@
|
||||
using System.Text.Json;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using TeamUp.Modules.Assembler.Domain;
|
||||
using TeamUp.Modules.Assembler.Persistence;
|
||||
|
||||
namespace TeamUp.Modules.Assembler.Runtime;
|
||||
|
||||
internal sealed record AgentRunPayload(Guid RunId);
|
||||
|
||||
/// <summary>
|
||||
/// Processes one claimed job: drives the AgentRun lifecycle. In M4 Increment 1 it records a
|
||||
/// placeholder; Increment 2 swaps the middle for the real assembler (assemble → model → parse).
|
||||
/// </summary>
|
||||
internal sealed class AgentRunExecutor(AssemblerDbContext db, TimeProvider clock, ILogger<AgentRunExecutor> logger)
|
||||
{
|
||||
public async Task ProcessAsync(Job job, CancellationToken cancellationToken = default)
|
||||
{
|
||||
try
|
||||
{
|
||||
var payload = JsonSerializer.Deserialize<AgentRunPayload>(job.Payload)
|
||||
?? throw new InvalidOperationException("Invalid job payload.");
|
||||
|
||||
var run = await db.AgentRuns.FirstOrDefaultAsync(r => r.Id == payload.RunId, cancellationToken)
|
||||
?? throw new InvalidOperationException($"AgentRun {payload.RunId} not found.");
|
||||
|
||||
run.Start(agentId: null, prompt: "[assembler pending — M4 Increment 2]", trace: null);
|
||||
await db.SaveChangesAsync(cancellationToken);
|
||||
|
||||
// TODO (M4 Increment 2): assemble the prompt, call the model, parse into action + risk.
|
||||
run.Complete(
|
||||
output: "[assembler pending]",
|
||||
actionType: "pending",
|
||||
actionRisk: "read",
|
||||
resultJson: null,
|
||||
latencyMs: 0,
|
||||
clock.GetUtcNow());
|
||||
job.MarkDone(clock.GetUtcNow());
|
||||
await db.SaveChangesAsync(cancellationToken);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
job.MarkFailed(ex.Message, clock.GetUtcNow());
|
||||
await db.SaveChangesAsync(cancellationToken);
|
||||
logger.LogError(ex, "Agent-run job {JobId} failed.", job.Id);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,12 +1,21 @@
|
||||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
|
||||
<!-- A self-contained module. References SharedKernel ONLY (ASP.NET flows transitively for the
|
||||
IModule seam). M1 adds this module's EF/Npgsql/FluentValidation/Mapperly packages when it
|
||||
gains an (internal) DbContext and validators. It must never reference another module.
|
||||
NOTE: this module hosts the runtime assembler + job-drain logic in the worker (M4); the AI
|
||||
model-client packages are deferred to M3-M4 and are not referenced in the skeleton. -->
|
||||
<!-- The runtime: the Postgres job queue (FOR UPDATE SKIP LOCKED), the worker drain, AgentRun,
|
||||
and the assembler. References SharedKernel only; reads agent/task/skill/config data through
|
||||
SharedKernel contracts (implemented by OrgBoard, Skills, Integrations) — never their tables.
|
||||
The model call goes through IModelClient (Integrations); no Microsoft.Extensions.AI dependency. -->
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\..\Shared\TeamUp.SharedKernel\TeamUp.SharedKernel.csproj" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Microsoft.EntityFrameworkCore" />
|
||||
<PackageReference Include="Microsoft.EntityFrameworkCore.Design" PrivateAssets="all" />
|
||||
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<InternalsVisibleTo Include="TeamUp.IntegrationTests" />
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
||||
|
||||
@@ -0,0 +1,50 @@
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using TeamUp.Modules.Assembler.Queue;
|
||||
using TeamUp.Modules.Assembler.Runtime;
|
||||
|
||||
namespace TeamUp.Modules.Assembler.Worker;
|
||||
|
||||
/// <summary>Drains the agent-run queue on the worker host: claim (SKIP LOCKED) → process, repeat.</summary>
|
||||
internal sealed class JobProcessor(IServiceScopeFactory scopeFactory, ILogger<JobProcessor> logger) : BackgroundService
|
||||
{
|
||||
private static readonly TimeSpan PollInterval = TimeSpan.FromSeconds(2);
|
||||
private readonly string _worker = $"{Environment.MachineName}:{Environment.ProcessId}";
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
logger.LogInformation("Agent-run job processor started ({Worker}).", _worker);
|
||||
|
||||
using var timer = new PeriodicTimer(PollInterval);
|
||||
while (!stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
await DrainAsync(stoppingToken);
|
||||
await timer.WaitForNextTickAsync(stoppingToken);
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task DrainAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
while (!cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
await using var scope = scopeFactory.CreateAsyncScope();
|
||||
var queue = scope.ServiceProvider.GetRequiredService<JobQueue>();
|
||||
var job = await queue.ClaimNextAsync(_worker, cancellationToken);
|
||||
if (job is null)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
var executor = scope.ServiceProvider.GetRequiredService<AgentRunExecutor>();
|
||||
await executor.ProcessAsync(job, cancellationToken);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,14 @@
|
||||
using Microsoft.Extensions.Configuration;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
|
||||
namespace TeamUp.SharedKernel.Modularity;
|
||||
|
||||
/// <summary>
|
||||
/// Implemented by modules that contribute background services. <see cref="RegisterWorker"/> runs in
|
||||
/// the WORKER host only — so hosted services (e.g. the agent-run job drainer) never start in the web
|
||||
/// host, keeping the web off the model path.
|
||||
/// </summary>
|
||||
public interface IWorkerModule
|
||||
{
|
||||
void RegisterWorker(IServiceCollection services, IConfiguration configuration);
|
||||
}
|
||||
Reference in New Issue
Block a user