MCP compatibility for AI agents: server registry, JSON-RPC client, gateway, run-time tool catalog
Agents can now use Model Context Protocol servers. End to end: - SharedKernel seam IMcpGateway (ListToolsAsync / CallToolAsync) + McpToolDescriptor / McpToolResult, so the Assembler discovers and can invoke MCP tools without referencing Integrations' tables. - Integrations: McpServerConfig (org-scoped, owner-only; auth headers AES-GCM encrypted, never returned — only their names) + AddMcpServers migration. McpClient: a dependency-free Streamable-HTTP JSON-RPC 2.0 client (initialize → notifications/initialized → tools/list / tools/call), carrying the Mcp-Session-Id and parsing both application/json and text/event-stream replies. McpGateway resolves an org's servers, decrypts headers server-side, and is best-effort: an unreachable server is logged and skipped, never failing the run. CRUD + connectivity-test endpoints (create/test/delete owner-only via ManageApiKeys; list via ConfigureAgents to bind). - OrgBoard: Agent gains McpServerIds (uuid[]; migration backfills existing agents to empty) flowing through ConfigureAgent + AgentRunContext. - Assembler: AgentRunExecutor lists the agent's MCP tools (best-effort) and PromptAssembler renders a "# Tools (MCP)" catalog — labelled as data, never instructions — and records it in the run trace. - Client: SeatsPage gains an MCP servers card (add/test/delete, encrypted auth header) and a per-agent MCP server multi-select; api client gains del(). Note: discovery + the governed call gateway are in place now; the autonomous model-driven tool-call loop (model emits tool_calls → gated execution → feedback) needs a tool-calling model client and is the next increment — the stub model can't drive it. Verified: ArchitectureTests 8/8, IntegrationTests 53/53 (McpClientTests: JSON-RPC handshake/session, json + SSE; McpServerRegistryTests: owner-only, encrypted-header-never-returned, graceful test, Member 403; PromptAssemblerMcpTests: catalog + trace, omitted when empty), client build green. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,196 @@
|
||||
using System.Net.Http.Json;
|
||||
using System.Text;
|
||||
using System.Text.Json;
|
||||
|
||||
namespace TeamUp.Modules.Integrations.Mcp;
|
||||
|
||||
internal sealed record McpTool(string Name, string? Description, string InputSchemaJson);
|
||||
|
||||
/// <summary>
|
||||
/// Minimal Model Context Protocol client over Streamable HTTP (JSON-RPC 2.0) — dependency-free (no
|
||||
/// preview SDK) and air-gap friendly. Handshake: initialize → notifications/initialized → tools/list
|
||||
/// or tools/call, carrying the server-issued Mcp-Session-Id. Accepts either an application/json reply
|
||||
/// or a text/event-stream whose data: line holds the JSON-RPC message. Lets exceptions surface so the
|
||||
/// gateway can treat an unreachable server as "no tools" rather than failing the whole run.
|
||||
/// </summary>
|
||||
internal sealed class McpClient(HttpClient http)
|
||||
{
|
||||
private const string ProtocolVersion = "2025-06-18";
|
||||
|
||||
public async Task<IReadOnlyList<McpTool>> ListToolsAsync(
|
||||
string endpoint, IReadOnlyDictionary<string, string>? headers, CancellationToken cancellationToken = default)
|
||||
{
|
||||
var sessionId = await InitializeAsync(endpoint, headers, cancellationToken);
|
||||
var response = await RpcAsync(endpoint, headers, sessionId, id: 2, method: "tools/list", parameters: new { }, cancellationToken);
|
||||
|
||||
var tools = new List<McpTool>();
|
||||
if (response.TryGetProperty("result", out var result)
|
||||
&& result.TryGetProperty("tools", out var arr) && arr.ValueKind == JsonValueKind.Array)
|
||||
{
|
||||
foreach (var tool in arr.EnumerateArray())
|
||||
{
|
||||
var name = tool.TryGetProperty("name", out var n) ? n.GetString() : null;
|
||||
if (string.IsNullOrEmpty(name))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
var description = tool.TryGetProperty("description", out var d) ? d.GetString() : null;
|
||||
var schema = tool.TryGetProperty("inputSchema", out var s) ? s.GetRawText() : "{}";
|
||||
tools.Add(new McpTool(name, description, schema));
|
||||
}
|
||||
}
|
||||
|
||||
return tools;
|
||||
}
|
||||
|
||||
public async Task<(bool Success, string? Content, string? Error)> CallToolAsync(
|
||||
string endpoint, IReadOnlyDictionary<string, string>? headers, string toolName, string argumentsJson,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
var sessionId = await InitializeAsync(endpoint, headers, cancellationToken);
|
||||
using var argsDoc = JsonDocument.Parse(string.IsNullOrWhiteSpace(argumentsJson) ? "{}" : argumentsJson);
|
||||
var response = await RpcAsync(endpoint, headers, sessionId, id: 3, method: "tools/call",
|
||||
parameters: new { name = toolName, arguments = argsDoc.RootElement }, cancellationToken);
|
||||
|
||||
if (response.TryGetProperty("error", out var err))
|
||||
{
|
||||
return (false, null, err.TryGetProperty("message", out var m) ? m.GetString() : "MCP error");
|
||||
}
|
||||
|
||||
if (response.TryGetProperty("result", out var result))
|
||||
{
|
||||
var isError = result.TryGetProperty("isError", out var ie) && ie.ValueKind == JsonValueKind.True;
|
||||
var text = ExtractText(result);
|
||||
return (!isError, text, isError ? text ?? "Tool reported an error." : null);
|
||||
}
|
||||
|
||||
return (false, null, "Empty MCP response.");
|
||||
}
|
||||
|
||||
private async Task<string?> InitializeAsync(
|
||||
string endpoint, IReadOnlyDictionary<string, string>? headers, CancellationToken cancellationToken)
|
||||
{
|
||||
var (_, sessionId) = await RpcWithSessionAsync(endpoint, headers, sessionId: null, id: 1, method: "initialize",
|
||||
parameters: new
|
||||
{
|
||||
protocolVersion = ProtocolVersion,
|
||||
capabilities = new { },
|
||||
clientInfo = new { name = "TeamUp.AI", version = "1.0" },
|
||||
}, cancellationToken);
|
||||
|
||||
await NotifyAsync(endpoint, headers, sessionId, "notifications/initialized", cancellationToken);
|
||||
return sessionId;
|
||||
}
|
||||
|
||||
private async Task<JsonElement> RpcAsync(
|
||||
string endpoint, IReadOnlyDictionary<string, string>? headers, string? sessionId,
|
||||
int id, string method, object parameters, CancellationToken cancellationToken)
|
||||
{
|
||||
var (response, _) = await RpcWithSessionAsync(endpoint, headers, sessionId, id, method, parameters, cancellationToken);
|
||||
return response;
|
||||
}
|
||||
|
||||
private async Task<(JsonElement Response, string? SessionId)> RpcWithSessionAsync(
|
||||
string endpoint, IReadOnlyDictionary<string, string>? headers, string? sessionId,
|
||||
int id, string method, object parameters, CancellationToken cancellationToken)
|
||||
{
|
||||
using var message = BuildMessage(endpoint, headers, sessionId);
|
||||
message.Content = JsonContent.Create(new { jsonrpc = "2.0", id, method, @params = parameters });
|
||||
|
||||
using var response = await http.SendAsync(message, cancellationToken);
|
||||
response.EnsureSuccessStatusCode();
|
||||
|
||||
var newSession = response.Headers.TryGetValues("Mcp-Session-Id", out var values)
|
||||
? values.FirstOrDefault()
|
||||
: null;
|
||||
var body = await response.Content.ReadAsStringAsync(cancellationToken);
|
||||
using var doc = JsonDocument.Parse(ExtractJsonRpc(body));
|
||||
return (doc.RootElement.Clone(), newSession ?? sessionId);
|
||||
}
|
||||
|
||||
private async Task NotifyAsync(
|
||||
string endpoint, IReadOnlyDictionary<string, string>? headers, string? sessionId, string method,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
using var message = BuildMessage(endpoint, headers, sessionId);
|
||||
message.Content = JsonContent.Create(new { jsonrpc = "2.0", method });
|
||||
using var response = await http.SendAsync(message, cancellationToken);
|
||||
// A notification returns 202 Accepted with no body; servers that don't need it ignore it.
|
||||
}
|
||||
|
||||
private static HttpRequestMessage BuildMessage(
|
||||
string endpoint, IReadOnlyDictionary<string, string>? headers, string? sessionId)
|
||||
{
|
||||
var message = new HttpRequestMessage(HttpMethod.Post, endpoint);
|
||||
message.Headers.Accept.ParseAdd("application/json");
|
||||
message.Headers.Accept.ParseAdd("text/event-stream");
|
||||
message.Headers.TryAddWithoutValidation("MCP-Protocol-Version", ProtocolVersion);
|
||||
if (sessionId is not null)
|
||||
{
|
||||
message.Headers.TryAddWithoutValidation("Mcp-Session-Id", sessionId);
|
||||
}
|
||||
|
||||
if (headers is not null)
|
||||
{
|
||||
foreach (var (key, value) in headers)
|
||||
{
|
||||
message.Headers.TryAddWithoutValidation(key, value);
|
||||
}
|
||||
}
|
||||
|
||||
return message;
|
||||
}
|
||||
|
||||
// A Streamable-HTTP server may answer with a single JSON object or an SSE stream whose data:
|
||||
// line carries the JSON-RPC message. Pull the JSON-RPC object out of either form.
|
||||
private static string ExtractJsonRpc(string body)
|
||||
{
|
||||
var trimmed = body.TrimStart();
|
||||
if (trimmed.StartsWith('{') || trimmed.StartsWith('['))
|
||||
{
|
||||
return body;
|
||||
}
|
||||
|
||||
string? last = null;
|
||||
foreach (var line in body.Split('\n'))
|
||||
{
|
||||
var trimmedLine = line.TrimEnd('\r');
|
||||
if (trimmedLine.StartsWith("data:", StringComparison.Ordinal))
|
||||
{
|
||||
var data = trimmedLine[5..].Trim();
|
||||
if (data.Length > 0)
|
||||
{
|
||||
last = data;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return last ?? "{}";
|
||||
}
|
||||
|
||||
private static string? ExtractText(JsonElement result)
|
||||
{
|
||||
if (!result.TryGetProperty("content", out var content) || content.ValueKind != JsonValueKind.Array)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
var builder = new StringBuilder();
|
||||
foreach (var item in content.EnumerateArray())
|
||||
{
|
||||
if (item.TryGetProperty("type", out var type) && type.GetString() == "text"
|
||||
&& item.TryGetProperty("text", out var text))
|
||||
{
|
||||
if (builder.Length > 0)
|
||||
{
|
||||
builder.Append('\n');
|
||||
}
|
||||
|
||||
builder.Append(text.GetString());
|
||||
}
|
||||
}
|
||||
|
||||
return builder.Length > 0 ? builder.ToString() : null;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user