Skip to content

Commit 483c47e

Browse files
committed
Mark connected before first line
1 parent 40866f9 commit 483c47e

File tree

4 files changed

+35
-26
lines changed

4 files changed

+35
-26
lines changed

packages/powersync_core/lib/src/sync/stream_utils.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ Future<void> cancelAll(List<StreamSubscription<void>> subscriptions) async {
115115
/// await the original future and report errors.
116116
///
117117
/// When using the regular [Stream.fromFuture], cancelling the subscription
118-
/// before the future completes with an error could cause an handled error to
118+
/// before the future completes with an error could cause an unhandled error to
119119
/// be reported.
120120
/// Further, it could cause concurrency issues in the stream client because it
121121
/// was possible for us to:

packages/powersync_core/lib/src/sync/streaming_sync.dart

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -470,6 +470,7 @@ class StreamingSyncImplementation implements StreamingSync {
470470
await handleLine(line as StreamingSyncLine);
471471
case UploadCompleted():
472472
case HandleChangedSubscriptions():
473+
case ConnectionEvent():
473474
// Only relevant for the Rust sync implementation.
474475
break;
475476
case AbortCurrentIteration():
@@ -647,20 +648,24 @@ final class _ActiveRustStreamingIteration {
647648
}
648649
}
649650

650-
Stream<ReceivedLine> _receiveLines(Object? data,
651+
Stream<SyncEvent> _receiveLines(Object? data,
651652
{required Future<void> onAbort}) {
652653
return streamFromFutureAwaitInCancellation(
653654
sync._postStreamRequest(data, true, onAbort: onAbort))
654-
.asyncExpand<Object /* Uint8List | String */ >((response) {
655+
.asyncExpand<SyncEvent>((response) async* {
655656
if (response == null) {
656-
return null;
657+
return;
657658
} else {
659+
yield ConnectionEvent.established;
660+
658661
final contentType = response.headers['content-type'];
659662
final isBson = contentType == 'application/vnd.powersync.bson-stream';
660663

661-
return isBson ? response.stream.bsonDocuments : response.stream.lines;
664+
yield* (isBson ? response.stream.bsonDocuments : response.stream.lines)
665+
.map(ReceivedLine.new);
666+
yield ConnectionEvent.end;
662667
}
663-
}).map(ReceivedLine.new);
668+
});
664669
}
665670

666671
Future<RustSyncIterationResult> _handleLines(
@@ -696,6 +701,8 @@ final class _ActiveRustStreamingIteration {
696701
}
697702

698703
switch (event) {
704+
case ConnectionEvent():
705+
await _control('connection', event.name);
699706
case ReceivedLine(line: final Uint8List line):
700707
_triggerCrudUploadOnFirstLine();
701708
await _control('line_binary', line);
@@ -802,6 +809,11 @@ typedef RustSyncIterationResult = ({bool immediateRestart});
802809

803810
sealed class SyncEvent {}
804811

812+
enum ConnectionEvent implements SyncEvent {
813+
established,
814+
end,
815+
}
816+
805817
final class ReceivedLine implements SyncEvent {
806818
final Object /* String|Uint8List|StreamingSyncLine */ line;
807819

packages/powersync_core/test/connected_test.dart

Lines changed: 11 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -98,23 +98,9 @@ void main() {
9898
// Create a new completer to await the next upload
9999
uploadTriggeredCompleter = Completer();
100100

101-
// Connect the PowerSync instance
102-
final connectedCompleter = Completer<void>();
103-
// The first connection attempt will fail
104-
final connectedErroredCompleter = Completer<void>();
105-
106-
db.statusStream.listen((status) {
107-
if (status.connected && !connectedCompleter.isCompleted) {
108-
connectedCompleter.complete();
109-
}
110-
if (status.downloadError != null &&
111-
!connectedErroredCompleter.isCompleted) {
112-
connectedErroredCompleter.complete();
113-
}
114-
});
115-
116101
// The first command will not be valid, this simulates a failed connection
117102
testServer.addEvent('asdf\n');
103+
// Connect the PowerSync instance. The first connection attempt will fail
118104
await db.connect(connector: connector);
119105

120106
// The connect operation should have triggered an upload (even though it fails to connect)
@@ -124,14 +110,20 @@ void main() {
124110
uploadTriggeredCompleter = Completer();
125111

126112
// Connection attempt should initially fail
127-
await connectedErroredCompleter.future;
128-
expect(db.currentStatus.anyError, isNotNull);
113+
await expectLater(
114+
db.statusStream,
115+
emitsThrough(isA<SyncStatus>()
116+
.having((e) => e.downloadError, 'downloadError', isNotNull)),
117+
);
129118

130119
// Now send a valid command. Which will result in successful connection
131120
await testServer.clearEvents();
132121
testServer.addEvent('{"token_expires_in": 3600}\n');
133-
await connectedCompleter.future;
134-
expect(db.connected, isTrue);
122+
await expectLater(
123+
db.statusStream,
124+
emitsThrough(
125+
isA<SyncStatus>().having((e) => e.connected, 'connected', isTrue)),
126+
);
135127

136128
await uploadTriggeredCompleter.future;
137129
expect(uploadCounter, equals(2));

packages/powersync_core/test/sync/in_memory_sync_test.dart

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ void _declareTests(String name, SyncOptions options, bool bson) {
9494
});
9595

9696
Future<StreamQueue<SyncStatus>> waitForConnection(
97-
{bool expectNoWarnings = true}) async {
97+
{bool expectNoWarnings = true, bool addKeepLive = true}) async {
9898
if (expectNoWarnings) {
9999
logger.onRecord.listen((e) {
100100
if (e.level >= Level.WARNING) {
@@ -311,6 +311,11 @@ void _declareTests(String name, SyncOptions options, bool bson) {
311311

312312
await expectLater(query, emits(isEmpty));
313313
});
314+
315+
test('marks as connected even without sync line', () async {
316+
await waitForConnection(addKeepLive: false);
317+
expect(database.currentStatus.connected, isTrue);
318+
});
314319
}
315320

316321
group('partial sync', () {

0 commit comments

Comments
 (0)