Skip to content

Commit 5c7b1b4

Browse files
committed
feat: make stabilization delay configurable
1 parent 1da476a commit 5c7b1b4

File tree

3 files changed

+160
-5
lines changed

3 files changed

+160
-5
lines changed
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
import { describe, test, expect, vi, beforeEach, afterEach } from 'vitest';
2+
import { SupabaseBroadcastAdapter } from '../src/lib/SupabaseBroadcastAdapter';
3+
import {
4+
createMockClient,
5+
} from './helpers/test-utils';
6+
import { RUN_ID } from './fixtures';
7+
import { mockChannelSubscription } from './mocks';
8+
9+
/**
10+
* Tests for configurable stabilization delay
11+
* Uses fake timers to verify the delay behavior without actual waiting
12+
*/
13+
describe('SupabaseBroadcastAdapter - Configurable Stabilization Delay', () => {
14+
beforeEach(() => {
15+
vi.useFakeTimers();
16+
// Silence console logs/errors in tests
17+
vi.spyOn(console, 'error').mockImplementation(() => { /* intentionally empty */ });
18+
vi.spyOn(console, 'log').mockImplementation(() => { /* intentionally empty */ });
19+
});
20+
21+
afterEach(() => {
22+
vi.useRealTimers();
23+
vi.restoreAllMocks();
24+
});
25+
26+
test('should wait for custom delay before subscription completes', async () => {
27+
const customDelay = 500;
28+
const { client, mocks } = createMockClient();
29+
30+
const adapter = new SupabaseBroadcastAdapter(client, {
31+
stabilizationDelayMs: customDelay
32+
});
33+
34+
// Setup channel subscription that emits SUBSCRIBED immediately
35+
mockChannelSubscription(mocks);
36+
37+
// Start subscription (returns promise)
38+
const subscribePromise = adapter.subscribeToRun(RUN_ID);
39+
40+
// Track whether promise has resolved
41+
let isResolved = false;
42+
subscribePromise.then(() => { isResolved = true; });
43+
44+
// Flush only microtasks (not timers) to process the SUBSCRIBED event
45+
await Promise.resolve();
46+
47+
// At this point, SUBSCRIBED has been received but we should still be waiting
48+
// for the stabilization delay
49+
expect(isResolved).toBe(false);
50+
51+
// Advance time by less than custom delay
52+
await vi.advanceTimersByTimeAsync(customDelay - 100);
53+
expect(isResolved).toBe(false); // Still waiting
54+
55+
// Advance past the custom delay
56+
await vi.advanceTimersByTimeAsync(100);
57+
expect(isResolved).toBe(true); // Now it's ready!
58+
});
59+
60+
test('should use default 300ms delay when not configured', async () => {
61+
const { client, mocks } = createMockClient();
62+
63+
const adapter = new SupabaseBroadcastAdapter(client);
64+
65+
mockChannelSubscription(mocks);
66+
67+
const subscribePromise = adapter.subscribeToRun(RUN_ID);
68+
69+
let isResolved = false;
70+
subscribePromise.then(() => { isResolved = true; });
71+
72+
// Flush only microtasks
73+
await Promise.resolve();
74+
75+
// Should NOT be ready before 300ms
76+
await vi.advanceTimersByTimeAsync(299);
77+
expect(isResolved).toBe(false);
78+
79+
// Should be ready after 300ms
80+
await vi.advanceTimersByTimeAsync(1);
81+
expect(isResolved).toBe(true);
82+
});
83+
84+
test('should be immediately ready when delay is 0', async () => {
85+
const { client, mocks } = createMockClient();
86+
87+
const adapter = new SupabaseBroadcastAdapter(client, {
88+
stabilizationDelayMs: 0
89+
});
90+
91+
mockChannelSubscription(mocks);
92+
93+
const subscribePromise = adapter.subscribeToRun(RUN_ID);
94+
95+
let isResolved = false;
96+
subscribePromise.then(() => { isResolved = true; });
97+
98+
// Flush microtasks and timers
99+
await vi.runAllTimersAsync();
100+
101+
// Should be ready immediately
102+
expect(isResolved).toBe(true);
103+
});
104+
105+
test('should allow different delays for different adapter instances', async () => {
106+
const { client: client1, mocks: mocks1 } = createMockClient();
107+
const { client: client2, mocks: mocks2 } = createMockClient();
108+
109+
const adapter1 = new SupabaseBroadcastAdapter(client1, {
110+
stabilizationDelayMs: 200
111+
});
112+
113+
const adapter2 = new SupabaseBroadcastAdapter(client2, {
114+
stabilizationDelayMs: 400
115+
});
116+
117+
mockChannelSubscription(mocks1);
118+
mockChannelSubscription(mocks2);
119+
120+
const promise1 = adapter1.subscribeToRun('run-1');
121+
const promise2 = adapter2.subscribeToRun('run-2');
122+
123+
let resolved1 = false;
124+
let resolved2 = false;
125+
promise1.then(() => { resolved1 = true; });
126+
promise2.then(() => { resolved2 = true; });
127+
128+
// Flush microtasks only
129+
await Promise.resolve();
130+
131+
// After 200ms, adapter1 should be ready but adapter2 should not
132+
await vi.advanceTimersByTimeAsync(200);
133+
expect(resolved1).toBe(true);
134+
expect(resolved2).toBe(false);
135+
136+
// After 400ms total, both should be ready
137+
await vi.advanceTimersByTimeAsync(200);
138+
expect(resolved1).toBe(true);
139+
expect(resolved2).toBe(true);
140+
});
141+
});

pkgs/client/src/lib/PgflowClient.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,18 @@ export class PgflowClient<TFlow extends AnyFlow = AnyFlow> implements IFlowClien
2828
* Creates a new PgflowClient instance
2929
*
3030
* @param supabaseClient - Supabase client instance
31+
* @param opts - Optional configuration
3132
*/
32-
constructor(supabaseClient: SupabaseClient) {
33+
constructor(
34+
supabaseClient: SupabaseClient,
35+
opts: {
36+
realtimeStabilizationDelayMs?: number;
37+
} = {}
38+
) {
3339
this.#supabase = supabaseClient;
34-
this.#realtimeAdapter = new SupabaseBroadcastAdapter(supabaseClient);
40+
this.#realtimeAdapter = new SupabaseBroadcastAdapter(supabaseClient, {
41+
stabilizationDelayMs: opts.realtimeStabilizationDelayMs,
42+
});
3543

3644
// Set up global event listeners - properly typed
3745
this.#realtimeAdapter.onRunEvent((event: BroadcastRunEvent) => {

pkgs/client/src/lib/SupabaseBroadcastAdapter.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,9 @@ export class SupabaseBroadcastAdapter implements IFlowRealtime {
2222
#channels: Map<string, RealtimeChannel> = new Map();
2323
#emitter = createNanoEvents<AdapterEvents>();
2424
#reconnectionDelay: number;
25+
#stabilizationDelay: number;
2526
#schedule: typeof setTimeout;
26-
27+
2728

2829
/**
2930
* Creates a new instance of SupabaseBroadcastAdapter
@@ -32,10 +33,15 @@ export class SupabaseBroadcastAdapter implements IFlowRealtime {
3233
*/
3334
constructor(
3435
supabase: SupabaseClient,
35-
opts: { reconnectDelayMs?: number; schedule?: typeof setTimeout } = {}
36+
opts: {
37+
reconnectDelayMs?: number;
38+
stabilizationDelayMs?: number;
39+
schedule?: typeof setTimeout;
40+
} = {}
3641
) {
3742
this.#supabase = supabase;
3843
this.#reconnectionDelay = opts.reconnectDelayMs ?? 2000;
44+
this.#stabilizationDelay = opts.stabilizationDelayMs ?? 300;
3945
this.#schedule = opts.schedule ?? setTimeout;
4046
}
4147

@@ -351,7 +357,7 @@ export class SupabaseBroadcastAdapter implements IFlowRealtime {
351357
// The SUBSCRIBED event is emitted before backend routing is fully established.
352358
// This delay ensures the backend can receive messages sent immediately after subscription.
353359
// See: https://github.com/supabase/supabase-js/issues/1599
354-
await new Promise(resolve => setTimeout(resolve, 300));
360+
await new Promise(resolve => this.#schedule(resolve, this.#stabilizationDelay));
355361

356362
this.#channels.set(run_id, channel);
357363

0 commit comments

Comments
 (0)