diff options
Diffstat (limited to 'src/backend/replication')
-rw-r--r-- | src/backend/replication/logical/logical.c | 7 | ||||
-rw-r--r-- | src/backend/replication/logical/logicalfuncs.c | 14 | ||||
-rw-r--r-- | src/backend/replication/slotfuncs.c | 13 | ||||
-rw-r--r-- | src/backend/replication/walsender.c | 8 |
4 files changed, 11 insertions, 31 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index bdf4389a57..cf93200618 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -461,11 +461,10 @@ DecodingContextReady(LogicalDecodingContext *ctx) void DecodingContextFindStartpoint(LogicalDecodingContext *ctx) { - XLogRecPtr startptr; ReplicationSlot *slot = ctx->slot; /* Initialize from where to start reading WAL. */ - startptr = slot->data.restart_lsn; + XLogBeginRead(ctx->reader, slot->data.restart_lsn); elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%X", (uint32) (slot->data.restart_lsn >> 32), @@ -478,14 +477,12 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx) char *err = NULL; /* the read_page callback waits for new WAL */ - record = XLogReadRecord(ctx->reader, startptr, &err); + record = XLogReadRecord(ctx->reader, &err); if (err) elog(ERROR, "%s", err); if (!record) elog(ERROR, "no record found"); /* shouldn't happen */ - startptr = InvalidXLogRecPtr; - LogicalDecodingProcessRecord(ctx, ctx->reader); /* only continue till we found a consistent spot */ diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 7693c98949..25b89e5616 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -127,7 +127,6 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin MemoryContext per_query_ctx; MemoryContext oldcontext; XLogRecPtr end_of_wal; - XLogRecPtr startptr; LogicalDecodingContext *ctx; ResourceOwner old_resowner = CurrentResourceOwner; ArrayType *arr; @@ -269,29 +268,22 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin * xacts that committed after the slot's confirmed_flush can be * accumulated into reorder buffers. */ - startptr = MyReplicationSlot->data.restart_lsn; + XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn); /* invalidate non-timetravel entries */ InvalidateSystemCaches(); /* Decode until we run out of records */ - while ((startptr != InvalidXLogRecPtr && startptr < end_of_wal) || - (ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < end_of_wal)) + while (ctx->reader->EndRecPtr < end_of_wal) { XLogRecord *record; char *errm = NULL; - record = XLogReadRecord(ctx->reader, startptr, &errm); + record = XLogReadRecord(ctx->reader, &errm); if (errm) elog(ERROR, "%s", errm); /* - * Now that we've set up the xlog reader state, subsequent calls - * pass InvalidXLogRecPtr to say "continue from last record" - */ - startptr = InvalidXLogRecPtr; - - /* * The {begin_txn,change,commit_txn}_wrapper callbacks above will * store the description into our tuplestore. */ diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index bb69683e2a..7c89694611 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -391,7 +391,6 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto) { LogicalDecodingContext *ctx; ResourceOwner old_resowner = CurrentResourceOwner; - XLogRecPtr startlsn; XLogRecPtr retlsn; PG_TRY(); @@ -411,7 +410,7 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto) * Start reading at the slot's restart_lsn, which we know to point to * a valid record. */ - startlsn = MyReplicationSlot->data.restart_lsn; + XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn); /* Initialize our return value in case we don't do anything */ retlsn = MyReplicationSlot->data.confirmed_flush; @@ -420,10 +419,7 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto) InvalidateSystemCaches(); /* Decode at least one record, until we run out of records */ - while ((!XLogRecPtrIsInvalid(startlsn) && - startlsn < moveto) || - (!XLogRecPtrIsInvalid(ctx->reader->EndRecPtr) && - ctx->reader->EndRecPtr < moveto)) + while (ctx->reader->EndRecPtr < moveto) { char *errm = NULL; XLogRecord *record; @@ -432,13 +428,10 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto) * Read records. No changes are generated in fast_forward mode, * but snapbuilder/slot statuses are updated properly. */ - record = XLogReadRecord(ctx->reader, startlsn, &errm); + record = XLogReadRecord(ctx->reader, &errm); if (errm) elog(ERROR, "%s", errm); - /* Read sequentially from now on */ - startlsn = InvalidXLogRecPtr; - /* * Process the record. Storage-level changes are ignored in * fast_forward mode, but other modules (such as snapbuilder) diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 9c063749b6..0c65f1660b 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -191,7 +191,6 @@ static volatile sig_atomic_t got_STOPPING = false; static volatile sig_atomic_t replication_active = false; static LogicalDecodingContext *logical_decoding_ctx = NULL; -static XLogRecPtr logical_startptr = InvalidXLogRecPtr; /* A sample associating a WAL location with the time it was written. */ typedef struct @@ -1130,9 +1129,9 @@ StartLogicalReplication(StartReplicationCmd *cmd) pq_endmessage(&buf); pq_flush(); - /* Start reading WAL from the oldest required WAL. */ - logical_startptr = MyReplicationSlot->data.restart_lsn; + XLogBeginRead(logical_decoding_ctx->reader, + MyReplicationSlot->data.restart_lsn); /* * Report the location after which we'll send out further commits as the @@ -2791,8 +2790,7 @@ XLogSendLogical(void) */ WalSndCaughtUp = false; - record = XLogReadRecord(logical_decoding_ctx->reader, logical_startptr, &errm); - logical_startptr = InvalidXLogRecPtr; + record = XLogReadRecord(logical_decoding_ctx->reader, &errm); /* xlog record was invalid */ if (errm != NULL) |