Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ jobs:
strategy:
fail-fast: false
matrix:
postgres-version: [11, 12, 13, 14, 15, 16, 17]
postgres-version: [11, 12, 13, 14, 15, 16, 17, 18]

steps:
- uses: actions/checkout@v5
Expand Down Expand Up @@ -210,7 +210,7 @@ jobs:
-e POSTGRES_PASSWORD=postgres \
-e POSTGRES_DB=powersync_storage_test \
-p 5431:5432 \
-d postgres:16
-d postgres:18

- name: Enable Corepack
run: corepack enable
Expand Down Expand Up @@ -270,7 +270,7 @@ jobs:
-e POSTGRES_PASSWORD=postgres \
-e POSTGRES_DB=powersync_storage_test \
-p 5431:5432 \
-d postgres:16
-d postgres:18

- name: Enable Corepack
run: corepack enable
Expand Down
7 changes: 6 additions & 1 deletion libs/lib-postgres/src/db/connection/DatabaseClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,12 @@ export class DatabaseClient extends AbstractPostgresConnection<DatabaseClientLis
}

async [Symbol.asyncDispose]() {
await this.initialized;
try {
await this.initialized;
} catch (e) {
// Error was already reported when initializing - ignore here.
// If if throw this, we typically get a SuppressedError, which is difficult to debug.
}
this.closed = true;

for (const c of this.connections) {
Expand Down
65 changes: 63 additions & 2 deletions modules/module-postgres/test/src/wal_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -315,13 +315,74 @@ bucket_definitions:
await pool.query(`UPDATE test_data SET description = 'updated'`);
await pool.query('CREATE PUBLICATION powersync FOR ALL TABLES');

const serverVersion = await context.connectionManager.getServerVersion();

await context.loadActiveSyncRules();
await expect(async () => {

if (serverVersion!.compareMain('18.0.0') >= 0) {
await context.replicateSnapshot();
}).rejects.toThrowError(MissingReplicationSlotError);
// No error expected in Postres 18
// TODO: introduce new test scenario for Postgres 18 that _does_ invalidate the replication slot.
} else {
// Postgres < 18 invalidates the replication slot when the publication is re-created.
// The error is handled on a higher level, which triggers
// creating a new replication slot.
await expect(async () => {
await context.replicateSnapshot();
}).rejects.toThrowError(MissingReplicationSlotError);
}
}
});

test('dropped replication slot', async () => {
{
await using context = await WalStreamTestContext.open(factory);
const { pool } = context;
await context.updateSyncRules(`
bucket_definitions:
global:
data:
- SELECT id, description FROM "test_data"`);

await pool.query(
`CREATE TABLE test_data(id uuid primary key default uuid_generate_v4(), description text, num int8)`
);
await pool.query(
`INSERT INTO test_data(id, description) VALUES('8133cd37-903b-4937-a022-7c8294015a3a', 'test1') returning id as test_id`
);
await context.replicateSnapshot();
await context.startStreaming();

const data = await context.getBucketData('global[]');

expect(data).toMatchObject([
putOp('test_data', {
id: '8133cd37-903b-4937-a022-7c8294015a3a',
description: 'test1'
})
]);

expect(await context.storage!.getStatus()).toMatchObject({ active: true, snapshot_done: true });
}

{
await using context = await WalStreamTestContext.open(factory, { doNotClear: true });
const { pool } = context;
const storage = await context.factory.getActiveStorage();

// Here we explicitly drop the replication slot, which should always be handled.
await pool.query({
statement: `SELECT pg_drop_replication_slot($1)`,
params: [{ type: 'varchar', value: storage?.slot_name! }]
});

await context.loadActiveSyncRules();

// The error is handled on a higher level, which triggers
// creating a new replication slot.
await expect(async () => {
await context.replicateSnapshot();
}).rejects.toThrowError(MissingReplicationSlotError);
}
});

Expand Down