summaryrefslogtreecommitdiff
path: root/src/backend/access/heap/heapam.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/heap/heapam.c')
-rw-r--r--src/backend/access/heap/heapam.c377
1 files changed, 338 insertions, 39 deletions
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index e84c1743f4..7ea9a77e7e 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -2061,8 +2061,17 @@ FreeBulkInsertState(BulkInsertState bistate)
* This causes rows to be frozen, which is an MVCC violation and
* requires explicit options chosen by user.
*
+ * HEAP_INSERT_IS_SPECULATIVE is used on so-called "speculative insertions",
+ * which can be backed out afterwards without aborting the whole transaction.
+ * Other sessions can wait for the speculative insertion to be confirmed,
+ * turning it into a regular tuple, or aborted, as if it never existed.
+ * Speculatively inserted tuples behave as "value locks" of short duration,
+ * used to implement INSERT .. ON CONFLICT.
+ *
* Note that these options will be applied when inserting into the heap's
* TOAST table, too, if the tuple requires any out-of-line data.
+ * FIXME: Do we mark TOAST tuples as speculative too? What about confirming
+ * or aborting them?
*
* The BulkInsertState object (if any; bistate can be NULL for default
* behavior) is also just passed through to RelationGetBufferForTuple.
@@ -2115,7 +2124,8 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
/* NO EREPORT(ERROR) from here till changes are logged */
START_CRIT_SECTION();
- RelationPutHeapTuple(relation, buffer, heaptup);
+ RelationPutHeapTuple(relation, buffer, heaptup,
+ (options & HEAP_INSERT_SPECULATIVE) != 0);
if (PageIsAllVisible(BufferGetPage(buffer)))
{
@@ -2169,7 +2179,11 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
}
xlrec.offnum = ItemPointerGetOffsetNumber(&heaptup->t_self);
- xlrec.flags = all_visible_cleared ? XLOG_HEAP_ALL_VISIBLE_CLEARED : 0;
+ xlrec.flags = 0;
+ if (all_visible_cleared)
+ xlrec.flags |= XLH_INSERT_ALL_VISIBLE_CLEARED;
+ if (options & HEAP_INSERT_SPECULATIVE)
+ xlrec.flags |= XLH_INSERT_IS_SPECULATIVE;
Assert(ItemPointerGetBlockNumber(&heaptup->t_self) == BufferGetBlockNumber(buffer));
/*
@@ -2179,7 +2193,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
*/
if (RelationIsLogicallyLogged(relation))
{
- xlrec.flags |= XLOG_HEAP_CONTAINS_NEW_TUPLE;
+ xlrec.flags |= XLH_INSERT_CONTAINS_NEW_TUPLE;
bufflags |= REGBUF_KEEP_DATA;
}
@@ -2224,6 +2238,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
*/
CacheInvalidateHeapTuple(relation, heaptup, NULL);
+ /* Note: speculative insertions are counted too, even if aborted later */
pgstat_count_heap_insert(relation, 1);
/*
@@ -2395,7 +2410,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
* RelationGetBufferForTuple has ensured that the first tuple fits.
* Put that on the page, and then as many other tuples as fit.
*/
- RelationPutHeapTuple(relation, buffer, heaptuples[ndone]);
+ RelationPutHeapTuple(relation, buffer, heaptuples[ndone], false);
for (nthispage = 1; ndone + nthispage < ntuples; nthispage++)
{
HeapTuple heaptup = heaptuples[ndone + nthispage];
@@ -2403,7 +2418,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
if (PageGetHeapFreeSpace(page) < MAXALIGN(heaptup->t_len) + saveFreeSpace)
break;
- RelationPutHeapTuple(relation, buffer, heaptup);
+ RelationPutHeapTuple(relation, buffer, heaptup, false);
/*
* We don't use heap_multi_insert for catalog tuples yet, but
@@ -2463,7 +2478,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
/* the rest of the scratch space is used for tuple data */
tupledata = scratchptr;
- xlrec->flags = all_visible_cleared ? XLOG_HEAP_ALL_VISIBLE_CLEARED : 0;
+ xlrec->flags = all_visible_cleared ? XLH_INSERT_ALL_VISIBLE_CLEARED : 0;
xlrec->ntuples = nthispage;
/*
@@ -2498,7 +2513,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
Assert((scratchptr - scratch) < BLCKSZ);
if (need_tuple_data)
- xlrec->flags |= XLOG_HEAP_CONTAINS_NEW_TUPLE;
+ xlrec->flags |= XLH_INSERT_CONTAINS_NEW_TUPLE;
/*
* Signal that this is the last xl_heap_multi_insert record
@@ -2506,7 +2521,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
* decoding so it knows when to cleanup temporary data.
*/
if (ndone + nthispage == ntuples)
- xlrec->flags |= XLOG_HEAP_LAST_MULTI_INSERT;
+ xlrec->flags |= XLH_INSERT_LAST_IN_MULTI;
if (init)
{
@@ -2914,7 +2929,12 @@ l1:
MarkBufferDirty(buffer);
- /* XLOG stuff */
+ /*
+ * XLOG stuff
+ *
+ * NB: heap_abort_speculative() uses the same xlog record and replay
+ * routines.
+ */
if (RelationNeedsWAL(relation))
{
xl_heap_delete xlrec;
@@ -2924,7 +2944,7 @@ l1:
if (RelationIsAccessibleInLogicalDecoding(relation))
log_heap_new_cid(relation, &tp);
- xlrec.flags = all_visible_cleared ? XLOG_HEAP_ALL_VISIBLE_CLEARED : 0;
+ xlrec.flags = all_visible_cleared ? XLH_DELETE_ALL_VISIBLE_CLEARED : 0;
xlrec.infobits_set = compute_infobits(tp.t_data->t_infomask,
tp.t_data->t_infomask2);
xlrec.offnum = ItemPointerGetOffsetNumber(&tp.t_self);
@@ -2933,9 +2953,9 @@ l1:
if (old_key_tuple != NULL)
{
if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
- xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_TUPLE;
+ xlrec.flags |= XLH_DELETE_CONTAINS_OLD_TUPLE;
else
- xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_KEY;
+ xlrec.flags |= XLH_DELETE_CONTAINS_OLD_KEY;
}
XLogBeginInsert();
@@ -3742,7 +3762,7 @@ l2:
HeapTupleClearHeapOnly(newtup);
}
- RelationPutHeapTuple(relation, newbuf, heaptup); /* insert new tuple */
+ RelationPutHeapTuple(relation, newbuf, heaptup, false); /* insert new tuple */
if (!already_marked)
{
@@ -4133,14 +4153,16 @@ get_mxact_status_for_lock(LockTupleMode mode, bool is_update)
*
* Function result may be:
* HeapTupleMayBeUpdated: lock was successfully acquired
+ * HeapTupleInvisible: lock failed because tuple was never visible to us
* HeapTupleSelfUpdated: lock failed because tuple updated by self
* HeapTupleUpdated: lock failed because tuple updated by other xact
* HeapTupleWouldBlock: lock couldn't be acquired and wait_policy is skip
*
- * In the failure cases, the routine fills *hufd with the tuple's t_ctid,
- * t_xmax (resolving a possible MultiXact, if necessary), and t_cmax
- * (the last only for HeapTupleSelfUpdated, since we
- * cannot obtain cmax from a combocid generated by another transaction).
+ * In the failure cases other than HeapTupleInvisible, the routine fills
+ * *hufd with the tuple's t_ctid, t_xmax (resolving a possible MultiXact,
+ * if necessary), and t_cmax (the last only for HeapTupleSelfUpdated,
+ * since we cannot obtain cmax from a combocid generated by another
+ * transaction).
* See comments for struct HeapUpdateFailureData for additional info.
*
* See README.tuplock for a thorough explanation of this mechanism.
@@ -4179,8 +4201,15 @@ l3:
if (result == HeapTupleInvisible)
{
- UnlockReleaseBuffer(*buffer);
- elog(ERROR, "attempted to lock invisible tuple");
+ LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
+
+ /*
+ * This is possible, but only when locking a tuple for ON CONFLICT
+ * UPDATE. We return this value here rather than throwing an error in
+ * order to give that case the opportunity to throw a more specific
+ * error.
+ */
+ return HeapTupleInvisible;
}
else if (result == HeapTupleBeingUpdated)
{
@@ -5417,6 +5446,234 @@ heap_lock_updated_tuple(Relation rel, HeapTuple tuple, ItemPointer ctid,
return HeapTupleMayBeUpdated;
}
+/*
+ * heap_finish_speculative - mark speculative insertion as successful
+ *
+ * To successfully finish a speculative insertion we have to clear speculative
+ * token from tuple. To do so the t_ctid field, which will contain a
+ * speculative token value, is modified in place to point to the tuple itself,
+ * which is characteristic of a newly inserted ordinary tuple.
+ *
+ * NB: It is not ok to commit without either finishing or aborting a
+ * speculative insertion. We could treat speculative tuples of committed
+ * transactions implicitly as completed, but then we would have to be prepared
+ * to deal with speculative tokens on committed tuples. That wouldn't be
+ * difficult - no-one looks at the ctid field of a tuple with invalid xmax -
+ * but clearing the token at completion isn't very expensive either.
+ * An explicit confirmation WAL record also makes logical decoding simpler.
+ */
+void
+heap_finish_speculative(Relation relation, HeapTuple tuple)
+{
+ Buffer buffer;
+ Page page;
+ OffsetNumber offnum;
+ ItemId lp = NULL;
+ HeapTupleHeader htup;
+
+ buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(&(tuple->t_self)));
+ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+ page = (Page) BufferGetPage(buffer);
+
+ offnum = ItemPointerGetOffsetNumber(&(tuple->t_self));
+ if (PageGetMaxOffsetNumber(page) >= offnum)
+ lp = PageGetItemId(page, offnum);
+
+ if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
+ elog(ERROR, "heap_confirm_insert: invalid lp");
+
+ htup = (HeapTupleHeader) PageGetItem(page, lp);
+
+ /* SpecTokenOffsetNumber should be distinguishable from any real offset */
+ StaticAssertStmt(MaxOffsetNumber < SpecTokenOffsetNumber,
+ "invalid speculative token constant");
+
+ /* NO EREPORT(ERROR) from here till changes are logged */
+ START_CRIT_SECTION();
+
+ Assert(HeapTupleHeaderIsSpeculative(tuple->t_data));
+
+ MarkBufferDirty(buffer);
+
+ /*
+ * Replace the speculative insertion token with a real t_ctid,
+ * pointing to itself like it does on regular tuples.
+ */
+ htup->t_ctid = tuple->t_self;
+
+ /* XLOG stuff */
+ if (RelationNeedsWAL(relation))
+ {
+ xl_heap_confirm xlrec;
+ XLogRecPtr recptr;
+
+ xlrec.offnum = ItemPointerGetOffsetNumber(&tuple->t_self);
+
+ XLogBeginInsert();
+
+ /* We want the same filtering on this as on a plain insert */
+ XLogIncludeOrigin();
+
+ XLogRegisterData((char *) &xlrec, SizeOfHeapConfirm);
+ XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
+
+ recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_CONFIRM);
+
+ PageSetLSN(page, recptr);
+ }
+
+ END_CRIT_SECTION();
+
+ UnlockReleaseBuffer(buffer);
+}
+
+/*
+ * heap_abort_speculative - kill a speculatively inserted tuple
+ *
+ * Marks a tuple that was speculatively inserted in the same command as dead,
+ * by setting its xmin as invalid. That makes it immediately appear as dead
+ * to all transactions, including our own. In particular, it makes
+ * HeapTupleSatisfiesDirty() regard the tuple as dead, so that another backend
+ * inserting a duplicate key value won't unnecessarily wait for our whole
+ * transaction to finish (it'll just wait for our speculative insertion to
+ * finish).
+ *
+ * Killing the tuple prevents "unprincipled deadlocks", which are deadlocks
+ * that arise due to a mutual dependency that is not user visible. By
+ * definition, unprincipled deadlocks cannot be prevented by the user
+ * reordering lock acquisition in client code, because the implementation level
+ * lock acquisitions are not under the user's direct control. If speculative
+ * inserters did not take this precaution, then under high concurrency they
+ * could deadlock with each other, which would not be acceptable.
+ *
+ * This is somewhat redundant with heap_delete, but we prefer to have a
+ * dedicated routine with stripped down requirements.
+ *
+ * This routine does not affect logical decoding as it only looks at
+ * confirmation records.
+ */
+void
+heap_abort_speculative(Relation relation, HeapTuple tuple)
+{
+ TransactionId xid = GetCurrentTransactionId();
+ ItemPointer tid = &(tuple->t_self);
+ ItemId lp;
+ HeapTupleData tp;
+ Page page;
+ BlockNumber block;
+ Buffer buffer;
+
+ Assert(ItemPointerIsValid(tid));
+
+ block = ItemPointerGetBlockNumber(tid);
+ buffer = ReadBuffer(relation, block);
+ page = BufferGetPage(buffer);
+
+ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+
+ /*
+ * Page can't be all visible, we just inserted into it, and are still
+ * running.
+ */
+ Assert(!PageIsAllVisible(page));
+
+ lp = PageGetItemId(page, ItemPointerGetOffsetNumber(tid));
+ Assert(ItemIdIsNormal(lp));
+
+ tp.t_tableOid = RelationGetRelid(relation);
+ tp.t_data = (HeapTupleHeader) PageGetItem(page, lp);
+ tp.t_len = ItemIdGetLength(lp);
+ tp.t_self = *tid;
+
+ /*
+ * Sanity check that the tuple really is a speculatively inserted tuple,
+ * inserted by us.
+ */
+ if (tp.t_data->t_choice.t_heap.t_xmin != xid)
+ elog(ERROR, "attempted to kill a tuple inserted by another transaction");
+ if (!HeapTupleHeaderIsSpeculative(tp.t_data))
+ elog(ERROR, "attempted to kill a non-speculative tuple");
+ Assert(!HeapTupleHeaderIsHeapOnly(tp.t_data));
+
+ /*
+ * No need to check for serializable conflicts here. There is never a
+ * need for a combocid, either. No need to extract replica identity, or
+ * do anything special with infomask bits.
+ */
+
+ START_CRIT_SECTION();
+
+ /*
+ * The tuple will become DEAD immediately. Flag that this page
+ * immediately is a candidate for pruning by setting xmin to
+ * RecentGlobalXmin. That's not pretty, but it doesn't seem worth
+ * inventing a nicer API for this.
+ */
+ Assert(TransactionIdIsValid(RecentGlobalXmin));
+ PageSetPrunable(page, RecentGlobalXmin);
+
+ /* store transaction information of xact deleting the tuple */
+ tp.t_data->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
+ tp.t_data->t_infomask2 &= ~HEAP_KEYS_UPDATED;
+
+ /*
+ * Set the tuple header xmin to InvalidTransactionId. This makes the
+ * tuple immediately invisible everyone. (In particular, to any
+ * transactions waiting on the speculative token, woken up later.)
+ */
+ HeapTupleHeaderSetXmin(tp.t_data, InvalidTransactionId);
+
+ /* Clear the speculative insertion token too */
+ tp.t_data->t_ctid = tp.t_self;
+
+ MarkBufferDirty(buffer);
+
+ /*
+ * XLOG stuff
+ *
+ * The WAL records generated here match heap_delete(). The same recovery
+ * routines are used.
+ */
+ if (RelationNeedsWAL(relation))
+ {
+ xl_heap_delete xlrec;
+ XLogRecPtr recptr;
+
+ xlrec.flags = XLH_DELETE_IS_SUPER;
+ xlrec.infobits_set = compute_infobits(tp.t_data->t_infomask,
+ tp.t_data->t_infomask2);
+ xlrec.offnum = ItemPointerGetOffsetNumber(&tp.t_self);
+ xlrec.xmax = xid;
+
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, SizeOfHeapDelete);
+ XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
+
+ /* No replica identity & replication origin logged */
+
+ recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_DELETE);
+
+ PageSetLSN(page, recptr);
+ }
+
+ END_CRIT_SECTION();
+
+ LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+
+ if (HeapTupleHasExternal(&tp))
+ toast_delete(relation, &tp);
+
+ /*
+ * Never need to mark tuple for invalidation, since catalogs don't support
+ * speculative insertion
+ */
+
+ /* Now we can release the buffer */
+ ReleaseBuffer(buffer);
+
+ /* count deletion, as we counted the insertion too */
+ pgstat_count_heap_delete(relation);
+}
/*
* heap_inplace_update - update a tuple "in place" (ie, overwrite it)
@@ -6732,22 +6989,22 @@ log_heap_update(Relation reln, Buffer oldbuf,
/* Prepare main WAL data chain */
xlrec.flags = 0;
if (all_visible_cleared)
- xlrec.flags |= XLOG_HEAP_ALL_VISIBLE_CLEARED;
+ xlrec.flags |= XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED;
if (new_all_visible_cleared)
- xlrec.flags |= XLOG_HEAP_NEW_ALL_VISIBLE_CLEARED;
+ xlrec.flags |= XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED;
if (prefixlen > 0)
- xlrec.flags |= XLOG_HEAP_PREFIX_FROM_OLD;
+ xlrec.flags |= XLH_UPDATE_PREFIX_FROM_OLD;
if (suffixlen > 0)
- xlrec.flags |= XLOG_HEAP_SUFFIX_FROM_OLD;
+ xlrec.flags |= XLH_UPDATE_SUFFIX_FROM_OLD;
if (need_tuple_data)
{
- xlrec.flags |= XLOG_HEAP_CONTAINS_NEW_TUPLE;
+ xlrec.flags |= XLH_UPDATE_CONTAINS_NEW_TUPLE;
if (old_key_tuple)
{
if (reln->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
- xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_TUPLE;
+ xlrec.flags |= XLH_UPDATE_CONTAINS_OLD_TUPLE;
else
- xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_KEY;
+ xlrec.flags |= XLH_UPDATE_CONTAINS_OLD_KEY;
}
}
@@ -7378,7 +7635,7 @@ heap_xlog_delete(XLogReaderState *record)
* The visibility map may need to be fixed even if the heap page is
* already up-to-date.
*/
- if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED)
+ if (xlrec->flags & XLH_DELETE_ALL_VISIBLE_CLEARED)
{
Relation reln = CreateFakeRelcacheEntry(target_node);
Buffer vmbuffer = InvalidBuffer;
@@ -7406,13 +7663,16 @@ heap_xlog_delete(XLogReaderState *record)
HeapTupleHeaderClearHotUpdated(htup);
fix_infomask_from_infobits(xlrec->infobits_set,
&htup->t_infomask, &htup->t_infomask2);
- HeapTupleHeaderSetXmax(htup, xlrec->xmax);
+ if (!(xlrec->flags & XLH_DELETE_IS_SUPER))
+ HeapTupleHeaderSetXmax(htup, xlrec->xmax);
+ else
+ HeapTupleHeaderSetXmin(htup, InvalidTransactionId);
HeapTupleHeaderSetCmax(htup, FirstCommandId, false);
/* Mark the page as a candidate for pruning */
PageSetPrunable(page, XLogRecGetXid(record));
- if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED)
+ if (xlrec->flags & XLH_DELETE_ALL_VISIBLE_CLEARED)
PageClearAllVisible(page);
/* Make sure there is no forward chain link in t_ctid */
@@ -7453,7 +7713,7 @@ heap_xlog_insert(XLogReaderState *record)
* The visibility map may need to be fixed even if the heap page is
* already up-to-date.
*/
- if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED)
+ if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED)
{
Relation reln = CreateFakeRelcacheEntry(target_node);
Buffer vmbuffer = InvalidBuffer;
@@ -7516,7 +7776,7 @@ heap_xlog_insert(XLogReaderState *record)
PageSetLSN(page, lsn);
- if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED)
+ if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED)
PageClearAllVisible(page);
MarkBufferDirty(buffer);
@@ -7573,7 +7833,7 @@ heap_xlog_multi_insert(XLogReaderState *record)
* The visibility map may need to be fixed even if the heap page is
* already up-to-date.
*/
- if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED)
+ if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED)
{
Relation reln = CreateFakeRelcacheEntry(rnode);
Buffer vmbuffer = InvalidBuffer;
@@ -7655,7 +7915,7 @@ heap_xlog_multi_insert(XLogReaderState *record)
PageSetLSN(page, lsn);
- if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED)
+ if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED)
PageClearAllVisible(page);
MarkBufferDirty(buffer);
@@ -7728,7 +7988,7 @@ heap_xlog_update(XLogReaderState *record, bool hot_update)
* The visibility map may need to be fixed even if the heap page is
* already up-to-date.
*/
- if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED)
+ if (xlrec->flags & XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED)
{
Relation reln = CreateFakeRelcacheEntry(rnode);
Buffer vmbuffer = InvalidBuffer;
@@ -7783,7 +8043,7 @@ heap_xlog_update(XLogReaderState *record, bool hot_update)
/* Mark the page as a candidate for pruning */
PageSetPrunable(page, XLogRecGetXid(record));
- if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED)
+ if (xlrec->flags & XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED)
PageClearAllVisible(page);
PageSetLSN(page, lsn);
@@ -7812,7 +8072,7 @@ heap_xlog_update(XLogReaderState *record, bool hot_update)
* The visibility map may need to be fixed even if the heap page is
* already up-to-date.
*/
- if (xlrec->flags & XLOG_HEAP_NEW_ALL_VISIBLE_CLEARED)
+ if (xlrec->flags & XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED)
{
Relation reln = CreateFakeRelcacheEntry(rnode);
Buffer vmbuffer = InvalidBuffer;
@@ -7840,13 +8100,13 @@ heap_xlog_update(XLogReaderState *record, bool hot_update)
if (PageGetMaxOffsetNumber(page) + 1 < offnum)
elog(PANIC, "heap_update_redo: invalid max offset number");
- if (xlrec->flags & XLOG_HEAP_PREFIX_FROM_OLD)
+ if (xlrec->flags & XLH_UPDATE_PREFIX_FROM_OLD)
{
Assert(newblk == oldblk);
memcpy(&prefixlen, recdata, sizeof(uint16));
recdata += sizeof(uint16);
}
- if (xlrec->flags & XLOG_HEAP_SUFFIX_FROM_OLD)
+ if (xlrec->flags & XLH_UPDATE_SUFFIX_FROM_OLD)
{
Assert(newblk == oldblk);
memcpy(&suffixlen, recdata, sizeof(uint16));
@@ -7918,7 +8178,7 @@ heap_xlog_update(XLogReaderState *record, bool hot_update)
if (offnum == InvalidOffsetNumber)
elog(PANIC, "heap_update_redo: failed to add tuple");
- if (xlrec->flags & XLOG_HEAP_NEW_ALL_VISIBLE_CLEARED)
+ if (xlrec->flags & XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED)
PageClearAllVisible(page);
freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */
@@ -7952,6 +8212,42 @@ heap_xlog_update(XLogReaderState *record, bool hot_update)
}
static void
+heap_xlog_confirm(XLogReaderState *record)
+{
+ XLogRecPtr lsn = record->EndRecPtr;
+ xl_heap_confirm *xlrec = (xl_heap_confirm *) XLogRecGetData(record);
+ Buffer buffer;
+ Page page;
+ OffsetNumber offnum;
+ ItemId lp = NULL;
+ HeapTupleHeader htup;
+
+ if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
+ {
+ page = BufferGetPage(buffer);
+
+ offnum = xlrec->offnum;
+ if (PageGetMaxOffsetNumber(page) >= offnum)
+ lp = PageGetItemId(page, offnum);
+
+ if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
+ elog(PANIC, "heap_confirm_redo: invalid lp");
+
+ htup = (HeapTupleHeader) PageGetItem(page, lp);
+
+ /*
+ * Confirm tuple as actually inserted
+ */
+ ItemPointerSet(&htup->t_ctid, BufferGetBlockNumber(buffer), offnum);
+
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(buffer);
+ }
+ if (BufferIsValid(buffer))
+ UnlockReleaseBuffer(buffer);
+}
+
+static void
heap_xlog_lock(XLogReaderState *record)
{
XLogRecPtr lsn = record->EndRecPtr;
@@ -8101,6 +8397,9 @@ heap_redo(XLogReaderState *record)
case XLOG_HEAP_HOT_UPDATE:
heap_xlog_update(record, true);
break;
+ case XLOG_HEAP_CONFIRM:
+ heap_xlog_confirm(record);
+ break;
case XLOG_HEAP_LOCK:
heap_xlog_lock(record);
break;