Skip to content

Commit 186846b

Browse files
committed
Fix streams.append for s2, and a bunch of other little things, plus tests
1 parent 55ed015 commit 186846b

File tree

13 files changed

+962
-106
lines changed

13 files changed

+962
-106
lines changed

apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.append.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { json } from "@remix-run/server-runtime";
22
import { tryCatch } from "@trigger.dev/core/utils";
3+
import { nanoid } from "nanoid";
34
import { z } from "zod";
45
import { $replica, prisma } from "~/db.server";
56
import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server";
@@ -96,8 +97,10 @@ const { action } = createActionApiRoute(
9697
targetRun.realtimeStreamsVersion
9798
);
9899

100+
const partId = request.headers.get("X-Part-Id") ?? nanoid(7);
101+
99102
const [appendError] = await tryCatch(
100-
realtimeStream.appendPart(part, targetId, params.streamId)
103+
realtimeStream.appendPart(part, partId, targetId, params.streamId)
101104
);
102105

103106
if (appendError) {

apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.streams.$streamKey/route.tsx

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,8 +118,6 @@ export function RealtimeStreamViewer({
118118
const [mouseOver, setMouseOver] = useState(false);
119119
const [copied, setCopied] = useState(false);
120120

121-
console.log("chunks.length", chunks.length);
122-
123121
const getCompactText = useCallback(() => {
124122
return chunks
125123
.map((chunk) => {

apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
334334
}
335335
}
336336

337-
async appendPart(part: string, runId: string, streamId: string): Promise<void> {
337+
async appendPart(part: string, partId: string, runId: string, streamId: string): Promise<void> {
338338
const redis = new Redis(this.options.redis ?? {});
339339
const streamKey = `stream:${runId}:${streamId}`;
340340

apps/webapp/app/services/realtime/s2realtimeStreams.server.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,12 +92,16 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor {
9292
throw new Error("S2 streams are written to S2 via the client, not from the server");
9393
}
9494

95-
async appendPart(part: string, runId: string, streamId: string): Promise<void> {
95+
async appendPart(part: string, partId: string, runId: string, streamId: string): Promise<void> {
9696
const s2Stream = this.toStreamName(runId, streamId);
9797

98-
await this.s2Append(s2Stream, {
99-
records: [{ body: part }],
98+
this.logger.info(`S2 appending to stream`, { part, stream: s2Stream });
99+
100+
const result = await this.s2Append(s2Stream, {
101+
records: [{ body: JSON.stringify({ data: part, id: partId }) }],
100102
});
103+
104+
this.logger.info(`S2 append result`, { result });
101105
}
102106

103107
getLastChunkIndex(runId: string, streamId: string, clientId: string): Promise<number> {
@@ -116,6 +120,8 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor {
116120
const s2Stream = this.toStreamName(runId, streamId);
117121
const startSeq = this.parseLastEventId(options?.lastEventId);
118122

123+
this.logger.info(`S2 streaming records from stream`, { stream: s2Stream, startSeq });
124+
119125
// Request SSE stream from S2 and return it directly
120126
const s2Response = await this.s2StreamRecords(s2Stream, {
121127
seq_num: startSeq ?? 0,

apps/webapp/app/services/realtime/types.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ export interface StreamIngestor {
1313
resumeFromChunk?: number
1414
): Promise<Response>;
1515

16-
appendPart(part: string, runId: string, streamId: string): Promise<void>;
16+
appendPart(part: string, partId: string, runId: string, streamId: string): Promise<void>;
1717

1818
getLastChunkIndex(runId: string, streamId: string, clientId: string): Promise<number>;
1919
}

packages/core/src/v3/apiClient/index.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ const DEFAULT_ZOD_FETCH_OPTIONS: ZodFetchOptions = {
132132
};
133133

134134
export type ApiClientFutureFlags = {
135-
unstable_v2RealtimeStreams?: boolean;
135+
v2RealtimeStreams?: boolean;
136136
};
137137

138138
export { isRequestOptions, SSEStreamSubscription };
@@ -1214,7 +1214,7 @@ export class ApiClient {
12141214
headers[API_VERSION_HEADER_NAME] = API_VERSION;
12151215

12161216
if (
1217-
this.futureFlags.unstable_v2RealtimeStreams ||
1217+
this.futureFlags.v2RealtimeStreams ||
12181218
getEnvVar("TRIGGER_V2_REALTIME_STREAMS") === "1" ||
12191219
getEnvVar("TRIGGER_V2_REALTIME_STREAMS") === "true" ||
12201220
getEnvVar("TRIGGER_REALTIME_STREAMS_V2") === "1" ||

packages/core/src/v3/realtimeStreams/manager.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ export class StandardRealtimeStreamsManager implements RealtimeStreamsManager {
8787

8888
const result = await this.apiClient.appendToStream(
8989
runId,
90-
options?.target ?? "self",
90+
"self",
9191
key,
9292
part,
9393
options?.requestOptions
@@ -141,11 +141,11 @@ export class StandardRealtimeStreamsManager implements RealtimeStreamsManager {
141141
function getRunIdForOptions(options?: RealtimeStreamOperationOptions): string | undefined {
142142
if (options?.target) {
143143
if (options.target === "parent") {
144-
return taskContext.ctx?.run?.parentTaskRunId;
144+
return taskContext.ctx?.run?.parentTaskRunId ?? taskContext.ctx?.run?.id;
145145
}
146146

147147
if (options.target === "root") {
148-
return taskContext.ctx?.run?.rootTaskRunId;
148+
return taskContext.ctx?.run?.rootTaskRunId ?? taskContext.ctx?.run?.id;
149149
}
150150

151151
if (options.target === "self") {

packages/core/src/v3/realtimeStreams/types.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ export type RealtimeDefinedStream<TPart> = {
3939
) => PipeStreamResult<TPart>;
4040
read: (runId: string, options?: ReadStreamOptions) => Promise<AsyncIterableStream<TPart>>;
4141
append: (value: TPart, options?: AppendStreamOptions) => Promise<void>;
42-
writer: (options: WriterStreamOptions<TPart>) => void;
42+
writer: (options: WriterStreamOptions<TPart>) => PipeStreamResult<TPart>;
4343
};
4444

4545
export type InferStreamType<T> = T extends RealtimeDefinedStream<infer TPart> ? TPart : unknown;

packages/core/src/v3/types/tasks.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -593,7 +593,8 @@ export interface Task<TIdentifier extends string, TInput = void, TOutput = any>
593593
*/
594594
triggerAndWait: (
595595
payload: TInput,
596-
options?: TriggerAndWaitOptions
596+
options?: TriggerAndWaitOptions,
597+
requestOptions?: TriggerApiRequestOptions
597598
) => TaskRunPromise<TIdentifier, TOutput>;
598599

599600
/**

packages/trigger-sdk/src/v3/shared.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ export function createTask<
185185
params.queue?.name
186186
);
187187
},
188-
triggerAndWait: (payload, options) => {
188+
triggerAndWait: (payload, options, requestOptions) => {
189189
return new TaskRunPromise<TIdentifier, TOutput>((resolve, reject) => {
190190
triggerAndWait_internal<TIdentifier, TInput, TOutput>(
191191
"triggerAndWait()",
@@ -195,7 +195,8 @@ export function createTask<
195195
{
196196
queue: params.queue?.name,
197197
...options,
198-
}
198+
},
199+
requestOptions
199200
)
200201
.then((result) => {
201202
resolve(result);

0 commit comments

Comments
 (0)