summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl
diff options
context:
space:
mode:
authorDianna Hohensee <dianna.hohensee@mongodb.com>2020-06-05 11:09:50 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-08-28 18:37:55 +0000
commit17457592a2a1b64ed4ac90c93b32aa47598d5c90 (patch)
treebe681db9c1f0c6d5280e43dbf5eea3e9e569721e /src/mongo/db/repl
parent24890bbac9ee27cf3fb9a1b6bb8123ab120a1594 (diff)
downloadmongo-17457592a2a1b64ed4ac90c93b32aa47598d5c90.tar.gz
SERVER-48149 Move callers of RecoveryUnit::waitUntilDurable onto JournalFlusher::waitForJournalFlush
Operations running concurrently with stepdown must call JournalFlusher::waitForJournalFlush so that writes to the oplogTruncateAfterPoint are interrupted correctly during stepdown and callers waiting for durability don't receive unexpected InterruptedDueToReplStateChange errors.
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r--src/mongo/db/repl/SConscript4
-rw-r--r--src/mongo/db/repl/oplog_applier_impl.cpp6
-rw-r--r--src/mongo/db/repl/replication_consistency_markers_impl.cpp5
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp8
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp3
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp3
-rw-r--r--src/mongo/db/repl/replication_recovery.cpp3
-rw-r--r--src/mongo/db/repl/rs_rollback.cpp3
-rw-r--r--src/mongo/db/repl/storage_interface_impl.cpp3
9 files changed, 24 insertions, 14 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index d3412d5569e..24232cfa1a7 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -257,6 +257,7 @@ env.Library(
],
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/db/catalog_raii',
+ '$BUILD_DIR/mongo/db/storage/journal_flusher',
],
)
@@ -269,6 +270,7 @@ env.Library(
],
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/db/storage/journal_flusher',
'$BUILD_DIR/mongo/db/storage/storage_options',
'oplog',
'oplog_application',
@@ -405,6 +407,7 @@ env.Library(
],
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/db/catalog/index_build_oplog_entry',
+ '$BUILD_DIR/mongo/db/storage/journal_flusher',
],
)
@@ -712,6 +715,7 @@ env.Library(
'$BUILD_DIR/mongo/db/catalog/collection_catalog',
'$BUILD_DIR/mongo/db/commands/test_commands_enabled',
'$BUILD_DIR/mongo/db/index_builds_coordinator_interface',
+ '$BUILD_DIR/mongo/db/storage/journal_flusher',
'$BUILD_DIR/mongo/idl/server_parameter',
'local_oplog_info',
'repl_server_parameters',
diff --git a/src/mongo/db/repl/oplog_applier_impl.cpp b/src/mongo/db/repl/oplog_applier_impl.cpp
index 9b3b3d0aa61..24ff5ad96d6 100644
--- a/src/mongo/db/repl/oplog_applier_impl.cpp
+++ b/src/mongo/db/repl/oplog_applier_impl.cpp
@@ -47,7 +47,7 @@
#include "mongo/db/repl/transaction_oplog_application.h"
#include "mongo/db/stats/counters.h"
#include "mongo/db/stats/timer_stats.h"
-#include "mongo/db/storage/control/storage_control.h"
+#include "mongo/db/storage/control/journal_flusher.h"
#include "mongo/logv2/log.h"
#include "mongo/platform/basic.h"
#include "mongo/util/fail_point.h"
@@ -227,7 +227,7 @@ void ApplyBatchFinalizerForJournal::_run() {
}
auto opCtx = cc().makeOperationContext();
- opCtx->recoveryUnit()->waitUntilDurable(opCtx.get());
+ JournalFlusher::get(opCtx.get())->waitForJournalFlush();
_recordDurable(latestOpTimeAndWallTime);
}
}
@@ -561,7 +561,7 @@ StatusWith<OpTime> OplogApplierImpl::_applyOplogBatch(OperationContext* opCtx,
// new writes with timestamps associated with those oplog entries will show up in the future. We
// want to flush the journal as soon as possible in order to free ops waiting with 'j' write
// concern.
- StorageControl::triggerJournalFlush(opCtx->getServiceContext());
+ JournalFlusher::get(opCtx)->triggerJournalFlush();
// Use this fail point to hold the PBWM lock and prevent the batch from completing.
if (MONGO_unlikely(pauseBatchApplicationBeforeCompletion.shouldFail())) {
diff --git a/src/mongo/db/repl/replication_consistency_markers_impl.cpp b/src/mongo/db/repl/replication_consistency_markers_impl.cpp
index a4d24555bf9..c829e33226b 100644
--- a/src/mongo/db/repl/replication_consistency_markers_impl.cpp
+++ b/src/mongo/db/repl/replication_consistency_markers_impl.cpp
@@ -40,6 +40,7 @@
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/repl/storage_interface.h"
+#include "mongo/db/storage/control/journal_flusher.h"
#include "mongo/logv2/log.h"
namespace mongo {
@@ -150,7 +151,7 @@ void ReplicationConsistencyMarkersImpl::setInitialSyncFlag(OperationContext* opC
update.timestamp = Timestamp();
_updateMinValidDocument(opCtx, update);
- opCtx->recoveryUnit()->waitUntilDurable(opCtx);
+ JournalFlusher::get(opCtx)->waitForJournalFlush();
}
void ReplicationConsistencyMarkersImpl::clearInitialSyncFlag(OperationContext* opCtx) {
@@ -186,7 +187,7 @@ void ReplicationConsistencyMarkersImpl::clearInitialSyncFlag(OperationContext* o
setOplogTruncateAfterPoint(opCtx, Timestamp());
if (getGlobalServiceContext()->getStorageEngine()->isDurable()) {
- opCtx->recoveryUnit()->waitUntilDurable(opCtx);
+ JournalFlusher::get(opCtx)->waitForJournalFlush();
replCoord->setMyLastDurableOpTimeAndWallTime(opTimeAndWallTime);
}
}
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index ccbcc74e0cd..1094c1adc03 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -89,7 +89,7 @@
#include "mongo/db/server_options.h"
#include "mongo/db/service_context.h"
#include "mongo/db/session_catalog_mongod.h"
-#include "mongo/db/storage/control/storage_control.h"
+#include "mongo/db/storage/control/journal_flusher.h"
#include "mongo/db/storage/flow_control.h"
#include "mongo/db/storage/storage_engine.h"
#include "mongo/db/system_index.h"
@@ -690,7 +690,7 @@ Status ReplicationCoordinatorExternalStateImpl::storeLocalLastVoteDocument(
return status;
}
- opCtx->recoveryUnit()->waitUntilDurable(opCtx);
+ JournalFlusher::get(opCtx)->waitForJournalFlush();
return Status::OK();
} catch (const DBException& ex) {
@@ -804,13 +804,13 @@ void ReplicationCoordinatorExternalStateImpl::_stopAsyncUpdatesOfAndClearOplogTr
//
// This makes sure the JournalFlusher is not stuck waiting for a lock that stepdown might hold
// before doing an update write to the truncate point.
- StorageControl::interruptJournalFlusherForReplStateChange(_service);
+ JournalFlusher::get(_service)->interruptJournalFlusherForReplStateChange();
// Wait for another round of journal flushing. This will ensure that we wait for the current
// round to completely finish and have no chance of racing with unsetting the truncate point
// below. It is possible that the JournalFlusher will not check for the interrupt signaled
// above, if writing is imminent, so we must make sure that the code completes fully.
- StorageControl::waitForJournalFlush(opCtx);
+ JournalFlusher::get(_service)->waitForJournalFlush();
// Writes to non-replicated collections do not need concurrency control with the OplogApplier
// that never accesses them. Skip taking the PBWM.
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index b1be5ec8e78..492740768e6 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -89,6 +89,7 @@
#include "mongo/db/server_options.h"
#include "mongo/db/session_catalog.h"
#include "mongo/db/shutdown_in_progress_quiesce_info.h"
+#include "mongo/db/storage/control/journal_flusher.h"
#include "mongo/db/storage/storage_options.h"
#include "mongo/db/vector_clock.h"
#include "mongo/db/vector_clock_mutable.h"
@@ -3484,7 +3485,7 @@ Status ReplicationCoordinatorImpl::doReplSetReconfig(OperationContext* opCtx,
}
}
// Wait for durability of the new config document.
- opCtx->recoveryUnit()->waitUntilDurable(opCtx);
+ JournalFlusher::get(opCtx)->waitForJournalFlush();
configStateGuard.dismiss();
_finishReplSetReconfig(opCtx, newConfig, force, myIndex);
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
index 41054ca6aa4..9ac897a22a8 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
@@ -57,6 +57,7 @@
#include "mongo/db/repl/topology_coordinator.h"
#include "mongo/db/repl/vote_requester.h"
#include "mongo/db/service_context.h"
+#include "mongo/db/storage/control/journal_flusher.h"
#include "mongo/logv2/log.h"
#include "mongo/platform/mutex.h"
#include "mongo/rpc/get_status_from_command_result.h"
@@ -662,7 +663,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore(
auto status = _externalState->storeLocalConfigDocument(
opCtx.get(), newConfig.toBSON(), false /* writeOplog */);
// Wait for durability of the new config document.
- opCtx->recoveryUnit()->waitUntilDurable(opCtx.get());
+ JournalFlusher::get(opCtx.get())->waitForJournalFlush();
bool isFirstConfig;
{
diff --git a/src/mongo/db/repl/replication_recovery.cpp b/src/mongo/db/repl/replication_recovery.cpp
index 00d772ac6ef..dd350f73552 100644
--- a/src/mongo/db/repl/replication_recovery.cpp
+++ b/src/mongo/db/repl/replication_recovery.cpp
@@ -49,6 +49,7 @@
#include "mongo/db/repl/transaction_oplog_application.h"
#include "mongo/db/server_recovery.h"
#include "mongo/db/session.h"
+#include "mongo/db/storage/control/journal_flusher.h"
#include "mongo/db/storage/storage_parameters_gen.h"
#include "mongo/db/transaction_history_iterator.h"
#include "mongo/db/transaction_participant.h"
@@ -727,7 +728,7 @@ void ReplicationRecoveryImpl::_truncateOplogIfNeededAndThenClearOplogTruncateAft
// Clear the oplogTruncateAfterPoint now that we have removed any holes that might exist in the
// oplog -- and so that we do not truncate future entries erroneously.
_consistencyMarkers->setOplogTruncateAfterPoint(opCtx, Timestamp());
- opCtx->recoveryUnit()->waitUntilDurable(opCtx);
+ JournalFlusher::get(opCtx)->waitForJournalFlush();
}
} // namespace repl
diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp
index fb186e4192c..d2116a74b17 100644
--- a/src/mongo/db/repl/rs_rollback.cpp
+++ b/src/mongo/db/repl/rs_rollback.cpp
@@ -73,6 +73,7 @@
#include "mongo/db/repl/rollback_source.h"
#include "mongo/db/s/shard_identity_rollback_notifier.h"
#include "mongo/db/session_catalog_mongod.h"
+#include "mongo/db/storage/control/journal_flusher.h"
#include "mongo/db/storage/durable_catalog.h"
#include "mongo/db/storage/remove_saver.h"
#include "mongo/db/transaction_participant.h"
@@ -2151,7 +2152,7 @@ void rollback(OperationContext* opCtx,
// so that if we wind up shutting down uncleanly in response to something we rolled back
// we know that we won't wind up right back in the same situation when we start back up
// because the rollback wasn't durable.
- opCtx->recoveryUnit()->waitUntilDurable(opCtx);
+ JournalFlusher::get(opCtx)->waitForJournalFlush();
// If we detected that we rolled back the shardIdentity document as part of this rollback
// then we must shut down to clear the in-memory ShardingState associated with the
diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp
index 688e50a0bca..9a4abeee1bc 100644
--- a/src/mongo/db/repl/storage_interface_impl.cpp
+++ b/src/mongo/db/repl/storage_interface_impl.cpp
@@ -74,6 +74,7 @@
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/repl/rollback_gen.h"
#include "mongo/db/service_context.h"
+#include "mongo/db/storage/control/journal_flusher.h"
#include "mongo/db/storage/control/storage_control.h"
#include "mongo/db/storage/durable_catalog.h"
#include "mongo/db/storage/oplog_cap_maintainer_thread.h"
@@ -177,7 +178,7 @@ StatusWith<int> StorageInterfaceImpl::incrementRollbackID(OperationContext* opCt
// We wait until durable so that we are sure the Rollback ID is updated before rollback ends.
if (status.isOK()) {
- opCtx->recoveryUnit()->waitUntilDurable(opCtx);
+ JournalFlusher::get(opCtx)->waitForJournalFlush();
return newRBID;
}
return status;