Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions packages/powersync_core/lib/src/sync/options.dart
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,11 @@ final class SyncOptions {
}

/// The PowerSync SDK offers two different implementations for receiving sync
/// lines: One handling most logic in Dart, and a newer one offloading that work
/// to the native PowerSync extension.
/// lines.
///
/// The recommended option is [rust], which can offload most work of the sync
/// client to the native PowerSync extension. `.dart` is a legacy implementation
/// that will be removed in a future release of the PowerSync Dart SDK.
enum SyncClientImplementation {
/// A sync implementation that decodes and handles sync lines in Dart.
@Deprecated(
Expand All @@ -72,14 +75,10 @@ enum SyncClientImplementation {
///
/// This implementation can be more performant than the Dart implementation,
/// and supports receiving sync lines in a more efficient format.
///
/// Note that this option is currently experimental.
@experimental
rust;

/// The default sync client implementation to use.
// ignore: deprecated_member_use_from_same_package
static const defaultClient = dart;
static const defaultClient = rust;
}

@internal
Expand Down
2 changes: 1 addition & 1 deletion packages/powersync_core/lib/src/sync/stream_utils.dart
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ Future<void> cancelAll(List<StreamSubscription<void>> subscriptions) async {
/// await the original future and report errors.
///
/// When using the regular [Stream.fromFuture], cancelling the subscription
/// before the future completes with an error could cause an handled error to
/// before the future completes with an error could cause an unhandled error to
/// be reported.
/// Further, it could cause concurrency issues in the stream client because it
/// was possible for us to:
Expand Down
22 changes: 17 additions & 5 deletions packages/powersync_core/lib/src/sync/streaming_sync.dart
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,7 @@ class StreamingSyncImplementation implements StreamingSync {
await handleLine(line as StreamingSyncLine);
case UploadCompleted():
case HandleChangedSubscriptions():
case ConnectionEvent():
// Only relevant for the Rust sync implementation.
break;
case AbortCurrentIteration():
Expand Down Expand Up @@ -647,20 +648,24 @@ final class _ActiveRustStreamingIteration {
}
}

Stream<ReceivedLine> _receiveLines(Object? data,
Stream<SyncEvent> _receiveLines(Object? data,
{required Future<void> onAbort}) {
return streamFromFutureAwaitInCancellation(
sync._postStreamRequest(data, true, onAbort: onAbort))
.asyncExpand<Object /* Uint8List | String */ >((response) {
.asyncExpand<SyncEvent>((response) async* {
if (response == null) {
return null;
return;
} else {
yield ConnectionEvent.established;

final contentType = response.headers['content-type'];
final isBson = contentType == 'application/vnd.powersync.bson-stream';

return isBson ? response.stream.bsonDocuments : response.stream.lines;
yield* (isBson ? response.stream.bsonDocuments : response.stream.lines)
.map(ReceivedLine.new);
yield ConnectionEvent.end;
}
}).map(ReceivedLine.new);
});
}

Future<RustSyncIterationResult> _handleLines(
Expand Down Expand Up @@ -696,6 +701,8 @@ final class _ActiveRustStreamingIteration {
}

switch (event) {
case ConnectionEvent():
await _control('connection', event.name);
case ReceivedLine(line: final Uint8List line):
_triggerCrudUploadOnFirstLine();
await _control('line_binary', line);
Expand Down Expand Up @@ -802,6 +809,11 @@ typedef RustSyncIterationResult = ({bool immediateRestart});

sealed class SyncEvent {}

enum ConnectionEvent implements SyncEvent {
established,
end,
}

final class ReceivedLine implements SyncEvent {
final Object /* String|Uint8List|StreamingSyncLine */ line;

Expand Down
30 changes: 11 additions & 19 deletions packages/powersync_core/test/connected_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -98,23 +98,9 @@ void main() {
// Create a new completer to await the next upload
uploadTriggeredCompleter = Completer();

// Connect the PowerSync instance
final connectedCompleter = Completer<void>();
// The first connection attempt will fail
final connectedErroredCompleter = Completer<void>();

db.statusStream.listen((status) {
if (status.connected && !connectedCompleter.isCompleted) {
connectedCompleter.complete();
}
if (status.downloadError != null &&
!connectedErroredCompleter.isCompleted) {
connectedErroredCompleter.complete();
}
});

// The first command will not be valid, this simulates a failed connection
testServer.addEvent('asdf\n');
// Connect the PowerSync instance. The first connection attempt will fail
await db.connect(connector: connector);

// The connect operation should have triggered an upload (even though it fails to connect)
Expand All @@ -124,14 +110,20 @@ void main() {
uploadTriggeredCompleter = Completer();

// Connection attempt should initially fail
await connectedErroredCompleter.future;
expect(db.currentStatus.anyError, isNotNull);
await expectLater(
db.statusStream,
emitsThrough(isA<SyncStatus>()
.having((e) => e.downloadError, 'downloadError', isNotNull)),
);

// Now send a valid command. Which will result in successful connection
await testServer.clearEvents();
testServer.addEvent('{"token_expires_in": 3600}\n');
await connectedCompleter.future;
expect(db.connected, isTrue);
await expectLater(
db.statusStream,
emitsThrough(
isA<SyncStatus>().having((e) => e.connected, 'connected', isTrue)),
);

await uploadTriggeredCompleter.future;
expect(uploadCounter, equals(2));
Expand Down
20 changes: 12 additions & 8 deletions packages/powersync_core/test/sync/in_memory_sync_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,13 @@ void main() {
group('rust sync client', () {
_declareTests(
'json',
SyncOptions(
syncImplementation: SyncClientImplementation.rust,
retryDelay: Duration(milliseconds: 200)),
SyncOptions(retryDelay: Duration(milliseconds: 200)),
false,
);

_declareTests(
'bson',
SyncOptions(
syncImplementation: SyncClientImplementation.rust,
retryDelay: Duration(milliseconds: 200)),
SyncOptions(retryDelay: Duration(milliseconds: 200)),
true,
);
});
Expand Down Expand Up @@ -98,7 +94,7 @@ void _declareTests(String name, SyncOptions options, bool bson) {
});

Future<StreamQueue<SyncStatus>> waitForConnection(
{bool expectNoWarnings = true}) async {
{bool expectNoWarnings = true, bool addKeepLive = true}) async {
if (expectNoWarnings) {
logger.onRecord.listen((e) {
if (e.level >= Level.WARNING) {
Expand All @@ -114,7 +110,10 @@ void _declareTests(String name, SyncOptions options, bool bson) {
final status = StreamQueue(database.statusStream);
addTearDown(status.cancel);

syncService.addKeepAlive();
if (addKeepLive) {
syncService.addKeepAlive();
}

await expectLater(status,
emitsThrough(isSyncStatus(connected: true, hasSynced: false)));
return status;
Expand Down Expand Up @@ -315,6 +314,11 @@ void _declareTests(String name, SyncOptions options, bool bson) {

await expectLater(query, emits(isEmpty));
});

test('marks as connected even without sync line', () async {
await waitForConnection(addKeepLive: false);
expect(database.currentStatus.connected, isTrue);
});
}

group('partial sync', () {
Expand Down
3 changes: 1 addition & 2 deletions packages/powersync_core/test/test_server.dart
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,8 @@ final class TestServer {
maxConnectionCount = max(connectionCount, maxConnectionCount);

stream() async* {
var blob = "*" * 5000;
for (var i = 0; i < 50; i++) {
yield {"token_expires_in": tokenExpiresIn, "blob": blob};
yield {"token_expires_in": tokenExpiresIn};
await Future<void>.delayed(Duration(microseconds: 1));
}
}
Expand Down