diff options
author | Amit Kapila <akapila@postgresql.org> | 2023-05-09 09:28:06 +0530 |
---|---|---|
committer | Amit Kapila <akapila@postgresql.org> | 2023-05-09 09:28:06 +0530 |
commit | 3d144c6c86025272e1711539f5fafb6fb85c4feb (patch) | |
tree | e8e64126bdc26dbb43be7d0505118cd2a76d91f5 /src/backend | |
parent | 455f948b0d03a556533a7e4a1a8abf45f0eb202e (diff) | |
download | postgresql-3d144c6c86025272e1711539f5fafb6fb85c4feb.tar.gz |
Fix invalid memory access during the shutdown of the parallel apply worker.
The callback function pa_shutdown() accesses MyLogicalRepWorker which may
not be initialized if there is an error during the initialization of the
parallel apply worker. The other problem is that by the time it is invoked
even after the initialization of the worker, the MyLogicalRepWorker will
be reset by another callback logicalrep_worker_onexit. So, it won't have
the required information.
To fix this, register the shutdown callback after we are attached to the
worker slot.
After this fix, we observed another issue which is that sometimes the
leader apply worker tries to receive the message from the error queue that
might already be detached by the parallel apply worker leading to an
error. To prevent such an error, we ensure that the leader apply worker
detaches from the parallel apply worker's error queue before stopping it.
Reported-by: Sawada Masahiko
Author: Hou Zhijie
Reviewed-by: Sawada Masahiko, Amit Kapila
Discussion: https://postgr.es/m/CAD21AoDo+yUwNq6nTrvE2h9bB2vZfcag=jxWc7QxuWCmkDAqcA@mail.gmail.com
Diffstat (limited to 'src/backend')
-rw-r--r-- | src/backend/replication/logical/applyparallelworker.c | 30 | ||||
-rw-r--r-- | src/backend/replication/logical/launcher.c | 24 |
2 files changed, 37 insertions, 17 deletions
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index ee7a18137f..82c1ddcdcb 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -577,16 +577,7 @@ pa_free_worker(ParallelApplyWorkerInfo *winfo) list_length(ParallelApplyWorkerPool) > (max_parallel_apply_workers_per_subscription / 2)) { - int slot_no; - uint16 generation; - - SpinLockAcquire(&winfo->shared->mutex); - generation = winfo->shared->logicalrep_worker_generation; - slot_no = winfo->shared->logicalrep_worker_slot_no; - SpinLockRelease(&winfo->shared->mutex); - - logicalrep_pa_worker_stop(slot_no, generation); - + logicalrep_pa_worker_stop(winfo); pa_free_worker_info(winfo); return; @@ -636,8 +627,11 @@ pa_detach_all_error_mq(void) { ParallelApplyWorkerInfo *winfo = (ParallelApplyWorkerInfo *) lfirst(lc); - shm_mq_detach(winfo->error_mq_handle); - winfo->error_mq_handle = NULL; + if (winfo->error_mq_handle) + { + shm_mq_detach(winfo->error_mq_handle); + winfo->error_mq_handle = NULL; + } } } @@ -845,6 +839,9 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh) * Make sure the leader apply worker tries to read from our error queue one more * time. This guards against the case where we exit uncleanly without sending * an ErrorResponse, for example because some code calls proc_exit directly. + * + * Also explicitly detach from dsm segment to invoke on_dsm_detach callbacks, + * if any. See ParallelWorkerShutdown for details. */ static void pa_shutdown(int code, Datum arg) @@ -901,8 +898,6 @@ ParallelApplyWorkerMain(Datum main_arg) (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("bad magic number in dynamic shared memory segment"))); - before_shmem_exit(pa_shutdown, PointerGetDatum(seg)); - /* Look up the shared information. */ shared = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_SHARED, false); MyParallelShared = shared; @@ -921,6 +916,13 @@ ParallelApplyWorkerMain(Datum main_arg) */ logicalrep_worker_attach(worker_slot); + /* + * Register the shutdown callback after we are attached to the worker + * slot. This is to ensure that MyLogicalRepWorker remains valid when this + * callback is invoked. + */ + before_shmem_exit(pa_shutdown, PointerGetDatum(seg)); + SpinLockAcquire(&MyParallelShared->mutex); MyParallelShared->logicalrep_worker_generation = MyLogicalRepWorker->generation; MyParallelShared->logicalrep_worker_slot_no = worker_slot; diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index ceea126231..87b5593d2d 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -609,19 +609,37 @@ logicalrep_worker_stop(Oid subid, Oid relid) } /* - * Stop the logical replication parallel apply worker corresponding to the - * input slot number. + * Stop the given logical replication parallel apply worker. * * Node that the function sends SIGINT instead of SIGTERM to the parallel apply * worker so that the worker exits cleanly. */ void -logicalrep_pa_worker_stop(int slot_no, uint16 generation) +logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo) { + int slot_no; + uint16 generation; LogicalRepWorker *worker; + SpinLockAcquire(&winfo->shared->mutex); + generation = winfo->shared->logicalrep_worker_generation; + slot_no = winfo->shared->logicalrep_worker_slot_no; + SpinLockRelease(&winfo->shared->mutex); + Assert(slot_no >= 0 && slot_no < max_logical_replication_workers); + /* + * Detach from the error_mq_handle for the parallel apply worker before + * stopping it. This prevents the leader apply worker from trying to + * receive the message from the error queue that might already be detached + * by the parallel apply worker. + */ + if (winfo->error_mq_handle) + { + shm_mq_detach(winfo->error_mq_handle); + winfo->error_mq_handle = NULL; + } + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); worker = &LogicalRepCtx->workers[slot_no]; |