From 21f492f5317b9a6321c9488b308f72f3acf9cad6 Mon Sep 17 00:00:00 2001 From: Justin Spahr-Summers Date: Tue, 22 Apr 2025 11:35:49 +0100 Subject: [PATCH] Revert "Merge pull request #334 from m-paternostro/mp/notifier" This reverts commit 3ef902328a940b06f666ac02422bbba1be4541ec, reversing changes made to 5b6d35eb12bebfe48f8fd160b56575ec8eac7a21. --- src/examples/server/simpleStreamableHttp.ts | 17 +- src/server/index.ts | 7 - src/server/mcp.test.ts | 414 +------- src/server/mcp.ts | 1027 ++----------------- src/shared/eventNotifier.test.ts | 145 --- src/shared/eventNotifier.ts | 152 --- 6 files changed, 61 insertions(+), 1701 deletions(-) delete mode 100644 src/shared/eventNotifier.test.ts delete mode 100644 src/shared/eventNotifier.ts diff --git a/src/examples/server/simpleStreamableHttp.ts b/src/examples/server/simpleStreamableHttp.ts index d94908bdc..969655e31 100644 --- a/src/examples/server/simpleStreamableHttp.ts +++ b/src/examples/server/simpleStreamableHttp.ts @@ -1,8 +1,8 @@ import express, { Request, Response } from 'express'; import { randomUUID } from 'node:crypto'; +import { z } from 'zod'; import { McpServer } from '../../server/mcp.js'; import { StreamableHTTPServerTransport } from '../../server/streamableHttp.js'; -import { z } from 'zod'; import { CallToolResult, GetPromptResult, isInitializeRequest, ReadResourceResult } from '../../types.js'; import { InMemoryEventStore } from '../shared/inMemoryEventStore.js'; @@ -13,21 +13,6 @@ const getServer = () => { version: '1.0.0', }, { capabilities: { logging: {} } }); - // Log the capability invocation details - server.onCapabilityChange((event) => { - switch (event.action) { - case 'invoked': - console.log(`${event.capabilityType} invocation ${event.invocationIndex}: '${event.capabilityName}' started`); - break; - case 'completed': - console.log(`${event.capabilityType} invocation ${event.invocationIndex}: '${event.capabilityName}' completed in ${event.durationMs}ms`); - break; - case 'error': - console.log(`${event.capabilityType} invocation ${event.invocationIndex}: '${event.capabilityName}' failed in ${event.durationMs}ms: ${event.error}`); - break; - } - }); - // Register a simple tool that returns a greeting server.tool( 'greet', diff --git a/src/server/index.ts b/src/server/index.ts index befaade21..3901099e3 100644 --- a/src/server/index.ts +++ b/src/server/index.ts @@ -104,13 +104,6 @@ export class Server< ); } - /** - * The server's name and version. - */ - getVersion(): { readonly name: string; readonly version: string } { - return this._serverInfo; - } - /** * Registers new capabilities. This can only be called before connecting to a transport. * diff --git a/src/server/mcp.test.ts b/src/server/mcp.test.ts index 4e6270cc6..eaac5c716 100644 --- a/src/server/mcp.test.ts +++ b/src/server/mcp.test.ts @@ -14,7 +14,7 @@ import { LoggingMessageNotificationSchema, Notification, } from "../types.js"; -import { ResourceTemplate, CapabilityEvent } from "./mcp.js"; +import { ResourceTemplate } from "./mcp.js"; import { completable } from "./completable.js"; import { UriTemplate } from "../shared/uriTemplate.js"; @@ -26,10 +26,6 @@ describe("McpServer", () => { }); expect(mcpServer.server).toBeDefined(); - expect(mcpServer.server.getVersion()).toEqual({ - name: "test server", - version: "1.0", - }); }); test("should allow sending notifications via Server", async () => { @@ -866,95 +862,6 @@ describe("tool()", () => { ), ).rejects.toThrow(/Tool nonexistent-tool not found/); }); - - test("should include duration in completed and error events", async () => { - const mcpServer = new McpServer({ - name: "test server", - version: "1.0", - }); - - const client = new Client({ - name: "test client", - version: "1.0", - }); - - const events: Array = []; - const subscription = mcpServer.onCapabilityChange((event) => { - events.push(event); - }); - - // Register tools with different behaviors - mcpServer.tool("success-tool", async () => { - // Simulate some work - await new Promise(resolve => setTimeout(resolve, 10)); - return { content: [{ type: "text", text: "Success" }] }; - }); - - mcpServer.tool("error-tool", async () => { - // Simulate some work - await new Promise(resolve => setTimeout(resolve, 10)); - throw new Error("Simulated error"); - }); - - const [clientTransport, serverTransport] = InMemoryTransport.createLinkedPair(); - - await Promise.all([ - client.connect(clientTransport), - mcpServer.connect(serverTransport), - ]); - - // Clear events from registration - events.length = 0; - - // Call the success tool - await client.request( - { - method: "tools/call", - params: { - name: "success-tool", - }, - }, - CallToolResultSchema, - ); - - // Find the completed event - const completedEvent = events.find(e => - e.capabilityType === "tool" && - e.capabilityName === "success-tool" && - e.action === "completed" - ) as Extract; - - expect(completedEvent).toBeDefined(); - expect(completedEvent.durationMs).toBeDefined(); - expect(completedEvent.durationMs).toBeGreaterThan(0); - - // Clear events - events.length = 0; - - // Call the error tool - await client.request( - { - method: "tools/call", - params: { - name: "error-tool", - }, - }, - CallToolResultSchema, - ); - - // Find the error event - const errorEvent = events.find(e => - e.capabilityType === "tool" && - e.capabilityName === "error-tool" && - e.action === "error" - ) as Extract; - - expect(errorEvent).toBeDefined(); - expect(errorEvent.durationMs).toBeDefined(); - expect(errorEvent.durationMs).toBeGreaterThan(0); - - subscription.close(); - }); }); describe("resource()", () => { @@ -2605,322 +2512,3 @@ describe("prompt()", () => { expect(result.completion.total).toBe(1); }); }); - -describe("CapabilityEvents", () => { - test("should emit capability events when registering and interacting with tools", async () => { - const mcpServer = new McpServer({ - name: "test server", - version: "1.0", - }); - - const events: Array = []; - const subscription = mcpServer.onCapabilityChange((event) => { - events.push(event); - }); - - // Register a tool (should trigger "added" event) - const tool = mcpServer.tool("test-tool", async () => ({ - content: [{ type: "text", text: "Test response" }], - })); - - expect(events.length).toBeGreaterThan(0); - expect(events[0]).toMatchObject({ - serverInfo: { name: "test server", version: "1.0" }, - capabilityType: "tool", - capabilityName: "test-tool", - action: "added", - }); - - // Update the tool (should trigger "updated" event) - tool.update({ - description: "Updated description", - }); - - expect(events.some(e => - e.capabilityType === "tool" && - e.capabilityName === "test-tool" && - e.action === "updated" - )).toBe(true); - - // Disable the tool (should trigger "disabled" event) - tool.disable(); - - expect(events.some(e => - e.capabilityType === "tool" && - e.capabilityName === "test-tool" && - e.action === "disabled" - )).toBe(true); - - // Enable the tool (should trigger "enabled" event) - tool.enable(); - - expect(events.some(e => - e.capabilityType === "tool" && - e.capabilityName === "test-tool" && - e.action === "enabled" - )).toBe(true); - - // Clean up - subscription.close(); - }); - - test("should emit capability events when tools are invoked", async () => { - const mcpServer = new McpServer({ - name: "test server", - version: "1.0", - }); - - const client = new Client({ - name: "test client", - version: "1.0", - }); - - const events: Array = []; - const subscription = mcpServer.onCapabilityChange((event) => { - events.push(event); - }); - - mcpServer.tool("test-tool", async () => ({ - content: [{ type: "text", text: "Test response" }], - })); - - // Clear events from registration - events.length = 0; - - const [clientTransport, serverTransport] = InMemoryTransport.createLinkedPair(); - - await Promise.all([ - client.connect(clientTransport), - mcpServer.connect(serverTransport), - ]); - - await client.request( - { - method: "tools/call", - params: { - name: "test-tool", - }, - }, - CallToolResultSchema, - ); - - // Should have "invoked" and "completed" events - expect(events.some(e => - e.capabilityType === "tool" && - e.capabilityName === "test-tool" && - e.action === "invoked" - )).toBe(true); - - expect(events.some(e => - e.capabilityType === "tool" && - e.capabilityName === "test-tool" && - e.action === "completed" - )).toBe(true); - - // The invoked and completed events should have the same invocationIndex - const invokedEvent = events.find(e => e.action === "invoked"); - const completedEvent = events.find(e => e.action === "completed"); - expect(invokedEvent?.invocationIndex).toBe(completedEvent?.invocationIndex); - - subscription.close(); - }); - - test("should emit capability events for resources", async () => { - const mcpServer = new McpServer({ - name: "test server", - version: "1.0", - }); - - const client = new Client({ - name: "test client", - version: "1.0", - }); - - const events: Array = []; - const subscription = mcpServer.onCapabilityChange((event) => { - events.push(event); - }); - - // Register a resource - const resource = mcpServer.resource("test-resource", "test://resource", async () => ({ - contents: [{ uri: "test://resource", text: "Test content" }], - })); - - expect(events.some(e => - e.capabilityType === "resource" && - e.capabilityName === "test-resource" && - e.action === "added" - )).toBe(true); - - // Clear events from registration - events.length = 0; - - const [clientTransport, serverTransport] = InMemoryTransport.createLinkedPair(); - - await Promise.all([ - client.connect(clientTransport), - mcpServer.connect(serverTransport), - ]); - - // Read the resource - await client.request( - { - method: "resources/read", - params: { - uri: "test://resource", - }, - }, - ReadResourceResultSchema, - ); - - // Should have "invoked" event - expect(events.some(e => - e.capabilityType === "resource" && - e.capabilityName === "test://resource" && - e.action === "invoked" - )).toBe(true); - - // Clear events - events.length = 0; - - // Remove the resource - resource.remove(); - - // Should have "removed" event - expect(events.some(e => - e.capabilityType === "resource" && - e.capabilityName === "test-resource" && - e.action === "removed" - )).toBe(true); - - subscription.close(); - }); - - test("should emit capability events for prompts", async () => { - const mcpServer = new McpServer({ - name: "test server", - version: "1.0", - }); - - const client = new Client({ - name: "test client", - version: "1.0", - }); - - const events: Array = []; - const subscription = mcpServer.onCapabilityChange((event) => { - events.push(event); - }); - - // Register a prompt - mcpServer.prompt("test-prompt", async () => ({ - messages: [ - { - role: "assistant", - content: { - type: "text", - text: "Test response", - }, - }, - ], - })); - - expect(events.some(e => - e.capabilityType === "prompt" && - e.capabilityName === "test-prompt" && - e.action === "added" - )).toBe(true); - - // Clear events from registration - events.length = 0; - - const [clientTransport, serverTransport] = InMemoryTransport.createLinkedPair(); - - await Promise.all([ - client.connect(clientTransport), - mcpServer.connect(serverTransport), - ]); - - // Get the prompt - await client.request( - { - method: "prompts/get", - params: { - name: "test-prompt", - }, - }, - GetPromptResultSchema, - ); - - // Should have "invoked" and "completed" events - expect(events.some(e => - e.capabilityType === "prompt" && - e.capabilityName === "test-prompt" && - e.action === "invoked" - )).toBe(true); - - expect(events.some(e => - e.capabilityType === "prompt" && - e.capabilityName === "test-prompt" && - e.action === "completed" - )).toBe(true); - - subscription.close(); - }); - - test("should emit error events when tool execution fails", async () => { - const mcpServer = new McpServer({ - name: "test server", - version: "1.0", - }); - - const client = new Client({ - name: "test client", - version: "1.0", - }); - - const events: Array = []; - const subscription = mcpServer.onCapabilityChange((event) => { - // Only capture error events - if (event.action === "error") { - events.push(event); - } - }); - - mcpServer.tool("error-tool", async () => { - throw new Error("Simulated error"); - }); - - const [clientTransport, serverTransport] = InMemoryTransport.createLinkedPair(); - - await Promise.all([ - client.connect(clientTransport), - mcpServer.connect(serverTransport), - ]); - - // Call the tool that throws an error - await client.request( - { - method: "tools/call", - params: { - name: "error-tool", - }, - }, - CallToolResultSchema, - ); - - // Should have error event - expect(events.length).toBe(1); - expect(events[0]).toMatchObject({ - capabilityType: "tool", - capabilityName: "error-tool", - action: "error", - }); - - // Type assert to the specific variant of CapabilityEvent that has the error property - const errorEvent = events[0] as Extract; - expect(errorEvent.error).toBeDefined(); - - subscription.close(); - }); -}); diff --git a/src/server/mcp.ts b/src/server/mcp.ts index b713d0a2d..652f97747 100644 --- a/src/server/mcp.ts +++ b/src/server/mcp.ts @@ -44,7 +44,6 @@ import { Completable, CompletableDef } from "./completable.js"; import { UriTemplate, Variables } from "../shared/uriTemplate.js"; import { RequestHandlerExtra } from "../shared/protocol.js"; import { Transport } from "../shared/transport.js"; -import { createEventNotifier } from "../shared/eventNotifier.js"; /** * High-level MCP server that provides a simpler API for working with resources, tools, and prompts. @@ -64,14 +63,6 @@ export class McpServer { private _registeredTools: { [name: string]: RegisteredTool } = {}; private _registeredPrompts: { [name: string]: RegisteredPrompt } = {}; - private _onCapabilityChange = createEventNotifier(); - /** Counter for unique resource invocation indexes, used to correlate resource invocation events */ - private _resourceInvocationIndex = 0; - /** Counter for unique tool invocation indexes, used to correlate tool invocation events */ - private _toolInvocationIndex = 0; - /** Counter for unique prompt invocation indexes, used to correlate prompt invocation events */ - private _promptInvocationIndex = 0; - constructor(serverInfo: Implementation, options?: ServerOptions) { this.server = new Server(serverInfo, options); } @@ -89,40 +80,16 @@ export class McpServer { * Closes the connection. */ async close(): Promise { - this._onCapabilityChange.close(); await this.server.close(); } - /** - * Event notifier for capability changes. Listeners will be notified when capabilities are added, updated, removed, - * enabled, disabled, invoked, completed, or when errors occur. - * - * This provides a way to monitor and respond to all capability-related activities in the server, - * including both lifecycle changes and invocation events. Each capability type (resource, tool, prompt) - * maintains its own sequence of invocation indexes, which can be used to correlate invocations - * with their completions or errors. - * - * @example - * const subscription = server.onCapabilityChange((event) => { - * if (event.action === "invoked") { - * console.log(`${event.capabilityType} ${event.capabilityName} invoked with index ${event.invocationIndex}`); - * } else if (event.action === "completed" || event.action === "error") { - * console.log(`${event.capabilityType} operation completed in ${event.durationMs}ms`); - * } - * }); - * - * // Later, to stop listening: - * subscription.close(); - */ - public readonly onCapabilityChange = this._onCapabilityChange.onEvent; - private _toolHandlersInitialized = false; private setToolRequestHandlers() { if (this._toolHandlersInitialized) { return; } - + this.server.assertCanSetRequestHandler( ListToolsRequestSchema.shape.method.value, ); @@ -168,35 +135,11 @@ export class McpServer { ); } - const invocationIndex = this._toolInvocationIndex++; - const startTime = performance.now(); - - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "tool", - capabilityName: request.params.name, - action: "invoked", - invocationIndex, - arguments: request.params.arguments, - })); - if (!tool.enabled) { - const error = new McpError( + throw new McpError( ErrorCode.InvalidParams, `Tool ${request.params.name} disabled`, ); - - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "tool", - capabilityName: request.params.name, - action: "error", - invocationIndex, - error, - durationMs: performance.now() - startTime, - })); - - throw error; } if (tool.inputSchema) { @@ -204,51 +147,17 @@ export class McpServer { request.params.arguments, ); if (!parseResult.success) { - const error = new McpError( + throw new McpError( ErrorCode.InvalidParams, `Invalid arguments for tool ${request.params.name}: ${parseResult.error.message}`, ); - - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "tool", - capabilityName: request.params.name, - action: "error", - invocationIndex, - error, - durationMs: performance.now() - startTime, - })); - - throw error; } const args = parseResult.data; const cb = tool.callback as ToolCallback; try { - return await Promise.resolve(cb(args, extra)).then((result) => { - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "tool", - capabilityName: request.params.name, - action: "completed", - invocationIndex, - result, - durationMs: performance.now() - startTime, - })); - - return result; - }); + return await Promise.resolve(cb(args, extra)); } catch (error) { - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "tool", - capabilityName: request.params.name, - action: "error", - invocationIndex, - error, - durationMs: performance.now() - startTime, - })); - return { content: [ { @@ -262,30 +171,8 @@ export class McpServer { } else { const cb = tool.callback as ToolCallback; try { - const result = await Promise.resolve(cb(extra)); - - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "tool", - capabilityName: request.params.name, - action: "completed", - invocationIndex, - result, - durationMs: performance.now() - startTime, - })); - - return result; + return await Promise.resolve(cb(extra)); } catch (error) { - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "tool", - capabilityName: request.params.name, - action: "error", - invocationIndex, - error, - durationMs: performance.now() - startTime, - })); - return { content: [ { @@ -348,79 +235,24 @@ export class McpServer { ); } - const invocationIndex = this._promptInvocationIndex++; - const startTime = performance.now(); - - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "prompt", - capabilityName: ref.name, - action: "invoked", - invocationIndex, - arguments: request.params.argument.name, - })); - if (!prompt.enabled) { - const error = new McpError( + throw new McpError( ErrorCode.InvalidParams, `Prompt ${ref.name} disabled`, ); - - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "prompt", - capabilityName: ref.name, - action: "error", - invocationIndex, - error, - durationMs: performance.now() - startTime, - })); - - throw error; } if (!prompt.argsSchema) { - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "prompt", - capabilityName: ref.name, - action: "completed", - invocationIndex, - result: EMPTY_COMPLETION_RESULT, - durationMs: performance.now() - startTime, - })); - return EMPTY_COMPLETION_RESULT; } const field = prompt.argsSchema.shape[request.params.argument.name]; if (!(field instanceof Completable)) { - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "prompt", - capabilityName: ref.name, - action: "completed", - invocationIndex, - result: EMPTY_COMPLETION_RESULT, - durationMs: performance.now() - startTime, - })); - return EMPTY_COMPLETION_RESULT; } const def: CompletableDef = field._def; const suggestions = await def.complete(request.params.argument.value); - - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "prompt", - capabilityName: ref.name, - action: "completed", - invocationIndex, - result: suggestions, - durationMs: performance.now() - startTime, - })); - return createCompletionResult(suggestions); } @@ -432,21 +264,8 @@ export class McpServer { (t) => t.resourceTemplate.uriTemplate.toString() === ref.uri, ); - const invocationIndex = this._resourceInvocationIndex++; - const startTime = performance.now(); - if (!template) { if (this._registeredResources[ref.uri]) { - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "resource", - capabilityName: ref.uri, - action: "completed", - invocationIndex, - result: EMPTY_COMPLETION_RESULT, - durationMs: performance.now() - startTime, - })); - // Attempting to autocomplete a fixed resource URI is not an error in the spec (but probably should be). return EMPTY_COMPLETION_RESULT; } @@ -461,46 +280,11 @@ export class McpServer { request.params.argument.name, ); if (!completer) { - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "resource", - capabilityName: ref.uri, - action: "completed", - invocationIndex, - result: EMPTY_COMPLETION_RESULT, - durationMs: performance.now() - startTime, - })); - return EMPTY_COMPLETION_RESULT; } - try { - const suggestions = await completer(request.params.argument.value); - - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "resource", - capabilityName: ref.uri, - action: "completed", - invocationIndex, - result: suggestions, - durationMs: performance.now() - startTime, - })); - - return createCompletionResult(suggestions); - } catch (error) { - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "resource", - capabilityName: ref.uri, - action: "error", - invocationIndex, - error, - durationMs: performance.now() - startTime, - })); - - throw error; - } + const suggestions = await completer(request.params.argument.value); + return createCompletionResult(suggestions); } private _resourceHandlersInitialized = false; @@ -583,62 +367,13 @@ export class McpServer { // First check for exact resource match const resource = this._registeredResources[uri.toString()]; if (resource) { - const invocationIndex = this._resourceInvocationIndex++; - const startTime = performance.now(); - - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "resource", - capabilityName: uri.toString(), - action: "invoked", - invocationIndex, - })); - if (!resource.enabled) { - const error = new McpError( + throw new McpError( ErrorCode.InvalidParams, `Resource ${uri} disabled`, ); - - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "resource", - capabilityName: uri.toString(), - action: "error", - invocationIndex, - error, - durationMs: performance.now() - startTime, - })); - - throw error; - } - try { - const result = await resource.readCallback(uri, extra); - - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "resource", - capabilityName: uri.toString(), - action: "completed", - invocationIndex, - result, - durationMs: performance.now() - startTime, - })); - - return result; - } catch (error) { - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "resource", - capabilityName: uri.toString(), - action: "error", - invocationIndex, - error, - durationMs: performance.now() - startTime, - })); - - throw error; } + return resource.readCallback(uri, extra); } // Then check templates @@ -649,44 +384,7 @@ export class McpServer { uri.toString(), ); if (variables) { - const invocationIndex = this._resourceInvocationIndex++; - const startTime = performance.now(); - - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "resource", - capabilityName: uri.toString(), - action: "invoked", - invocationIndex, - })); - - try { - const result = await template.readCallback(uri, variables, extra); - - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "resource", - capabilityName: uri.toString(), - action: "completed", - invocationIndex, - result, - durationMs: performance.now() - startTime, - })); - - return result; - } catch (error) { - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "resource", - capabilityName: uri.toString(), - action: "error", - invocationIndex, - error, - durationMs: performance.now() - startTime, - })); - - throw error; - } + return template.readCallback(uri, variables, extra); } } @@ -698,7 +396,7 @@ export class McpServer { ); this.setCompletionRequestHandler(); - + this._resourceHandlersInitialized = true; } @@ -752,35 +450,11 @@ export class McpServer { ); } - const invocationIndex = this._promptInvocationIndex++; - const startTime = performance.now(); - - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "prompt", - capabilityName: request.params.name, - action: "invoked", - invocationIndex, - arguments: request.params.arguments, - })); - if (!prompt.enabled) { - const error = new McpError( + throw new McpError( ErrorCode.InvalidParams, `Prompt ${request.params.name} disabled`, ); - - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "prompt", - capabilityName: request.params.name, - action: "error", - invocationIndex, - error, - durationMs: performance.now() - startTime, - })); - - throw error; } if (prompt.argsSchema) { @@ -788,90 +462,24 @@ export class McpServer { request.params.arguments, ); if (!parseResult.success) { - const error = new McpError( + throw new McpError( ErrorCode.InvalidParams, `Invalid arguments for prompt ${request.params.name}: ${parseResult.error.message}`, ); - - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "prompt", - capabilityName: request.params.name, - action: "error", - invocationIndex, - error, - durationMs: performance.now() - startTime, - })); - - throw error; } const args = parseResult.data; const cb = prompt.callback as PromptCallback; - - try { - const result = await Promise.resolve(cb(args, extra)); - - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "prompt", - capabilityName: request.params.name, - action: "completed", - invocationIndex, - result, - durationMs: performance.now() - startTime, - })); - - return result; - } catch (error) { - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "prompt", - capabilityName: request.params.name, - action: "error", - invocationIndex, - error, - durationMs: performance.now() - startTime, - })); - - throw error; - } + return await Promise.resolve(cb(args, extra)); } else { const cb = prompt.callback as PromptCallback; - - try { - const result = await Promise.resolve(cb(extra)); - - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "prompt", - capabilityName: request.params.name, - action: "completed", - invocationIndex, - result, - durationMs: performance.now() - startTime, - })); - - return result; - } catch (error) { - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "prompt", - capabilityName: request.params.name, - action: "error", - invocationIndex, - error, - durationMs: performance.now() - startTime, - })); - - throw error; - } + return await Promise.resolve(cb(extra)); } - } + }, ); this.setCompletionRequestHandler(); - + this._promptHandlersInitialized = true; } @@ -933,128 +541,23 @@ export class McpServer { metadata, readCallback: readCallback as ReadResourceCallback, enabled: true, - disable: () => { - if (!registeredResource.enabled) return; - - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "resource", - capabilityName: name, - action: "disabled", - })); - - registeredResource.update({ enabled: false }); - }, - enable: () => { - if (registeredResource.enabled) return; - - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "resource", - capabilityName: name, - action: "enabled", - })); - - registeredResource.update({ enabled: true }); - }, - remove: () => { - if (uriOrTemplate === null) return; - - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "resource", - capabilityName: name, - action: "removed", - })); - - registeredResource.update({ uri: null }); - }, + disable: () => registeredResource.update({ enabled: false }), + enable: () => registeredResource.update({ enabled: true }), + remove: () => registeredResource.update({ uri: null }), update: (updates) => { - let added = false; - let removed = false; - let updated = false; - let enabled = false; - - if ( - typeof updates.uri !== "undefined" && - updates.uri !== uriOrTemplate - ) { - removed = true; - delete this._registeredResources[uriOrTemplate]; - - if (updates.uri) { - added = true; - this._registeredResources[updates.uri] = registeredResource; - } - } - - if (typeof updates.name !== "undefined" && updates.name !== name) { - updated = true; - registeredResource.name = updates.name; - } - - if (typeof updates.metadata !== "undefined" && updates.metadata !== metadata) { - updated = true; - registeredResource.metadata = updates.metadata; + if (typeof updates.uri !== "undefined" && updates.uri !== uriOrTemplate) { + delete this._registeredResources[uriOrTemplate] + if (updates.uri) this._registeredResources[updates.uri] = registeredResource } - - if (typeof updates.callback !== "undefined" && updates.callback !== registeredResource.readCallback) { - updated = true; - registeredResource.readCallback = updates.callback; - } - - if (typeof updates.enabled !== "undefined" && updates.enabled !== registeredResource.enabled) { - enabled = true; - registeredResource.enabled = updates.enabled; - } - - if (removed) { - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "resource", - capabilityName: name, - action: "removed", - })); - } - - if (added) { - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "resource", - capabilityName: registeredResource.name, - action: "added", - })); - } - - if (updated) { - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "resource", - capabilityName: registeredResource.name, - action: "updated", - })); - } - - if (enabled) { - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "resource", - capabilityName: registeredResource.name, - action: registeredResource.enabled ? "enabled" : "disabled", - })); - } - this.sendResourceListChanged(); + if (typeof updates.name !== "undefined") registeredResource.name = updates.name + if (typeof updates.metadata !== "undefined") registeredResource.metadata = updates.metadata + if (typeof updates.callback !== "undefined") registeredResource.readCallback = updates.callback + if (typeof updates.enabled !== "undefined") registeredResource.enabled = updates.enabled + this.sendResourceListChanged() }, }; this._registeredResources[uriOrTemplate] = registeredResource; - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "resource", - capabilityName: name, - action: "added", - })); - this.setResourceRequestHandlers(); this.sendResourceListChanged(); return registeredResource; @@ -1068,136 +571,23 @@ export class McpServer { metadata, readCallback: readCallback as ReadResourceTemplateCallback, enabled: true, - disable: () => { - if (!registeredResourceTemplate.enabled) return; - - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "resource", - capabilityName: name, - action: "disabled", - })); - - registeredResourceTemplate.update({ enabled: false }); - }, - enable: () => { - if (registeredResourceTemplate.enabled) return; - - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "resource", - capabilityName: name, - action: "enabled", - })); - - registeredResourceTemplate.update({ enabled: true }); - }, - remove: () => { - if (name === null) return; - - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "resource", - capabilityName: name, - action: "removed", - })); - - registeredResourceTemplate.update({ name: null }); - }, + disable: () => registeredResourceTemplate.update({ enabled: false }), + enable: () => registeredResourceTemplate.update({ enabled: true }), + remove: () => registeredResourceTemplate.update({ name: null }), update: (updates) => { - let added = false; - let removed = false; - let updated = false; - let enabled = false; - if (typeof updates.name !== "undefined" && updates.name !== name) { - removed = true; - delete this._registeredResourceTemplates[name]; - - if (updates.name) { - added = true; - this._registeredResourceTemplates[updates.name] = - registeredResourceTemplate; - } + delete this._registeredResourceTemplates[name] + if (updates.name) this._registeredResourceTemplates[updates.name] = registeredResourceTemplate } - - if (typeof updates.template !== "undefined" && updates.template !== registeredResourceTemplate.resourceTemplate) { - updated = true; - registeredResourceTemplate.resourceTemplate = updates.template; - } - - if ( - typeof updates.metadata !== "undefined" && - updates.metadata !== metadata - ) { - updated = true; - registeredResourceTemplate.metadata = updates.metadata; - } - - if ( - typeof updates.callback !== "undefined" && - updates.callback !== registeredResourceTemplate.readCallback - ) { - updated = true; - registeredResourceTemplate.readCallback = updates.callback; - } - - if ( - typeof updates.enabled !== "undefined" && - updates.enabled !== registeredResourceTemplate.enabled - ) { - enabled = true; - registeredResourceTemplate.enabled = updates.enabled; - } - - if (removed) { - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "resource", - capabilityName: name, - action: "removed", - })); - } - - if (added) { - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "resource", - capabilityName: updates.name || name, - action: "added", - })); - } - - if (updated) { - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "resource", - capabilityName: updates.name || name, - action: "updated", - })); - } - - if (enabled) { - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "resource", - capabilityName: updates.name || name, - action: registeredResourceTemplate.enabled ? "enabled" : "disabled", - })); - } - - this.sendResourceListChanged(); + if (typeof updates.template !== "undefined") registeredResourceTemplate.resourceTemplate = updates.template + if (typeof updates.metadata !== "undefined") registeredResourceTemplate.metadata = updates.metadata + if (typeof updates.callback !== "undefined") registeredResourceTemplate.readCallback = updates.callback + if (typeof updates.enabled !== "undefined") registeredResourceTemplate.enabled = updates.enabled + this.sendResourceListChanged() }, }; this._registeredResourceTemplates[name] = registeredResourceTemplate; - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "resource", - capabilityName: name, - action: "added", - })); - this.setResourceRequestHandlers(); this.sendResourceListChanged(); return registeredResourceTemplate; @@ -1255,138 +645,23 @@ export class McpServer { paramsSchema === undefined ? undefined : z.object(paramsSchema), callback: cb, enabled: true, - disable: () => { - if (!registeredTool.enabled) return; - - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "tool", - capabilityName: name, - action: "disabled", - })); - - registeredTool.update({ enabled: false }); - }, - enable: () => { - if (registeredTool.enabled) return; - - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "tool", - capabilityName: name, - action: "enabled", - })); - - registeredTool.update({ enabled: true }); - }, - remove: () => { - if (name === null) return; - - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "tool", - capabilityName: name, - action: "removed", - })); - - registeredTool.update({ name: null }); - }, + disable: () => registeredTool.update({ enabled: false }), + enable: () => registeredTool.update({ enabled: true }), + remove: () => registeredTool.update({ name: null }), update: (updates) => { - let added = false; - let removed = false; - let updated = false; - let enabled = false; - if (typeof updates.name !== "undefined" && updates.name !== name) { - removed = true; - delete this._registeredTools[name]; - - if (updates.name) { - added = true; - this._registeredTools[updates.name] = registeredTool; - } - } - - if ( - typeof updates.description !== "undefined" && - updates.description !== description - ) { - updated = true; - registeredTool.description = updates.description; - } - - if ( - typeof updates.paramsSchema !== "undefined" && - updates.paramsSchema !== paramsSchema - ) { - updated = true; - registeredTool.inputSchema = z.object(updates.paramsSchema); - } - - if ( - typeof updates.callback !== "undefined" && - updates.callback !== registeredTool.callback - ) { - updated = true; - registeredTool.callback = updates.callback; + delete this._registeredTools[name] + if (updates.name) this._registeredTools[updates.name] = registeredTool } - - if ( - typeof updates.enabled !== "undefined" && - updates.enabled !== registeredTool.enabled - ) { - enabled = true; - registeredTool.enabled = updates.enabled; - } - - if (removed) { - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "tool", - capabilityName: name, - action: "removed", - })); - } - - if (added) { - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "tool", - capabilityName: updates.name || name, - action: "added", - })); - } - - if (updated) { - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "tool", - capabilityName: updates.name || name, - action: "updated", - })); - } - - if (enabled) { - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "tool", - capabilityName: updates.name || name, - action: registeredTool.enabled ? "enabled" : "disabled", - })); - } - - this.sendToolListChanged(); + if (typeof updates.description !== "undefined") registeredTool.description = updates.description + if (typeof updates.paramsSchema !== "undefined") registeredTool.inputSchema = z.object(updates.paramsSchema) + if (typeof updates.callback !== "undefined") registeredTool.callback = updates.callback + if (typeof updates.enabled !== "undefined") registeredTool.enabled = updates.enabled + this.sendToolListChanged() }, }; this._registeredTools[name] = registeredTool; - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "tool", - capabilityName: name, - action: "added", - })); - this.setToolRequestHandlers(); this.sendToolListChanged() @@ -1443,138 +718,23 @@ export class McpServer { argsSchema: argsSchema === undefined ? undefined : z.object(argsSchema), callback: cb, enabled: true, - disable: () => { - if (!registeredPrompt.enabled) return; - - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "prompt", - capabilityName: name, - action: "disabled", - })); - - registeredPrompt.update({ enabled: false }); - }, - enable: () => { - if (registeredPrompt.enabled) return; - - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "prompt", - capabilityName: name, - action: "enabled", - })); - - registeredPrompt.update({ enabled: true }); - }, - remove: () => { - if (name === null) return; - - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "prompt", - capabilityName: name, - action: "removed", - })); - - registeredPrompt.update({ name: null }); - }, + disable: () => registeredPrompt.update({ enabled: false }), + enable: () => registeredPrompt.update({ enabled: true }), + remove: () => registeredPrompt.update({ name: null }), update: (updates) => { - let added = false; - let removed = false; - let updated = false; - let enabled = false; - if (typeof updates.name !== "undefined" && updates.name !== name) { - removed = true; - delete this._registeredPrompts[name]; - - if (updates.name) { - added = true; - this._registeredPrompts[updates.name!] = registeredPrompt; - } - } - - if ( - typeof updates.description !== "undefined" && - updates.description !== description - ) { - updated = true; - registeredPrompt.description = updates.description; - } - - if ( - typeof updates.argsSchema !== "undefined" && - updates.argsSchema !== argsSchema - ) { - updated = true; - registeredPrompt.argsSchema = z.object(updates.argsSchema); - } - - if ( - typeof updates.callback !== "undefined" && - updates.callback !== registeredPrompt.callback - ) { - updated = true; - registeredPrompt.callback = updates.callback; - } - - if ( - typeof updates.enabled !== "undefined" && - updates.enabled !== registeredPrompt.enabled - ) { - enabled = true; - registeredPrompt.enabled = updates.enabled; - } - - if (removed) { - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "prompt", - capabilityName: name, - action: "removed", - })); - } - - if (added) { - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "prompt", - capabilityName: updates.name || name, - action: "added", - })); + delete this._registeredPrompts[name] + if (updates.name) this._registeredPrompts[updates.name] = registeredPrompt } - - if (updated) { - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "prompt", - capabilityName: updates.name || name, - action: "updated", - })); - } - - if (enabled) { - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "prompt", - capabilityName: updates.name || name, - action: registeredPrompt.enabled ? "enabled" : "disabled", - })); - } - - this.sendPromptListChanged(); + if (typeof updates.description !== "undefined") registeredPrompt.description = updates.description + if (typeof updates.argsSchema !== "undefined") registeredPrompt.argsSchema = z.object(updates.argsSchema) + if (typeof updates.callback !== "undefined") registeredPrompt.callback = updates.callback + if (typeof updates.enabled !== "undefined") registeredPrompt.enabled = updates.enabled + this.sendPromptListChanged() }, }; this._registeredPrompts[name] = registeredPrompt; - this._onCapabilityChange.notify(() => ({ - serverInfo: this.server.getVersion(), - capabilityType: "prompt", - capabilityName: name, - action: "added", - })); - this.setPromptRequestHandlers(); this.sendPromptListChanged() @@ -1810,72 +970,3 @@ const EMPTY_COMPLETION_RESULT: CompleteResult = { hasMore: false, }, }; - -/** - * Represents events emitted when capabilities (tools, resources, prompts) change state or are invoked. - * - * These events allow tracking the lifecycle and usage of all capabilities registered with an McpServer. - * Events include capability registration, updates, invocation, completion, and errors. - * - * Each capability type (tool, resource, prompt) maintains its own sequence of invocation indexes. - * The invocationIndex can be used to correlate "invoked" events with their corresponding - * "completed" or "error" events for the same capability type. - */ -export type CapabilityEvent = { - /** Information about the server that generated this event */ - readonly serverInfo: { readonly name: string; readonly version: string }; - /** The type of capability this event relates to */ - readonly capabilityType: "resource" | "tool" | "prompt"; - /** The name (or URI for resources) of the specific capability */ - readonly capabilityName: string; -} & ( - | { - /** - * Lifecycle events for capability registration and status changes. - * - "added": The capability was registered - * - "updated": The capability was modified - * - "removed": The capability was unregistered - * - "enabled": The capability was enabled - * - "disabled": The capability was disabled - */ - readonly action: "added" | "updated" | "removed" | "enabled" | "disabled"; - } - | { - /** Emitted when a capability is invoked */ - readonly action: "invoked"; - /** - * Monotonically increasing index for each invocation, per capability type. - * This index can be used to correlate this "invoked" event with a later - * "completed" or "error" event with the same capabilityType and invocationIndex. - */ - readonly invocationIndex: number; - /** The arguments passed to the capability, if any */ - readonly arguments?: unknown; - } - | { - /** Emitted when a capability invocation completes successfully */ - readonly action: "completed"; - /** - * The invocationIndex from the corresponding "invoked" event. - * This allows correlating the completion with its invocation. - */ - readonly invocationIndex: number; - /** The result returned by the capability, if any */ - readonly result?: unknown; - /** The duration of the operation in milliseconds, measured from invocation to completion */ - readonly durationMs?: number; - } - | { - /** Emitted when a capability invocation fails with an error */ - readonly action: "error"; - /** - * The invocationIndex from the corresponding "invoked" event. - * This allows correlating the error with its invocation. - */ - readonly invocationIndex: number; - /** The error that occurred during capability execution */ - readonly error: unknown; - /** The duration from invocation to error in milliseconds */ - readonly durationMs?: number; - } -); diff --git a/src/shared/eventNotifier.test.ts b/src/shared/eventNotifier.test.ts deleted file mode 100644 index 2923f7a09..000000000 --- a/src/shared/eventNotifier.test.ts +++ /dev/null @@ -1,145 +0,0 @@ -import { createEventNotifier } from "./eventNotifier.js"; - -describe("EventNotifier", () => { - let notifier: ReturnType>; - - beforeEach(() => { - notifier = createEventNotifier(); - }); - - test("should notify listeners in registration order", () => { - const events: string[] = []; - notifier.onEvent((event) => events.push(`first: ${event}`)); - notifier.onEvent((event) => events.push(`second: ${event}`)); - notifier.onEvent((event) => events.push(`third: ${event}`)); - - notifier.notify("test event"); - - expect(events).toEqual([ - "first: test event", - "second: test event", - "third: test event", - ]); - }); - - test("should not notify unsubscribed listeners", () => { - const events: string[] = []; - const subscription = notifier.onEvent((event) => events.push(event)); - - notifier.notify("first event"); - subscription.close(); - notifier.notify("second event"); - - expect(events).toEqual(["first event"]); - }); - - test("should handle function events", () => { - const events: string[] = []; - notifier.onEvent((event) => events.push(event)); - - notifier.notify(() => "dynamic event"); - - expect(events).toEqual(["dynamic event"]); - }); - - test("should handle errors through error handler", () => { - const errors: Error[] = []; - notifier.onError((error) => errors.push(error)); - - notifier.onEvent(() => { - throw new Error("test error"); - }); - - notifier.notify("test event"); - - expect(errors).toHaveLength(1); - expect(errors[0]).toBeInstanceOf(Error); - expect(errors[0].message).toBe("test error"); - }); - - test("should not notify after close", () => { - const events: string[] = []; - notifier.onEvent((event) => events.push(event)); - - notifier.notify("first event"); - notifier.close(); - notifier.notify("second event"); - - expect(events).toEqual(["first event"]); - }); - - test("should handle multiple subscriptions and unsubscriptions", () => { - const events: string[] = []; - const subscription1 = notifier.onEvent((event) => - events.push(`1: ${event}`) - ); - const subscription2 = notifier.onEvent((event) => - events.push(`2: ${event}`) - ); - - notifier.notify("first event"); - subscription1.close(); - notifier.notify("second event"); - subscription2.close(); - notifier.notify("third event"); - - expect(events).toEqual([ - "1: first event", - "2: first event", - "2: second event", - ]); - }); - - test("should handle error handler after close", () => { - const errors: Error[] = []; - notifier.onError((error) => errors.push(error)); - notifier.close(); - - notifier.onEvent(() => { - throw new Error("test error"); - }); - - notifier.notify("test event"); - - expect(errors).toHaveLength(0); - }); - - test("should clear error handler on close", () => { - const errors: Error[] = []; - notifier.onError((error) => errors.push(error)); - - // Close should clear the error handler - notifier.close(); - - // Setting up a new listener after close - notifier.onEvent(() => { - throw new Error("test error"); - }); - - // This should not trigger the error handler since the notifier is closed - notifier.notify("test event"); - - expect(errors).toHaveLength(0); - }); - - test("should use the last set error handler", () => { - const errors1: Error[] = []; - const errors2: Error[] = []; - - // First error handler - notifier.onError((error) => errors1.push(error)); - - // Second error handler should replace the first one - notifier.onError((error) => errors2.push(error)); - - notifier.onEvent(() => { - throw new Error("test error"); - }); - - notifier.notify("test event"); - - expect(errors1).toHaveLength(0); - expect(errors2).toHaveLength(1); - expect(errors2[0].message).toBe("test error"); - }); -}); diff --git a/src/shared/eventNotifier.ts b/src/shared/eventNotifier.ts deleted file mode 100644 index a1f53238d..000000000 --- a/src/shared/eventNotifier.ts +++ /dev/null @@ -1,152 +0,0 @@ -/** - * Provides a simple, type-safe event notification implementation. This module allows components to implement the - * observer pattern with minimal boilerplate and proper type checking. - */ - -/** - * A type-safe event notifier that manages event listeners and notifications. - * - * @template T The type of events this notifier will handle - * @template E The type of error that can be handled (defaults to Error) - * - * EventNotifier provides: - * - * - Type-safe event subscriptions via `onEvent` - * - Synchronized event notifications via `notify` - * - Automatic cleanup of resources via `close` - * - Status tracking via `active` property - * - Error handling via optional error callback - */ -export type EventNotifier = { - /** - * Registers a listener function to be called when events are notified. Listeners are notified in the order they - * were registered. - * - * @example - * - * ```ts - * const notifier = createEventNotifier(); - * const subscription = notifier.onEvent((message) => { - * console.log(`Received message: ${message}`); - * }); - * - * // Later, to stop listening: - * subscription.close(); - * ``` - * - * @param listener A function that will be called with the notified event - * @returns A closeable to unregister the listener (all listeners are unregistered when the notifier is closed). - */ - onEvent: (listener: (event: T) => unknown) => { close: () => void }; - - /** - * Notifies all registered listeners with the provided event. - * - * This method: - * - * - Calls all registered listeners with the event in their registration order - * - Ignores errors thrown by listeners (they won't affect other listeners) - * - Ignores returned promises (results are not awaited) - * - Does nothing if there are no listeners - * - If the event is a function, it will be called if there are listeners and its return value will be - * used as the event. - * - * @example - * - * ```ts - * const notifier = createEventNotifier<{ type: string; value: number }>(); - * notifier.onEvent((event) => { - * console.log(`Received ${event.type} with value:`, event.value); - * }); - * - * notifier.notify({ type: 'progress', value: 75 }); - * ``` - * - * @param event The event to send to all listeners or a function that returns such event. - */ - notify: (event: T | (() => T)) => void; - - /** - * Sets an error handler for the notifier. This handler will be called when a listener throws an error. - * - * @param handler A function that will be called with any errors thrown by listeners. - */ - onError: (handler: (error: E) => void) => void; - - /** - * Closes the notifier and removes all listeners. - * - * @warning Failing to call close() on subscriptions or the notifier itself may lead to memory leaks. - */ - close: () => void; -}; - -/** - * Creates a type-safe event notifier. - * - * @example - * - * ```ts - * // Simple string event notifier - * const stringNotifier = createEventNotifier(); - * - * // Complex object event notifier - * interface TaskEvent { - * type: 'started' | 'completed' | 'failed'; - * taskId: number; - * details: { - * name: string; - * duration?: number; - * }; - * } - * const taskNotifier = createEventNotifier(); - * ``` - * - * @template T The type of events this notifier will handle - * @template E The type of error that can be handled (defaults to Error) - * @returns A new EventNotifier instance. - */ -export const createEventNotifier = (): EventNotifier => { - const listeners = new Set<(event: T) => unknown>(); - let errorHandler: ((error: E) => void) | undefined; - - return { - close: () => { - listeners.clear(); - errorHandler = undefined; - }, - - onEvent: (listener) => { - listeners.add(listener); - return { - close: () => { - listeners.delete(listener); - }, - }; - }, - - notify: (event: T | (() => T)) => { - if (!listeners.size) { - return; - } - - if (typeof event === "function") { - event = (event as () => T)(); - } - - for (const listener of listeners) { - try { - void listener(event); - } catch (error) { - if (errorHandler) { - errorHandler(error as E); - } - } - } - }, - - onError: (handler) => { - errorHandler = handler; - }, - }; -};