diff options
author | Dianna Hohensee <dianna.hohensee@mongodb.com> | 2020-06-05 11:09:50 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-08-28 18:37:55 +0000 |
commit | 17457592a2a1b64ed4ac90c93b32aa47598d5c90 (patch) | |
tree | be681db9c1f0c6d5280e43dbf5eea3e9e569721e /src | |
parent | 24890bbac9ee27cf3fb9a1b6bb8123ab120a1594 (diff) | |
download | mongo-17457592a2a1b64ed4ac90c93b32aa47598d5c90.tar.gz |
SERVER-48149 Move callers of RecoveryUnit::waitUntilDurable onto JournalFlusher::waitForJournalFlush
Operations running concurrently with stepdown must call JournalFlusher::waitForJournalFlush so that
writes to the oplogTruncateAfterPoint are interrupted correctly during stepdown and callers waiting
for durability don't receive unexpected InterruptedDueToReplStateChange errors.
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/repl/SConscript | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_applier_impl.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_consistency_markers_impl.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state_impl.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_recovery.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_rollback.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/storage_interface_impl.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/storage/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/storage/control/journal_flusher.cpp | 40 | ||||
-rw-r--r-- | src/mongo/db/storage/control/journal_flusher.h | 12 | ||||
-rw-r--r-- | src/mongo/db/storage/control/storage_control.cpp | 1 | ||||
-rw-r--r-- | src/mongo/db/storage/control/storage_control.h | 17 | ||||
-rw-r--r-- | src/mongo/db/storage/storage_repair_observer.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/write_concern.cpp | 6 |
16 files changed, 67 insertions, 51 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index d3412d5569e..24232cfa1a7 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -257,6 +257,7 @@ env.Library( ], LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/db/catalog_raii', + '$BUILD_DIR/mongo/db/storage/journal_flusher', ], ) @@ -269,6 +270,7 @@ env.Library( ], LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/db/storage/journal_flusher', '$BUILD_DIR/mongo/db/storage/storage_options', 'oplog', 'oplog_application', @@ -405,6 +407,7 @@ env.Library( ], LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/db/catalog/index_build_oplog_entry', + '$BUILD_DIR/mongo/db/storage/journal_flusher', ], ) @@ -712,6 +715,7 @@ env.Library( '$BUILD_DIR/mongo/db/catalog/collection_catalog', '$BUILD_DIR/mongo/db/commands/test_commands_enabled', '$BUILD_DIR/mongo/db/index_builds_coordinator_interface', + '$BUILD_DIR/mongo/db/storage/journal_flusher', '$BUILD_DIR/mongo/idl/server_parameter', 'local_oplog_info', 'repl_server_parameters', diff --git a/src/mongo/db/repl/oplog_applier_impl.cpp b/src/mongo/db/repl/oplog_applier_impl.cpp index 9b3b3d0aa61..24ff5ad96d6 100644 --- a/src/mongo/db/repl/oplog_applier_impl.cpp +++ b/src/mongo/db/repl/oplog_applier_impl.cpp @@ -47,7 +47,7 @@ #include "mongo/db/repl/transaction_oplog_application.h" #include "mongo/db/stats/counters.h" #include "mongo/db/stats/timer_stats.h" -#include "mongo/db/storage/control/storage_control.h" +#include "mongo/db/storage/control/journal_flusher.h" #include "mongo/logv2/log.h" #include "mongo/platform/basic.h" #include "mongo/util/fail_point.h" @@ -227,7 +227,7 @@ void ApplyBatchFinalizerForJournal::_run() { } auto opCtx = cc().makeOperationContext(); - opCtx->recoveryUnit()->waitUntilDurable(opCtx.get()); + JournalFlusher::get(opCtx.get())->waitForJournalFlush(); _recordDurable(latestOpTimeAndWallTime); } } @@ -561,7 +561,7 @@ StatusWith<OpTime> OplogApplierImpl::_applyOplogBatch(OperationContext* opCtx, // new writes with timestamps associated with those oplog entries will show up in the future. We // want to flush the journal as soon as possible in order to free ops waiting with 'j' write // concern. - StorageControl::triggerJournalFlush(opCtx->getServiceContext()); + JournalFlusher::get(opCtx)->triggerJournalFlush(); // Use this fail point to hold the PBWM lock and prevent the batch from completing. if (MONGO_unlikely(pauseBatchApplicationBeforeCompletion.shouldFail())) { diff --git a/src/mongo/db/repl/replication_consistency_markers_impl.cpp b/src/mongo/db/repl/replication_consistency_markers_impl.cpp index a4d24555bf9..c829e33226b 100644 --- a/src/mongo/db/repl/replication_consistency_markers_impl.cpp +++ b/src/mongo/db/repl/replication_consistency_markers_impl.cpp @@ -40,6 +40,7 @@ #include "mongo/db/repl/optime.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/storage_interface.h" +#include "mongo/db/storage/control/journal_flusher.h" #include "mongo/logv2/log.h" namespace mongo { @@ -150,7 +151,7 @@ void ReplicationConsistencyMarkersImpl::setInitialSyncFlag(OperationContext* opC update.timestamp = Timestamp(); _updateMinValidDocument(opCtx, update); - opCtx->recoveryUnit()->waitUntilDurable(opCtx); + JournalFlusher::get(opCtx)->waitForJournalFlush(); } void ReplicationConsistencyMarkersImpl::clearInitialSyncFlag(OperationContext* opCtx) { @@ -186,7 +187,7 @@ void ReplicationConsistencyMarkersImpl::clearInitialSyncFlag(OperationContext* o setOplogTruncateAfterPoint(opCtx, Timestamp()); if (getGlobalServiceContext()->getStorageEngine()->isDurable()) { - opCtx->recoveryUnit()->waitUntilDurable(opCtx); + JournalFlusher::get(opCtx)->waitForJournalFlush(); replCoord->setMyLastDurableOpTimeAndWallTime(opTimeAndWallTime); } } diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index ccbcc74e0cd..1094c1adc03 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -89,7 +89,7 @@ #include "mongo/db/server_options.h" #include "mongo/db/service_context.h" #include "mongo/db/session_catalog_mongod.h" -#include "mongo/db/storage/control/storage_control.h" +#include "mongo/db/storage/control/journal_flusher.h" #include "mongo/db/storage/flow_control.h" #include "mongo/db/storage/storage_engine.h" #include "mongo/db/system_index.h" @@ -690,7 +690,7 @@ Status ReplicationCoordinatorExternalStateImpl::storeLocalLastVoteDocument( return status; } - opCtx->recoveryUnit()->waitUntilDurable(opCtx); + JournalFlusher::get(opCtx)->waitForJournalFlush(); return Status::OK(); } catch (const DBException& ex) { @@ -804,13 +804,13 @@ void ReplicationCoordinatorExternalStateImpl::_stopAsyncUpdatesOfAndClearOplogTr // // This makes sure the JournalFlusher is not stuck waiting for a lock that stepdown might hold // before doing an update write to the truncate point. - StorageControl::interruptJournalFlusherForReplStateChange(_service); + JournalFlusher::get(_service)->interruptJournalFlusherForReplStateChange(); // Wait for another round of journal flushing. This will ensure that we wait for the current // round to completely finish and have no chance of racing with unsetting the truncate point // below. It is possible that the JournalFlusher will not check for the interrupt signaled // above, if writing is imminent, so we must make sure that the code completes fully. - StorageControl::waitForJournalFlush(opCtx); + JournalFlusher::get(_service)->waitForJournalFlush(); // Writes to non-replicated collections do not need concurrency control with the OplogApplier // that never accesses them. Skip taking the PBWM. diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index b1be5ec8e78..492740768e6 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -89,6 +89,7 @@ #include "mongo/db/server_options.h" #include "mongo/db/session_catalog.h" #include "mongo/db/shutdown_in_progress_quiesce_info.h" +#include "mongo/db/storage/control/journal_flusher.h" #include "mongo/db/storage/storage_options.h" #include "mongo/db/vector_clock.h" #include "mongo/db/vector_clock_mutable.h" @@ -3484,7 +3485,7 @@ Status ReplicationCoordinatorImpl::doReplSetReconfig(OperationContext* opCtx, } } // Wait for durability of the new config document. - opCtx->recoveryUnit()->waitUntilDurable(opCtx); + JournalFlusher::get(opCtx)->waitForJournalFlush(); configStateGuard.dismiss(); _finishReplSetReconfig(opCtx, newConfig, force, myIndex); diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp index 41054ca6aa4..9ac897a22a8 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp @@ -57,6 +57,7 @@ #include "mongo/db/repl/topology_coordinator.h" #include "mongo/db/repl/vote_requester.h" #include "mongo/db/service_context.h" +#include "mongo/db/storage/control/journal_flusher.h" #include "mongo/logv2/log.h" #include "mongo/platform/mutex.h" #include "mongo/rpc/get_status_from_command_result.h" @@ -662,7 +663,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore( auto status = _externalState->storeLocalConfigDocument( opCtx.get(), newConfig.toBSON(), false /* writeOplog */); // Wait for durability of the new config document. - opCtx->recoveryUnit()->waitUntilDurable(opCtx.get()); + JournalFlusher::get(opCtx.get())->waitForJournalFlush(); bool isFirstConfig; { diff --git a/src/mongo/db/repl/replication_recovery.cpp b/src/mongo/db/repl/replication_recovery.cpp index 00d772ac6ef..dd350f73552 100644 --- a/src/mongo/db/repl/replication_recovery.cpp +++ b/src/mongo/db/repl/replication_recovery.cpp @@ -49,6 +49,7 @@ #include "mongo/db/repl/transaction_oplog_application.h" #include "mongo/db/server_recovery.h" #include "mongo/db/session.h" +#include "mongo/db/storage/control/journal_flusher.h" #include "mongo/db/storage/storage_parameters_gen.h" #include "mongo/db/transaction_history_iterator.h" #include "mongo/db/transaction_participant.h" @@ -727,7 +728,7 @@ void ReplicationRecoveryImpl::_truncateOplogIfNeededAndThenClearOplogTruncateAft // Clear the oplogTruncateAfterPoint now that we have removed any holes that might exist in the // oplog -- and so that we do not truncate future entries erroneously. _consistencyMarkers->setOplogTruncateAfterPoint(opCtx, Timestamp()); - opCtx->recoveryUnit()->waitUntilDurable(opCtx); + JournalFlusher::get(opCtx)->waitForJournalFlush(); } } // namespace repl diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp index fb186e4192c..d2116a74b17 100644 --- a/src/mongo/db/repl/rs_rollback.cpp +++ b/src/mongo/db/repl/rs_rollback.cpp @@ -73,6 +73,7 @@ #include "mongo/db/repl/rollback_source.h" #include "mongo/db/s/shard_identity_rollback_notifier.h" #include "mongo/db/session_catalog_mongod.h" +#include "mongo/db/storage/control/journal_flusher.h" #include "mongo/db/storage/durable_catalog.h" #include "mongo/db/storage/remove_saver.h" #include "mongo/db/transaction_participant.h" @@ -2151,7 +2152,7 @@ void rollback(OperationContext* opCtx, // so that if we wind up shutting down uncleanly in response to something we rolled back // we know that we won't wind up right back in the same situation when we start back up // because the rollback wasn't durable. - opCtx->recoveryUnit()->waitUntilDurable(opCtx); + JournalFlusher::get(opCtx)->waitForJournalFlush(); // If we detected that we rolled back the shardIdentity document as part of this rollback // then we must shut down to clear the in-memory ShardingState associated with the diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp index 688e50a0bca..9a4abeee1bc 100644 --- a/src/mongo/db/repl/storage_interface_impl.cpp +++ b/src/mongo/db/repl/storage_interface_impl.cpp @@ -74,6 +74,7 @@ #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/rollback_gen.h" #include "mongo/db/service_context.h" +#include "mongo/db/storage/control/journal_flusher.h" #include "mongo/db/storage/control/storage_control.h" #include "mongo/db/storage/durable_catalog.h" #include "mongo/db/storage/oplog_cap_maintainer_thread.h" @@ -177,7 +178,7 @@ StatusWith<int> StorageInterfaceImpl::incrementRollbackID(OperationContext* opCt // We wait until durable so that we are sure the Rollback ID is updated before rollback ends. if (status.isOK()) { - opCtx->recoveryUnit()->waitUntilDurable(opCtx); + JournalFlusher::get(opCtx)->waitForJournalFlush(); return newRBID; } return status; diff --git a/src/mongo/db/storage/SConscript b/src/mongo/db/storage/SConscript index 88565bb5ca5..53ac37b0e30 100644 --- a/src/mongo/db/storage/SConscript +++ b/src/mongo/db/storage/SConscript @@ -381,6 +381,7 @@ env.Library( '$BUILD_DIR/mongo/db/dbhelpers', '$BUILD_DIR/mongo/db/repl/replica_set_messages', '$BUILD_DIR/mongo/db/service_context', + '$BUILD_DIR/mongo/db/storage/journal_flusher', 'storage_file_util', ], ) diff --git a/src/mongo/db/storage/control/journal_flusher.cpp b/src/mongo/db/storage/control/journal_flusher.cpp index 6e9d7beb16f..8d1dc4f704f 100644 --- a/src/mongo/db/storage/control/journal_flusher.cpp +++ b/src/mongo/db/storage/control/journal_flusher.cpp @@ -85,9 +85,6 @@ void JournalFlusher::run() { // from Flow Control. _uniqueCtx->get()->setShouldParticipateInFlowControl(false); while (true) { - - pauseJournalFlusherThread.pauseWhileSet(_uniqueCtx->get()); - try { ON_BLOCK_EXIT([&] { // We do not want to miss an interrupt for the next round. Therefore, the opCtx @@ -126,7 +123,10 @@ void JournalFlusher::run() { stdx::unique_lock<Latch> lk(_stateMutex); MONGO_IDLE_THREAD_BLOCK; - if (_disablePeriodicFlushes) { + if (_disablePeriodicFlushes || MONGO_unlikely(pauseJournalFlusherThread.shouldFail())) { + // This is not an ideal solution for the failpoint usage because turning the failpoint + // off at this point in the code would leave this thread sleeping until explicitly + // pinged by an async thread to flush the journal. _flushJournalNowCV.wait(lk, [&] { return _flushJournalNow || _shuttingDown; }); } else { _flushJournalNowCV.wait_until(lk, deadline.toSystemTimePoint(), [&] { @@ -172,16 +172,17 @@ void JournalFlusher::triggerJournalFlush() { } void JournalFlusher::waitForJournalFlush() { - auto myFuture = [&]() { - stdx::unique_lock<Latch> lk(_stateMutex); - if (!_flushJournalNow) { - _flushJournalNow = true; - _flushJournalNowCV.notify_one(); + while (true) { + try { + _waitForJournalFlushNoRetry(); + break; + } catch (const ExceptionFor<ErrorCodes::InterruptedDueToReplStateChange>&) { + // Do nothing and let the while-loop retry the operation. + LOGV2_DEBUG(4814901, + 3, + "Retrying waiting for durability interrupted by replication state change"); } - return _nextSharedPromise->getFuture(); - }(); - // Throws on error if the catalog is closed or the flusher round is interrupted by stepdown. - myFuture.get(); + } } void JournalFlusher::interruptJournalFlusherForReplStateChange() { @@ -192,4 +193,17 @@ void JournalFlusher::interruptJournalFlusherForReplStateChange() { } } +void JournalFlusher::_waitForJournalFlushNoRetry() { + auto myFuture = [&]() { + stdx::unique_lock<Latch> lk(_stateMutex); + if (!_flushJournalNow) { + _flushJournalNow = true; + _flushJournalNowCV.notify_one(); + } + return _nextSharedPromise->getFuture(); + }(); + // Throws on error if the flusher round is interrupted or the flusher thread is shutdown. + myFuture.get(); +} + } // namespace mongo diff --git a/src/mongo/db/storage/control/journal_flusher.h b/src/mongo/db/storage/control/journal_flusher.h index 74a0dca2ed0..4d1058b60e9 100644 --- a/src/mongo/db/storage/control/journal_flusher.h +++ b/src/mongo/db/storage/control/journal_flusher.h @@ -94,8 +94,8 @@ public: /** * Signals an immediate journal flush and waits for it to complete before returning. * - * Will throw ShutdownInProgress if the flusher thread is being stopped. - * Will throw InterruptedDueToReplStateChange if a flusher round is interrupted by stepdown. + * Retries internally on InterruptedDueToReplStateChange errors. + * Will throw ErrorCodes::isShutdownError errors. */ void waitForJournalFlush(); @@ -106,6 +106,14 @@ public: void interruptJournalFlusherForReplStateChange(); private: + /** + * Signals an immediate journal flush and waits for it to complete before returning. + * + * Will throw ErrorCodes::isShutdownError if the flusher thread is being stopped. + * Will throw InterruptedDueToReplStateChange if a flusher round is interrupted by stepdown. + */ + void _waitForJournalFlushNoRetry(); + // Serializes setting/resetting _uniqueCtx and marking _uniqueCtx killed. mutable Mutex _opCtxMutex = MONGO_MAKE_LATCH("JournalFlusherOpCtxMutex"); diff --git a/src/mongo/db/storage/control/storage_control.cpp b/src/mongo/db/storage/control/storage_control.cpp index ccd5bd9ed53..f0b7e7d825f 100644 --- a/src/mongo/db/storage/control/storage_control.cpp +++ b/src/mongo/db/storage/control/storage_control.cpp @@ -37,7 +37,6 @@ #include "mongo/db/service_context.h" #include "mongo/db/storage/control/journal_flusher.h" #include "mongo/logv2/log.h" -#include "mongo/util/background.h" namespace mongo { diff --git a/src/mongo/db/storage/control/storage_control.h b/src/mongo/db/storage/control/storage_control.h index 6e877cf644e..ea868466d81 100644 --- a/src/mongo/db/storage/control/storage_control.h +++ b/src/mongo/db/storage/control/storage_control.h @@ -63,23 +63,6 @@ void startStorageControls(ServiceContext* serviceContext, bool forTestOnly = fal */ void stopStorageControls(ServiceContext* serviceContext, const Status& reason); -/** - * Prompts an immediate journal flush and returns without waiting for it. - */ -void triggerJournalFlush(ServiceContext* serviceContext); - -/** - * Initiates if needed and waits for a complete round of journal flushing to execute. - * - * Can throw ShutdownInProgress if the storage engine is being closed. - */ -void waitForJournalFlush(OperationContext* opCtx); - -/** - * Ensures interruption of the JournalFlusher if it is or will be acquiring a lock. - */ -void interruptJournalFlusherForReplStateChange(ServiceContext* serviceContext); - } // namespace StorageControl } // namespace mongo diff --git a/src/mongo/db/storage/storage_repair_observer.cpp b/src/mongo/db/storage/storage_repair_observer.cpp index 3252536b933..22b76a6a39c 100644 --- a/src/mongo/db/storage/storage_repair_observer.cpp +++ b/src/mongo/db/storage/storage_repair_observer.cpp @@ -47,6 +47,7 @@ #include "mongo/db/operation_context.h" #include "mongo/db/repl/repl_set_config.h" #include "mongo/db/service_context.h" +#include "mongo/db/storage/control/journal_flusher.h" #include "mongo/db/storage/storage_file_util.h" #include "mongo/db/storage/storage_options.h" #include "mongo/logv2/log.h" @@ -165,7 +166,7 @@ void StorageRepairObserver::_invalidateReplConfigIfNeeded(OperationContext* opCt configBuilder.append(repl::ReplSetConfig::kRepairedFieldName, true); Helpers::putSingleton(opCtx, kConfigNss.ns().c_str(), configBuilder.obj()); - opCtx->recoveryUnit()->waitUntilDurable(opCtx); + JournalFlusher::get(opCtx)->waitForJournalFlush(); } } // namespace mongo diff --git a/src/mongo/db/write_concern.cpp b/src/mongo/db/write_concern.cpp index 825fcce84c4..6c49846706c 100644 --- a/src/mongo/db/write_concern.cpp +++ b/src/mongo/db/write_concern.cpp @@ -45,7 +45,7 @@ #include "mongo/db/server_options.h" #include "mongo/db/service_context.h" #include "mongo/db/stats/timer_stats.h" -#include "mongo/db/storage/control/storage_control.h" +#include "mongo/db/storage/control/journal_flusher.h" #include "mongo/db/storage/storage_engine.h" #include "mongo/db/transaction_validation.h" #include "mongo/db/write_concern_options.h" @@ -314,13 +314,13 @@ Status waitForWriteConcern(OperationContext* opCtx, result->fsyncFiles = 1; } else { // We only need to commit the journal if we're durable - StorageControl::waitForJournalFlush(opCtx); + JournalFlusher::get(opCtx)->waitForJournalFlush(); } break; } case WriteConcernOptions::SyncMode::JOURNAL: waitForNoOplogHolesIfNeeded(opCtx); - StorageControl::waitForJournalFlush(opCtx); + JournalFlusher::get(opCtx)->waitForJournalFlush(); break; } } catch (const DBException& ex) { |