Skip to content

Commit 531e887

Browse files
committed
Port changes to postgres storage.
1 parent 049b32a commit 531e887

File tree

2 files changed

+146
-132
lines changed

2 files changed

+146
-132
lines changed

modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,6 @@ export class PostgresSyncRulesStorage
354354
slot_name: this.slot_name,
355355
last_checkpoint_lsn: checkpoint_lsn,
356356
keep_alive_op: syncRules?.keepalive_op,
357-
no_checkpoint_before_lsn: syncRules?.no_checkpoint_before ?? options.zeroLSN,
358357
resumeFromLsn: maxLsn(syncRules?.snapshot_lsn, checkpoint_lsn),
359358
store_current_data: options.storeCurrentData,
360359
skip_existing_rows: options.skipExistingRows ?? false,

modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts

Lines changed: 146 additions & 131 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ export interface PostgresBucketBatchOptions {
2828
group_id: number;
2929
slot_name: string;
3030
last_checkpoint_lsn: string | null;
31-
no_checkpoint_before_lsn: string;
3231
store_current_data: boolean;
3332
keep_alive_op?: InternalOpId | null;
3433
resumeFromLsn: string | null;
@@ -48,6 +47,15 @@ export interface PostgresBucketBatchOptions {
4847
const StatefulCheckpoint = models.ActiveCheckpoint.and(t.object({ state: t.Enum(storage.SyncRuleState) }));
4948
type StatefulCheckpointDecoded = t.Decoded<typeof StatefulCheckpoint>;
5049

50+
const CheckpointWithStatus = StatefulCheckpoint.and(
51+
t.object({
52+
snapshot_done: t.boolean,
53+
no_checkpoint_before: t.string.or(t.Null),
54+
can_checkpoint: t.boolean
55+
})
56+
);
57+
type CheckpointWithStatusDecoded = t.Decoded<typeof CheckpointWithStatus>;
58+
5159
/**
5260
* 15MB. Currently matches MongoDB.
5361
* This could be increased in future.
@@ -67,7 +75,6 @@ export class PostgresBucketBatch
6775
protected db: lib_postgres.DatabaseClient;
6876
protected group_id: number;
6977
protected last_checkpoint_lsn: string | null;
70-
protected no_checkpoint_before_lsn: string;
7178

7279
protected persisted_op: InternalOpId | null;
7380

@@ -84,7 +91,6 @@ export class PostgresBucketBatch
8491
this.db = options.db;
8592
this.group_id = options.group_id;
8693
this.last_checkpoint_lsn = options.last_checkpoint_lsn;
87-
this.no_checkpoint_before_lsn = options.no_checkpoint_before_lsn;
8894
this.resumeFromLsn = options.resumeFromLsn;
8995
this.write_checkpoint_batch = [];
9096
this.sync_rules = options.sync_rules;
@@ -285,136 +291,136 @@ export class PostgresBucketBatch
285291
}
286292

287293
async commit(lsn: string, options?: storage.BucketBatchCommitOptions): Promise<boolean> {
288-
const { createEmptyCheckpoints } = { ...storage.DEFAULT_BUCKET_BATCH_COMMIT_OPTIONS, ...options };
289-
290294
await this.flush();
291295

292-
if (this.last_checkpoint_lsn != null && lsn < this.last_checkpoint_lsn) {
293-
// When re-applying transactions, don't create a new checkpoint until
294-
// we are past the last transaction.
295-
this.logger.info(`Re-applied transaction ${lsn} - skipping checkpoint`);
296-
// Cannot create a checkpoint yet - return false
297-
return false;
298-
}
299-
300-
if (lsn < this.no_checkpoint_before_lsn) {
301-
if (Date.now() - this.lastWaitingLogThrottled > 5_000) {
302-
this.logger.info(
303-
`Waiting until ${this.no_checkpoint_before_lsn} before creating checkpoint, currently at ${lsn}. Persisted op: ${this.persisted_op}`
304-
);
305-
this.lastWaitingLogThrottled = Date.now();
306-
}
307-
308-
// Edge case: During initial replication, we have a no_checkpoint_before_lsn set,
309-
// and don't actually commit the snapshot.
310-
// The first commit can happen from an implicit keepalive message.
311-
// That needs the persisted_op to get an accurate checkpoint, so
312-
// we persist that in keepalive_op.
313-
314-
await this.db.sql`
315-
UPDATE sync_rules
316-
SET
317-
keepalive_op = ${{ type: 'int8', value: this.persisted_op }}
318-
WHERE
319-
id = ${{ type: 'int4', value: this.group_id }}
320-
`.execute();
321-
322-
// Cannot create a checkpoint yet - return false
323-
return false;
324-
}
325-
326-
// Don't create a checkpoint if there were no changes
327-
if (!createEmptyCheckpoints && this.persisted_op == null) {
328-
// Nothing to commit - return true
329-
await this.autoActivate(lsn);
330-
return true;
331-
}
332-
333296
const now = new Date().toISOString();
334-
const update: Partial<models.SyncRules> = {
335-
last_checkpoint_lsn: lsn,
336-
last_checkpoint_ts: now,
337-
last_keepalive_ts: now,
338-
last_fatal_error: null,
339-
keepalive_op: null
340-
};
341297

342-
if (this.persisted_op != null) {
343-
update.last_checkpoint = this.persisted_op.toString();
344-
}
298+
const persisted_op = this.persisted_op ?? null;
345299

346-
const doc = await this.db.sql`
347-
UPDATE sync_rules
348-
SET
349-
keepalive_op = ${{ type: 'int8', value: update.keepalive_op }},
350-
last_fatal_error = ${{ type: 'varchar', value: update.last_fatal_error }},
351-
last_keepalive_ts = ${{ type: 1184, value: update.last_keepalive_ts }},
352-
last_checkpoint = COALESCE(
353-
${{ type: 'int8', value: update.last_checkpoint }},
354-
last_checkpoint
300+
const result = await this.db.sql`
301+
WITH
302+
selected AS (
303+
SELECT
304+
id,
305+
state,
306+
last_checkpoint,
307+
last_checkpoint_lsn,
308+
snapshot_done,
309+
no_checkpoint_before,
310+
keepalive_op,
311+
(
312+
snapshot_done = TRUE
313+
AND (
314+
last_checkpoint_lsn IS NULL
315+
OR last_checkpoint_lsn <= ${{ type: 'varchar', value: lsn }}
316+
)
317+
AND (
318+
no_checkpoint_before IS NULL
319+
OR no_checkpoint_before <= ${{ type: 'varchar', value: lsn }}
320+
)
321+
) AS can_checkpoint
322+
FROM
323+
sync_rules
324+
WHERE
325+
id = ${{ type: 'int4', value: this.group_id }}
326+
FOR UPDATE
355327
),
356-
last_checkpoint_ts = ${{ type: 1184, value: update.last_checkpoint_ts }},
357-
last_checkpoint_lsn = ${{ type: 'varchar', value: update.last_checkpoint_lsn }}
358-
WHERE
359-
id = ${{ type: 'int4', value: this.group_id }}
360-
RETURNING
328+
updated AS (
329+
UPDATE sync_rules AS sr
330+
SET
331+
last_checkpoint_lsn = CASE
332+
WHEN selected.can_checkpoint THEN ${{ type: 'varchar', value: lsn }}
333+
ELSE sr.last_checkpoint_lsn
334+
END,
335+
last_checkpoint_ts = CASE
336+
WHEN selected.can_checkpoint THEN ${{ type: 1184, value: now }}
337+
ELSE sr.last_checkpoint_ts
338+
END,
339+
last_keepalive_ts = ${{ type: 1184, value: now }},
340+
last_fatal_error = CASE
341+
WHEN selected.can_checkpoint THEN NULL
342+
ELSE sr.last_fatal_error
343+
END,
344+
keepalive_op = CASE
345+
WHEN selected.can_checkpoint THEN NULL
346+
ELSE GREATEST(
347+
COALESCE(sr.keepalive_op, 0),
348+
COALESCE(${{ type: 'int8', value: persisted_op }}, 0)
349+
)
350+
END,
351+
last_checkpoint = CASE
352+
WHEN selected.can_checkpoint THEN GREATEST(
353+
COALESCE(sr.last_checkpoint, 0),
354+
COALESCE(${{ type: 'int8', value: persisted_op }}, 0),
355+
COALESCE(sr.keepalive_op, 0)
356+
)
357+
ELSE sr.last_checkpoint
358+
END
359+
FROM
360+
selected
361+
WHERE
362+
sr.id = selected.id
363+
RETURNING
364+
sr.id,
365+
sr.state,
366+
sr.last_checkpoint,
367+
sr.last_checkpoint_lsn,
368+
sr.snapshot_done,
369+
sr.no_checkpoint_before,
370+
selected.can_checkpoint
371+
)
372+
SELECT
361373
id,
362374
state,
363375
last_checkpoint,
364-
last_checkpoint_lsn
376+
last_checkpoint_lsn,
377+
snapshot_done,
378+
no_checkpoint_before,
379+
can_checkpoint
380+
FROM
381+
updated
365382
`
366-
.decoded(StatefulCheckpoint)
383+
.decoded(CheckpointWithStatus)
367384
.first();
368385

369-
await this.autoActivate(lsn);
370-
await notifySyncRulesUpdate(this.db, doc!);
371-
372-
this.persisted_op = null;
373-
this.last_checkpoint_lsn = lsn;
374-
return true;
375-
}
376-
377-
async keepalive(lsn: string): Promise<boolean> {
378-
if (this.last_checkpoint_lsn != null && lsn < this.last_checkpoint_lsn) {
379-
// No-op
380-
return false;
381-
}
382-
383-
if (lsn < this.no_checkpoint_before_lsn) {
384-
return false;
386+
if (result == null) {
387+
throw new ReplicationAssertionError('Failed to update sync_rules during checkpoint');
385388
}
386389

387-
if (this.persisted_op != null) {
388-
// The commit may have been skipped due to "no_checkpoint_before_lsn".
389-
// Apply it now if relevant
390-
this.logger.info(`Commit due to keepalive at ${lsn} / ${this.persisted_op}`);
391-
return await this.commit(lsn);
390+
if (!result.can_checkpoint) {
391+
if (Date.now() - this.lastWaitingLogThrottled > 5_000 || true) {
392+
this.logger.info(
393+
`Waiting before creating checkpoint, currently at ${lsn}. Persisted op: ${this.persisted_op}. Current state: ${JSON.stringify(
394+
{
395+
snapshot_done: result.snapshot_done,
396+
last_checkpoint_lsn: result.last_checkpoint_lsn,
397+
no_checkpoint_before: result.no_checkpoint_before
398+
}
399+
)}`
400+
);
401+
this.lastWaitingLogThrottled = Date.now();
402+
}
403+
return true;
392404
}
393405

394-
const updated = await this.db.sql`
395-
UPDATE sync_rules
396-
SET
397-
last_checkpoint_lsn = ${{ type: 'varchar', value: lsn }},
398-
last_fatal_error = ${{ type: 'varchar', value: null }},
399-
last_keepalive_ts = ${{ type: 1184, value: new Date().toISOString() }}
400-
WHERE
401-
id = ${{ type: 'int4', value: this.group_id }}
402-
RETURNING
403-
id,
404-
state,
405-
last_checkpoint,
406-
last_checkpoint_lsn
407-
`
408-
.decoded(StatefulCheckpoint)
409-
.first();
410-
406+
this.logger.info(`Created checkpoint at ${lsn}. Persisted op: ${this.persisted_op}`);
411407
await this.autoActivate(lsn);
412-
await notifySyncRulesUpdate(this.db, updated!);
408+
await notifySyncRulesUpdate(this.db, {
409+
id: result.id,
410+
state: result.state,
411+
last_checkpoint: result.last_checkpoint,
412+
last_checkpoint_lsn: result.last_checkpoint_lsn
413+
});
413414

415+
this.persisted_op = null;
414416
this.last_checkpoint_lsn = lsn;
415417
return true;
416418
}
417419

420+
async keepalive(lsn: string): Promise<boolean> {
421+
return await this.commit(lsn);
422+
}
423+
418424
async setResumeLsn(lsn: string): Promise<void> {
419425
await this.db.sql`
420426
UPDATE sync_rules
@@ -426,21 +432,25 @@ export class PostgresBucketBatch
426432
}
427433

428434
async markAllSnapshotDone(no_checkpoint_before_lsn: string): Promise<void> {
429-
if (no_checkpoint_before_lsn != null && no_checkpoint_before_lsn > this.no_checkpoint_before_lsn) {
430-
this.no_checkpoint_before_lsn = no_checkpoint_before_lsn;
431-
432-
await this.db.transaction(async (db) => {
433-
await db.sql`
434-
UPDATE sync_rules
435-
SET
436-
no_checkpoint_before = ${{ type: 'varchar', value: no_checkpoint_before_lsn }},
437-
last_keepalive_ts = ${{ type: 1184, value: new Date().toISOString() }},
438-
snapshot_done = TRUE
439-
WHERE
440-
id = ${{ type: 'int4', value: this.group_id }}
441-
`.execute();
442-
});
443-
}
435+
await this.db.transaction(async (db) => {
436+
await db.sql`
437+
UPDATE sync_rules
438+
SET
439+
snapshot_done = TRUE,
440+
last_keepalive_ts = ${{ type: 1184, value: new Date().toISOString() }},
441+
snapshot_lsn = NULL,
442+
no_checkpoint_before = CASE
443+
WHEN no_checkpoint_before IS NULL
444+
OR no_checkpoint_before < ${{ type: 'varchar', value: no_checkpoint_before_lsn }} THEN ${{
445+
type: 'varchar',
446+
value: no_checkpoint_before_lsn
447+
}}
448+
ELSE no_checkpoint_before
449+
END
450+
WHERE
451+
id = ${{ type: 'int4', value: this.group_id }}
452+
`.execute();
453+
});
444454
}
445455

446456
async markTableSnapshotRequired(table: storage.SourceTable): Promise<void> {
@@ -476,14 +486,19 @@ export class PostgresBucketBatch
476486
);
477487
`.execute();
478488

479-
if (no_checkpoint_before_lsn != null && no_checkpoint_before_lsn > this.no_checkpoint_before_lsn) {
480-
this.no_checkpoint_before_lsn = no_checkpoint_before_lsn;
481-
489+
if (no_checkpoint_before_lsn != null) {
482490
await db.sql`
483491
UPDATE sync_rules
484492
SET
485-
no_checkpoint_before = ${{ type: 'varchar', value: no_checkpoint_before_lsn }},
486-
last_keepalive_ts = ${{ type: 1184, value: new Date().toISOString() }}
493+
last_keepalive_ts = ${{ type: 1184, value: new Date().toISOString() }},
494+
no_checkpoint_before = CASE
495+
WHEN no_checkpoint_before IS NULL
496+
OR no_checkpoint_before < ${{ type: 'varchar', value: no_checkpoint_before_lsn }} THEN ${{
497+
type: 'varchar',
498+
value: no_checkpoint_before_lsn
499+
}}
500+
ELSE no_checkpoint_before
501+
END
487502
WHERE
488503
id = ${{ type: 'int4', value: this.group_id }}
489504
`.execute();

0 commit comments

Comments
 (0)