Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 178 additions & 0 deletions src/acp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,184 @@ describe("Connection", () => {
});
});

it("resolves closed promise when stream ends", async () => {
const closeLog: string[] = [];

// Create simple client and agent
class TestClient implements Client {
async writeTextFile(
_: WriteTextFileRequest,
): Promise<WriteTextFileResponse> {
return {};
}
async readTextFile(
_: ReadTextFileRequest,
): Promise<ReadTextFileResponse> {
return { content: "test" };
}
async requestPermission(
_: RequestPermissionRequest,
): Promise<RequestPermissionResponse> {
return {
outcome: {
outcome: "selected",
optionId: "allow",
},
};
}
async sessionUpdate(_: SessionNotification): Promise<void> {
// no-op
}
}

class TestAgent implements Agent {
async initialize(_: InitializeRequest): Promise<InitializeResponse> {
return {
protocolVersion: PROTOCOL_VERSION,
agentCapabilities: { loadSession: false },
};
}
async newSession(_: NewSessionRequest): Promise<NewSessionResponse> {
return { sessionId: "test-session" };
}
async authenticate(_: AuthenticateRequest): Promise<void> {
// no-op
}
async prompt(_: PromptRequest): Promise<PromptResponse> {
return { stopReason: "end_turn" };
}
async cancel(_: CancelNotification): Promise<void> {
// no-op
}
}

// Set up connections
const agentConnection = new ClientSideConnection(
() => new TestClient(),
ndJsonStream(clientToAgent.writable, agentToClient.readable),
);

const clientConnection = new AgentSideConnection(
() => new TestAgent(),
ndJsonStream(agentToClient.writable, clientToAgent.readable),
);

// Listen for close via signal
agentConnection.signal.addEventListener("abort", () => {
closeLog.push("agent connection closed (signal)");
});

clientConnection.signal.addEventListener("abort", () => {
closeLog.push("client connection closed (signal)");
});

// Verify connections are not closed yet
expect(agentConnection.signal.aborted).toBe(false);
expect(clientConnection.signal.aborted).toBe(false);
expect(closeLog).toHaveLength(0);

// Close the streams by closing the writable ends
await clientToAgent.writable.close();
await agentToClient.writable.close();

// Wait for closed promises to resolve
await agentConnection.closed;
await clientConnection.closed;

// Verify connections are now closed
expect(agentConnection.signal.aborted).toBe(true);
expect(clientConnection.signal.aborted).toBe(true);
expect(closeLog).toContain("agent connection closed (signal)");
expect(closeLog).toContain("client connection closed (signal)");
});

it("supports removing signal event listeners", async () => {
const closeLog: string[] = [];

// Create simple client and agent
class TestClient implements Client {
async writeTextFile(
_: WriteTextFileRequest,
): Promise<WriteTextFileResponse> {
return {};
}
async readTextFile(
_: ReadTextFileRequest,
): Promise<ReadTextFileResponse> {
return { content: "test" };
}
async requestPermission(
_: RequestPermissionRequest,
): Promise<RequestPermissionResponse> {
return {
outcome: {
outcome: "selected",
optionId: "allow",
},
};
}
async sessionUpdate(_: SessionNotification): Promise<void> {
// no-op
}
}

class TestAgent implements Agent {
async initialize(_: InitializeRequest): Promise<InitializeResponse> {
return {
protocolVersion: PROTOCOL_VERSION,
agentCapabilities: { loadSession: false },
};
}
async newSession(_: NewSessionRequest): Promise<NewSessionResponse> {
return { sessionId: "test-session" };
}
async authenticate(_: AuthenticateRequest): Promise<void> {
// no-op
}
async prompt(_: PromptRequest): Promise<PromptResponse> {
return { stopReason: "end_turn" };
}
async cancel(_: CancelNotification): Promise<void> {
// no-op
}
}

// Set up connections
const agentConnection = new ClientSideConnection(
() => new TestClient(),
ndJsonStream(clientToAgent.writable, agentToClient.readable),
);

new AgentSideConnection(
() => new TestAgent(),
ndJsonStream(agentToClient.writable, clientToAgent.readable),
);

// Register and then remove a listener
const listener = () => {
closeLog.push("this should not be called");
};

agentConnection.signal.addEventListener("abort", listener);
agentConnection.signal.removeEventListener("abort", listener);

// Register another listener that should be called
agentConnection.signal.addEventListener("abort", () => {
closeLog.push("agent connection closed");
});

// Close the streams
await clientToAgent.writable.close();
await agentToClient.writable.close();

// Wait for closed promise
await agentConnection.closed;

// Verify only the non-removed listener was called
expect(closeLog).toEqual(["agent connection closed"]);
expect(closeLog).not.toContain("this should not be called");
});

it("handles methods returning response objects with _meta or void", async () => {
// Create client that returns both response objects and void
class TestClient implements Client {
Expand Down
138 changes: 138 additions & 0 deletions src/acp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,57 @@ export class AgentSideConnection {
): Promise<void> {
return await this.#connection.sendNotification(`_${method}`, params);
}

/**
* AbortSignal that aborts when the connection closes.
*
* This signal can be used to:
* - Listen for connection closure: `connection.signal.addEventListener('abort', () => {...})`
* - Check connection status synchronously: `if (connection.signal.aborted) {...}`
* - Pass to other APIs (fetch, setTimeout) for automatic cancellation
*
* The connection closes when the underlying stream ends, either normally or due to an error.
*
* @example
* ```typescript
* const connection = new AgentSideConnection(agent, stream);
*
* // Listen for closure
* connection.signal.addEventListener('abort', () => {
* console.log('Connection closed - performing cleanup');
* });
*
* // Check status
* if (connection.signal.aborted) {
* console.log('Connection is already closed');
* }
*
* // Pass to other APIs
* fetch(url, { signal: connection.signal });
* ```
*/
get signal(): AbortSignal {
return this.#connection.signal;
}

/**
* Promise that resolves when the connection closes.
*
* The connection closes when the underlying stream ends, either normally or due to an error.
* Once closed, the connection cannot send or receive any more messages.
*
* This is useful for async/await style cleanup:
*
* @example
* ```typescript
* const connection = new AgentSideConnection(agent, stream);
* await connection.closed;
* console.log('Connection closed - performing cleanup');
* ```
*/
get closed(): Promise<void> {
return this.#connection.closed;
}
}

/**
Expand Down Expand Up @@ -686,6 +737,57 @@ export class ClientSideConnection implements Agent {
): Promise<void> {
return await this.#connection.sendNotification(`_${method}`, params);
}

/**
* AbortSignal that aborts when the connection closes.
*
* This signal can be used to:
* - Listen for connection closure: `connection.signal.addEventListener('abort', () => {...})`
* - Check connection status synchronously: `if (connection.signal.aborted) {...}`
* - Pass to other APIs (fetch, setTimeout) for automatic cancellation
*
* The connection closes when the underlying stream ends, either normally or due to an error.
*
* @example
* ```typescript
* const connection = new ClientSideConnection(client, stream);
*
* // Listen for closure
* connection.signal.addEventListener('abort', () => {
* console.log('Connection closed - performing cleanup');
* });
*
* // Check status
* if (connection.signal.aborted) {
* console.log('Connection is already closed');
* }
*
* // Pass to other APIs
* fetch(url, { signal: connection.signal });
* ```
*/
get signal(): AbortSignal {
return this.#connection.signal;
}

/**
* Promise that resolves when the connection closes.
*
* The connection closes when the underlying stream ends, either normally or due to an error.
* Once closed, the connection cannot send or receive any more messages.
*
* This is useful for async/await style cleanup:
*
* @example
* ```typescript
* const connection = new ClientSideConnection(client, stream);
* await connection.closed;
* console.log('Connection closed - performing cleanup');
* ```
*/
get closed(): Promise<void> {
return this.#connection.closed;
}
}

export type { AnyMessage } from "./jsonrpc.js";
Expand All @@ -697,6 +799,8 @@ class Connection {
#notificationHandler: NotificationHandler;
#stream: Stream;
#writeQueue: Promise<void> = Promise.resolve();
#abortController = new AbortController();
#closedPromise: Promise<void>;

constructor(
requestHandler: RequestHandler,
Expand All @@ -706,9 +810,42 @@ class Connection {
this.#requestHandler = requestHandler;
this.#notificationHandler = notificationHandler;
this.#stream = stream;
this.#closedPromise = new Promise((resolve) => {
this.#abortController.signal.addEventListener("abort", () => resolve());
});
this.#receive();
}

/**
* AbortSignal that aborts when the connection closes.
*
* This signal can be used to:
* - Listen for connection closure via event listeners
* - Check connection status synchronously with `signal.aborted`
* - Pass to other APIs (fetch, setTimeout) for automatic cancellation
*/
get signal(): AbortSignal {
return this.#abortController.signal;
}

/**
* Promise that resolves when the connection closes.
*
* The connection closes when the underlying stream ends, either normally
* or due to an error. Once closed, the connection cannot send or receive
* any more messages.
*
* @example
* ```typescript
* const connection = new ClientSideConnection(client, stream);
* await connection.closed;
* console.log('Connection closed - performing cleanup');
* ```
*/
get closed(): Promise<void> {
return this.#closedPromise;
}

async #receive() {
const reader = this.#stream.readable.getReader();
try {
Expand Down Expand Up @@ -744,6 +881,7 @@ class Connection {
}
} finally {
reader.releaseLock();
this.#abortController.abort();
}
}

Expand Down