summaryrefslogtreecommitdiff
path: root/src/backend/replication
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication')
-rw-r--r--src/backend/replication/logical/decode.c66
-rw-r--r--src/backend/replication/logical/reorderbuffer.c159
2 files changed, 172 insertions, 53 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 88424964ef..ea38818269 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -64,6 +64,8 @@ static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+
static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
xl_xact_parsed_commit *parsed, TransactionId xid);
static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
@@ -414,6 +416,11 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr);
break;
+ case XLOG_HEAP_CONFIRM:
+ if (SnapBuildProcessChange(builder, xid, buf->origptr))
+ DecodeSpecConfirm(ctx, buf);
+ break;
+
case XLOG_HEAP_LOCK:
/* we don't care about row level locks for now */
break;
@@ -564,11 +571,15 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
return;
change = ReorderBufferGetChange(ctx->reorder);
- change->action = REORDER_BUFFER_CHANGE_INSERT;
+ if (!(xlrec->flags & XLH_INSERT_IS_SPECULATIVE))
+ change->action = REORDER_BUFFER_CHANGE_INSERT;
+ else
+ change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT;
change->origin_id = XLogRecGetOrigin(r);
+
memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
- if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE)
+ if (xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE)
{
Size tuplelen;
char *tupledata = XLogRecGetBlockData(r, 0, &tuplelen);
@@ -615,7 +626,7 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
change->origin_id = XLogRecGetOrigin(r);
memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
- if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE)
+ if (xlrec->flags & XLH_UPDATE_CONTAINS_NEW_TUPLE)
{
data = XLogRecGetBlockData(r, 0, &datalen);
@@ -624,7 +635,7 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
DecodeXLogTuple(data, datalen, change->data.tp.newtuple);
}
- if (xlrec->flags & XLOG_HEAP_CONTAINS_OLD)
+ if (xlrec->flags & XLH_UPDATE_CONTAINS_OLD)
{
/* caution, remaining data in record is not aligned */
data = XLogRecGetData(r) + SizeOfHeapUpdate;
@@ -660,6 +671,13 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
if (target_node.dbNode != ctx->slot->data.database)
return;
+ /*
+ * Super deletions are irrelevant for logical decoding, it's driven by the
+ * confirmation records.
+ */
+ if (xlrec->flags & XLH_DELETE_IS_SUPER)
+ return;
+
/* output plugin doesn't look for this origin, no need to queue */
if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
return;
@@ -671,7 +689,7 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
/* old primary key stored */
- if (xlrec->flags & XLOG_HEAP_CONTAINS_OLD)
+ if (xlrec->flags & XLH_DELETE_CONTAINS_OLD)
{
Assert(XLogRecGetDataLen(r) > (SizeOfHeapDelete + SizeOfHeapHeader));
@@ -737,7 +755,7 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
* We decode the tuple in pretty much the same way as DecodeXLogTuple,
* but since the layout is slightly different, we can't use it here.
*/
- if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE)
+ if (xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE)
{
change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder);
@@ -775,7 +793,7 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
* xl_multi_insert_tuple record emitted by one heap_multi_insert()
* call.
*/
- if (xlrec->flags & XLOG_HEAP_LAST_MULTI_INSERT &&
+ if (xlrec->flags & XLH_INSERT_LAST_IN_MULTI &&
(i + 1) == xlrec->ntuples)
change->data.tp.clear_toast_afterwards = true;
else
@@ -788,6 +806,40 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
}
/*
+ * Parse XLOG_HEAP_CONFIRM from wal into a confirmation change.
+ *
+ * This is pretty trivial, all the state essentially already setup by the
+ * speculative insertion.
+ */
+static void
+DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+ XLogReaderState *r = buf->record;
+ ReorderBufferChange *change;
+ RelFileNode target_node;
+
+ /* only interested in our database */
+ XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
+ if (target_node.dbNode != ctx->slot->data.database)
+ return;
+
+ /* output plugin doesn't look for this origin, no need to queue */
+ if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+ return;
+
+ change = ReorderBufferGetChange(ctx->reorder);
+ change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM;
+ change->origin_id = XLogRecGetOrigin(r);
+
+ memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
+
+ change->data.tp.clear_toast_afterwards = true;
+
+ ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
+}
+
+
+/*
* Read a HeapTuple as WAL logged by heap_insert, heap_update and heap_delete
* (but not by heap_multi_insert) into a tuplebuf.
*
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index c9c1d1036e..57854b0aa5 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -401,6 +401,7 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
case REORDER_BUFFER_CHANGE_INSERT:
case REORDER_BUFFER_CHANGE_UPDATE:
case REORDER_BUFFER_CHANGE_DELETE:
+ case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
if (change->data.tp.newtuple)
{
ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
@@ -420,8 +421,9 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
change->data.snapshot = NULL;
}
break;
+ /* no data in addition to the struct itself */
+ case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
- break;
case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
break;
}
@@ -1317,6 +1319,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
PG_TRY();
{
ReorderBufferChange *change;
+ ReorderBufferChange *specinsert = NULL;
if (using_subtxn)
BeginInternalSubTransaction("replay");
@@ -1333,6 +1336,17 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
switch (change->action)
{
+ case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
+ /*
+ * Confirmation for speculative insertion arrived. Simply
+ * use as a normal record. It'll be cleaned up at the end
+ * of INSERT processing.
+ */
+ Assert(specinsert->data.tp.oldtuple == NULL);
+ change = specinsert;
+ change->action = REORDER_BUFFER_CHANGE_INSERT;
+
+ /* intentionally fall through */
case REORDER_BUFFER_CHANGE_INSERT:
case REORDER_BUFFER_CHANGE_UPDATE:
case REORDER_BUFFER_CHANGE_DELETE:
@@ -1348,7 +1362,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
if (reloid == InvalidOid &&
change->data.tp.newtuple == NULL &&
change->data.tp.oldtuple == NULL)
- continue;
+ goto change_done;
else if (reloid == InvalidOid)
elog(ERROR, "could not map filenode \"%s\" to relation OID",
relpathperm(change->data.tp.relnode,
@@ -1362,50 +1376,92 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
relpathperm(change->data.tp.relnode,
MAIN_FORKNUM));
- if (RelationIsLogicallyLogged(relation))
+ if (!RelationIsLogicallyLogged(relation))
+ goto change_done;
+
+ /*
+ * For now ignore sequence changes entirely. Most of
+ * the time they don't log changes using records we
+ * understand, so it doesn't make sense to handle the
+ * few cases we do.
+ */
+ if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
+ goto change_done;
+
+ /* user-triggered change */
+ if (!IsToastRelation(relation))
{
+ ReorderBufferToastReplace(rb, txn, relation, change);
+ rb->apply_change(rb, txn, relation, change);
+
/*
- * For now ignore sequence changes entirely. Most of
- * the time they don't log changes using records we
- * understand, so it doesn't make sense to handle the
- * few cases we do.
+ * Only clear reassembled toast chunks if we're
+ * sure they're not required anymore. The creator
+ * of the tuple tells us.
*/
- if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
- {
- }
- /* user-triggered change */
- else if (!IsToastRelation(relation))
- {
- ReorderBufferToastReplace(rb, txn, relation, change);
- rb->apply_change(rb, txn, relation, change);
-
- /*
- * Only clear reassembled toast chunks if we're
- * sure they're not required anymore. The creator
- * of the tuple tells us.
- */
- if (change->data.tp.clear_toast_afterwards)
- ReorderBufferToastReset(rb, txn);
- }
- /* we're not interested in toast deletions */
- else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
- {
- /*
- * Need to reassemble the full toasted Datum in
- * memory, to ensure the chunks don't get reused
- * till we're done remove it from the list of this
- * transaction's changes. Otherwise it will get
- * freed/reused while restoring spooled data from
- * disk.
- */
- dlist_delete(&change->node);
- ReorderBufferToastAppendChunk(rb, txn, relation,
- change);
- }
+ if (change->data.tp.clear_toast_afterwards)
+ ReorderBufferToastReset(rb, txn);
+ }
+ /* we're not interested in toast deletions */
+ else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
+ {
+ /*
+ * Need to reassemble the full toasted Datum in
+ * memory, to ensure the chunks don't get reused till
+ * we're done remove it from the list of this
+ * transaction's changes. Otherwise it will get
+ * freed/reused while restoring spooled data from
+ * disk.
+ */
+ dlist_delete(&change->node);
+ ReorderBufferToastAppendChunk(rb, txn, relation,
+ change);
+ }
+
+ change_done:
+ /*
+ * Either speculative insertion was confirmed, or it was
+ * unsuccessful and the record isn't needed anymore.
+ */
+ if (specinsert != NULL)
+ {
+ ReorderBufferReturnChange(rb, specinsert);
+ specinsert = NULL;
+ }
+ if (relation != NULL)
+ {
+ RelationClose(relation);
+ relation = NULL;
}
- RelationClose(relation);
break;
+
+ case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
+ /*
+ * Speculative insertions are dealt with by delaying the
+ * processing of the insert until the confirmation record
+ * arrives. For that we simply unlink the record from the
+ * chain, so it does not get freed/reused while restoring
+ * spooled data from disk.
+ *
+ * This is safe in the face of concurrent catalog changes
+ * because the relevant relation can't be changed between
+ * speculative insertion and confirmation due to
+ * CheckTableNotInUse() and locking.
+ */
+
+ /* clear out a pending (and thus failed) speculation */
+ if (specinsert != NULL)
+ {
+ ReorderBufferReturnChange(rb, specinsert);
+ specinsert = NULL;
+ }
+
+ /* and memorize the pending insertion */
+ dlist_delete(&change->node);
+ specinsert = change;
+ break;
+
case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
/* get rid of the old */
TeardownHistoricSnapshot(false);
@@ -1474,6 +1530,17 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
}
}
+ /*
+ * There's a a speculative insertion remaining, just clean in up, it
+ * can't have been successful, otherwise we'd gotten a confirmation
+ * record.
+ */
+ if (specinsert)
+ {
+ ReorderBufferReturnChange(rb, specinsert);
+ specinsert = NULL;
+ }
+
/* clean up the iterator */
ReorderBufferIterTXNFinish(rb, iterstate);
iterstate = NULL;
@@ -2001,11 +2068,11 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
switch (change->action)
{
+ /* fall through these, they're all similar enough */
case REORDER_BUFFER_CHANGE_INSERT:
- /* fall through */
case REORDER_BUFFER_CHANGE_UPDATE:
- /* fall through */
case REORDER_BUFFER_CHANGE_DELETE:
+ case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
{
char *data;
ReorderBufferTupleBuf *oldtup,
@@ -2083,9 +2150,8 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
}
break;
}
+ case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
- /* ReorderBufferChange contains everything important */
- break;
case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
/* ReorderBufferChange contains everything important */
break;
@@ -2256,11 +2322,11 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
/* restore individual stuff */
switch (change->action)
{
+ /* fall through these, they're all similar enough */
case REORDER_BUFFER_CHANGE_INSERT:
- /* fall through */
case REORDER_BUFFER_CHANGE_UPDATE:
- /* fall through */
case REORDER_BUFFER_CHANGE_DELETE:
+ case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
if (change->data.tp.newtuple)
{
Size len = offsetof(ReorderBufferTupleBuf, t_data) +
@@ -2309,6 +2375,7 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
break;
}
/* the base struct contains all the data, easy peasy */
+ case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
break;