summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDianna Hohensee <dianna.hohensee@mongodb.com>2020-06-05 11:09:50 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-08-28 18:37:55 +0000
commit17457592a2a1b64ed4ac90c93b32aa47598d5c90 (patch)
treebe681db9c1f0c6d5280e43dbf5eea3e9e569721e /src
parent24890bbac9ee27cf3fb9a1b6bb8123ab120a1594 (diff)
downloadmongo-17457592a2a1b64ed4ac90c93b32aa47598d5c90.tar.gz
SERVER-48149 Move callers of RecoveryUnit::waitUntilDurable onto JournalFlusher::waitForJournalFlush
Operations running concurrently with stepdown must call JournalFlusher::waitForJournalFlush so that writes to the oplogTruncateAfterPoint are interrupted correctly during stepdown and callers waiting for durability don't receive unexpected InterruptedDueToReplStateChange errors.
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/repl/SConscript4
-rw-r--r--src/mongo/db/repl/oplog_applier_impl.cpp6
-rw-r--r--src/mongo/db/repl/replication_consistency_markers_impl.cpp5
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp8
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp3
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp3
-rw-r--r--src/mongo/db/repl/replication_recovery.cpp3
-rw-r--r--src/mongo/db/repl/rs_rollback.cpp3
-rw-r--r--src/mongo/db/repl/storage_interface_impl.cpp3
-rw-r--r--src/mongo/db/storage/SConscript1
-rw-r--r--src/mongo/db/storage/control/journal_flusher.cpp40
-rw-r--r--src/mongo/db/storage/control/journal_flusher.h12
-rw-r--r--src/mongo/db/storage/control/storage_control.cpp1
-rw-r--r--src/mongo/db/storage/control/storage_control.h17
-rw-r--r--src/mongo/db/storage/storage_repair_observer.cpp3
-rw-r--r--src/mongo/db/write_concern.cpp6
16 files changed, 67 insertions, 51 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index d3412d5569e..24232cfa1a7 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -257,6 +257,7 @@ env.Library(
],
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/db/catalog_raii',
+ '$BUILD_DIR/mongo/db/storage/journal_flusher',
],
)
@@ -269,6 +270,7 @@ env.Library(
],
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/db/storage/journal_flusher',
'$BUILD_DIR/mongo/db/storage/storage_options',
'oplog',
'oplog_application',
@@ -405,6 +407,7 @@ env.Library(
],
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/db/catalog/index_build_oplog_entry',
+ '$BUILD_DIR/mongo/db/storage/journal_flusher',
],
)
@@ -712,6 +715,7 @@ env.Library(
'$BUILD_DIR/mongo/db/catalog/collection_catalog',
'$BUILD_DIR/mongo/db/commands/test_commands_enabled',
'$BUILD_DIR/mongo/db/index_builds_coordinator_interface',
+ '$BUILD_DIR/mongo/db/storage/journal_flusher',
'$BUILD_DIR/mongo/idl/server_parameter',
'local_oplog_info',
'repl_server_parameters',
diff --git a/src/mongo/db/repl/oplog_applier_impl.cpp b/src/mongo/db/repl/oplog_applier_impl.cpp
index 9b3b3d0aa61..24ff5ad96d6 100644
--- a/src/mongo/db/repl/oplog_applier_impl.cpp
+++ b/src/mongo/db/repl/oplog_applier_impl.cpp
@@ -47,7 +47,7 @@
#include "mongo/db/repl/transaction_oplog_application.h"
#include "mongo/db/stats/counters.h"
#include "mongo/db/stats/timer_stats.h"
-#include "mongo/db/storage/control/storage_control.h"
+#include "mongo/db/storage/control/journal_flusher.h"
#include "mongo/logv2/log.h"
#include "mongo/platform/basic.h"
#include "mongo/util/fail_point.h"
@@ -227,7 +227,7 @@ void ApplyBatchFinalizerForJournal::_run() {
}
auto opCtx = cc().makeOperationContext();
- opCtx->recoveryUnit()->waitUntilDurable(opCtx.get());
+ JournalFlusher::get(opCtx.get())->waitForJournalFlush();
_recordDurable(latestOpTimeAndWallTime);
}
}
@@ -561,7 +561,7 @@ StatusWith<OpTime> OplogApplierImpl::_applyOplogBatch(OperationContext* opCtx,
// new writes with timestamps associated with those oplog entries will show up in the future. We
// want to flush the journal as soon as possible in order to free ops waiting with 'j' write
// concern.
- StorageControl::triggerJournalFlush(opCtx->getServiceContext());
+ JournalFlusher::get(opCtx)->triggerJournalFlush();
// Use this fail point to hold the PBWM lock and prevent the batch from completing.
if (MONGO_unlikely(pauseBatchApplicationBeforeCompletion.shouldFail())) {
diff --git a/src/mongo/db/repl/replication_consistency_markers_impl.cpp b/src/mongo/db/repl/replication_consistency_markers_impl.cpp
index a4d24555bf9..c829e33226b 100644
--- a/src/mongo/db/repl/replication_consistency_markers_impl.cpp
+++ b/src/mongo/db/repl/replication_consistency_markers_impl.cpp
@@ -40,6 +40,7 @@
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/repl/storage_interface.h"
+#include "mongo/db/storage/control/journal_flusher.h"
#include "mongo/logv2/log.h"
namespace mongo {
@@ -150,7 +151,7 @@ void ReplicationConsistencyMarkersImpl::setInitialSyncFlag(OperationContext* opC
update.timestamp = Timestamp();
_updateMinValidDocument(opCtx, update);
- opCtx->recoveryUnit()->waitUntilDurable(opCtx);
+ JournalFlusher::get(opCtx)->waitForJournalFlush();
}
void ReplicationConsistencyMarkersImpl::clearInitialSyncFlag(OperationContext* opCtx) {
@@ -186,7 +187,7 @@ void ReplicationConsistencyMarkersImpl::clearInitialSyncFlag(OperationContext* o
setOplogTruncateAfterPoint(opCtx, Timestamp());
if (getGlobalServiceContext()->getStorageEngine()->isDurable()) {
- opCtx->recoveryUnit()->waitUntilDurable(opCtx);
+ JournalFlusher::get(opCtx)->waitForJournalFlush();
replCoord->setMyLastDurableOpTimeAndWallTime(opTimeAndWallTime);
}
}
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index ccbcc74e0cd..1094c1adc03 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -89,7 +89,7 @@
#include "mongo/db/server_options.h"
#include "mongo/db/service_context.h"
#include "mongo/db/session_catalog_mongod.h"
-#include "mongo/db/storage/control/storage_control.h"
+#include "mongo/db/storage/control/journal_flusher.h"
#include "mongo/db/storage/flow_control.h"
#include "mongo/db/storage/storage_engine.h"
#include "mongo/db/system_index.h"
@@ -690,7 +690,7 @@ Status ReplicationCoordinatorExternalStateImpl::storeLocalLastVoteDocument(
return status;
}
- opCtx->recoveryUnit()->waitUntilDurable(opCtx);
+ JournalFlusher::get(opCtx)->waitForJournalFlush();
return Status::OK();
} catch (const DBException& ex) {
@@ -804,13 +804,13 @@ void ReplicationCoordinatorExternalStateImpl::_stopAsyncUpdatesOfAndClearOplogTr
//
// This makes sure the JournalFlusher is not stuck waiting for a lock that stepdown might hold
// before doing an update write to the truncate point.
- StorageControl::interruptJournalFlusherForReplStateChange(_service);
+ JournalFlusher::get(_service)->interruptJournalFlusherForReplStateChange();
// Wait for another round of journal flushing. This will ensure that we wait for the current
// round to completely finish and have no chance of racing with unsetting the truncate point
// below. It is possible that the JournalFlusher will not check for the interrupt signaled
// above, if writing is imminent, so we must make sure that the code completes fully.
- StorageControl::waitForJournalFlush(opCtx);
+ JournalFlusher::get(_service)->waitForJournalFlush();
// Writes to non-replicated collections do not need concurrency control with the OplogApplier
// that never accesses them. Skip taking the PBWM.
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index b1be5ec8e78..492740768e6 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -89,6 +89,7 @@
#include "mongo/db/server_options.h"
#include "mongo/db/session_catalog.h"
#include "mongo/db/shutdown_in_progress_quiesce_info.h"
+#include "mongo/db/storage/control/journal_flusher.h"
#include "mongo/db/storage/storage_options.h"
#include "mongo/db/vector_clock.h"
#include "mongo/db/vector_clock_mutable.h"
@@ -3484,7 +3485,7 @@ Status ReplicationCoordinatorImpl::doReplSetReconfig(OperationContext* opCtx,
}
}
// Wait for durability of the new config document.
- opCtx->recoveryUnit()->waitUntilDurable(opCtx);
+ JournalFlusher::get(opCtx)->waitForJournalFlush();
configStateGuard.dismiss();
_finishReplSetReconfig(opCtx, newConfig, force, myIndex);
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
index 41054ca6aa4..9ac897a22a8 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
@@ -57,6 +57,7 @@
#include "mongo/db/repl/topology_coordinator.h"
#include "mongo/db/repl/vote_requester.h"
#include "mongo/db/service_context.h"
+#include "mongo/db/storage/control/journal_flusher.h"
#include "mongo/logv2/log.h"
#include "mongo/platform/mutex.h"
#include "mongo/rpc/get_status_from_command_result.h"
@@ -662,7 +663,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore(
auto status = _externalState->storeLocalConfigDocument(
opCtx.get(), newConfig.toBSON(), false /* writeOplog */);
// Wait for durability of the new config document.
- opCtx->recoveryUnit()->waitUntilDurable(opCtx.get());
+ JournalFlusher::get(opCtx.get())->waitForJournalFlush();
bool isFirstConfig;
{
diff --git a/src/mongo/db/repl/replication_recovery.cpp b/src/mongo/db/repl/replication_recovery.cpp
index 00d772ac6ef..dd350f73552 100644
--- a/src/mongo/db/repl/replication_recovery.cpp
+++ b/src/mongo/db/repl/replication_recovery.cpp
@@ -49,6 +49,7 @@
#include "mongo/db/repl/transaction_oplog_application.h"
#include "mongo/db/server_recovery.h"
#include "mongo/db/session.h"
+#include "mongo/db/storage/control/journal_flusher.h"
#include "mongo/db/storage/storage_parameters_gen.h"
#include "mongo/db/transaction_history_iterator.h"
#include "mongo/db/transaction_participant.h"
@@ -727,7 +728,7 @@ void ReplicationRecoveryImpl::_truncateOplogIfNeededAndThenClearOplogTruncateAft
// Clear the oplogTruncateAfterPoint now that we have removed any holes that might exist in the
// oplog -- and so that we do not truncate future entries erroneously.
_consistencyMarkers->setOplogTruncateAfterPoint(opCtx, Timestamp());
- opCtx->recoveryUnit()->waitUntilDurable(opCtx);
+ JournalFlusher::get(opCtx)->waitForJournalFlush();
}
} // namespace repl
diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp
index fb186e4192c..d2116a74b17 100644
--- a/src/mongo/db/repl/rs_rollback.cpp
+++ b/src/mongo/db/repl/rs_rollback.cpp
@@ -73,6 +73,7 @@
#include "mongo/db/repl/rollback_source.h"
#include "mongo/db/s/shard_identity_rollback_notifier.h"
#include "mongo/db/session_catalog_mongod.h"
+#include "mongo/db/storage/control/journal_flusher.h"
#include "mongo/db/storage/durable_catalog.h"
#include "mongo/db/storage/remove_saver.h"
#include "mongo/db/transaction_participant.h"
@@ -2151,7 +2152,7 @@ void rollback(OperationContext* opCtx,
// so that if we wind up shutting down uncleanly in response to something we rolled back
// we know that we won't wind up right back in the same situation when we start back up
// because the rollback wasn't durable.
- opCtx->recoveryUnit()->waitUntilDurable(opCtx);
+ JournalFlusher::get(opCtx)->waitForJournalFlush();
// If we detected that we rolled back the shardIdentity document as part of this rollback
// then we must shut down to clear the in-memory ShardingState associated with the
diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp
index 688e50a0bca..9a4abeee1bc 100644
--- a/src/mongo/db/repl/storage_interface_impl.cpp
+++ b/src/mongo/db/repl/storage_interface_impl.cpp
@@ -74,6 +74,7 @@
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/repl/rollback_gen.h"
#include "mongo/db/service_context.h"
+#include "mongo/db/storage/control/journal_flusher.h"
#include "mongo/db/storage/control/storage_control.h"
#include "mongo/db/storage/durable_catalog.h"
#include "mongo/db/storage/oplog_cap_maintainer_thread.h"
@@ -177,7 +178,7 @@ StatusWith<int> StorageInterfaceImpl::incrementRollbackID(OperationContext* opCt
// We wait until durable so that we are sure the Rollback ID is updated before rollback ends.
if (status.isOK()) {
- opCtx->recoveryUnit()->waitUntilDurable(opCtx);
+ JournalFlusher::get(opCtx)->waitForJournalFlush();
return newRBID;
}
return status;
diff --git a/src/mongo/db/storage/SConscript b/src/mongo/db/storage/SConscript
index 88565bb5ca5..53ac37b0e30 100644
--- a/src/mongo/db/storage/SConscript
+++ b/src/mongo/db/storage/SConscript
@@ -381,6 +381,7 @@ env.Library(
'$BUILD_DIR/mongo/db/dbhelpers',
'$BUILD_DIR/mongo/db/repl/replica_set_messages',
'$BUILD_DIR/mongo/db/service_context',
+ '$BUILD_DIR/mongo/db/storage/journal_flusher',
'storage_file_util',
],
)
diff --git a/src/mongo/db/storage/control/journal_flusher.cpp b/src/mongo/db/storage/control/journal_flusher.cpp
index 6e9d7beb16f..8d1dc4f704f 100644
--- a/src/mongo/db/storage/control/journal_flusher.cpp
+++ b/src/mongo/db/storage/control/journal_flusher.cpp
@@ -85,9 +85,6 @@ void JournalFlusher::run() {
// from Flow Control.
_uniqueCtx->get()->setShouldParticipateInFlowControl(false);
while (true) {
-
- pauseJournalFlusherThread.pauseWhileSet(_uniqueCtx->get());
-
try {
ON_BLOCK_EXIT([&] {
// We do not want to miss an interrupt for the next round. Therefore, the opCtx
@@ -126,7 +123,10 @@ void JournalFlusher::run() {
stdx::unique_lock<Latch> lk(_stateMutex);
MONGO_IDLE_THREAD_BLOCK;
- if (_disablePeriodicFlushes) {
+ if (_disablePeriodicFlushes || MONGO_unlikely(pauseJournalFlusherThread.shouldFail())) {
+ // This is not an ideal solution for the failpoint usage because turning the failpoint
+ // off at this point in the code would leave this thread sleeping until explicitly
+ // pinged by an async thread to flush the journal.
_flushJournalNowCV.wait(lk, [&] { return _flushJournalNow || _shuttingDown; });
} else {
_flushJournalNowCV.wait_until(lk, deadline.toSystemTimePoint(), [&] {
@@ -172,16 +172,17 @@ void JournalFlusher::triggerJournalFlush() {
}
void JournalFlusher::waitForJournalFlush() {
- auto myFuture = [&]() {
- stdx::unique_lock<Latch> lk(_stateMutex);
- if (!_flushJournalNow) {
- _flushJournalNow = true;
- _flushJournalNowCV.notify_one();
+ while (true) {
+ try {
+ _waitForJournalFlushNoRetry();
+ break;
+ } catch (const ExceptionFor<ErrorCodes::InterruptedDueToReplStateChange>&) {
+ // Do nothing and let the while-loop retry the operation.
+ LOGV2_DEBUG(4814901,
+ 3,
+ "Retrying waiting for durability interrupted by replication state change");
}
- return _nextSharedPromise->getFuture();
- }();
- // Throws on error if the catalog is closed or the flusher round is interrupted by stepdown.
- myFuture.get();
+ }
}
void JournalFlusher::interruptJournalFlusherForReplStateChange() {
@@ -192,4 +193,17 @@ void JournalFlusher::interruptJournalFlusherForReplStateChange() {
}
}
+void JournalFlusher::_waitForJournalFlushNoRetry() {
+ auto myFuture = [&]() {
+ stdx::unique_lock<Latch> lk(_stateMutex);
+ if (!_flushJournalNow) {
+ _flushJournalNow = true;
+ _flushJournalNowCV.notify_one();
+ }
+ return _nextSharedPromise->getFuture();
+ }();
+ // Throws on error if the flusher round is interrupted or the flusher thread is shutdown.
+ myFuture.get();
+}
+
} // namespace mongo
diff --git a/src/mongo/db/storage/control/journal_flusher.h b/src/mongo/db/storage/control/journal_flusher.h
index 74a0dca2ed0..4d1058b60e9 100644
--- a/src/mongo/db/storage/control/journal_flusher.h
+++ b/src/mongo/db/storage/control/journal_flusher.h
@@ -94,8 +94,8 @@ public:
/**
* Signals an immediate journal flush and waits for it to complete before returning.
*
- * Will throw ShutdownInProgress if the flusher thread is being stopped.
- * Will throw InterruptedDueToReplStateChange if a flusher round is interrupted by stepdown.
+ * Retries internally on InterruptedDueToReplStateChange errors.
+ * Will throw ErrorCodes::isShutdownError errors.
*/
void waitForJournalFlush();
@@ -106,6 +106,14 @@ public:
void interruptJournalFlusherForReplStateChange();
private:
+ /**
+ * Signals an immediate journal flush and waits for it to complete before returning.
+ *
+ * Will throw ErrorCodes::isShutdownError if the flusher thread is being stopped.
+ * Will throw InterruptedDueToReplStateChange if a flusher round is interrupted by stepdown.
+ */
+ void _waitForJournalFlushNoRetry();
+
// Serializes setting/resetting _uniqueCtx and marking _uniqueCtx killed.
mutable Mutex _opCtxMutex = MONGO_MAKE_LATCH("JournalFlusherOpCtxMutex");
diff --git a/src/mongo/db/storage/control/storage_control.cpp b/src/mongo/db/storage/control/storage_control.cpp
index ccd5bd9ed53..f0b7e7d825f 100644
--- a/src/mongo/db/storage/control/storage_control.cpp
+++ b/src/mongo/db/storage/control/storage_control.cpp
@@ -37,7 +37,6 @@
#include "mongo/db/service_context.h"
#include "mongo/db/storage/control/journal_flusher.h"
#include "mongo/logv2/log.h"
-#include "mongo/util/background.h"
namespace mongo {
diff --git a/src/mongo/db/storage/control/storage_control.h b/src/mongo/db/storage/control/storage_control.h
index 6e877cf644e..ea868466d81 100644
--- a/src/mongo/db/storage/control/storage_control.h
+++ b/src/mongo/db/storage/control/storage_control.h
@@ -63,23 +63,6 @@ void startStorageControls(ServiceContext* serviceContext, bool forTestOnly = fal
*/
void stopStorageControls(ServiceContext* serviceContext, const Status& reason);
-/**
- * Prompts an immediate journal flush and returns without waiting for it.
- */
-void triggerJournalFlush(ServiceContext* serviceContext);
-
-/**
- * Initiates if needed and waits for a complete round of journal flushing to execute.
- *
- * Can throw ShutdownInProgress if the storage engine is being closed.
- */
-void waitForJournalFlush(OperationContext* opCtx);
-
-/**
- * Ensures interruption of the JournalFlusher if it is or will be acquiring a lock.
- */
-void interruptJournalFlusherForReplStateChange(ServiceContext* serviceContext);
-
} // namespace StorageControl
} // namespace mongo
diff --git a/src/mongo/db/storage/storage_repair_observer.cpp b/src/mongo/db/storage/storage_repair_observer.cpp
index 3252536b933..22b76a6a39c 100644
--- a/src/mongo/db/storage/storage_repair_observer.cpp
+++ b/src/mongo/db/storage/storage_repair_observer.cpp
@@ -47,6 +47,7 @@
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/repl_set_config.h"
#include "mongo/db/service_context.h"
+#include "mongo/db/storage/control/journal_flusher.h"
#include "mongo/db/storage/storage_file_util.h"
#include "mongo/db/storage/storage_options.h"
#include "mongo/logv2/log.h"
@@ -165,7 +166,7 @@ void StorageRepairObserver::_invalidateReplConfigIfNeeded(OperationContext* opCt
configBuilder.append(repl::ReplSetConfig::kRepairedFieldName, true);
Helpers::putSingleton(opCtx, kConfigNss.ns().c_str(), configBuilder.obj());
- opCtx->recoveryUnit()->waitUntilDurable(opCtx);
+ JournalFlusher::get(opCtx)->waitForJournalFlush();
}
} // namespace mongo
diff --git a/src/mongo/db/write_concern.cpp b/src/mongo/db/write_concern.cpp
index 825fcce84c4..6c49846706c 100644
--- a/src/mongo/db/write_concern.cpp
+++ b/src/mongo/db/write_concern.cpp
@@ -45,7 +45,7 @@
#include "mongo/db/server_options.h"
#include "mongo/db/service_context.h"
#include "mongo/db/stats/timer_stats.h"
-#include "mongo/db/storage/control/storage_control.h"
+#include "mongo/db/storage/control/journal_flusher.h"
#include "mongo/db/storage/storage_engine.h"
#include "mongo/db/transaction_validation.h"
#include "mongo/db/write_concern_options.h"
@@ -314,13 +314,13 @@ Status waitForWriteConcern(OperationContext* opCtx,
result->fsyncFiles = 1;
} else {
// We only need to commit the journal if we're durable
- StorageControl::waitForJournalFlush(opCtx);
+ JournalFlusher::get(opCtx)->waitForJournalFlush();
}
break;
}
case WriteConcernOptions::SyncMode::JOURNAL:
waitForNoOplogHolesIfNeeded(opCtx);
- StorageControl::waitForJournalFlush(opCtx);
+ JournalFlusher::get(opCtx)->waitForJournalFlush();
break;
}
} catch (const DBException& ex) {