diff --git a/src/acp.test.ts b/src/acp.test.ts index d774515..1bf4def 100644 --- a/src/acp.test.ts +++ b/src/acp.test.ts @@ -758,6 +758,184 @@ describe("Connection", () => { }); }); + it("resolves closed promise when stream ends", async () => { + const closeLog: string[] = []; + + // Create simple client and agent + class TestClient implements Client { + async writeTextFile( + _: WriteTextFileRequest, + ): Promise { + return {}; + } + async readTextFile( + _: ReadTextFileRequest, + ): Promise { + return { content: "test" }; + } + async requestPermission( + _: RequestPermissionRequest, + ): Promise { + return { + outcome: { + outcome: "selected", + optionId: "allow", + }, + }; + } + async sessionUpdate(_: SessionNotification): Promise { + // no-op + } + } + + class TestAgent implements Agent { + async initialize(_: InitializeRequest): Promise { + return { + protocolVersion: PROTOCOL_VERSION, + agentCapabilities: { loadSession: false }, + }; + } + async newSession(_: NewSessionRequest): Promise { + return { sessionId: "test-session" }; + } + async authenticate(_: AuthenticateRequest): Promise { + // no-op + } + async prompt(_: PromptRequest): Promise { + return { stopReason: "end_turn" }; + } + async cancel(_: CancelNotification): Promise { + // no-op + } + } + + // Set up connections + const agentConnection = new ClientSideConnection( + () => new TestClient(), + ndJsonStream(clientToAgent.writable, agentToClient.readable), + ); + + const clientConnection = new AgentSideConnection( + () => new TestAgent(), + ndJsonStream(agentToClient.writable, clientToAgent.readable), + ); + + // Listen for close via signal + agentConnection.signal.addEventListener("abort", () => { + closeLog.push("agent connection closed (signal)"); + }); + + clientConnection.signal.addEventListener("abort", () => { + closeLog.push("client connection closed (signal)"); + }); + + // Verify connections are not closed yet + expect(agentConnection.signal.aborted).toBe(false); + expect(clientConnection.signal.aborted).toBe(false); + expect(closeLog).toHaveLength(0); + + // Close the streams by closing the writable ends + await clientToAgent.writable.close(); + await agentToClient.writable.close(); + + // Wait for closed promises to resolve + await agentConnection.closed; + await clientConnection.closed; + + // Verify connections are now closed + expect(agentConnection.signal.aborted).toBe(true); + expect(clientConnection.signal.aborted).toBe(true); + expect(closeLog).toContain("agent connection closed (signal)"); + expect(closeLog).toContain("client connection closed (signal)"); + }); + + it("supports removing signal event listeners", async () => { + const closeLog: string[] = []; + + // Create simple client and agent + class TestClient implements Client { + async writeTextFile( + _: WriteTextFileRequest, + ): Promise { + return {}; + } + async readTextFile( + _: ReadTextFileRequest, + ): Promise { + return { content: "test" }; + } + async requestPermission( + _: RequestPermissionRequest, + ): Promise { + return { + outcome: { + outcome: "selected", + optionId: "allow", + }, + }; + } + async sessionUpdate(_: SessionNotification): Promise { + // no-op + } + } + + class TestAgent implements Agent { + async initialize(_: InitializeRequest): Promise { + return { + protocolVersion: PROTOCOL_VERSION, + agentCapabilities: { loadSession: false }, + }; + } + async newSession(_: NewSessionRequest): Promise { + return { sessionId: "test-session" }; + } + async authenticate(_: AuthenticateRequest): Promise { + // no-op + } + async prompt(_: PromptRequest): Promise { + return { stopReason: "end_turn" }; + } + async cancel(_: CancelNotification): Promise { + // no-op + } + } + + // Set up connections + const agentConnection = new ClientSideConnection( + () => new TestClient(), + ndJsonStream(clientToAgent.writable, agentToClient.readable), + ); + + new AgentSideConnection( + () => new TestAgent(), + ndJsonStream(agentToClient.writable, clientToAgent.readable), + ); + + // Register and then remove a listener + const listener = () => { + closeLog.push("this should not be called"); + }; + + agentConnection.signal.addEventListener("abort", listener); + agentConnection.signal.removeEventListener("abort", listener); + + // Register another listener that should be called + agentConnection.signal.addEventListener("abort", () => { + closeLog.push("agent connection closed"); + }); + + // Close the streams + await clientToAgent.writable.close(); + await agentToClient.writable.close(); + + // Wait for closed promise + await agentConnection.closed; + + // Verify only the non-removed listener was called + expect(closeLog).toEqual(["agent connection closed"]); + expect(closeLog).not.toContain("this should not be called"); + }); + it("handles methods returning response objects with _meta or void", async () => { // Create client that returns both response objects and void class TestClient implements Client { diff --git a/src/acp.ts b/src/acp.ts index 1b81047..64398eb 100644 --- a/src/acp.ts +++ b/src/acp.ts @@ -261,6 +261,57 @@ export class AgentSideConnection { ): Promise { return await this.#connection.sendNotification(`_${method}`, params); } + + /** + * AbortSignal that aborts when the connection closes. + * + * This signal can be used to: + * - Listen for connection closure: `connection.signal.addEventListener('abort', () => {...})` + * - Check connection status synchronously: `if (connection.signal.aborted) {...}` + * - Pass to other APIs (fetch, setTimeout) for automatic cancellation + * + * The connection closes when the underlying stream ends, either normally or due to an error. + * + * @example + * ```typescript + * const connection = new AgentSideConnection(agent, stream); + * + * // Listen for closure + * connection.signal.addEventListener('abort', () => { + * console.log('Connection closed - performing cleanup'); + * }); + * + * // Check status + * if (connection.signal.aborted) { + * console.log('Connection is already closed'); + * } + * + * // Pass to other APIs + * fetch(url, { signal: connection.signal }); + * ``` + */ + get signal(): AbortSignal { + return this.#connection.signal; + } + + /** + * Promise that resolves when the connection closes. + * + * The connection closes when the underlying stream ends, either normally or due to an error. + * Once closed, the connection cannot send or receive any more messages. + * + * This is useful for async/await style cleanup: + * + * @example + * ```typescript + * const connection = new AgentSideConnection(agent, stream); + * await connection.closed; + * console.log('Connection closed - performing cleanup'); + * ``` + */ + get closed(): Promise { + return this.#connection.closed; + } } /** @@ -686,6 +737,57 @@ export class ClientSideConnection implements Agent { ): Promise { return await this.#connection.sendNotification(`_${method}`, params); } + + /** + * AbortSignal that aborts when the connection closes. + * + * This signal can be used to: + * - Listen for connection closure: `connection.signal.addEventListener('abort', () => {...})` + * - Check connection status synchronously: `if (connection.signal.aborted) {...}` + * - Pass to other APIs (fetch, setTimeout) for automatic cancellation + * + * The connection closes when the underlying stream ends, either normally or due to an error. + * + * @example + * ```typescript + * const connection = new ClientSideConnection(client, stream); + * + * // Listen for closure + * connection.signal.addEventListener('abort', () => { + * console.log('Connection closed - performing cleanup'); + * }); + * + * // Check status + * if (connection.signal.aborted) { + * console.log('Connection is already closed'); + * } + * + * // Pass to other APIs + * fetch(url, { signal: connection.signal }); + * ``` + */ + get signal(): AbortSignal { + return this.#connection.signal; + } + + /** + * Promise that resolves when the connection closes. + * + * The connection closes when the underlying stream ends, either normally or due to an error. + * Once closed, the connection cannot send or receive any more messages. + * + * This is useful for async/await style cleanup: + * + * @example + * ```typescript + * const connection = new ClientSideConnection(client, stream); + * await connection.closed; + * console.log('Connection closed - performing cleanup'); + * ``` + */ + get closed(): Promise { + return this.#connection.closed; + } } export type { AnyMessage } from "./jsonrpc.js"; @@ -697,6 +799,8 @@ class Connection { #notificationHandler: NotificationHandler; #stream: Stream; #writeQueue: Promise = Promise.resolve(); + #abortController = new AbortController(); + #closedPromise: Promise; constructor( requestHandler: RequestHandler, @@ -706,9 +810,42 @@ class Connection { this.#requestHandler = requestHandler; this.#notificationHandler = notificationHandler; this.#stream = stream; + this.#closedPromise = new Promise((resolve) => { + this.#abortController.signal.addEventListener("abort", () => resolve()); + }); this.#receive(); } + /** + * AbortSignal that aborts when the connection closes. + * + * This signal can be used to: + * - Listen for connection closure via event listeners + * - Check connection status synchronously with `signal.aborted` + * - Pass to other APIs (fetch, setTimeout) for automatic cancellation + */ + get signal(): AbortSignal { + return this.#abortController.signal; + } + + /** + * Promise that resolves when the connection closes. + * + * The connection closes when the underlying stream ends, either normally + * or due to an error. Once closed, the connection cannot send or receive + * any more messages. + * + * @example + * ```typescript + * const connection = new ClientSideConnection(client, stream); + * await connection.closed; + * console.log('Connection closed - performing cleanup'); + * ``` + */ + get closed(): Promise { + return this.#closedPromise; + } + async #receive() { const reader = this.#stream.readable.getReader(); try { @@ -744,6 +881,7 @@ class Connection { } } finally { reader.releaseLock(); + this.#abortController.abort(); } }