using System.Text.Json; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Logging; using TeamUp.Modules.Assembler.Domain; using TeamUp.Modules.Assembler.Persistence; using TeamUp.SharedKernel.Ai; namespace TeamUp.Modules.Assembler.Runtime; internal sealed record AgentRunPayload(Guid RunId); /// /// Processes one claimed job end to end: resolve the run context (OrgBoard) + skills (Skills) → /// assemble the prompt → call the model (BYOK, with fallback) → parse into an action + risk tag, /// all captured on the AgentRun — then hand the proposal to the action gate (Governance), which /// executes it or holds it in the review inbox. /// internal sealed class AgentRunExecutor( AssemblerDbContext db, IAgentRunContextProvider contextProvider, ISkillCatalog skillCatalog, IApiConfigResolver configResolver, IModelClient modelClient, IActionGate actionGate, ITeamMemory teamMemory, TimeProvider clock, ILogger logger) { public async Task ProcessAsync(Job job, CancellationToken cancellationToken = default) { AgentRun? run = null; try { var payload = JsonSerializer.Deserialize(job.Payload) ?? throw new InvalidOperationException("Invalid job payload."); run = await db.AgentRuns.FirstOrDefaultAsync(r => r.Id == payload.RunId, cancellationToken) ?? throw new InvalidOperationException($"AgentRun {payload.RunId} not found."); var context = await contextProvider.GetAsync(run.SeatId, run.WorkItemId, cancellationToken) ?? throw new InvalidOperationException("Agent or task not found for the run."); var skills = await skillCatalog.GetByKeysAsync(context.OrganizationId, context.SkillKeys, cancellationToken); // Working memory: recall the team's most relevant decisions/corrections for this task. var memories = await teamMemory.SearchAsync( context.TeamId, context.TaskTitle + "\n" + context.TaskDescription, take: 3, cancellationToken); var assembled = PromptAssembler.Build(context, skills, memories); run.Start(context.AgentId, assembled.Prompt, assembled.Trace); await db.SaveChangesAsync(cancellationToken); var config = await configResolver.ResolveAsync(context.ApiConfigId, cancellationToken) ?? (context.FallbackApiConfigId is { } fallback ? await configResolver.ResolveAsync(fallback, cancellationToken) : null) ?? throw new InvalidOperationException("No usable model config for the agent."); var completion = await modelClient.CompleteAsync( new ModelRequest(config.Provider, config.Model, config.ApiKey, config.Endpoint, assembled.Prompt, MaxTokens: 512), cancellationToken); if (!completion.Success) { var error = completion.Error ?? "Model call failed."; run.Fail(error, clock.GetUtcNow()); job.MarkFailed(error, clock.GetUtcNow()); await db.SaveChangesAsync(cancellationToken); return; } var result = JsonSerializer.Serialize(new { action = assembled.PrimaryAction, risk = assembled.PrimaryActionRisk, skill = context.SkillKeys.Count > 0 ? context.SkillKeys[0] : null, }); var output = completion.Text ?? string.Empty; run.Complete(output, assembled.PrimaryAction, assembled.PrimaryActionRisk, result, completion.LatencyMs, clock.GetUtcNow()); await db.SaveChangesAsync(cancellationToken); // Hand the parsed action to the gate: autonomy vs risk → execute now or hold in review. var gate = await actionGate.EvaluateAsync( new AgentActionProposal( run.Id, run.SeatId, context.AgentId, run.WorkItemId, context.TeamId, context.OrganizationId, context.Autonomy, assembled.PrimaryAction, assembled.PrimaryActionRisk, context.TaskTitle, output, OutputParser.ExtractChildTitles(output), assembled.Trace), cancellationToken); logger.LogInformation( "Run {RunId}: {Action} ({Risk}) → {Outcome}.", run.Id, assembled.PrimaryAction, assembled.PrimaryActionRisk, gate.Outcome); job.MarkDone(clock.GetUtcNow()); await db.SaveChangesAsync(cancellationToken); } catch (Exception ex) { run?.Fail(ex.Message, clock.GetUtcNow()); job.MarkFailed(ex.Message, clock.GetUtcNow()); await db.SaveChangesAsync(cancellationToken); logger.LogError(ex, "Agent-run job {JobId} failed.", job.Id); } } }