summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAmit Kapila <akapila@postgresql.org>2023-03-17 08:29:41 +0530
committerAmit Kapila <akapila@postgresql.org>2023-03-17 08:29:41 +0530
commite709596b25bd184d6566dfff240e3f672a548afe (patch)
tree054a77f8afd6dd62ec3db67c452e9b78a5436b56
parenteb7d043c9bbadb75a87385113c578f1b30e9d195 (diff)
downloadpostgresql-e709596b25bd184d6566dfff240e3f672a548afe.tar.gz
Add macros for ReorderBufferTXN toptxn.
Currently, there are quite a few places in reorderbuffer.c that tries to access top-transaction for a subtransaction. This makes the code to access top-transaction consistent and easier to follow. Author: Peter Smith Reviewed-by: Vignesh C, Sawada Masahiko Discussion: https://postgr.es/m/CAHut+PuCznOyTqBQwjRUu-ibG-=KHyCv-0FTcWQtZUdR88umfg@mail.gmail.com
-rw-r--r--contrib/test_decoding/test_decoding.c4
-rw-r--r--src/backend/replication/logical/reorderbuffer.c46
-rw-r--r--src/backend/replication/pgoutput/pgoutput.c6
-rw-r--r--src/include/replication/reorderbuffer.h18
4 files changed, 41 insertions, 33 deletions
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index b7e6048647..628c6a2595 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -815,11 +815,11 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx,
* maintain the output_plugin_private only under the toptxn so if this is
* not the toptxn then fetch the toptxn.
*/
- ReorderBufferTXN *toptxn = txn->toptxn ? txn->toptxn : txn;
+ ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
TestDecodingTxnData *txndata = toptxn->output_plugin_private;
bool xact_wrote_changes = txndata->xact_wrote_changes;
- if (txn->toptxn == NULL)
+ if (rbtxn_is_toptxn(txn))
{
Assert(txn->output_plugin_private != NULL);
pfree(txndata);
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 2d17c551a8..9f44974473 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -717,10 +717,7 @@ ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
return;
/* Get the top transaction. */
- if (txn->toptxn != NULL)
- toptxn = txn->toptxn;
- else
- toptxn = txn;
+ toptxn = rbtxn_get_toptxn(txn);
/*
* Indicate a partial change for toast inserts. The change will be
@@ -809,13 +806,7 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
change->action == REORDER_BUFFER_CHANGE_TRUNCATE ||
change->action == REORDER_BUFFER_CHANGE_MESSAGE)
{
- ReorderBufferTXN *toptxn;
-
- /* get the top transaction */
- if (txn->toptxn != NULL)
- toptxn = txn->toptxn;
- else
- toptxn = txn;
+ ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
toptxn->txn_flags |= RBTXN_HAS_STREAMABLE_CHANGE;
}
@@ -1655,9 +1646,9 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep
/*
* Mark the transaction as streamed.
*
- * The toplevel transaction, identified by (toptxn==NULL), is marked as
- * streamed always, even if it does not contain any changes (that is, when
- * all the changes are in subtransactions).
+ * The top-level transaction, is marked as streamed always, even if it
+ * does not contain any changes (that is, when all the changes are in
+ * subtransactions).
*
* For subtransactions, we only mark them as streamed when there are
* changes in them.
@@ -1667,7 +1658,7 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep
* about the toplevel xact (we send the XID in all messages), but we never
* stream XIDs of empty subxacts.
*/
- if ((!txn_prepared) && ((!txn->toptxn) || (txn->nentries_mem != 0)))
+ if ((!txn_prepared) && (rbtxn_is_toptxn(txn) || (txn->nentries_mem != 0)))
txn->txn_flags |= RBTXN_IS_STREAMED;
if (txn_prepared)
@@ -3207,10 +3198,7 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
* Update the total size in top level as well. This is later used to
* compute the decoding stats.
*/
- if (txn->toptxn != NULL)
- toptxn = txn->toptxn;
- else
- toptxn = txn;
+ toptxn = rbtxn_get_toptxn(txn);
if (addition)
{
@@ -3295,8 +3283,7 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
* so that we can execute them all together. See comments atop this
* function.
*/
- if (txn->toptxn)
- txn = txn->toptxn;
+ txn = rbtxn_get_toptxn(txn);
Assert(nmsgs > 0);
@@ -3354,7 +3341,6 @@ ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid,
XLogRecPtr lsn)
{
ReorderBufferTXN *txn;
- ReorderBufferTXN *toptxn;
txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
@@ -3370,11 +3356,15 @@ ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid,
* conveniently check just top-level transaction and decide whether to
* build the hash table or not.
*/
- toptxn = txn->toptxn;
- if (toptxn != NULL && !rbtxn_has_catalog_changes(toptxn))
+ if (rbtxn_is_subtxn(txn))
{
- toptxn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
- dclist_push_tail(&rb->catchange_txns, &toptxn->catchange_node);
+ ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
+
+ if (!rbtxn_has_catalog_changes(toptxn))
+ {
+ toptxn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
+ dclist_push_tail(&rb->catchange_txns, &toptxn->catchange_node);
+ }
}
}
@@ -3619,7 +3609,7 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
(txn = ReorderBufferLargestStreamableTopTXN(rb)) != NULL)
{
/* we know there has to be one, because the size is not zero */
- Assert(txn && !txn->toptxn);
+ Assert(txn && rbtxn_is_toptxn(txn));
Assert(txn->total_size > 0);
Assert(rb->size >= txn->total_size);
@@ -4007,7 +3997,7 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
bool txn_is_streamed;
/* We can never reach here for a subtransaction. */
- Assert(txn->toptxn == NULL);
+ Assert(rbtxn_is_toptxn(txn));
/*
* We can't make any assumptions about base snapshot here, similar to what
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 00a2d73dab..3a2d2e357e 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -694,8 +694,8 @@ maybe_send_schema(LogicalDecodingContext *ctx,
if (in_streaming)
xid = change->txn->xid;
- if (change->txn->toptxn)
- topxid = change->txn->toptxn->xid;
+ if (rbtxn_is_subtxn(change->txn))
+ topxid = rbtxn_get_toptxn(change->txn)->xid;
else
topxid = xid;
@@ -1879,7 +1879,7 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
Assert(!in_streaming);
/* determine the toplevel transaction */
- toptxn = (txn->toptxn) ? txn->toptxn : txn;
+ toptxn = rbtxn_get_toptxn(txn);
Assert(rbtxn_is_streamed(toptxn));
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 215d1494e9..e37f5120eb 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -249,6 +249,24 @@ typedef struct ReorderBufferChange
((txn)->txn_flags & RBTXN_SKIPPED_PREPARE) != 0 \
)
+/* Is this a top-level transaction? */
+#define rbtxn_is_toptxn(txn) \
+( \
+ (txn)->toptxn == NULL \
+)
+
+/* Is this a subtransaction? */
+#define rbtxn_is_subtxn(txn) \
+( \
+ (txn)->toptxn != NULL \
+)
+
+/* Get the top-level transaction of this (sub)transaction. */
+#define rbtxn_get_toptxn(txn) \
+( \
+ rbtxn_is_subtxn(txn) ? (txn)->toptxn : (txn) \
+)
+
typedef struct ReorderBufferTXN
{
/* See above */