diff options
Diffstat (limited to 'contrib/test_decoding/test_decoding.c')
-rw-r--r-- | contrib/test_decoding/test_decoding.c | 45 |
1 files changed, 42 insertions, 3 deletions
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index 6edc5fcd47..fdbd313cda 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -41,6 +41,8 @@ typedef struct MemoryContext context; bool include_xids; bool include_timestamp; + bool skip_empty_xacts; + bool xact_wrote_changes; } TestDecodingData; static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, @@ -48,6 +50,10 @@ static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions * static void pg_decode_shutdown(LogicalDecodingContext *ctx); static void pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn); +static void pg_output_begin(LogicalDecodingContext *ctx, + TestDecodingData *data, + ReorderBufferTXN *txn, + bool last_write); static void pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); static void pg_decode_change(LogicalDecodingContext *ctx, @@ -82,7 +88,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, ListCell *option; TestDecodingData *data; - data = palloc(sizeof(TestDecodingData)); + data = palloc0(sizeof(TestDecodingData)); data->context = AllocSetContextCreate(ctx->context, "text conversion context", ALLOCSET_DEFAULT_MINSIZE, @@ -90,6 +96,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, ALLOCSET_DEFAULT_MAXSIZE); data->include_xids = true; data->include_timestamp = false; + data->skip_empty_xacts = false; ctx->output_plugin_private = data; @@ -137,6 +144,17 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, if (force_binary) opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT; } + else if (strcmp(elem->defname, "skip-empty-xacts") == 0) + { + + if (elem->arg == NULL) + data->skip_empty_xacts = true; + else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("could not parse value \"%s\" for parameter \"%s\"", + strVal(elem->arg), elem->defname))); + } else { ereport(ERROR, @@ -164,12 +182,22 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { TestDecodingData *data = ctx->output_plugin_private; - OutputPluginPrepareWrite(ctx, true); + data->xact_wrote_changes = false; + if (data->skip_empty_xacts) + return; + + pg_output_begin(ctx, data, txn, true); +} + +static void +pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write) +{ + OutputPluginPrepareWrite(ctx, last_write); if (data->include_xids) appendStringInfo(ctx->out, "BEGIN %u", txn->xid); else appendStringInfoString(ctx->out, "BEGIN"); - OutputPluginWrite(ctx, true); + OutputPluginWrite(ctx, last_write); } /* COMMIT callback */ @@ -179,6 +207,9 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, { TestDecodingData *data = ctx->output_plugin_private; + if (data->skip_empty_xacts && !data->xact_wrote_changes) + return; + OutputPluginPrepareWrite(ctx, true); if (data->include_xids) appendStringInfo(ctx->out, "COMMIT %u", txn->xid); @@ -338,6 +369,14 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, MemoryContext old; data = ctx->output_plugin_private; + + /* output BEGIN if we haven't yet */ + if (data->skip_empty_xacts && !data->xact_wrote_changes) + { + pg_output_begin(ctx, data, txn, false); + } + data->xact_wrote_changes = true; + class_form = RelationGetForm(relation); tupdesc = RelationGetDescr(relation); |