summaryrefslogtreecommitdiff
path: root/contrib/test_decoding/test_decoding.c
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/test_decoding/test_decoding.c')
-rw-r--r--contrib/test_decoding/test_decoding.c45
1 files changed, 42 insertions, 3 deletions
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 6edc5fcd47..fdbd313cda 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -41,6 +41,8 @@ typedef struct
MemoryContext context;
bool include_xids;
bool include_timestamp;
+ bool skip_empty_xacts;
+ bool xact_wrote_changes;
} TestDecodingData;
static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
@@ -48,6 +50,10 @@ static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *
static void pg_decode_shutdown(LogicalDecodingContext *ctx);
static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
+static void pg_output_begin(LogicalDecodingContext *ctx,
+ TestDecodingData *data,
+ ReorderBufferTXN *txn,
+ bool last_write);
static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
static void pg_decode_change(LogicalDecodingContext *ctx,
@@ -82,7 +88,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
ListCell *option;
TestDecodingData *data;
- data = palloc(sizeof(TestDecodingData));
+ data = palloc0(sizeof(TestDecodingData));
data->context = AllocSetContextCreate(ctx->context,
"text conversion context",
ALLOCSET_DEFAULT_MINSIZE,
@@ -90,6 +96,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
ALLOCSET_DEFAULT_MAXSIZE);
data->include_xids = true;
data->include_timestamp = false;
+ data->skip_empty_xacts = false;
ctx->output_plugin_private = data;
@@ -137,6 +144,17 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
if (force_binary)
opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
}
+ else if (strcmp(elem->defname, "skip-empty-xacts") == 0)
+ {
+
+ if (elem->arg == NULL)
+ data->skip_empty_xacts = true;
+ else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("could not parse value \"%s\" for parameter \"%s\"",
+ strVal(elem->arg), elem->defname)));
+ }
else
{
ereport(ERROR,
@@ -164,12 +182,22 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
{
TestDecodingData *data = ctx->output_plugin_private;
- OutputPluginPrepareWrite(ctx, true);
+ data->xact_wrote_changes = false;
+ if (data->skip_empty_xacts)
+ return;
+
+ pg_output_begin(ctx, data, txn, true);
+}
+
+static void
+pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
+{
+ OutputPluginPrepareWrite(ctx, last_write);
if (data->include_xids)
appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
else
appendStringInfoString(ctx->out, "BEGIN");
- OutputPluginWrite(ctx, true);
+ OutputPluginWrite(ctx, last_write);
}
/* COMMIT callback */
@@ -179,6 +207,9 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
{
TestDecodingData *data = ctx->output_plugin_private;
+ if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ return;
+
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
appendStringInfo(ctx->out, "COMMIT %u", txn->xid);
@@ -338,6 +369,14 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
MemoryContext old;
data = ctx->output_plugin_private;
+
+ /* output BEGIN if we haven't yet */
+ if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ {
+ pg_output_begin(ctx, data, txn, false);
+ }
+ data->xact_wrote_changes = true;
+
class_form = RelationGetForm(relation);
tupdesc = RelationGetDescr(relation);