diff options
Diffstat (limited to 'src/backend/access/heap/heapam.c')
-rw-r--r-- | src/backend/access/heap/heapam.c | 377 |
1 files changed, 338 insertions, 39 deletions
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index e84c1743f4..7ea9a77e7e 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -2061,8 +2061,17 @@ FreeBulkInsertState(BulkInsertState bistate) * This causes rows to be frozen, which is an MVCC violation and * requires explicit options chosen by user. * + * HEAP_INSERT_IS_SPECULATIVE is used on so-called "speculative insertions", + * which can be backed out afterwards without aborting the whole transaction. + * Other sessions can wait for the speculative insertion to be confirmed, + * turning it into a regular tuple, or aborted, as if it never existed. + * Speculatively inserted tuples behave as "value locks" of short duration, + * used to implement INSERT .. ON CONFLICT. + * * Note that these options will be applied when inserting into the heap's * TOAST table, too, if the tuple requires any out-of-line data. + * FIXME: Do we mark TOAST tuples as speculative too? What about confirming + * or aborting them? * * The BulkInsertState object (if any; bistate can be NULL for default * behavior) is also just passed through to RelationGetBufferForTuple. @@ -2115,7 +2124,8 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, /* NO EREPORT(ERROR) from here till changes are logged */ START_CRIT_SECTION(); - RelationPutHeapTuple(relation, buffer, heaptup); + RelationPutHeapTuple(relation, buffer, heaptup, + (options & HEAP_INSERT_SPECULATIVE) != 0); if (PageIsAllVisible(BufferGetPage(buffer))) { @@ -2169,7 +2179,11 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, } xlrec.offnum = ItemPointerGetOffsetNumber(&heaptup->t_self); - xlrec.flags = all_visible_cleared ? XLOG_HEAP_ALL_VISIBLE_CLEARED : 0; + xlrec.flags = 0; + if (all_visible_cleared) + xlrec.flags |= XLH_INSERT_ALL_VISIBLE_CLEARED; + if (options & HEAP_INSERT_SPECULATIVE) + xlrec.flags |= XLH_INSERT_IS_SPECULATIVE; Assert(ItemPointerGetBlockNumber(&heaptup->t_self) == BufferGetBlockNumber(buffer)); /* @@ -2179,7 +2193,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, */ if (RelationIsLogicallyLogged(relation)) { - xlrec.flags |= XLOG_HEAP_CONTAINS_NEW_TUPLE; + xlrec.flags |= XLH_INSERT_CONTAINS_NEW_TUPLE; bufflags |= REGBUF_KEEP_DATA; } @@ -2224,6 +2238,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, */ CacheInvalidateHeapTuple(relation, heaptup, NULL); + /* Note: speculative insertions are counted too, even if aborted later */ pgstat_count_heap_insert(relation, 1); /* @@ -2395,7 +2410,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, * RelationGetBufferForTuple has ensured that the first tuple fits. * Put that on the page, and then as many other tuples as fit. */ - RelationPutHeapTuple(relation, buffer, heaptuples[ndone]); + RelationPutHeapTuple(relation, buffer, heaptuples[ndone], false); for (nthispage = 1; ndone + nthispage < ntuples; nthispage++) { HeapTuple heaptup = heaptuples[ndone + nthispage]; @@ -2403,7 +2418,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, if (PageGetHeapFreeSpace(page) < MAXALIGN(heaptup->t_len) + saveFreeSpace) break; - RelationPutHeapTuple(relation, buffer, heaptup); + RelationPutHeapTuple(relation, buffer, heaptup, false); /* * We don't use heap_multi_insert for catalog tuples yet, but @@ -2463,7 +2478,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, /* the rest of the scratch space is used for tuple data */ tupledata = scratchptr; - xlrec->flags = all_visible_cleared ? XLOG_HEAP_ALL_VISIBLE_CLEARED : 0; + xlrec->flags = all_visible_cleared ? XLH_INSERT_ALL_VISIBLE_CLEARED : 0; xlrec->ntuples = nthispage; /* @@ -2498,7 +2513,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, Assert((scratchptr - scratch) < BLCKSZ); if (need_tuple_data) - xlrec->flags |= XLOG_HEAP_CONTAINS_NEW_TUPLE; + xlrec->flags |= XLH_INSERT_CONTAINS_NEW_TUPLE; /* * Signal that this is the last xl_heap_multi_insert record @@ -2506,7 +2521,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, * decoding so it knows when to cleanup temporary data. */ if (ndone + nthispage == ntuples) - xlrec->flags |= XLOG_HEAP_LAST_MULTI_INSERT; + xlrec->flags |= XLH_INSERT_LAST_IN_MULTI; if (init) { @@ -2914,7 +2929,12 @@ l1: MarkBufferDirty(buffer); - /* XLOG stuff */ + /* + * XLOG stuff + * + * NB: heap_abort_speculative() uses the same xlog record and replay + * routines. + */ if (RelationNeedsWAL(relation)) { xl_heap_delete xlrec; @@ -2924,7 +2944,7 @@ l1: if (RelationIsAccessibleInLogicalDecoding(relation)) log_heap_new_cid(relation, &tp); - xlrec.flags = all_visible_cleared ? XLOG_HEAP_ALL_VISIBLE_CLEARED : 0; + xlrec.flags = all_visible_cleared ? XLH_DELETE_ALL_VISIBLE_CLEARED : 0; xlrec.infobits_set = compute_infobits(tp.t_data->t_infomask, tp.t_data->t_infomask2); xlrec.offnum = ItemPointerGetOffsetNumber(&tp.t_self); @@ -2933,9 +2953,9 @@ l1: if (old_key_tuple != NULL) { if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL) - xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_TUPLE; + xlrec.flags |= XLH_DELETE_CONTAINS_OLD_TUPLE; else - xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_KEY; + xlrec.flags |= XLH_DELETE_CONTAINS_OLD_KEY; } XLogBeginInsert(); @@ -3742,7 +3762,7 @@ l2: HeapTupleClearHeapOnly(newtup); } - RelationPutHeapTuple(relation, newbuf, heaptup); /* insert new tuple */ + RelationPutHeapTuple(relation, newbuf, heaptup, false); /* insert new tuple */ if (!already_marked) { @@ -4133,14 +4153,16 @@ get_mxact_status_for_lock(LockTupleMode mode, bool is_update) * * Function result may be: * HeapTupleMayBeUpdated: lock was successfully acquired + * HeapTupleInvisible: lock failed because tuple was never visible to us * HeapTupleSelfUpdated: lock failed because tuple updated by self * HeapTupleUpdated: lock failed because tuple updated by other xact * HeapTupleWouldBlock: lock couldn't be acquired and wait_policy is skip * - * In the failure cases, the routine fills *hufd with the tuple's t_ctid, - * t_xmax (resolving a possible MultiXact, if necessary), and t_cmax - * (the last only for HeapTupleSelfUpdated, since we - * cannot obtain cmax from a combocid generated by another transaction). + * In the failure cases other than HeapTupleInvisible, the routine fills + * *hufd with the tuple's t_ctid, t_xmax (resolving a possible MultiXact, + * if necessary), and t_cmax (the last only for HeapTupleSelfUpdated, + * since we cannot obtain cmax from a combocid generated by another + * transaction). * See comments for struct HeapUpdateFailureData for additional info. * * See README.tuplock for a thorough explanation of this mechanism. @@ -4179,8 +4201,15 @@ l3: if (result == HeapTupleInvisible) { - UnlockReleaseBuffer(*buffer); - elog(ERROR, "attempted to lock invisible tuple"); + LockBuffer(*buffer, BUFFER_LOCK_UNLOCK); + + /* + * This is possible, but only when locking a tuple for ON CONFLICT + * UPDATE. We return this value here rather than throwing an error in + * order to give that case the opportunity to throw a more specific + * error. + */ + return HeapTupleInvisible; } else if (result == HeapTupleBeingUpdated) { @@ -5417,6 +5446,234 @@ heap_lock_updated_tuple(Relation rel, HeapTuple tuple, ItemPointer ctid, return HeapTupleMayBeUpdated; } +/* + * heap_finish_speculative - mark speculative insertion as successful + * + * To successfully finish a speculative insertion we have to clear speculative + * token from tuple. To do so the t_ctid field, which will contain a + * speculative token value, is modified in place to point to the tuple itself, + * which is characteristic of a newly inserted ordinary tuple. + * + * NB: It is not ok to commit without either finishing or aborting a + * speculative insertion. We could treat speculative tuples of committed + * transactions implicitly as completed, but then we would have to be prepared + * to deal with speculative tokens on committed tuples. That wouldn't be + * difficult - no-one looks at the ctid field of a tuple with invalid xmax - + * but clearing the token at completion isn't very expensive either. + * An explicit confirmation WAL record also makes logical decoding simpler. + */ +void +heap_finish_speculative(Relation relation, HeapTuple tuple) +{ + Buffer buffer; + Page page; + OffsetNumber offnum; + ItemId lp = NULL; + HeapTupleHeader htup; + + buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(&(tuple->t_self))); + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); + page = (Page) BufferGetPage(buffer); + + offnum = ItemPointerGetOffsetNumber(&(tuple->t_self)); + if (PageGetMaxOffsetNumber(page) >= offnum) + lp = PageGetItemId(page, offnum); + + if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp)) + elog(ERROR, "heap_confirm_insert: invalid lp"); + + htup = (HeapTupleHeader) PageGetItem(page, lp); + + /* SpecTokenOffsetNumber should be distinguishable from any real offset */ + StaticAssertStmt(MaxOffsetNumber < SpecTokenOffsetNumber, + "invalid speculative token constant"); + + /* NO EREPORT(ERROR) from here till changes are logged */ + START_CRIT_SECTION(); + + Assert(HeapTupleHeaderIsSpeculative(tuple->t_data)); + + MarkBufferDirty(buffer); + + /* + * Replace the speculative insertion token with a real t_ctid, + * pointing to itself like it does on regular tuples. + */ + htup->t_ctid = tuple->t_self; + + /* XLOG stuff */ + if (RelationNeedsWAL(relation)) + { + xl_heap_confirm xlrec; + XLogRecPtr recptr; + + xlrec.offnum = ItemPointerGetOffsetNumber(&tuple->t_self); + + XLogBeginInsert(); + + /* We want the same filtering on this as on a plain insert */ + XLogIncludeOrigin(); + + XLogRegisterData((char *) &xlrec, SizeOfHeapConfirm); + XLogRegisterBuffer(0, buffer, REGBUF_STANDARD); + + recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_CONFIRM); + + PageSetLSN(page, recptr); + } + + END_CRIT_SECTION(); + + UnlockReleaseBuffer(buffer); +} + +/* + * heap_abort_speculative - kill a speculatively inserted tuple + * + * Marks a tuple that was speculatively inserted in the same command as dead, + * by setting its xmin as invalid. That makes it immediately appear as dead + * to all transactions, including our own. In particular, it makes + * HeapTupleSatisfiesDirty() regard the tuple as dead, so that another backend + * inserting a duplicate key value won't unnecessarily wait for our whole + * transaction to finish (it'll just wait for our speculative insertion to + * finish). + * + * Killing the tuple prevents "unprincipled deadlocks", which are deadlocks + * that arise due to a mutual dependency that is not user visible. By + * definition, unprincipled deadlocks cannot be prevented by the user + * reordering lock acquisition in client code, because the implementation level + * lock acquisitions are not under the user's direct control. If speculative + * inserters did not take this precaution, then under high concurrency they + * could deadlock with each other, which would not be acceptable. + * + * This is somewhat redundant with heap_delete, but we prefer to have a + * dedicated routine with stripped down requirements. + * + * This routine does not affect logical decoding as it only looks at + * confirmation records. + */ +void +heap_abort_speculative(Relation relation, HeapTuple tuple) +{ + TransactionId xid = GetCurrentTransactionId(); + ItemPointer tid = &(tuple->t_self); + ItemId lp; + HeapTupleData tp; + Page page; + BlockNumber block; + Buffer buffer; + + Assert(ItemPointerIsValid(tid)); + + block = ItemPointerGetBlockNumber(tid); + buffer = ReadBuffer(relation, block); + page = BufferGetPage(buffer); + + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); + + /* + * Page can't be all visible, we just inserted into it, and are still + * running. + */ + Assert(!PageIsAllVisible(page)); + + lp = PageGetItemId(page, ItemPointerGetOffsetNumber(tid)); + Assert(ItemIdIsNormal(lp)); + + tp.t_tableOid = RelationGetRelid(relation); + tp.t_data = (HeapTupleHeader) PageGetItem(page, lp); + tp.t_len = ItemIdGetLength(lp); + tp.t_self = *tid; + + /* + * Sanity check that the tuple really is a speculatively inserted tuple, + * inserted by us. + */ + if (tp.t_data->t_choice.t_heap.t_xmin != xid) + elog(ERROR, "attempted to kill a tuple inserted by another transaction"); + if (!HeapTupleHeaderIsSpeculative(tp.t_data)) + elog(ERROR, "attempted to kill a non-speculative tuple"); + Assert(!HeapTupleHeaderIsHeapOnly(tp.t_data)); + + /* + * No need to check for serializable conflicts here. There is never a + * need for a combocid, either. No need to extract replica identity, or + * do anything special with infomask bits. + */ + + START_CRIT_SECTION(); + + /* + * The tuple will become DEAD immediately. Flag that this page + * immediately is a candidate for pruning by setting xmin to + * RecentGlobalXmin. That's not pretty, but it doesn't seem worth + * inventing a nicer API for this. + */ + Assert(TransactionIdIsValid(RecentGlobalXmin)); + PageSetPrunable(page, RecentGlobalXmin); + + /* store transaction information of xact deleting the tuple */ + tp.t_data->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED); + tp.t_data->t_infomask2 &= ~HEAP_KEYS_UPDATED; + + /* + * Set the tuple header xmin to InvalidTransactionId. This makes the + * tuple immediately invisible everyone. (In particular, to any + * transactions waiting on the speculative token, woken up later.) + */ + HeapTupleHeaderSetXmin(tp.t_data, InvalidTransactionId); + + /* Clear the speculative insertion token too */ + tp.t_data->t_ctid = tp.t_self; + + MarkBufferDirty(buffer); + + /* + * XLOG stuff + * + * The WAL records generated here match heap_delete(). The same recovery + * routines are used. + */ + if (RelationNeedsWAL(relation)) + { + xl_heap_delete xlrec; + XLogRecPtr recptr; + + xlrec.flags = XLH_DELETE_IS_SUPER; + xlrec.infobits_set = compute_infobits(tp.t_data->t_infomask, + tp.t_data->t_infomask2); + xlrec.offnum = ItemPointerGetOffsetNumber(&tp.t_self); + xlrec.xmax = xid; + + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, SizeOfHeapDelete); + XLogRegisterBuffer(0, buffer, REGBUF_STANDARD); + + /* No replica identity & replication origin logged */ + + recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_DELETE); + + PageSetLSN(page, recptr); + } + + END_CRIT_SECTION(); + + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + + if (HeapTupleHasExternal(&tp)) + toast_delete(relation, &tp); + + /* + * Never need to mark tuple for invalidation, since catalogs don't support + * speculative insertion + */ + + /* Now we can release the buffer */ + ReleaseBuffer(buffer); + + /* count deletion, as we counted the insertion too */ + pgstat_count_heap_delete(relation); +} /* * heap_inplace_update - update a tuple "in place" (ie, overwrite it) @@ -6732,22 +6989,22 @@ log_heap_update(Relation reln, Buffer oldbuf, /* Prepare main WAL data chain */ xlrec.flags = 0; if (all_visible_cleared) - xlrec.flags |= XLOG_HEAP_ALL_VISIBLE_CLEARED; + xlrec.flags |= XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED; if (new_all_visible_cleared) - xlrec.flags |= XLOG_HEAP_NEW_ALL_VISIBLE_CLEARED; + xlrec.flags |= XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED; if (prefixlen > 0) - xlrec.flags |= XLOG_HEAP_PREFIX_FROM_OLD; + xlrec.flags |= XLH_UPDATE_PREFIX_FROM_OLD; if (suffixlen > 0) - xlrec.flags |= XLOG_HEAP_SUFFIX_FROM_OLD; + xlrec.flags |= XLH_UPDATE_SUFFIX_FROM_OLD; if (need_tuple_data) { - xlrec.flags |= XLOG_HEAP_CONTAINS_NEW_TUPLE; + xlrec.flags |= XLH_UPDATE_CONTAINS_NEW_TUPLE; if (old_key_tuple) { if (reln->rd_rel->relreplident == REPLICA_IDENTITY_FULL) - xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_TUPLE; + xlrec.flags |= XLH_UPDATE_CONTAINS_OLD_TUPLE; else - xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_KEY; + xlrec.flags |= XLH_UPDATE_CONTAINS_OLD_KEY; } } @@ -7378,7 +7635,7 @@ heap_xlog_delete(XLogReaderState *record) * The visibility map may need to be fixed even if the heap page is * already up-to-date. */ - if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) + if (xlrec->flags & XLH_DELETE_ALL_VISIBLE_CLEARED) { Relation reln = CreateFakeRelcacheEntry(target_node); Buffer vmbuffer = InvalidBuffer; @@ -7406,13 +7663,16 @@ heap_xlog_delete(XLogReaderState *record) HeapTupleHeaderClearHotUpdated(htup); fix_infomask_from_infobits(xlrec->infobits_set, &htup->t_infomask, &htup->t_infomask2); - HeapTupleHeaderSetXmax(htup, xlrec->xmax); + if (!(xlrec->flags & XLH_DELETE_IS_SUPER)) + HeapTupleHeaderSetXmax(htup, xlrec->xmax); + else + HeapTupleHeaderSetXmin(htup, InvalidTransactionId); HeapTupleHeaderSetCmax(htup, FirstCommandId, false); /* Mark the page as a candidate for pruning */ PageSetPrunable(page, XLogRecGetXid(record)); - if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) + if (xlrec->flags & XLH_DELETE_ALL_VISIBLE_CLEARED) PageClearAllVisible(page); /* Make sure there is no forward chain link in t_ctid */ @@ -7453,7 +7713,7 @@ heap_xlog_insert(XLogReaderState *record) * The visibility map may need to be fixed even if the heap page is * already up-to-date. */ - if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) + if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED) { Relation reln = CreateFakeRelcacheEntry(target_node); Buffer vmbuffer = InvalidBuffer; @@ -7516,7 +7776,7 @@ heap_xlog_insert(XLogReaderState *record) PageSetLSN(page, lsn); - if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) + if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED) PageClearAllVisible(page); MarkBufferDirty(buffer); @@ -7573,7 +7833,7 @@ heap_xlog_multi_insert(XLogReaderState *record) * The visibility map may need to be fixed even if the heap page is * already up-to-date. */ - if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) + if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED) { Relation reln = CreateFakeRelcacheEntry(rnode); Buffer vmbuffer = InvalidBuffer; @@ -7655,7 +7915,7 @@ heap_xlog_multi_insert(XLogReaderState *record) PageSetLSN(page, lsn); - if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) + if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED) PageClearAllVisible(page); MarkBufferDirty(buffer); @@ -7728,7 +7988,7 @@ heap_xlog_update(XLogReaderState *record, bool hot_update) * The visibility map may need to be fixed even if the heap page is * already up-to-date. */ - if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) + if (xlrec->flags & XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) { Relation reln = CreateFakeRelcacheEntry(rnode); Buffer vmbuffer = InvalidBuffer; @@ -7783,7 +8043,7 @@ heap_xlog_update(XLogReaderState *record, bool hot_update) /* Mark the page as a candidate for pruning */ PageSetPrunable(page, XLogRecGetXid(record)); - if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) + if (xlrec->flags & XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) PageClearAllVisible(page); PageSetLSN(page, lsn); @@ -7812,7 +8072,7 @@ heap_xlog_update(XLogReaderState *record, bool hot_update) * The visibility map may need to be fixed even if the heap page is * already up-to-date. */ - if (xlrec->flags & XLOG_HEAP_NEW_ALL_VISIBLE_CLEARED) + if (xlrec->flags & XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) { Relation reln = CreateFakeRelcacheEntry(rnode); Buffer vmbuffer = InvalidBuffer; @@ -7840,13 +8100,13 @@ heap_xlog_update(XLogReaderState *record, bool hot_update) if (PageGetMaxOffsetNumber(page) + 1 < offnum) elog(PANIC, "heap_update_redo: invalid max offset number"); - if (xlrec->flags & XLOG_HEAP_PREFIX_FROM_OLD) + if (xlrec->flags & XLH_UPDATE_PREFIX_FROM_OLD) { Assert(newblk == oldblk); memcpy(&prefixlen, recdata, sizeof(uint16)); recdata += sizeof(uint16); } - if (xlrec->flags & XLOG_HEAP_SUFFIX_FROM_OLD) + if (xlrec->flags & XLH_UPDATE_SUFFIX_FROM_OLD) { Assert(newblk == oldblk); memcpy(&suffixlen, recdata, sizeof(uint16)); @@ -7918,7 +8178,7 @@ heap_xlog_update(XLogReaderState *record, bool hot_update) if (offnum == InvalidOffsetNumber) elog(PANIC, "heap_update_redo: failed to add tuple"); - if (xlrec->flags & XLOG_HEAP_NEW_ALL_VISIBLE_CLEARED) + if (xlrec->flags & XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) PageClearAllVisible(page); freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */ @@ -7952,6 +8212,42 @@ heap_xlog_update(XLogReaderState *record, bool hot_update) } static void +heap_xlog_confirm(XLogReaderState *record) +{ + XLogRecPtr lsn = record->EndRecPtr; + xl_heap_confirm *xlrec = (xl_heap_confirm *) XLogRecGetData(record); + Buffer buffer; + Page page; + OffsetNumber offnum; + ItemId lp = NULL; + HeapTupleHeader htup; + + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) + { + page = BufferGetPage(buffer); + + offnum = xlrec->offnum; + if (PageGetMaxOffsetNumber(page) >= offnum) + lp = PageGetItemId(page, offnum); + + if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp)) + elog(PANIC, "heap_confirm_redo: invalid lp"); + + htup = (HeapTupleHeader) PageGetItem(page, lp); + + /* + * Confirm tuple as actually inserted + */ + ItemPointerSet(&htup->t_ctid, BufferGetBlockNumber(buffer), offnum); + + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); + } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); +} + +static void heap_xlog_lock(XLogReaderState *record) { XLogRecPtr lsn = record->EndRecPtr; @@ -8101,6 +8397,9 @@ heap_redo(XLogReaderState *record) case XLOG_HEAP_HOT_UPDATE: heap_xlog_update(record, true); break; + case XLOG_HEAP_CONFIRM: + heap_xlog_confirm(record); + break; case XLOG_HEAP_LOCK: heap_xlog_lock(record); break; |