diff options
| author | Andres Freund <andres@anarazel.de> | 2023-04-07 22:40:27 -0700 | 
|---|---|---|
| committer | Andres Freund <andres@anarazel.de> | 2023-04-07 22:40:27 -0700 | 
| commit | be87200efd9308ccfe217ce8828f316e93e370da (patch) | |
| tree | f269cd86fdfebf3a15ef3559904f5863caede055 /src/backend/replication/slot.c | |
| parent | 2ed16aacf1af1e1a26bffb121a19d1ad5f5177f0 (diff) | |
| download | postgresql-be87200efd9308ccfe217ce8828f316e93e370da.tar.gz | |
Support invalidating replication slots due to horizon and wal_level
Needed for logical decoding on a standby. Slots need to be invalidated because
of the horizon if rows required for logical decoding are removed. If the
primary's wal_level is lowered from 'logical', logical slots on the standby
need to be invalidated.
The new invalidation methods will be used in a subsequent commit.
Logical slots that have been invalidated can be identified via the new
pg_replication_slots.conflicting column.
See 6af1793954e for an overall design of logical decoding on a standby.
Bumps catversion for the addition of the new pg_replication_slots column.
Author: "Drouvot, Bertrand" <bertranddrouvot.pg@gmail.com>
Author: Andres Freund <andres@anarazel.de>
Author: Amit Khandekar <amitdkhan.pg@gmail.com> (in an older version)
Reviewed-by: "Drouvot, Bertrand" <bertranddrouvot.pg@gmail.com>
Reviewed-by: Andres Freund <andres@anarazel.de>
Reviewed-by: Robert Haas <robertmhaas@gmail.com>
Reviewed-by: FabrÃzio de Royes Mello <fabriziomello@gmail.com>
Reviewed-by: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Reviewed-by: Melanie Plageman <melanieplageman@gmail.com>
Reviewed-by: Alvaro Herrera <alvherre@alvh.no-ip.org>
Discussion: https://postgr.es/m/20230407075009.igg7be27ha2htkbt@awork3.anarazel.de
Diffstat (limited to 'src/backend/replication/slot.c')
| -rw-r--r-- | src/backend/replication/slot.c | 151 | 
1 files changed, 125 insertions, 26 deletions
| diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index f969f7c083..4d0421c5ed 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -1241,8 +1241,58 @@ ReplicationSlotReserveWal(void)  }  /* - * Helper for InvalidateObsoleteReplicationSlots -- acquires the given slot - * and mark it invalid, if necessary and possible. + * Report that replication slot needs to be invalidated + */ +static void +ReportSlotInvalidation(ReplicationSlotInvalidationCause cause, +					   bool terminating, +					   int pid, +					   NameData slotname, +					   XLogRecPtr restart_lsn, +					   XLogRecPtr oldestLSN, +					   TransactionId snapshotConflictHorizon) +{ +	StringInfoData err_detail; +	bool		hint = false; + +	initStringInfo(&err_detail); + +	switch (cause) +	{ +		case RS_INVAL_WAL_REMOVED: +			hint = true; +			appendStringInfo(&err_detail, _("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes."), +							 LSN_FORMAT_ARGS(restart_lsn), +							 (unsigned long long) (oldestLSN - restart_lsn)); +			break; +		case RS_INVAL_HORIZON: +			appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."), +							 snapshotConflictHorizon); +			break; + +		case RS_INVAL_WAL_LEVEL: +			appendStringInfo(&err_detail, _("Logical decoding on standby requires wal_level to be at least logical on the primary server")); +			break; +		case RS_INVAL_NONE: +			pg_unreachable(); +	} + +	ereport(LOG, +			terminating ? +			errmsg("terminating process %d to release replication slot \"%s\"", +				   pid, NameStr(slotname)) : +			errmsg("invalidating obsolete replication slot \"%s\"", +				   NameStr(slotname)), +			errdetail_internal("%s", err_detail.data), +			hint ? errhint("You might need to increase max_slot_wal_keep_size.") : 0); + +	pfree(err_detail.data); +} + +/* + * Helper for InvalidateObsoleteReplicationSlots + * + * Acquires the given slot and mark it invalid, if necessary and possible.   *   * Returns whether ReplicationSlotControlLock was released in the interim (and   * in that case we're not holding the lock at return, otherwise we are). @@ -1253,7 +1303,10 @@ ReplicationSlotReserveWal(void)   * for syscalls, so caller must restart if we return true.   */  static bool -InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN, +InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, +							   ReplicationSlot *s, +							   XLogRecPtr oldestLSN, +							   Oid dboid, TransactionId snapshotConflictHorizon,  							   bool *invalidated)  {  	int			last_signaled_pid = 0; @@ -1264,6 +1317,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,  		XLogRecPtr	restart_lsn;  		NameData	slotname;  		int			active_pid = 0; +		ReplicationSlotInvalidationCause conflict = RS_INVAL_NONE;  		Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED)); @@ -1286,10 +1340,44 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,  		restart_lsn = s->data.restart_lsn;  		/* -		 * If the slot is already invalid or is fresh enough, we don't need to -		 * do anything. +		 * If the slot is already invalid or is a non conflicting slot, we +		 * don't need to do anything.  		 */ -		if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN) +		if (s->data.invalidated == RS_INVAL_NONE) +		{ +			switch (cause) +			{ +				case RS_INVAL_WAL_REMOVED: +					if (s->data.restart_lsn != InvalidXLogRecPtr && +						s->data.restart_lsn < oldestLSN) +						conflict = cause; +					break; +				case RS_INVAL_HORIZON: +					if (!SlotIsLogical(s)) +						break; +					/* invalid DB oid signals a shared relation */ +					if (dboid != InvalidOid && dboid != s->data.database) +						break; +					if (TransactionIdIsValid(s->effective_xmin) && +						TransactionIdPrecedesOrEquals(s->effective_xmin, +													  snapshotConflictHorizon)) +						conflict = cause; +					else if (TransactionIdIsValid(s->effective_catalog_xmin) && +							 TransactionIdPrecedesOrEquals(s->effective_catalog_xmin, +														   snapshotConflictHorizon)) +						conflict = cause; +					break; +				case RS_INVAL_WAL_LEVEL: +					if (SlotIsLogical(s)) +						conflict = cause; +					break; +				case RS_INVAL_NONE: +					pg_unreachable(); +			} +		} + +		/* if there's no conflict, we're done */ +		if (conflict == RS_INVAL_NONE)  		{  			SpinLockRelease(&s->mutex);  			if (released_lock) @@ -1309,13 +1397,14 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,  		{  			MyReplicationSlot = s;  			s->active_pid = MyProcPid; -			s->data.invalidated = RS_INVAL_WAL_REMOVED; +			s->data.invalidated = conflict;  			/*  			 * XXX: We should consider not overwriting restart_lsn and instead  			 * just rely on .invalidated.  			 */ -			s->data.restart_lsn = InvalidXLogRecPtr; +			if (conflict == RS_INVAL_WAL_REMOVED) +				s->data.restart_lsn = InvalidXLogRecPtr;  			/* Let caller know */  			*invalidated = true; @@ -1349,13 +1438,9 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,  			 */  			if (last_signaled_pid != active_pid)  			{ -				ereport(LOG, -						errmsg("terminating process %d to release replication slot \"%s\"", -							   active_pid, NameStr(slotname)), -						errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.", -								  LSN_FORMAT_ARGS(restart_lsn), -								  (unsigned long long) (oldestLSN - restart_lsn)), -						errhint("You might need to increase max_slot_wal_keep_size.")); +				ReportSlotInvalidation(conflict, true, active_pid, +									   slotname, restart_lsn, +									   oldestLSN, snapshotConflictHorizon);  				(void) kill(active_pid, SIGTERM);  				last_signaled_pid = active_pid; @@ -1390,14 +1475,11 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,  			ReplicationSlotMarkDirty();  			ReplicationSlotSave();  			ReplicationSlotRelease(); +			pgstat_drop_replslot(s); -			ereport(LOG, -					errmsg("invalidating obsolete replication slot \"%s\"", -						   NameStr(slotname)), -					errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.", -							  LSN_FORMAT_ARGS(restart_lsn), -							  (unsigned long long) (oldestLSN - restart_lsn)), -					errhint("You might need to increase max_slot_wal_keep_size.")); +			ReportSlotInvalidation(conflict, false, active_pid, +								   slotname, restart_lsn, +								   oldestLSN, snapshotConflictHorizon);  			/* done with this slot for now */  			break; @@ -1410,19 +1492,34 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,  }  /* - * Mark any slot that points to an LSN older than the given segment - * as invalid; it requires WAL that's about to be removed. + * Invalidate slots that require resources about to be removed.   *   * Returns true when any slot have got invalidated.   * + * Whether a slot needs to be invalidated depends on the cause. A slot is + * removed if it: + * - RS_INVAL_WAL_REMOVED: requires a LSN older than the given segment + * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given + *   db; dboid may be InvalidOid for shared relations + * - RS_INVAL_WAL_LEVEL: is logical + *   * NB - this runs as part of checkpoint, so avoid raising errors if possible.   */  bool -InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno) +InvalidateObsoleteReplicationSlots(ReplicationSlotInvalidationCause cause, +								   XLogSegNo oldestSegno, Oid dboid, +								   TransactionId snapshotConflictHorizon)  {  	XLogRecPtr	oldestLSN;  	bool		invalidated = false; +	Assert(cause != RS_INVAL_HORIZON || TransactionIdIsValid(snapshotConflictHorizon)); +	Assert(cause != RS_INVAL_WAL_REMOVED || oldestSegno > 0); +	Assert(cause != RS_INVAL_NONE); + +	if (max_replication_slots == 0) +		return invalidated; +  	XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);  restart: @@ -1434,7 +1531,9 @@ restart:  		if (!s->in_use)  			continue; -		if (InvalidatePossiblyObsoleteSlot(s, oldestLSN, &invalidated)) +		if (InvalidatePossiblyObsoleteSlot(cause, s, oldestLSN, dboid, +										   snapshotConflictHorizon, +										   &invalidated))  		{  			/* if the lock was released, start from scratch */  			goto restart; | 
