summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/src/sgml/func.sgml15
-rw-r--r--doc/src/sgml/logicaldecoding.sgml27
-rw-r--r--src/backend/access/transam/xlog.c11
-rw-r--r--src/backend/access/transam/xlogfuncs.c31
-rw-r--r--src/backend/catalog/system_functions.sql2
-rw-r--r--src/backend/replication/logical/decode.c30
-rw-r--r--src/backend/replication/logical/logical.c36
-rw-r--r--src/backend/replication/slot.c57
-rw-r--r--src/backend/replication/walsender.c48
-rw-r--r--src/include/access/xlog.h1
-rw-r--r--src/include/catalog/catversion.h2
-rw-r--r--src/include/catalog/pg_proc.dat3
12 files changed, 202 insertions, 61 deletions
diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index 4211d31f30..bf4c61ccfb 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -27074,6 +27074,21 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
prepared with <xref linkend="sql-prepare-transaction"/>.
</para></entry>
</row>
+ <row>
+ <entry role="func_table_entry"><para role="func_signature">
+ <indexterm>
+ <primary>pg_log_standby_snapshot</primary>
+ </indexterm>
+ <function>pg_log_standby_snapshot</function> ()
+ <returnvalue>pg_lsn</returnvalue>
+ </para>
+ <para>
+ Take a snapshot of running transactions and write it to WAL, without
+ having to wait bgwriter or checkpointer to log one. This is useful for
+ logical decoding on standby, as logical slot creation has to wait
+ until such a record is replayed on the standby.
+ </para></entry>
+ </row>
</tbody>
</tgroup>
</table>
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 4e912b4bd4..ebe0376e3e 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -316,6 +316,33 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU
may consume changes from a slot at any given time.
</para>
+ <para>
+ A logical replication slot can also be created on a hot standby. To prevent
+ <command>VACUUM</command> from removing required rows from the system
+ catalogs, <varname>hot_standby_feedback</varname> should be set on the
+ standby. In spite of that, if any required rows get removed, the slot gets
+ invalidated. It's highly recommended to use a physical slot between the primary
+ and the standby. Otherwise, hot_standby_feedback will work, but only while the
+ connection is alive (for example a node restart would break it). Then, the
+ primary may delete system catalog rows that could be needed by the logical
+ decoding on the standby (as it does not know about the catalog_xmin on the
+ standby). Existing logical slots on standby also get invalidated if wal_level
+ on primary is reduced to less than 'logical'. This is done as soon as the
+ standby detects such a change in the WAL stream. It means, that for walsenders
+ that are lagging (if any), some WAL records up to the wal_level parameter change
+ on the primary won't be decoded.
+ </para>
+
+ <para>
+ Creation of a logical slot requires information about all the currently
+ running transactions. On the primary, this information is available
+ directly, but on a standby, this information has to be obtained from
+ primary. Thus, slot creation may need to wait for some activity to happen
+ on the primary. If the primary is idle, creating a logical slot on
+ standby may take noticeable time. This can be sped up by calling the
+ <function>pg_log_standby_snapshot</function> on the primary.
+ </para>
+
<caution>
<para>
Replication slots persist across crashes and know nothing about the state
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 1b7c2f23a4..b540ee293b 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -4470,6 +4470,17 @@ LocalProcessControlFile(bool reset)
}
/*
+ * Get the wal_level from the control file. For a standby, this value should be
+ * considered as its active wal_level, because it may be different from what
+ * was originally configured on standby.
+ */
+WalLevel
+GetActiveWalLevelOnStandby(void)
+{
+ return ControlFile->wal_level;
+}
+
+/*
* Initialization of shared memory for XLOG
*/
Size
diff --git a/src/backend/access/transam/xlogfuncs.c b/src/backend/access/transam/xlogfuncs.c
index c07daa874f..52f827a902 100644
--- a/src/backend/access/transam/xlogfuncs.c
+++ b/src/backend/access/transam/xlogfuncs.c
@@ -31,6 +31,7 @@
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/smgr.h"
+#include "storage/standby.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/memutils.h"
@@ -197,6 +198,36 @@ pg_switch_wal(PG_FUNCTION_ARGS)
}
/*
+ * pg_log_standby_snapshot: call LogStandbySnapshot()
+ *
+ * Permission checking for this function is managed through the normal
+ * GRANT system.
+ */
+Datum
+pg_log_standby_snapshot(PG_FUNCTION_ARGS)
+{
+ XLogRecPtr recptr;
+
+ if (RecoveryInProgress())
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("recovery is in progress"),
+ errhint("pg_log_standby_snapshot() cannot be executed during recovery.")));
+
+ if (!XLogStandbyInfoActive())
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("pg_log_standby_snapshot() can only be used if wal_level >= replica")));
+
+ recptr = LogStandbySnapshot();
+
+ /*
+ * As a convenience, return the WAL location of the last inserted record
+ */
+ PG_RETURN_LSN(recptr);
+}
+
+/*
* pg_create_restore_point: a named point for restore
*
* Permission checking for this function is managed through the normal
diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql
index 83ca893444..b7c65ea37d 100644
--- a/src/backend/catalog/system_functions.sql
+++ b/src/backend/catalog/system_functions.sql
@@ -644,6 +644,8 @@ REVOKE EXECUTE ON FUNCTION pg_create_restore_point(text) FROM public;
REVOKE EXECUTE ON FUNCTION pg_switch_wal() FROM public;
+REVOKE EXECUTE ON FUNCTION pg_log_standby_snapshot() FROM public;
+
REVOKE EXECUTE ON FUNCTION pg_wal_replay_pause() FROM public;
REVOKE EXECUTE ON FUNCTION pg_wal_replay_resume() FROM public;
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 8fe7bb65f1..5508cc2177 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -152,11 +152,39 @@ xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
* can restart from there.
*/
break;
+ case XLOG_PARAMETER_CHANGE:
+ {
+ xl_parameter_change *xlrec =
+ (xl_parameter_change *) XLogRecGetData(buf->record);
+
+ /*
+ * If wal_level on the primary is reduced to less than
+ * logical, we want to prevent existing logical slots from
+ * being used. Existing logical slots on the standby get
+ * invalidated when this WAL record is replayed; and further,
+ * slot creation fails when wal_level is not sufficient; but
+ * all these operations are not synchronized, so a logical
+ * slot may creep in while the wal_level is being
+ * reduced. Hence this extra check.
+ */
+ if (xlrec->wal_level < WAL_LEVEL_LOGICAL)
+ {
+ /*
+ * This can occur only on a standby, as a primary would
+ * not allow to restart after changing wal_level < logical
+ * if there is pre-existing logical slot.
+ */
+ Assert(RecoveryInProgress());
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical decoding on a standby requires wal_level to be at least logical on the primary")));
+ }
+ break;
+ }
case XLOG_NOOP:
case XLOG_NEXTOID:
case XLOG_SWITCH:
case XLOG_BACKUP_END:
- case XLOG_PARAMETER_CHANGE:
case XLOG_RESTORE_POINT:
case XLOG_FPW_CHANGE:
case XLOG_FPI_FOR_HINT:
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 6ecea3c49c..82dae95080 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -124,23 +124,21 @@ CheckLogicalDecodingRequirements(void)
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("logical decoding requires a database connection")));
- /* ----
- * TODO: We got to change that someday soon...
- *
- * There's basically three things missing to allow this:
- * 1) We need to be able to correctly and quickly identify the timeline a
- * LSN belongs to
- * 2) We need to force hot_standby_feedback to be enabled at all times so
- * the primary cannot remove rows we need.
- * 3) support dropping replication slots referring to a database, in
- * dbase_redo. There can't be any active ones due to HS recovery
- * conflicts, so that should be relatively easy.
- * ----
- */
if (RecoveryInProgress())
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("logical decoding cannot be used while in recovery")));
+ {
+ /*
+ * This check may have race conditions, but whenever
+ * XLOG_PARAMETER_CHANGE indicates that wal_level has changed, we
+ * verify that there are no existing logical replication slots. And to
+ * avoid races around creating a new slot,
+ * CheckLogicalDecodingRequirements() is called once before creating
+ * the slot, and once when logical decoding is initially starting up.
+ */
+ if (GetActiveWalLevelOnStandby() < WAL_LEVEL_LOGICAL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical decoding on a standby requires wal_level to be at least logical on the primary")));
+ }
}
/*
@@ -342,6 +340,12 @@ CreateInitDecodingContext(const char *plugin,
LogicalDecodingContext *ctx;
MemoryContext old_context;
+ /*
+ * On a standby, this check is also required while creating the
+ * slot. Check the comments in the function.
+ */
+ CheckLogicalDecodingRequirements();
+
/* shorter lines... */
slot = MyReplicationSlot;
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index cc79d6713b..41848f0ac6 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -41,6 +41,7 @@
#include "access/transam.h"
#include "access/xlog_internal.h"
+#include "access/xlogrecovery.h"
#include "common/file_utils.h"
#include "common/string.h"
#include "miscadmin.h"
@@ -1192,37 +1193,28 @@ ReplicationSlotReserveWal(void)
/*
* For logical slots log a standby snapshot and start logical decoding
* at exactly that position. That allows the slot to start up more
- * quickly.
+ * quickly. But on a standby we cannot do WAL writes, so just use the
+ * replay pointer; effectively, an attempt to create a logical slot on
+ * standby will cause it to wait for an xl_running_xact record to be
+ * logged independently on the primary, so that a snapshot can be
+ * built using the record.
*
- * That's not needed (or indeed helpful) for physical slots as they'll
- * start replay at the last logged checkpoint anyway. Instead return
- * the location of the last redo LSN. While that slightly increases
- * the chance that we have to retry, it's where a base backup has to
- * start replay at.
+ * None of this is needed (or indeed helpful) for physical slots as
+ * they'll start replay at the last logged checkpoint anyway. Instead
+ * return the location of the last redo LSN. While that slightly
+ * increases the chance that we have to retry, it's where a base
+ * backup has to start replay at.
*/
- if (!RecoveryInProgress() && SlotIsLogical(slot))
- {
- XLogRecPtr flushptr;
-
- /* start at current insert position */
+ if (SlotIsPhysical(slot))
+ restart_lsn = GetRedoRecPtr();
+ else if (RecoveryInProgress())
+ restart_lsn = GetXLogReplayRecPtr(NULL);
+ else
restart_lsn = GetXLogInsertRecPtr();
- SpinLockAcquire(&slot->mutex);
- slot->data.restart_lsn = restart_lsn;
- SpinLockRelease(&slot->mutex);
-
- /* make sure we have enough information to start */
- flushptr = LogStandbySnapshot();
- /* and make sure it's fsynced to disk */
- XLogFlush(flushptr);
- }
- else
- {
- restart_lsn = GetRedoRecPtr();
- SpinLockAcquire(&slot->mutex);
- slot->data.restart_lsn = restart_lsn;
- SpinLockRelease(&slot->mutex);
- }
+ SpinLockAcquire(&slot->mutex);
+ slot->data.restart_lsn = restart_lsn;
+ SpinLockRelease(&slot->mutex);
/* prevent WAL removal as fast as possible */
ReplicationSlotsComputeRequiredLSN();
@@ -1238,6 +1230,17 @@ ReplicationSlotReserveWal(void)
if (XLogGetLastRemovedSegno() < segno)
break;
}
+
+ if (!RecoveryInProgress() && SlotIsLogical(slot))
+ {
+ XLogRecPtr flushptr;
+
+ /* make sure we have enough information to start */
+ flushptr = LogStandbySnapshot();
+
+ /* and make sure it's fsynced to disk */
+ XLogFlush(flushptr);
+ }
}
/*
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 5423cf0a17..45b8b3684f 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -906,23 +906,34 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
int count;
WALReadError errinfo;
XLogSegNo segno;
- TimeLineID currTLI = GetWALInsertionTimeLine();
+ TimeLineID currTLI;
+
+ /*
+ * Make sure we have enough WAL available before retrieving the current
+ * timeline. This is needed to determine am_cascading_walsender accurately
+ * which is needed to determine the current timeline.
+ */
+ flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
/*
- * Since logical decoding is only permitted on a primary server, we know
- * that the current timeline ID can't be changing any more. If we did this
- * on a standby, we'd have to worry about the values we compute here
- * becoming invalid due to a promotion or timeline change.
+ * Since logical decoding is also permitted on a standby server, we need
+ * to check if the server is in recovery to decide how to get the current
+ * timeline ID (so that it also cover the promotion or timeline change
+ * cases).
*/
+ am_cascading_walsender = RecoveryInProgress();
+
+ if (am_cascading_walsender)
+ GetXLogReplayRecPtr(&currTLI);
+ else
+ currTLI = GetWALInsertionTimeLine();
+
XLogReadDetermineTimeline(state, targetPagePtr, reqLen, currTLI);
sendTimeLineIsHistoric = (state->currTLI != currTLI);
sendTimeLine = state->currTLI;
sendTimeLineValidUpto = state->currTLIValidUntil;
sendTimeLineNextTLI = state->nextTLI;
- /* make sure we have enough WAL available */
- flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
-
/* fail if not (implies we are going to shut down) */
if (flushptr < targetPagePtr + reqLen)
return -1;
@@ -937,9 +948,9 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
cur_page,
targetPagePtr,
XLOG_BLCKSZ,
- state->seg.ws_tli, /* Pass the current TLI because only
- * WalSndSegmentOpen controls whether new
- * TLI is needed. */
+ currTLI, /* Pass the current TLI because only
+ * WalSndSegmentOpen controls whether new TLI
+ * is needed. */
&errinfo))
WALReadRaiseError(&errinfo);
@@ -3076,10 +3087,14 @@ XLogSendLogical(void)
* If first time through in this session, initialize flushPtr. Otherwise,
* we only need to update flushPtr if EndRecPtr is past it.
*/
- if (flushPtr == InvalidXLogRecPtr)
- flushPtr = GetFlushRecPtr(NULL);
- else if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
- flushPtr = GetFlushRecPtr(NULL);
+ if (flushPtr == InvalidXLogRecPtr ||
+ logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
+ {
+ if (am_cascading_walsender)
+ flushPtr = GetStandbyFlushRecPtr(NULL);
+ else
+ flushPtr = GetFlushRecPtr(NULL);
+ }
/* If EndRecPtr is still past our flushPtr, it means we caught up. */
if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
@@ -3170,7 +3185,8 @@ GetStandbyFlushRecPtr(TimeLineID *tli)
receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
replayPtr = GetXLogReplayRecPtr(&replayTLI);
- *tli = replayTLI;
+ if (tli)
+ *tli = replayTLI;
result = replayPtr;
if (receiveTLI == replayTLI && receivePtr > replayPtr)
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index cfe5409738..48ca852381 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -230,6 +230,7 @@ extern void XLOGShmemInit(void);
extern void BootStrapXLOG(void);
extern void InitializeWalConsistencyChecking(void);
extern void LocalProcessControlFile(bool reset);
+extern WalLevel GetActiveWalLevelOnStandby(void);
extern void StartupXLOG(void);
extern void ShutdownXLOG(int code, Datum arg);
extern void CreateCheckPoint(int flags);
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index 33a77fe6ae..54f3ddcd97 100644
--- a/src/include/catalog/catversion.h
+++ b/src/include/catalog/catversion.h
@@ -57,6 +57,6 @@
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 202304074
+#define CATALOG_VERSION_NO 202304075
#endif
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 067bee8198..b516cee8bd 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -6432,6 +6432,9 @@
{ oid => '2848', descr => 'switch to new wal file',
proname => 'pg_switch_wal', provolatile => 'v', prorettype => 'pg_lsn',
proargtypes => '', prosrc => 'pg_switch_wal' },
+{ oid => '9658', descr => 'log details of the current snapshot to WAL',
+ proname => 'pg_log_standby_snapshot', provolatile => 'v', prorettype => 'pg_lsn',
+ proargtypes => '', prosrc => 'pg_log_standby_snapshot' },
{ oid => '3098', descr => 'create a named restore point',
proname => 'pg_create_restore_point', provolatile => 'v',
prorettype => 'pg_lsn', proargtypes => 'text',