diff options
author | Dianna Hohensee <dianna.hohensee@mongodb.com> | 2020-06-05 11:09:50 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-08-28 18:37:55 +0000 |
commit | 17457592a2a1b64ed4ac90c93b32aa47598d5c90 (patch) | |
tree | be681db9c1f0c6d5280e43dbf5eea3e9e569721e /src/mongo/db/repl | |
parent | 24890bbac9ee27cf3fb9a1b6bb8123ab120a1594 (diff) | |
download | mongo-17457592a2a1b64ed4ac90c93b32aa47598d5c90.tar.gz |
SERVER-48149 Move callers of RecoveryUnit::waitUntilDurable onto JournalFlusher::waitForJournalFlush
Operations running concurrently with stepdown must call JournalFlusher::waitForJournalFlush so that
writes to the oplogTruncateAfterPoint are interrupted correctly during stepdown and callers waiting
for durability don't receive unexpected InterruptedDueToReplStateChange errors.
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r-- | src/mongo/db/repl/SConscript | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_applier_impl.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_consistency_markers_impl.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state_impl.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_recovery.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_rollback.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/storage_interface_impl.cpp | 3 |
9 files changed, 24 insertions, 14 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index d3412d5569e..24232cfa1a7 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -257,6 +257,7 @@ env.Library( ], LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/db/catalog_raii', + '$BUILD_DIR/mongo/db/storage/journal_flusher', ], ) @@ -269,6 +270,7 @@ env.Library( ], LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/db/storage/journal_flusher', '$BUILD_DIR/mongo/db/storage/storage_options', 'oplog', 'oplog_application', @@ -405,6 +407,7 @@ env.Library( ], LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/db/catalog/index_build_oplog_entry', + '$BUILD_DIR/mongo/db/storage/journal_flusher', ], ) @@ -712,6 +715,7 @@ env.Library( '$BUILD_DIR/mongo/db/catalog/collection_catalog', '$BUILD_DIR/mongo/db/commands/test_commands_enabled', '$BUILD_DIR/mongo/db/index_builds_coordinator_interface', + '$BUILD_DIR/mongo/db/storage/journal_flusher', '$BUILD_DIR/mongo/idl/server_parameter', 'local_oplog_info', 'repl_server_parameters', diff --git a/src/mongo/db/repl/oplog_applier_impl.cpp b/src/mongo/db/repl/oplog_applier_impl.cpp index 9b3b3d0aa61..24ff5ad96d6 100644 --- a/src/mongo/db/repl/oplog_applier_impl.cpp +++ b/src/mongo/db/repl/oplog_applier_impl.cpp @@ -47,7 +47,7 @@ #include "mongo/db/repl/transaction_oplog_application.h" #include "mongo/db/stats/counters.h" #include "mongo/db/stats/timer_stats.h" -#include "mongo/db/storage/control/storage_control.h" +#include "mongo/db/storage/control/journal_flusher.h" #include "mongo/logv2/log.h" #include "mongo/platform/basic.h" #include "mongo/util/fail_point.h" @@ -227,7 +227,7 @@ void ApplyBatchFinalizerForJournal::_run() { } auto opCtx = cc().makeOperationContext(); - opCtx->recoveryUnit()->waitUntilDurable(opCtx.get()); + JournalFlusher::get(opCtx.get())->waitForJournalFlush(); _recordDurable(latestOpTimeAndWallTime); } } @@ -561,7 +561,7 @@ StatusWith<OpTime> OplogApplierImpl::_applyOplogBatch(OperationContext* opCtx, // new writes with timestamps associated with those oplog entries will show up in the future. We // want to flush the journal as soon as possible in order to free ops waiting with 'j' write // concern. - StorageControl::triggerJournalFlush(opCtx->getServiceContext()); + JournalFlusher::get(opCtx)->triggerJournalFlush(); // Use this fail point to hold the PBWM lock and prevent the batch from completing. if (MONGO_unlikely(pauseBatchApplicationBeforeCompletion.shouldFail())) { diff --git a/src/mongo/db/repl/replication_consistency_markers_impl.cpp b/src/mongo/db/repl/replication_consistency_markers_impl.cpp index a4d24555bf9..c829e33226b 100644 --- a/src/mongo/db/repl/replication_consistency_markers_impl.cpp +++ b/src/mongo/db/repl/replication_consistency_markers_impl.cpp @@ -40,6 +40,7 @@ #include "mongo/db/repl/optime.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/storage_interface.h" +#include "mongo/db/storage/control/journal_flusher.h" #include "mongo/logv2/log.h" namespace mongo { @@ -150,7 +151,7 @@ void ReplicationConsistencyMarkersImpl::setInitialSyncFlag(OperationContext* opC update.timestamp = Timestamp(); _updateMinValidDocument(opCtx, update); - opCtx->recoveryUnit()->waitUntilDurable(opCtx); + JournalFlusher::get(opCtx)->waitForJournalFlush(); } void ReplicationConsistencyMarkersImpl::clearInitialSyncFlag(OperationContext* opCtx) { @@ -186,7 +187,7 @@ void ReplicationConsistencyMarkersImpl::clearInitialSyncFlag(OperationContext* o setOplogTruncateAfterPoint(opCtx, Timestamp()); if (getGlobalServiceContext()->getStorageEngine()->isDurable()) { - opCtx->recoveryUnit()->waitUntilDurable(opCtx); + JournalFlusher::get(opCtx)->waitForJournalFlush(); replCoord->setMyLastDurableOpTimeAndWallTime(opTimeAndWallTime); } } diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index ccbcc74e0cd..1094c1adc03 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -89,7 +89,7 @@ #include "mongo/db/server_options.h" #include "mongo/db/service_context.h" #include "mongo/db/session_catalog_mongod.h" -#include "mongo/db/storage/control/storage_control.h" +#include "mongo/db/storage/control/journal_flusher.h" #include "mongo/db/storage/flow_control.h" #include "mongo/db/storage/storage_engine.h" #include "mongo/db/system_index.h" @@ -690,7 +690,7 @@ Status ReplicationCoordinatorExternalStateImpl::storeLocalLastVoteDocument( return status; } - opCtx->recoveryUnit()->waitUntilDurable(opCtx); + JournalFlusher::get(opCtx)->waitForJournalFlush(); return Status::OK(); } catch (const DBException& ex) { @@ -804,13 +804,13 @@ void ReplicationCoordinatorExternalStateImpl::_stopAsyncUpdatesOfAndClearOplogTr // // This makes sure the JournalFlusher is not stuck waiting for a lock that stepdown might hold // before doing an update write to the truncate point. - StorageControl::interruptJournalFlusherForReplStateChange(_service); + JournalFlusher::get(_service)->interruptJournalFlusherForReplStateChange(); // Wait for another round of journal flushing. This will ensure that we wait for the current // round to completely finish and have no chance of racing with unsetting the truncate point // below. It is possible that the JournalFlusher will not check for the interrupt signaled // above, if writing is imminent, so we must make sure that the code completes fully. - StorageControl::waitForJournalFlush(opCtx); + JournalFlusher::get(_service)->waitForJournalFlush(); // Writes to non-replicated collections do not need concurrency control with the OplogApplier // that never accesses them. Skip taking the PBWM. diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index b1be5ec8e78..492740768e6 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -89,6 +89,7 @@ #include "mongo/db/server_options.h" #include "mongo/db/session_catalog.h" #include "mongo/db/shutdown_in_progress_quiesce_info.h" +#include "mongo/db/storage/control/journal_flusher.h" #include "mongo/db/storage/storage_options.h" #include "mongo/db/vector_clock.h" #include "mongo/db/vector_clock_mutable.h" @@ -3484,7 +3485,7 @@ Status ReplicationCoordinatorImpl::doReplSetReconfig(OperationContext* opCtx, } } // Wait for durability of the new config document. - opCtx->recoveryUnit()->waitUntilDurable(opCtx); + JournalFlusher::get(opCtx)->waitForJournalFlush(); configStateGuard.dismiss(); _finishReplSetReconfig(opCtx, newConfig, force, myIndex); diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp index 41054ca6aa4..9ac897a22a8 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp @@ -57,6 +57,7 @@ #include "mongo/db/repl/topology_coordinator.h" #include "mongo/db/repl/vote_requester.h" #include "mongo/db/service_context.h" +#include "mongo/db/storage/control/journal_flusher.h" #include "mongo/logv2/log.h" #include "mongo/platform/mutex.h" #include "mongo/rpc/get_status_from_command_result.h" @@ -662,7 +663,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore( auto status = _externalState->storeLocalConfigDocument( opCtx.get(), newConfig.toBSON(), false /* writeOplog */); // Wait for durability of the new config document. - opCtx->recoveryUnit()->waitUntilDurable(opCtx.get()); + JournalFlusher::get(opCtx.get())->waitForJournalFlush(); bool isFirstConfig; { diff --git a/src/mongo/db/repl/replication_recovery.cpp b/src/mongo/db/repl/replication_recovery.cpp index 00d772ac6ef..dd350f73552 100644 --- a/src/mongo/db/repl/replication_recovery.cpp +++ b/src/mongo/db/repl/replication_recovery.cpp @@ -49,6 +49,7 @@ #include "mongo/db/repl/transaction_oplog_application.h" #include "mongo/db/server_recovery.h" #include "mongo/db/session.h" +#include "mongo/db/storage/control/journal_flusher.h" #include "mongo/db/storage/storage_parameters_gen.h" #include "mongo/db/transaction_history_iterator.h" #include "mongo/db/transaction_participant.h" @@ -727,7 +728,7 @@ void ReplicationRecoveryImpl::_truncateOplogIfNeededAndThenClearOplogTruncateAft // Clear the oplogTruncateAfterPoint now that we have removed any holes that might exist in the // oplog -- and so that we do not truncate future entries erroneously. _consistencyMarkers->setOplogTruncateAfterPoint(opCtx, Timestamp()); - opCtx->recoveryUnit()->waitUntilDurable(opCtx); + JournalFlusher::get(opCtx)->waitForJournalFlush(); } } // namespace repl diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp index fb186e4192c..d2116a74b17 100644 --- a/src/mongo/db/repl/rs_rollback.cpp +++ b/src/mongo/db/repl/rs_rollback.cpp @@ -73,6 +73,7 @@ #include "mongo/db/repl/rollback_source.h" #include "mongo/db/s/shard_identity_rollback_notifier.h" #include "mongo/db/session_catalog_mongod.h" +#include "mongo/db/storage/control/journal_flusher.h" #include "mongo/db/storage/durable_catalog.h" #include "mongo/db/storage/remove_saver.h" #include "mongo/db/transaction_participant.h" @@ -2151,7 +2152,7 @@ void rollback(OperationContext* opCtx, // so that if we wind up shutting down uncleanly in response to something we rolled back // we know that we won't wind up right back in the same situation when we start back up // because the rollback wasn't durable. - opCtx->recoveryUnit()->waitUntilDurable(opCtx); + JournalFlusher::get(opCtx)->waitForJournalFlush(); // If we detected that we rolled back the shardIdentity document as part of this rollback // then we must shut down to clear the in-memory ShardingState associated with the diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp index 688e50a0bca..9a4abeee1bc 100644 --- a/src/mongo/db/repl/storage_interface_impl.cpp +++ b/src/mongo/db/repl/storage_interface_impl.cpp @@ -74,6 +74,7 @@ #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/rollback_gen.h" #include "mongo/db/service_context.h" +#include "mongo/db/storage/control/journal_flusher.h" #include "mongo/db/storage/control/storage_control.h" #include "mongo/db/storage/durable_catalog.h" #include "mongo/db/storage/oplog_cap_maintainer_thread.h" @@ -177,7 +178,7 @@ StatusWith<int> StorageInterfaceImpl::incrementRollbackID(OperationContext* opCt // We wait until durable so that we are sure the Rollback ID is updated before rollback ends. if (status.isOK()) { - opCtx->recoveryUnit()->waitUntilDurable(opCtx); + JournalFlusher::get(opCtx)->waitForJournalFlush(); return newRBID; } return status; |