summaryrefslogtreecommitdiff
path: root/src/backend/replication
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication')
-rw-r--r--src/backend/replication/logical/logical.c7
-rw-r--r--src/backend/replication/logical/logicalfuncs.c14
-rw-r--r--src/backend/replication/slotfuncs.c13
-rw-r--r--src/backend/replication/walsender.c8
4 files changed, 11 insertions, 31 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index bdf4389a57..cf93200618 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -461,11 +461,10 @@ DecodingContextReady(LogicalDecodingContext *ctx)
void
DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
{
- XLogRecPtr startptr;
ReplicationSlot *slot = ctx->slot;
/* Initialize from where to start reading WAL. */
- startptr = slot->data.restart_lsn;
+ XLogBeginRead(ctx->reader, slot->data.restart_lsn);
elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%X",
(uint32) (slot->data.restart_lsn >> 32),
@@ -478,14 +477,12 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
char *err = NULL;
/* the read_page callback waits for new WAL */
- record = XLogReadRecord(ctx->reader, startptr, &err);
+ record = XLogReadRecord(ctx->reader, &err);
if (err)
elog(ERROR, "%s", err);
if (!record)
elog(ERROR, "no record found"); /* shouldn't happen */
- startptr = InvalidXLogRecPtr;
-
LogicalDecodingProcessRecord(ctx, ctx->reader);
/* only continue till we found a consistent spot */
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 7693c98949..25b89e5616 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -127,7 +127,6 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
MemoryContext per_query_ctx;
MemoryContext oldcontext;
XLogRecPtr end_of_wal;
- XLogRecPtr startptr;
LogicalDecodingContext *ctx;
ResourceOwner old_resowner = CurrentResourceOwner;
ArrayType *arr;
@@ -269,29 +268,22 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
* xacts that committed after the slot's confirmed_flush can be
* accumulated into reorder buffers.
*/
- startptr = MyReplicationSlot->data.restart_lsn;
+ XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
/* invalidate non-timetravel entries */
InvalidateSystemCaches();
/* Decode until we run out of records */
- while ((startptr != InvalidXLogRecPtr && startptr < end_of_wal) ||
- (ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < end_of_wal))
+ while (ctx->reader->EndRecPtr < end_of_wal)
{
XLogRecord *record;
char *errm = NULL;
- record = XLogReadRecord(ctx->reader, startptr, &errm);
+ record = XLogReadRecord(ctx->reader, &errm);
if (errm)
elog(ERROR, "%s", errm);
/*
- * Now that we've set up the xlog reader state, subsequent calls
- * pass InvalidXLogRecPtr to say "continue from last record"
- */
- startptr = InvalidXLogRecPtr;
-
- /*
* The {begin_txn,change,commit_txn}_wrapper callbacks above will
* store the description into our tuplestore.
*/
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index bb69683e2a..7c89694611 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -391,7 +391,6 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
{
LogicalDecodingContext *ctx;
ResourceOwner old_resowner = CurrentResourceOwner;
- XLogRecPtr startlsn;
XLogRecPtr retlsn;
PG_TRY();
@@ -411,7 +410,7 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
* Start reading at the slot's restart_lsn, which we know to point to
* a valid record.
*/
- startlsn = MyReplicationSlot->data.restart_lsn;
+ XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
/* Initialize our return value in case we don't do anything */
retlsn = MyReplicationSlot->data.confirmed_flush;
@@ -420,10 +419,7 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
InvalidateSystemCaches();
/* Decode at least one record, until we run out of records */
- while ((!XLogRecPtrIsInvalid(startlsn) &&
- startlsn < moveto) ||
- (!XLogRecPtrIsInvalid(ctx->reader->EndRecPtr) &&
- ctx->reader->EndRecPtr < moveto))
+ while (ctx->reader->EndRecPtr < moveto)
{
char *errm = NULL;
XLogRecord *record;
@@ -432,13 +428,10 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
* Read records. No changes are generated in fast_forward mode,
* but snapbuilder/slot statuses are updated properly.
*/
- record = XLogReadRecord(ctx->reader, startlsn, &errm);
+ record = XLogReadRecord(ctx->reader, &errm);
if (errm)
elog(ERROR, "%s", errm);
- /* Read sequentially from now on */
- startlsn = InvalidXLogRecPtr;
-
/*
* Process the record. Storage-level changes are ignored in
* fast_forward mode, but other modules (such as snapbuilder)
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 9c063749b6..0c65f1660b 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -191,7 +191,6 @@ static volatile sig_atomic_t got_STOPPING = false;
static volatile sig_atomic_t replication_active = false;
static LogicalDecodingContext *logical_decoding_ctx = NULL;
-static XLogRecPtr logical_startptr = InvalidXLogRecPtr;
/* A sample associating a WAL location with the time it was written. */
typedef struct
@@ -1130,9 +1129,9 @@ StartLogicalReplication(StartReplicationCmd *cmd)
pq_endmessage(&buf);
pq_flush();
-
/* Start reading WAL from the oldest required WAL. */
- logical_startptr = MyReplicationSlot->data.restart_lsn;
+ XLogBeginRead(logical_decoding_ctx->reader,
+ MyReplicationSlot->data.restart_lsn);
/*
* Report the location after which we'll send out further commits as the
@@ -2791,8 +2790,7 @@ XLogSendLogical(void)
*/
WalSndCaughtUp = false;
- record = XLogReadRecord(logical_decoding_ctx->reader, logical_startptr, &errm);
- logical_startptr = InvalidXLogRecPtr;
+ record = XLogReadRecord(logical_decoding_ctx->reader, &errm);
/* xlog record was invalid */
if (errm != NULL)