diff --git a/packages/powersync_core/lib/src/sync/options.dart b/packages/powersync_core/lib/src/sync/options.dart index ee8b3c63..12e4c331 100644 --- a/packages/powersync_core/lib/src/sync/options.dart +++ b/packages/powersync_core/lib/src/sync/options.dart @@ -57,8 +57,11 @@ final class SyncOptions { } /// The PowerSync SDK offers two different implementations for receiving sync -/// lines: One handling most logic in Dart, and a newer one offloading that work -/// to the native PowerSync extension. +/// lines. +/// +/// The recommended option is [rust], which can offload most work of the sync +/// client to the native PowerSync extension. `.dart` is a legacy implementation +/// that will be removed in a future release of the PowerSync Dart SDK. enum SyncClientImplementation { /// A sync implementation that decodes and handles sync lines in Dart. @Deprecated( @@ -72,14 +75,10 @@ enum SyncClientImplementation { /// /// This implementation can be more performant than the Dart implementation, /// and supports receiving sync lines in a more efficient format. - /// - /// Note that this option is currently experimental. - @experimental rust; /// The default sync client implementation to use. - // ignore: deprecated_member_use_from_same_package - static const defaultClient = dart; + static const defaultClient = rust; } @internal diff --git a/packages/powersync_core/lib/src/sync/stream_utils.dart b/packages/powersync_core/lib/src/sync/stream_utils.dart index d531e783..65b6f891 100644 --- a/packages/powersync_core/lib/src/sync/stream_utils.dart +++ b/packages/powersync_core/lib/src/sync/stream_utils.dart @@ -115,7 +115,7 @@ Future cancelAll(List> subscriptions) async { /// await the original future and report errors. /// /// When using the regular [Stream.fromFuture], cancelling the subscription -/// before the future completes with an error could cause an handled error to +/// before the future completes with an error could cause an unhandled error to /// be reported. /// Further, it could cause concurrency issues in the stream client because it /// was possible for us to: diff --git a/packages/powersync_core/lib/src/sync/streaming_sync.dart b/packages/powersync_core/lib/src/sync/streaming_sync.dart index 60deef12..0d7ec7a7 100644 --- a/packages/powersync_core/lib/src/sync/streaming_sync.dart +++ b/packages/powersync_core/lib/src/sync/streaming_sync.dart @@ -470,6 +470,7 @@ class StreamingSyncImplementation implements StreamingSync { await handleLine(line as StreamingSyncLine); case UploadCompleted(): case HandleChangedSubscriptions(): + case ConnectionEvent(): // Only relevant for the Rust sync implementation. break; case AbortCurrentIteration(): @@ -647,20 +648,24 @@ final class _ActiveRustStreamingIteration { } } - Stream _receiveLines(Object? data, + Stream _receiveLines(Object? data, {required Future onAbort}) { return streamFromFutureAwaitInCancellation( sync._postStreamRequest(data, true, onAbort: onAbort)) - .asyncExpand((response) { + .asyncExpand((response) async* { if (response == null) { - return null; + return; } else { + yield ConnectionEvent.established; + final contentType = response.headers['content-type']; final isBson = contentType == 'application/vnd.powersync.bson-stream'; - return isBson ? response.stream.bsonDocuments : response.stream.lines; + yield* (isBson ? response.stream.bsonDocuments : response.stream.lines) + .map(ReceivedLine.new); + yield ConnectionEvent.end; } - }).map(ReceivedLine.new); + }); } Future _handleLines( @@ -696,6 +701,8 @@ final class _ActiveRustStreamingIteration { } switch (event) { + case ConnectionEvent(): + await _control('connection', event.name); case ReceivedLine(line: final Uint8List line): _triggerCrudUploadOnFirstLine(); await _control('line_binary', line); @@ -802,6 +809,11 @@ typedef RustSyncIterationResult = ({bool immediateRestart}); sealed class SyncEvent {} +enum ConnectionEvent implements SyncEvent { + established, + end, +} + final class ReceivedLine implements SyncEvent { final Object /* String|Uint8List|StreamingSyncLine */ line; diff --git a/packages/powersync_core/test/connected_test.dart b/packages/powersync_core/test/connected_test.dart index a2188958..429130b7 100644 --- a/packages/powersync_core/test/connected_test.dart +++ b/packages/powersync_core/test/connected_test.dart @@ -98,23 +98,9 @@ void main() { // Create a new completer to await the next upload uploadTriggeredCompleter = Completer(); - // Connect the PowerSync instance - final connectedCompleter = Completer(); - // The first connection attempt will fail - final connectedErroredCompleter = Completer(); - - db.statusStream.listen((status) { - if (status.connected && !connectedCompleter.isCompleted) { - connectedCompleter.complete(); - } - if (status.downloadError != null && - !connectedErroredCompleter.isCompleted) { - connectedErroredCompleter.complete(); - } - }); - // The first command will not be valid, this simulates a failed connection testServer.addEvent('asdf\n'); + // Connect the PowerSync instance. The first connection attempt will fail await db.connect(connector: connector); // The connect operation should have triggered an upload (even though it fails to connect) @@ -124,14 +110,20 @@ void main() { uploadTriggeredCompleter = Completer(); // Connection attempt should initially fail - await connectedErroredCompleter.future; - expect(db.currentStatus.anyError, isNotNull); + await expectLater( + db.statusStream, + emitsThrough(isA() + .having((e) => e.downloadError, 'downloadError', isNotNull)), + ); // Now send a valid command. Which will result in successful connection await testServer.clearEvents(); testServer.addEvent('{"token_expires_in": 3600}\n'); - await connectedCompleter.future; - expect(db.connected, isTrue); + await expectLater( + db.statusStream, + emitsThrough( + isA().having((e) => e.connected, 'connected', isTrue)), + ); await uploadTriggeredCompleter.future; expect(uploadCounter, equals(2)); diff --git a/packages/powersync_core/test/sync/in_memory_sync_test.dart b/packages/powersync_core/test/sync/in_memory_sync_test.dart index 87c18d82..6a465dab 100644 --- a/packages/powersync_core/test/sync/in_memory_sync_test.dart +++ b/packages/powersync_core/test/sync/in_memory_sync_test.dart @@ -30,17 +30,13 @@ void main() { group('rust sync client', () { _declareTests( 'json', - SyncOptions( - syncImplementation: SyncClientImplementation.rust, - retryDelay: Duration(milliseconds: 200)), + SyncOptions(retryDelay: Duration(milliseconds: 200)), false, ); _declareTests( 'bson', - SyncOptions( - syncImplementation: SyncClientImplementation.rust, - retryDelay: Duration(milliseconds: 200)), + SyncOptions(retryDelay: Duration(milliseconds: 200)), true, ); }); @@ -98,7 +94,7 @@ void _declareTests(String name, SyncOptions options, bool bson) { }); Future> waitForConnection( - {bool expectNoWarnings = true}) async { + {bool expectNoWarnings = true, bool addKeepLive = true}) async { if (expectNoWarnings) { logger.onRecord.listen((e) { if (e.level >= Level.WARNING) { @@ -114,7 +110,10 @@ void _declareTests(String name, SyncOptions options, bool bson) { final status = StreamQueue(database.statusStream); addTearDown(status.cancel); - syncService.addKeepAlive(); + if (addKeepLive) { + syncService.addKeepAlive(); + } + await expectLater(status, emitsThrough(isSyncStatus(connected: true, hasSynced: false))); return status; @@ -315,6 +314,11 @@ void _declareTests(String name, SyncOptions options, bool bson) { await expectLater(query, emits(isEmpty)); }); + + test('marks as connected even without sync line', () async { + await waitForConnection(addKeepLive: false); + expect(database.currentStatus.connected, isTrue); + }); } group('partial sync', () { diff --git a/packages/powersync_core/test/test_server.dart b/packages/powersync_core/test/test_server.dart index 372da797..f1f38f5c 100644 --- a/packages/powersync_core/test/test_server.dart +++ b/packages/powersync_core/test/test_server.dart @@ -44,9 +44,8 @@ final class TestServer { maxConnectionCount = max(connectionCount, maxConnectionCount); stream() async* { - var blob = "*" * 5000; for (var i = 0; i < 50; i++) { - yield {"token_expires_in": tokenExpiresIn, "blob": blob}; + yield {"token_expires_in": tokenExpiresIn}; await Future.delayed(Duration(microseconds: 1)); } }