From 05236fceb995fddab15b4c307ee5bf8ded5ebd0d Mon Sep 17 00:00:00 2001 From: Ben Brandt Date: Thu, 23 Oct 2025 22:23:13 +0200 Subject: [PATCH] Add signal and closed properties to connection classes Expose AbortSignal and closed Promise on AgentSideConnection and ClientSideConnection to allow detection of connection closure. The signal can be used with event listeners or passed to other APIs for cancellation, while the closed promise enables async/await style cleanup when the stream ends. --- src/acp.test.ts | 178 ++++++++++++++++++++++++++++++++++++++++++++++++ src/acp.ts | 138 +++++++++++++++++++++++++++++++++++++ 2 files changed, 316 insertions(+) diff --git a/src/acp.test.ts b/src/acp.test.ts index d774515..1bf4def 100644 --- a/src/acp.test.ts +++ b/src/acp.test.ts @@ -758,6 +758,184 @@ describe("Connection", () => { }); }); + it("resolves closed promise when stream ends", async () => { + const closeLog: string[] = []; + + // Create simple client and agent + class TestClient implements Client { + async writeTextFile( + _: WriteTextFileRequest, + ): Promise { + return {}; + } + async readTextFile( + _: ReadTextFileRequest, + ): Promise { + return { content: "test" }; + } + async requestPermission( + _: RequestPermissionRequest, + ): Promise { + return { + outcome: { + outcome: "selected", + optionId: "allow", + }, + }; + } + async sessionUpdate(_: SessionNotification): Promise { + // no-op + } + } + + class TestAgent implements Agent { + async initialize(_: InitializeRequest): Promise { + return { + protocolVersion: PROTOCOL_VERSION, + agentCapabilities: { loadSession: false }, + }; + } + async newSession(_: NewSessionRequest): Promise { + return { sessionId: "test-session" }; + } + async authenticate(_: AuthenticateRequest): Promise { + // no-op + } + async prompt(_: PromptRequest): Promise { + return { stopReason: "end_turn" }; + } + async cancel(_: CancelNotification): Promise { + // no-op + } + } + + // Set up connections + const agentConnection = new ClientSideConnection( + () => new TestClient(), + ndJsonStream(clientToAgent.writable, agentToClient.readable), + ); + + const clientConnection = new AgentSideConnection( + () => new TestAgent(), + ndJsonStream(agentToClient.writable, clientToAgent.readable), + ); + + // Listen for close via signal + agentConnection.signal.addEventListener("abort", () => { + closeLog.push("agent connection closed (signal)"); + }); + + clientConnection.signal.addEventListener("abort", () => { + closeLog.push("client connection closed (signal)"); + }); + + // Verify connections are not closed yet + expect(agentConnection.signal.aborted).toBe(false); + expect(clientConnection.signal.aborted).toBe(false); + expect(closeLog).toHaveLength(0); + + // Close the streams by closing the writable ends + await clientToAgent.writable.close(); + await agentToClient.writable.close(); + + // Wait for closed promises to resolve + await agentConnection.closed; + await clientConnection.closed; + + // Verify connections are now closed + expect(agentConnection.signal.aborted).toBe(true); + expect(clientConnection.signal.aborted).toBe(true); + expect(closeLog).toContain("agent connection closed (signal)"); + expect(closeLog).toContain("client connection closed (signal)"); + }); + + it("supports removing signal event listeners", async () => { + const closeLog: string[] = []; + + // Create simple client and agent + class TestClient implements Client { + async writeTextFile( + _: WriteTextFileRequest, + ): Promise { + return {}; + } + async readTextFile( + _: ReadTextFileRequest, + ): Promise { + return { content: "test" }; + } + async requestPermission( + _: RequestPermissionRequest, + ): Promise { + return { + outcome: { + outcome: "selected", + optionId: "allow", + }, + }; + } + async sessionUpdate(_: SessionNotification): Promise { + // no-op + } + } + + class TestAgent implements Agent { + async initialize(_: InitializeRequest): Promise { + return { + protocolVersion: PROTOCOL_VERSION, + agentCapabilities: { loadSession: false }, + }; + } + async newSession(_: NewSessionRequest): Promise { + return { sessionId: "test-session" }; + } + async authenticate(_: AuthenticateRequest): Promise { + // no-op + } + async prompt(_: PromptRequest): Promise { + return { stopReason: "end_turn" }; + } + async cancel(_: CancelNotification): Promise { + // no-op + } + } + + // Set up connections + const agentConnection = new ClientSideConnection( + () => new TestClient(), + ndJsonStream(clientToAgent.writable, agentToClient.readable), + ); + + new AgentSideConnection( + () => new TestAgent(), + ndJsonStream(agentToClient.writable, clientToAgent.readable), + ); + + // Register and then remove a listener + const listener = () => { + closeLog.push("this should not be called"); + }; + + agentConnection.signal.addEventListener("abort", listener); + agentConnection.signal.removeEventListener("abort", listener); + + // Register another listener that should be called + agentConnection.signal.addEventListener("abort", () => { + closeLog.push("agent connection closed"); + }); + + // Close the streams + await clientToAgent.writable.close(); + await agentToClient.writable.close(); + + // Wait for closed promise + await agentConnection.closed; + + // Verify only the non-removed listener was called + expect(closeLog).toEqual(["agent connection closed"]); + expect(closeLog).not.toContain("this should not be called"); + }); + it("handles methods returning response objects with _meta or void", async () => { // Create client that returns both response objects and void class TestClient implements Client { diff --git a/src/acp.ts b/src/acp.ts index 1b81047..64398eb 100644 --- a/src/acp.ts +++ b/src/acp.ts @@ -261,6 +261,57 @@ export class AgentSideConnection { ): Promise { return await this.#connection.sendNotification(`_${method}`, params); } + + /** + * AbortSignal that aborts when the connection closes. + * + * This signal can be used to: + * - Listen for connection closure: `connection.signal.addEventListener('abort', () => {...})` + * - Check connection status synchronously: `if (connection.signal.aborted) {...}` + * - Pass to other APIs (fetch, setTimeout) for automatic cancellation + * + * The connection closes when the underlying stream ends, either normally or due to an error. + * + * @example + * ```typescript + * const connection = new AgentSideConnection(agent, stream); + * + * // Listen for closure + * connection.signal.addEventListener('abort', () => { + * console.log('Connection closed - performing cleanup'); + * }); + * + * // Check status + * if (connection.signal.aborted) { + * console.log('Connection is already closed'); + * } + * + * // Pass to other APIs + * fetch(url, { signal: connection.signal }); + * ``` + */ + get signal(): AbortSignal { + return this.#connection.signal; + } + + /** + * Promise that resolves when the connection closes. + * + * The connection closes when the underlying stream ends, either normally or due to an error. + * Once closed, the connection cannot send or receive any more messages. + * + * This is useful for async/await style cleanup: + * + * @example + * ```typescript + * const connection = new AgentSideConnection(agent, stream); + * await connection.closed; + * console.log('Connection closed - performing cleanup'); + * ``` + */ + get closed(): Promise { + return this.#connection.closed; + } } /** @@ -686,6 +737,57 @@ export class ClientSideConnection implements Agent { ): Promise { return await this.#connection.sendNotification(`_${method}`, params); } + + /** + * AbortSignal that aborts when the connection closes. + * + * This signal can be used to: + * - Listen for connection closure: `connection.signal.addEventListener('abort', () => {...})` + * - Check connection status synchronously: `if (connection.signal.aborted) {...}` + * - Pass to other APIs (fetch, setTimeout) for automatic cancellation + * + * The connection closes when the underlying stream ends, either normally or due to an error. + * + * @example + * ```typescript + * const connection = new ClientSideConnection(client, stream); + * + * // Listen for closure + * connection.signal.addEventListener('abort', () => { + * console.log('Connection closed - performing cleanup'); + * }); + * + * // Check status + * if (connection.signal.aborted) { + * console.log('Connection is already closed'); + * } + * + * // Pass to other APIs + * fetch(url, { signal: connection.signal }); + * ``` + */ + get signal(): AbortSignal { + return this.#connection.signal; + } + + /** + * Promise that resolves when the connection closes. + * + * The connection closes when the underlying stream ends, either normally or due to an error. + * Once closed, the connection cannot send or receive any more messages. + * + * This is useful for async/await style cleanup: + * + * @example + * ```typescript + * const connection = new ClientSideConnection(client, stream); + * await connection.closed; + * console.log('Connection closed - performing cleanup'); + * ``` + */ + get closed(): Promise { + return this.#connection.closed; + } } export type { AnyMessage } from "./jsonrpc.js"; @@ -697,6 +799,8 @@ class Connection { #notificationHandler: NotificationHandler; #stream: Stream; #writeQueue: Promise = Promise.resolve(); + #abortController = new AbortController(); + #closedPromise: Promise; constructor( requestHandler: RequestHandler, @@ -706,9 +810,42 @@ class Connection { this.#requestHandler = requestHandler; this.#notificationHandler = notificationHandler; this.#stream = stream; + this.#closedPromise = new Promise((resolve) => { + this.#abortController.signal.addEventListener("abort", () => resolve()); + }); this.#receive(); } + /** + * AbortSignal that aborts when the connection closes. + * + * This signal can be used to: + * - Listen for connection closure via event listeners + * - Check connection status synchronously with `signal.aborted` + * - Pass to other APIs (fetch, setTimeout) for automatic cancellation + */ + get signal(): AbortSignal { + return this.#abortController.signal; + } + + /** + * Promise that resolves when the connection closes. + * + * The connection closes when the underlying stream ends, either normally + * or due to an error. Once closed, the connection cannot send or receive + * any more messages. + * + * @example + * ```typescript + * const connection = new ClientSideConnection(client, stream); + * await connection.closed; + * console.log('Connection closed - performing cleanup'); + * ``` + */ + get closed(): Promise { + return this.#closedPromise; + } + async #receive() { const reader = this.#stream.readable.getReader(); try { @@ -744,6 +881,7 @@ class Connection { } } finally { reader.releaseLock(); + this.#abortController.abort(); } }