summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJudah Schvimer <judah@mongodb.com>2017-05-30 14:58:59 -0400
committerJudah Schvimer <judah@mongodb.com>2017-05-30 14:58:59 -0400
commit244ed4e90574c41c793a8f8e7fd3a1142cb1de9f (patch)
tree9b02a225c30ec0f129ac47a3d9d0984c8438be81 /src
parent7b112f71fe551e92ae0a365f62fff402d4158035 (diff)
downloadmongo-244ed4e90574c41c793a8f8e7fd3a1142cb1de9f.tar.gz
SERVER-29254 Moved MinValid into ReplicationProcess
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/db.cpp10
-rw-r--r--src/mongo/db/repl/SConscript32
-rw-r--r--src/mongo/db/repl/bgsync.cpp25
-rw-r--r--src/mongo/db/repl/bgsync.h8
-rw-r--r--src/mongo/db/repl/initial_syncer.cpp9
-rw-r--r--src/mongo/db/repl/initial_syncer.h4
-rw-r--r--src/mongo/db/repl/initial_syncer_test.cpp44
-rw-r--r--src/mongo/db/repl/repl_set_commands.cpp3
-rw-r--r--src/mongo/db/repl/replication_consistency_markers.cpp40
-rw-r--r--src/mongo/db/repl/replication_consistency_markers.h164
-rw-r--r--src/mongo/db/repl/replication_consistency_markers_impl.cpp207
-rw-r--r--src/mongo/db/repl/replication_consistency_markers_impl.h93
-rw-r--r--src/mongo/db/repl/replication_consistency_markers_impl_test.cpp227
-rw-r--r--src/mongo/db/repl/replication_consistency_markers_mock.cpp92
-rw-r--r--src/mongo/db/repl/replication_consistency_markers_mock.h78
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp49
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.h7
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp1
-rw-r--r--src/mongo/db/repl/replication_coordinator_test_fixture.cpp6
-rw-r--r--src/mongo/db/repl/replication_process.cpp19
-rw-r--r--src/mongo/db/repl/replication_process.h15
-rw-r--r--src/mongo/db/repl/replication_process_test.cpp37
-rw-r--r--src/mongo/db/repl/rollback_test_fixture.cpp15
-rw-r--r--src/mongo/db/repl/rollback_test_fixture.h5
-rw-r--r--src/mongo/db/repl/rs_rollback.cpp33
-rw-r--r--src/mongo/db/repl/rs_rollback.h8
-rw-r--r--src/mongo/db/repl/rs_rollback_test.cpp81
-rw-r--r--src/mongo/db/repl/storage_interface.h79
-rw-r--r--src/mongo/db/repl/storage_interface_impl.cpp150
-rw-r--r--src/mongo/db/repl/storage_interface_impl.h28
-rw-r--r--src/mongo/db/repl/storage_interface_impl_test.cpp143
-rw-r--r--src/mongo/db/repl/storage_interface_mock.cpp58
-rw-r--r--src/mongo/db/repl/storage_interface_mock.h19
-rw-r--r--src/mongo/db/repl/sync_tail.cpp18
-rw-r--r--src/mongo/db/repl/sync_tail_test.cpp10
-rw-r--r--src/mongo/dbtests/SConscript1
-rw-r--r--src/mongo/dbtests/replica_set_tests.cpp10
-rw-r--r--src/mongo/s/sharding_mongod_test_fixture.cpp7
38 files changed, 1208 insertions, 627 deletions
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp
index 52d09a884a6..3803808efab 100644
--- a/src/mongo/db/db.cpp
+++ b/src/mongo/db/db.cpp
@@ -84,6 +84,7 @@
#include "mongo/db/repl/drop_pending_collection_reaper.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/repl_settings.h"
+#include "mongo/db/repl/replication_consistency_markers_impl.h"
#include "mongo/db/repl/replication_coordinator_external_state_impl.h"
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/repl/replication_coordinator_impl.h"
@@ -895,8 +896,11 @@ MONGO_INITIALIZER_WITH_PREREQUISITES(CreateReplicationManager,
repl::StorageInterface::set(serviceContext, stdx::make_unique<repl::StorageInterfaceImpl>());
auto storageInterface = repl::StorageInterface::get(serviceContext);
- repl::ReplicationProcess::set(serviceContext,
- stdx::make_unique<repl::ReplicationProcess>(storageInterface));
+ repl::ReplicationProcess::set(
+ serviceContext,
+ stdx::make_unique<repl::ReplicationProcess>(
+ storageInterface,
+ stdx::make_unique<repl::ReplicationConsistencyMarkersImpl>(storageInterface)));
auto replicationProcess = repl::ReplicationProcess::get(serviceContext);
repl::DropPendingCollectionReaper::set(
@@ -914,7 +918,7 @@ MONGO_INITIALIZER_WITH_PREREQUISITES(CreateReplicationManager,
serviceContext,
getGlobalReplSettings(),
stdx::make_unique<repl::ReplicationCoordinatorExternalStateImpl>(
- serviceContext, dropPendingCollectionReaper, storageInterface),
+ serviceContext, dropPendingCollectionReaper, storageInterface, replicationProcess),
makeReplicationExecutor(serviceContext),
stdx::make_unique<repl::TopologyCoordinatorImpl>(topoCoordOptions),
replicationProcess,
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 3809c5e62d7..c63f661058e 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -185,8 +185,33 @@ env.CppUnitTest(
)
env.Library(
+ target='replication_consistency_markers_impl',
+ source=[
+ 'replication_consistency_markers_impl.cpp',
+ ],
+ LIBDEPS=[
+ 'optime',
+ 'repl_coordinator_impl',
+ ],
+)
+
+env.CppUnitTest(
+ target='replication_consistency_markers_impl_test',
+ source=[
+ 'replication_consistency_markers_impl_test.cpp',
+ ],
+ LIBDEPS=[
+ 'replication_consistency_markers_impl',
+ 'replmocks',
+ 'storage_interface_impl',
+ '$BUILD_DIR/mongo/db/service_context_d_test_fixture',
+ ],
+)
+
+env.Library(
target='replication_process',
source=[
+ 'replication_consistency_markers.cpp',
'replication_process.cpp',
],
LIBDEPS=[
@@ -204,6 +229,7 @@ env.CppUnitTest(
'replication_process_test.cpp',
],
LIBDEPS=[
+ 'replication_consistency_markers_impl',
'replication_process',
'replmocks',
'storage_interface_impl',
@@ -392,6 +418,7 @@ env.Library(
],
LIBDEPS=[
'optime',
+ 'replication_process',
'replmocks',
#'serveronly', # CYCLE
'$BUILD_DIR/mongo/db/service_context',
@@ -502,6 +529,7 @@ env.CppUnitTest(
],
LIBDEPS=[
'oplog_interface_local',
+ 'replication_process',
'replmocks',
'sync_tail',
'$BUILD_DIR/mongo/db/service_context_d_test_fixture',
@@ -739,6 +767,7 @@ env.Library(
],
LIBDEPS=[
'optime',
+ 'replication_process',
'$BUILD_DIR/mongo/db/service_context',
'$BUILD_DIR/mongo/util/net/hostandport',
'$BUILD_DIR/mongo/util/decorable',
@@ -752,6 +781,7 @@ env.Library('repl_coordinator_global',
env.Library(
target='replmocks',
source=[
+ 'replication_consistency_markers_mock.cpp',
'replication_coordinator_external_state_mock.cpp',
'replication_coordinator_mock.cpp',
'service_context_repl_mock.cpp',
@@ -1271,6 +1301,7 @@ env.CppUnitTest(
'base_cloner_test_fixture',
'initial_syncer',
'data_replicator_external_state_mock',
+ 'replication_process',
'replmocks',
'service_context_repl_mock_init',
'sync_source_selector_mock',
@@ -1383,6 +1414,7 @@ env.Library(
'repl_coordinator_interface',
'repl_coordinator_impl',
'repl_settings',
+ 'replication_consistency_markers_impl',
'replication_process',
'sync_tail',
'$BUILD_DIR/mongo/base',
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index 8bd3c0e8ab0..be735cb0b6d 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -45,6 +45,7 @@
#include "mongo/db/repl/oplog_interface_local.h"
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/repl/replication_coordinator_impl.h"
+#include "mongo/db/repl/replication_process.h"
#include "mongo/db/repl/rollback_source_impl.h"
#include "mongo/db/repl/rs_rollback.h"
#include "mongo/db/repl/rs_sync.h"
@@ -135,10 +136,12 @@ static ServerStatusMetricField<Counter64> displayBufferMaxSize("repl.buffer.maxS
BackgroundSync::BackgroundSync(
ReplicationCoordinatorExternalState* replicationCoordinatorExternalState,
+ ReplicationProcess* replicationProcess,
std::unique_ptr<OplogBuffer> oplogBuffer)
: _oplogBuffer(std::move(oplogBuffer)),
_replCoord(getGlobalReplicationCoordinator()),
- _replicationCoordinatorExternalState(replicationCoordinatorExternalState) {
+ _replicationCoordinatorExternalState(replicationCoordinatorExternalState),
+ _replicationProcess(replicationProcess) {
// Update "repl.buffer.maxSizeBytes" server status metric to reflect the current oplog buffer's
// max size.
bufferMaxSizeGauge.increment(_oplogBuffer->getMaxSize() - bufferMaxSizeGauge.get());
@@ -274,14 +277,14 @@ void BackgroundSync::_produce(OperationContext* opCtx) {
}
}
- auto storageInterface = StorageInterface::get(opCtx);
// find a target to sync from the last optime fetched
OpTime lastOpTimeFetched;
HostAndPort source;
HostAndPort oldSource = _syncSourceHost;
SyncSourceResolverResponse syncSourceResp;
{
- const OpTime minValidSaved = storageInterface->getMinValid(opCtx);
+ const OpTime minValidSaved =
+ _replicationProcess->getConsistencyMarkers()->getMinValid(opCtx);
stdx::lock_guard<stdx::mutex> lock(_mutex);
if (_state != ProducerState::Running) {
@@ -405,8 +408,9 @@ void BackgroundSync::_produce(OperationContext* opCtx) {
// Set the applied point if unset. This is most likely the first time we've established a sync
// source since stepping down or otherwise clearing the applied point. We need to set this here,
// before the OplogWriter gets a chance to append to the oplog.
- if (storageInterface->getAppliedThrough(opCtx).isNull()) {
- storageInterface->setAppliedThrough(opCtx, _replCoord->getMyLastAppliedOpTime());
+ if (_replicationProcess->getConsistencyMarkers()->getAppliedThrough(opCtx).isNull()) {
+ _replicationProcess->getConsistencyMarkers()->setAppliedThrough(
+ opCtx, _replCoord->getMyLastAppliedOpTime());
}
// "lastFetched" not used. Already set in _enqueueDocuments.
@@ -477,6 +481,7 @@ void BackgroundSync::_produce(OperationContext* opCtx) {
// if it can't return a matching oplog start from the last fetch oplog ts field.
return;
} else if (fetcherReturnStatus.code() == ErrorCodes::OplogStartMissing) {
+ auto storageInterface = StorageInterface::get(opCtx);
_runRollback(opCtx, fetcherReturnStatus, source, syncSourceResp.rbid, storageInterface);
} else if (fetcherReturnStatus == ErrorCodes::InvalidBSON) {
Seconds blacklistDuration(60);
@@ -618,7 +623,7 @@ void BackgroundSync::_runRollback(OperationContext* opCtx,
OplogInterfaceLocal localOplog(opCtx, rsOplogName);
if (use3dot4Rollback) {
log() << "Rollback falling back on 3.4 algorithm due to startup server parameter";
- _fallBackOn3dot4Rollback(opCtx, source, requiredRBID, &localOplog, storageInterface);
+ _fallBackOn3dot4Rollback(opCtx, source, requiredRBID, &localOplog);
} else {
AbstractAsyncComponent* rollback;
StatusWith<OpTime> onRollbackShutdownResult =
@@ -661,8 +666,7 @@ void BackgroundSync::_runRollback(OperationContext* opCtx,
<< onRollbackShutdownResult.getValue();
} else if (ErrorCodes::IncompatibleRollbackAlgorithm == status) {
log() << "Rollback falling back on 3.4 algorithm due to " << status;
- _fallBackOn3dot4Rollback(
- opCtx, source, requiredRBID, &localOplog, storageInterface);
+ _fallBackOn3dot4Rollback(opCtx, source, requiredRBID, &localOplog);
} else {
warning() << "Rollback failed with error: " << status;
}
@@ -677,8 +681,7 @@ void BackgroundSync::_runRollback(OperationContext* opCtx,
void BackgroundSync::_fallBackOn3dot4Rollback(OperationContext* opCtx,
const HostAndPort& source,
int requiredRBID,
- OplogInterface* localOplog,
- StorageInterface* storageInterface) {
+ OplogInterface* localOplog) {
const int messagingPortTags = 0;
ConnectionPool connectionPool(messagingPortTags);
std::unique_ptr<ConnectionPool::ConnectionPtr> connection;
@@ -691,7 +694,7 @@ void BackgroundSync::_fallBackOn3dot4Rollback(OperationContext* opCtx,
};
RollbackSourceImpl rollbackSource(getConnection, source, rsOplogName);
- rollback(opCtx, *localOplog, rollbackSource, requiredRBID, _replCoord, storageInterface);
+ rollback(opCtx, *localOplog, rollbackSource, requiredRBID, _replCoord, _replicationProcess);
}
HostAndPort BackgroundSync::getSyncTarget() const {
diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h
index e76bcc0dd83..d208261309e 100644
--- a/src/mongo/db/repl/bgsync.h
+++ b/src/mongo/db/repl/bgsync.h
@@ -55,6 +55,7 @@ namespace repl {
class OplogInterface;
class ReplicationCoordinator;
class ReplicationCoordinatorExternalState;
+class ReplicationProcess;
class StorageInterface;
class BackgroundSync {
@@ -76,6 +77,7 @@ public:
enum class ProducerState { Starting, Running, Stopped };
BackgroundSync(ReplicationCoordinatorExternalState* replicationCoordinatorExternalState,
+ ReplicationProcess* replicationProcess,
std::unique_ptr<OplogBuffer> oplogBuffer);
// stop syncing (when this node becomes a primary, e.g.)
@@ -187,8 +189,7 @@ private:
void _fallBackOn3dot4Rollback(OperationContext* opCtx,
const HostAndPort& source,
int requiredRBID,
- OplogInterface* localOplog,
- StorageInterface* storageInterface);
+ OplogInterface* localOplog);
// restart syncing
void start(OperationContext* opCtx);
@@ -204,6 +205,9 @@ private:
// A pointer to the replication coordinator external state.
ReplicationCoordinatorExternalState* _replicationCoordinatorExternalState;
+ // A pointer to the replication process.
+ ReplicationProcess* _replicationProcess;
+
/**
* All member variables are labeled with one of the following codes indicating the
* synchronization rules for accessing them:
diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp
index b036ed08f7e..3923d3402b7 100644
--- a/src/mongo/db/repl/initial_syncer.cpp
+++ b/src/mongo/db/repl/initial_syncer.cpp
@@ -51,6 +51,7 @@
#include "mongo/db/repl/oplog_buffer.h"
#include "mongo/db/repl/oplog_fetcher.h"
#include "mongo/db/repl/optime.h"
+#include "mongo/db/repl/replication_process.h"
#include "mongo/db/repl/storage_interface.h"
#include "mongo/db/repl/sync_source_selector.h"
#include "mongo/db/server_parameters.h"
@@ -203,15 +204,18 @@ InitialSyncer::InitialSyncer(
InitialSyncerOptions opts,
std::unique_ptr<DataReplicatorExternalState> dataReplicatorExternalState,
StorageInterface* storage,
+ ReplicationProcess* replicationProcess,
const OnCompletionFn& onCompletion)
: _fetchCount(0),
_opts(opts),
_dataReplicatorExternalState(std::move(dataReplicatorExternalState)),
_exec(_dataReplicatorExternalState->getTaskExecutor()),
_storage(storage),
+ _replicationProcess(replicationProcess),
_onCompletion(onCompletion) {
uassert(ErrorCodes::BadValue, "task executor cannot be null", _exec);
uassert(ErrorCodes::BadValue, "invalid storage interface", _storage);
+ uassert(ErrorCodes::BadValue, "invalid replication process", _replicationProcess);
uassert(ErrorCodes::BadValue, "invalid getMyLastOptime function", _opts.getMyLastOptime);
uassert(ErrorCodes::BadValue, "invalid setMyLastOptime function", _opts.setMyLastOptime);
uassert(ErrorCodes::BadValue, "invalid getSlaveDelay function", _opts.getSlaveDelay);
@@ -381,9 +385,8 @@ void InitialSyncer::setScheduleDbWorkFn_forTest(const CollectionCloner::Schedule
}
void InitialSyncer::_setUp_inlock(OperationContext* opCtx, std::uint32_t initialSyncMaxAttempts) {
- // This will call through to the storageInterfaceImpl to ReplicationCoordinatorImpl.
// 'opCtx' is passed through from startup().
- _storage->setInitialSyncFlag(opCtx);
+ _replicationProcess->getConsistencyMarkers()->setInitialSyncFlag(opCtx);
LOG(1) << "Creating oplogBuffer.";
_oplogBuffer = _dataReplicatorExternalState->makeInitialSyncOplogBuffer(opCtx);
@@ -405,7 +408,7 @@ void InitialSyncer::_tearDown_inlock(OperationContext* opCtx,
if (!lastApplied.isOK()) {
return;
}
- _storage->clearInitialSyncFlag(opCtx);
+ _replicationProcess->getConsistencyMarkers()->clearInitialSyncFlag(opCtx);
_opts.setMyLastOptime(lastApplied.getValue().opTime);
log() << "initial sync done; took "
<< duration_cast<Seconds>(_stats.initialSyncEnd - _stats.initialSyncStart) << ".";
diff --git a/src/mongo/db/repl/initial_syncer.h b/src/mongo/db/repl/initial_syncer.h
index 81e42c7956e..b438a0c97e1 100644
--- a/src/mongo/db/repl/initial_syncer.h
+++ b/src/mongo/db/repl/initial_syncer.h
@@ -71,9 +71,9 @@ MONGO_FP_FORWARD_DECLARE(rsSyncApplyStop);
struct InitialSyncState;
struct MemberState;
+class ReplicationProcess;
class StorageInterface;
-
struct InitialSyncerOptions {
/** Function to return optime of last operation applied on this node */
using GetMyLastOptimeFn = stdx::function<OpTime()>;
@@ -174,6 +174,7 @@ public:
InitialSyncer(InitialSyncerOptions opts,
std::unique_ptr<DataReplicatorExternalState> dataReplicatorExternalState,
StorageInterface* storage,
+ ReplicationProcess* replicationProcess,
const OnCompletionFn& onCompletion);
virtual ~InitialSyncer();
@@ -558,6 +559,7 @@ private:
std::unique_ptr<DataReplicatorExternalState> _dataReplicatorExternalState; // (R)
executor::TaskExecutor* _exec; // (R)
StorageInterface* _storage; // (R)
+ ReplicationProcess* _replicationProcess; // (S)
// This is invoked with the final status of the initial sync. If startup() fails, this callback
// is never invoked. The caller gets the last applied optime with hash when the initial sync
diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp
index 1821e3f491d..6dd7bfd5f66 100644
--- a/src/mongo/db/repl/initial_syncer_test.cpp
+++ b/src/mongo/db/repl/initial_syncer_test.cpp
@@ -43,6 +43,8 @@
#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/repl/oplog_fetcher.h"
#include "mongo/db/repl/optime.h"
+#include "mongo/db/repl/replication_consistency_markers_mock.h"
+#include "mongo/db/repl/replication_process.h"
#include "mongo/db/repl/reporter.h"
#include "mongo/db/repl/storage_interface.h"
#include "mongo/db/repl/storage_interface_mock.h"
@@ -280,6 +282,9 @@ protected:
launchExecutorThread();
+ _replicationProcess = stdx::make_unique<ReplicationProcess>(
+ _storageInterface.get(), stdx::make_unique<ReplicationConsistencyMarkersMock>());
+
_executorProxy = stdx::make_unique<TaskExecutorMock>(&getExecutor());
_myLastOpTime = OpTime({3, 0}, 1);
@@ -336,6 +341,7 @@ protected:
options,
std::move(dataReplicatorExternalState),
_storageInterface.get(),
+ _replicationProcess.get(),
[this](const StatusWith<OpTimeWithHash>& lastApplied) {
_onCompletion(lastApplied);
});
@@ -363,6 +369,7 @@ protected:
_initialSyncer.reset();
_dbWorkThreadPool->join();
_dbWorkThreadPool.reset();
+ _replicationProcess.reset();
_storageInterface.reset();
// tearDown() destroys the task executor which was referenced by the initial syncer.
@@ -393,6 +400,7 @@ protected:
OpTime _myLastOpTime;
std::unique_ptr<SyncSourceSelectorMock> _syncSourceSelector;
std::unique_ptr<StorageInterfaceMock> _storageInterface;
+ std::unique_ptr<ReplicationProcess> _replicationProcess;
std::unique_ptr<OldThreadPool> _dbWorkThreadPool;
std::map<NamespaceString, CollectionMockStats> _collectionStats;
std::map<NamespaceString, CollectionCloneInfo> _collections;
@@ -525,12 +533,14 @@ TEST_F(InitialSyncerTest, InvalidConstruction) {
// Null task executor in external state.
{
auto dataReplicatorExternalState = stdx::make_unique<DataReplicatorExternalStateMock>();
- ASSERT_THROWS_CODE_AND_WHAT(
- InitialSyncer(
- options, std::move(dataReplicatorExternalState), _storageInterface.get(), callback),
- UserException,
- ErrorCodes::BadValue,
- "task executor cannot be null");
+ ASSERT_THROWS_CODE_AND_WHAT(InitialSyncer(options,
+ std::move(dataReplicatorExternalState),
+ _storageInterface.get(),
+ _replicationProcess.get(),
+ callback),
+ UserException,
+ ErrorCodes::BadValue,
+ "task executor cannot be null");
}
// Null callback function.
@@ -540,6 +550,7 @@ TEST_F(InitialSyncerTest, InvalidConstruction) {
ASSERT_THROWS_CODE_AND_WHAT(InitialSyncer(options,
std::move(dataReplicatorExternalState),
_storageInterface.get(),
+ _replicationProcess.get(),
InitialSyncer::OnCompletionFn()),
UserException,
ErrorCodes::BadValue,
@@ -598,13 +609,13 @@ TEST_F(InitialSyncerTest, StartupSetsInitialSyncFlagOnSuccess) {
auto opCtx = makeOpCtx();
// Initial sync flag should not be set before starting.
- ASSERT_FALSE(getStorage().getInitialSyncFlag(opCtx.get()));
+ ASSERT_FALSE(_replicationProcess->getConsistencyMarkers()->getInitialSyncFlag(opCtx.get()));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
ASSERT_TRUE(initialSyncer->isActive());
// Initial sync flag should be set.
- ASSERT_TRUE(getStorage().getInitialSyncFlag(opCtx.get()));
+ ASSERT_TRUE(_replicationProcess->getConsistencyMarkers()->getInitialSyncFlag(opCtx.get()));
}
TEST_F(InitialSyncerTest, InitialSyncerReturnsCallbackCanceledIfShutdownImmediatelyAfterStartup) {
@@ -832,6 +843,7 @@ TEST_F(InitialSyncerTest, InitialSyncerResetsOnCompletionCallbackFunctionPointer
_options,
std::move(dataReplicatorExternalState),
_storageInterface.get(),
+ _replicationProcess.get(),
[&lastApplied, sharedCallbackData](const StatusWith<OpTimeWithHash>& result) {
lastApplied = result;
});
@@ -2385,7 +2397,7 @@ TEST_F(InitialSyncerTest, LastOpTimeShouldBeSetEvenIfNoOperationsAreAppliedAfter
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
- ASSERT_TRUE(_storageInterface->getInitialSyncFlag(opCtx.get()));
+ ASSERT_TRUE(_replicationProcess->getConsistencyMarkers()->getInitialSyncFlag(opCtx.get()));
auto oplogEntry = makeOplogEntry(1);
auto net = getNet();
@@ -2464,7 +2476,7 @@ TEST_F(InitialSyncerTest, LastOpTimeShouldBeSetEvenIfNoOperationsAreAppliedAfter
initialSyncer->join();
ASSERT_EQUALS(oplogEntry.getOpTime(), unittest::assertGet(_lastApplied).opTime);
ASSERT_EQUALS(oplogEntry.getHash(), unittest::assertGet(_lastApplied).value);
- ASSERT_FALSE(_storageInterface->getInitialSyncFlag(opCtx.get()));
+ ASSERT_FALSE(_replicationProcess->getConsistencyMarkers()->getInitialSyncFlag(opCtx.get()));
}
TEST_F(InitialSyncerTest, InitialSyncerPassesThroughGetNextApplierBatchScheduleError) {
@@ -2474,7 +2486,7 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughGetNextApplierBatchScheduleE
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
- ASSERT_TRUE(_storageInterface->getInitialSyncFlag(opCtx.get()));
+ ASSERT_TRUE(_replicationProcess->getConsistencyMarkers()->getInitialSyncFlag(opCtx.get()));
auto net = getNet();
int baseRollbackId = 1;
@@ -2528,7 +2540,7 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughSecondGetNextApplierBatchSch
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
- ASSERT_TRUE(_storageInterface->getInitialSyncFlag(opCtx.get()));
+ ASSERT_TRUE(_replicationProcess->getConsistencyMarkers()->getInitialSyncFlag(opCtx.get()));
auto net = getNet();
int baseRollbackId = 1;
@@ -2582,7 +2594,7 @@ TEST_F(InitialSyncerTest, InitialSyncerCancelsGetNextApplierBatchOnShutdown) {
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
- ASSERT_TRUE(_storageInterface->getInitialSyncFlag(opCtx.get()));
+ ASSERT_TRUE(_replicationProcess->getConsistencyMarkers()->getInitialSyncFlag(opCtx.get()));
auto net = getNet();
int baseRollbackId = 1;
@@ -2632,7 +2644,7 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughGetNextApplierBatchInLockErr
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
- ASSERT_TRUE(_storageInterface->getInitialSyncFlag(opCtx.get()));
+ ASSERT_TRUE(_replicationProcess->getConsistencyMarkers()->getInitialSyncFlag(opCtx.get()));
// _getNextApplierBatch_inlock() returns BadValue when it gets an oplog entry with an unexpected
// version (not OplogEntry::kOplogVersion).
@@ -2690,7 +2702,7 @@ TEST_F(
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
- ASSERT_TRUE(_storageInterface->getInitialSyncFlag(opCtx.get()));
+ ASSERT_TRUE(_replicationProcess->getConsistencyMarkers()->getInitialSyncFlag(opCtx.get()));
// _getNextApplierBatch_inlock() returns BadValue when it gets an oplog entry with an unexpected
// version (not OplogEntry::kOplogVersion).
@@ -2757,7 +2769,7 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughMultiApplierScheduleError) {
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
- ASSERT_TRUE(_storageInterface->getInitialSyncFlag(opCtx.get()));
+ ASSERT_TRUE(_replicationProcess->getConsistencyMarkers()->getInitialSyncFlag(opCtx.get()));
auto net = getNet();
int baseRollbackId = 1;
diff --git a/src/mongo/db/repl/repl_set_commands.cpp b/src/mongo/db/repl/repl_set_commands.cpp
index 44d75d15c51..f34b91f4af2 100644
--- a/src/mongo/db/repl/repl_set_commands.cpp
+++ b/src/mongo/db/repl/repl_set_commands.cpp
@@ -357,7 +357,8 @@ public:
ReplicationCoordinatorExternalStateImpl externalState(
opCtx->getServiceContext(),
DropPendingCollectionReaper::get(opCtx),
- StorageInterface::get(opCtx));
+ StorageInterface::get(opCtx),
+ ReplicationProcess::get(opCtx));
std::string name;
std::vector<HostAndPort> seeds;
parseReplSetSeedList(&externalState, replSetString, &name, &seeds); // may throw...
diff --git a/src/mongo/db/repl/replication_consistency_markers.cpp b/src/mongo/db/repl/replication_consistency_markers.cpp
new file mode 100644
index 00000000000..4e7fadb9e84
--- /dev/null
+++ b/src/mongo/db/repl/replication_consistency_markers.cpp
@@ -0,0 +1,40 @@
+/**
+ * Copyright 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/repl/replication_consistency_markers.h"
+
+namespace mongo {
+namespace repl {
+
+ReplicationConsistencyMarkers::ReplicationConsistencyMarkers() {}
+ReplicationConsistencyMarkers::~ReplicationConsistencyMarkers() {}
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/replication_consistency_markers.h b/src/mongo/db/repl/replication_consistency_markers.h
new file mode 100644
index 00000000000..d57f54960c6
--- /dev/null
+++ b/src/mongo/db/repl/replication_consistency_markers.h
@@ -0,0 +1,164 @@
+/**
+* Copyright (C) 2017 MongoDB Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*
+* As a special exception, the copyright holders give permission to link the
+* code of portions of this program with the OpenSSL library under certain
+* conditions as described in each individual source file and distribute
+* linked combinations including the program with the OpenSSL library. You
+* must comply with the GNU Affero General Public License in all respects for
+* all of the code used other than as permitted herein. If you modify file(s)
+* with this exception, you may extend this exception to your version of the
+* file(s), but you are not obligated to do so. If you do not wish to do so,
+* delete this exception statement from your version. If you delete this
+* exception statement from all source files in the program, then also delete
+* it in the license file.
+*/
+
+#pragma once
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/db/namespace_string.h"
+
+namespace mongo {
+
+class BSONObj;
+class OperationContext;
+class Timestamp;
+
+namespace repl {
+
+class OpTime;
+class StorageInterface;
+
+/**
+ * This interface provides helper functions for maintaining the documents used for
+ * maintaining data consistency.
+ *
+ * The minValid document, in 'local.replset.minvalid', is used for indicating whether or not the
+ * data on disk is consistent and for getting to a consistent point after unclean shutdown.
+ *
+ * Example of all fields:
+ * {
+ * _id: <ObjectId>, // not used, but auto-generated
+ * ts: <Timestamp>,
+ * t: <long long>, // timestamp and term of minValid OpTime
+ * doingInitialSync: <bool>,
+ * begin: {
+ * ts: <Timestamp>,
+ * t: <long long>
+ * }, // field for 'appliedThrough'
+ * oplogDeleteFromPoint: <Timestamp>
+ * }
+ *
+ * See below for explanations of each field.
+ */
+class ReplicationConsistencyMarkers {
+ MONGO_DISALLOW_COPYING(ReplicationConsistencyMarkers);
+
+public:
+ // Constructor and Destructor.
+ ReplicationConsistencyMarkers();
+ virtual ~ReplicationConsistencyMarkers();
+
+ // -------- Initial Sync Flag ----------
+
+ /**
+ * Returns true if initial sync was started but has not completed. If we start up and this is
+ * set to true, we know that we must do a resync.
+ */
+ virtual bool getInitialSyncFlag(OperationContext* opCtx) const = 0;
+
+ /**
+ * Sets the initial sync flag to record that initial sync has not completed.
+ *
+ * This operation is durable and waits for durable writes (which will block on
+ * journaling/checkpointing).
+ */
+ virtual void setInitialSyncFlag(OperationContext* opCtx) = 0;
+
+ /**
+ * Clears the initial sync flag to record that initial sync has completed.
+ *
+ * This operation is durable and waits for durable writes (which will block on
+ * journaling/checkpointing).
+ */
+ virtual void clearInitialSyncFlag(OperationContext* opCtx) = 0;
+
+ // -------- MinValid ----------
+
+ /**
+ * The minValid value is the earliest (minimum) OpTime that must be applied in order to
+ * consider the dataset consistent.
+ * - This is set to the end of a batch before we begin applying a batch of oplog entries
+ * since the oplog entries can be applied out of order.
+ * - This is also set during rollback so we do not exit RECOVERING until we are consistent.
+ * If we crash while applying a batch, we apply from appliedThrough to minValid in order
+ * to be consistent. We may re-apply operations, but this is safe.
+ *
+ * Returns the minValid OpTime.
+ */
+ virtual OpTime getMinValid(OperationContext* opCtx) const = 0;
+
+ /**
+ * Sets the minValid OpTime to 'minValid'. This can set minValid backwards, which is necessary
+ * in rollback when the OpTimes in the oplog may move backwards.
+ */
+ virtual void setMinValid(OperationContext* opCtx, const OpTime& minValid) = 0;
+
+ /**
+ * Sets minValid only if it is not already higher than endOpTime.
+ *
+ * Warning, this compares the term and timestamp independently. Do not use if the current
+ * minValid could be from the other fork of a rollback.
+ */
+ virtual void setMinValidToAtLeast(OperationContext* opCtx, const OpTime& minValid) = 0;
+
+ // -------- Oplog Delete From Point ----------
+
+ /**
+ * The oplog delete from point is set to the beginning of a batch of oplog entries before
+ * the oplog entries are written into the oplog, and reset before we begin applying the batch.
+ * On startup all oplog entries with a value >= the oplog delete from point should be deleted.
+ * We write operations to the oplog in parallel so if we crash mid-batch there could be holes
+ * in the oplog. Deleting them at startup keeps us consistent.
+ *
+ * If null, no documents should be deleted.
+ */
+ virtual void setOplogDeleteFromPoint(OperationContext* opCtx, const Timestamp& timestamp) = 0;
+ virtual Timestamp getOplogDeleteFromPoint(OperationContext* opCtx) const = 0;
+
+ // -------- Applied Through ----------
+
+ /**
+ * The applied through point is a persistent record of which oplog entries we've applied.
+ * If we crash while applying a batch of oplog entries, this OpTime tells us where to start
+ * applying operations on startup.
+ *
+ * If null, the applied through point is the top of the oplog.
+ */
+ virtual void setAppliedThrough(OperationContext* opCtx, const OpTime& optime) = 0;
+
+ /**
+ * You should probably be calling ReplicationCoordinator::getLastAppliedOpTime() instead.
+ *
+ * This reads the value from storage which isn't always updated when the ReplicationCoordinator
+ * is. This is safe because it will only ever be stale and reapplying oplog operations is
+ * always safe.
+ */
+ virtual OpTime getAppliedThrough(OperationContext* opCtx) const = 0;
+};
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/replication_consistency_markers_impl.cpp b/src/mongo/db/repl/replication_consistency_markers_impl.cpp
new file mode 100644
index 00000000000..ec157379a74
--- /dev/null
+++ b/src/mongo/db/repl/replication_consistency_markers_impl.cpp
@@ -0,0 +1,207 @@
+
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/repl/replication_consistency_markers_impl.h"
+
+#include "mongo/db/catalog/collection_options.h"
+#include "mongo/db/concurrency/d_concurrency.h"
+#include "mongo/db/concurrency/write_conflict_exception.h"
+#include "mongo/db/repl/optime.h"
+#include "mongo/db/repl/replication_coordinator.h"
+#include "mongo/db/repl/storage_interface.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+namespace repl {
+
+constexpr StringData ReplicationConsistencyMarkersImpl::kDefaultMinValidNamespace;
+constexpr StringData ReplicationConsistencyMarkersImpl::kInitialSyncFlagFieldName;
+constexpr StringData ReplicationConsistencyMarkersImpl::kBeginFieldName;
+constexpr StringData ReplicationConsistencyMarkersImpl::kOplogDeleteFromPointFieldName;
+
+namespace {
+const BSONObj kInitialSyncFlag(BSON(ReplicationConsistencyMarkersImpl::kInitialSyncFlagFieldName
+ << true));
+} // namespace
+
+ReplicationConsistencyMarkersImpl::ReplicationConsistencyMarkersImpl(
+ StorageInterface* storageInterface)
+ : ReplicationConsistencyMarkersImpl(
+ storageInterface,
+ NamespaceString(ReplicationConsistencyMarkersImpl::kDefaultMinValidNamespace)) {}
+
+ReplicationConsistencyMarkersImpl::ReplicationConsistencyMarkersImpl(
+ StorageInterface* storageInterface, NamespaceString minValidNss)
+ : _storageInterface(storageInterface), _minValidNss(minValidNss) {}
+
+BSONObj ReplicationConsistencyMarkersImpl::_getMinValidDocument(OperationContext* opCtx) const {
+ auto result = _storageInterface->findSingleton(opCtx, _minValidNss);
+ if (!result.isOK()) {
+ if (result.getStatus() == ErrorCodes::NamespaceNotFound ||
+ result.getStatus() == ErrorCodes::CollectionIsEmpty) {
+ return BSONObj();
+ }
+ // Fail if there is an error other than the collection being missing or being empty.
+ fassertFailedWithStatus(40466, result.getStatus());
+ }
+ return result.getValue();
+}
+
+void ReplicationConsistencyMarkersImpl::_updateMinValidDocument(OperationContext* opCtx,
+ const BSONObj& updateSpec) {
+ Status status = _storageInterface->putSingleton(opCtx, _minValidNss, updateSpec);
+
+ // If the collection doesn't exist, create it and try again.
+ if (status == ErrorCodes::NamespaceNotFound) {
+ status = _storageInterface->createCollection(opCtx, _minValidNss, CollectionOptions());
+ if (status.isOK()) {
+ status = _storageInterface->putSingleton(opCtx, _minValidNss, updateSpec);
+ }
+ }
+
+ fassertStatusOK(40467, status);
+}
+
+bool ReplicationConsistencyMarkersImpl::getInitialSyncFlag(OperationContext* opCtx) const {
+ const BSONObj doc = _getMinValidDocument(opCtx);
+ const auto flag = doc[kInitialSyncFlagFieldName].trueValue();
+ LOG(3) << "returning initial sync flag value of " << flag;
+ return flag;
+}
+
+void ReplicationConsistencyMarkersImpl::setInitialSyncFlag(OperationContext* opCtx) {
+ LOG(3) << "setting initial sync flag";
+ _updateMinValidDocument(opCtx, BSON("$set" << kInitialSyncFlag));
+ opCtx->recoveryUnit()->waitUntilDurable();
+}
+
+void ReplicationConsistencyMarkersImpl::clearInitialSyncFlag(OperationContext* opCtx) {
+ LOG(3) << "clearing initial sync flag";
+
+ auto replCoord = repl::ReplicationCoordinator::get(opCtx);
+ OpTime time = replCoord->getMyLastAppliedOpTime();
+ _updateMinValidDocument(
+ opCtx,
+ BSON("$unset" << kInitialSyncFlag << "$set"
+ << BSON("ts" << time.getTimestamp() << "t" << time.getTerm()
+ << kBeginFieldName
+ << time)));
+
+ if (getGlobalServiceContext()->getGlobalStorageEngine()->isDurable()) {
+ opCtx->recoveryUnit()->waitUntilDurable();
+ replCoord->setMyLastDurableOpTime(time);
+ }
+}
+
+OpTime ReplicationConsistencyMarkersImpl::getMinValid(OperationContext* opCtx) const {
+ const BSONObj doc = _getMinValidDocument(opCtx);
+ const auto opTimeStatus = OpTime::parseFromOplogEntry(doc);
+ // If any of the keys (fields) are missing from the minvalid document, we return
+ // a null OpTime.
+ if (opTimeStatus == ErrorCodes::NoSuchKey) {
+ return {};
+ }
+
+ if (!opTimeStatus.isOK()) {
+ severe() << "Error parsing minvalid entry: " << redact(doc)
+ << ", with status:" << opTimeStatus.getStatus();
+ fassertFailedNoTrace(40052);
+ }
+
+ OpTime minValid = opTimeStatus.getValue();
+ LOG(3) << "returning minvalid: " << minValid.toString() << "(" << minValid.toBSON() << ")";
+
+ return minValid;
+}
+
+void ReplicationConsistencyMarkersImpl::setMinValid(OperationContext* opCtx,
+ const OpTime& minValid) {
+ LOG(3) << "setting minvalid to exactly: " << minValid.toString() << "(" << minValid.toBSON()
+ << ")";
+ _updateMinValidDocument(
+ opCtx, BSON("$set" << BSON("ts" << minValid.getTimestamp() << "t" << minValid.getTerm())));
+}
+
+void ReplicationConsistencyMarkersImpl::setMinValidToAtLeast(OperationContext* opCtx,
+ const OpTime& minValid) {
+ LOG(3) << "setting minvalid to at least: " << minValid.toString() << "(" << minValid.toBSON()
+ << ")";
+ _updateMinValidDocument(
+ opCtx, BSON("$max" << BSON("ts" << minValid.getTimestamp() << "t" << minValid.getTerm())));
+}
+
+void ReplicationConsistencyMarkersImpl::setOplogDeleteFromPoint(OperationContext* opCtx,
+ const Timestamp& timestamp) {
+ LOG(3) << "setting oplog delete from point to: " << timestamp.toStringPretty();
+ _updateMinValidDocument(opCtx,
+ BSON("$set" << BSON(kOplogDeleteFromPointFieldName << timestamp)));
+}
+
+Timestamp ReplicationConsistencyMarkersImpl::getOplogDeleteFromPoint(
+ OperationContext* opCtx) const {
+ const BSONObj doc = _getMinValidDocument(opCtx);
+ Timestamp out = {};
+ if (auto field = doc[kOplogDeleteFromPointFieldName]) {
+ out = field.timestamp();
+ }
+
+ LOG(3) << "returning oplog delete from point: " << out;
+ return out;
+}
+
+void ReplicationConsistencyMarkersImpl::setAppliedThrough(OperationContext* opCtx,
+ const OpTime& optime) {
+ LOG(3) << "setting appliedThrough to: " << optime.toString() << "(" << optime.toBSON() << ")";
+ if (optime.isNull()) {
+ _updateMinValidDocument(opCtx, BSON("$unset" << BSON(kBeginFieldName << 1)));
+ } else {
+ _updateMinValidDocument(opCtx, BSON("$set" << BSON(kBeginFieldName << optime)));
+ }
+}
+
+OpTime ReplicationConsistencyMarkersImpl::getAppliedThrough(OperationContext* opCtx) const {
+ const BSONObj doc = _getMinValidDocument(opCtx);
+ const auto opTimeStatus = OpTime::parseFromOplogEntry(doc.getObjectField(kBeginFieldName));
+ if (!opTimeStatus.isOK()) {
+ // Return null OpTime on any parse failure, including if "begin" is missing.
+ return {};
+ }
+
+ OpTime appliedThrough = opTimeStatus.getValue();
+ LOG(3) << "returning appliedThrough: " << appliedThrough.toString() << "("
+ << appliedThrough.toBSON() << ")";
+
+ return appliedThrough;
+}
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/replication_consistency_markers_impl.h b/src/mongo/db/repl/replication_consistency_markers_impl.h
new file mode 100644
index 00000000000..fd8a6dc9d26
--- /dev/null
+++ b/src/mongo/db/repl/replication_consistency_markers_impl.h
@@ -0,0 +1,93 @@
+/**
+* Copyright (C) 2017 MongoDB Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*
+* As a special exception, the copyright holders give permission to link the
+* code of portions of this program with the OpenSSL library under certain
+* conditions as described in each individual source file and distribute
+* linked combinations including the program with the OpenSSL library. You
+* must comply with the GNU Affero General Public License in all respects for
+* all of the code used other than as permitted herein. If you modify file(s)
+* with this exception, you may extend this exception to your version of the
+* file(s), but you are not obligated to do so. If you do not wish to do so,
+* delete this exception statement from your version. If you delete this
+* exception statement from all source files in the program, then also delete
+* it in the license file.
+*/
+
+#pragma once
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/repl/replication_consistency_markers.h"
+
+namespace mongo {
+
+class BSONObj;
+class OperationContext;
+class Timestamp;
+
+namespace repl {
+
+class OpTime;
+class StorageInterface;
+
+class ReplicationConsistencyMarkersImpl : public ReplicationConsistencyMarkers {
+ MONGO_DISALLOW_COPYING(ReplicationConsistencyMarkersImpl);
+
+public:
+ static constexpr StringData kDefaultMinValidNamespace = "local.replset.minvalid"_sd;
+ static constexpr StringData kInitialSyncFlagFieldName = "doingInitialSync"_sd;
+ static constexpr StringData kBeginFieldName = "begin"_sd;
+ static constexpr StringData kOplogDeleteFromPointFieldName = "oplogDeleteFromPoint"_sd;
+
+ explicit ReplicationConsistencyMarkersImpl(StorageInterface* storageInterface);
+ ReplicationConsistencyMarkersImpl(StorageInterface* storageInterface,
+ NamespaceString minValidNss);
+
+ bool getInitialSyncFlag(OperationContext* opCtx) const override;
+ void setInitialSyncFlag(OperationContext* opCtx) override;
+ void clearInitialSyncFlag(OperationContext* opCtx) override;
+
+ OpTime getMinValid(OperationContext* opCtx) const override;
+ void setMinValid(OperationContext* opCtx, const OpTime& minValid) override;
+ void setMinValidToAtLeast(OperationContext* opCtx, const OpTime& minValid) override;
+
+ void setOplogDeleteFromPoint(OperationContext* opCtx, const Timestamp& timestamp) override;
+ Timestamp getOplogDeleteFromPoint(OperationContext* opCtx) const override;
+
+ void setAppliedThrough(OperationContext* opCtx, const OpTime& optime) override;
+ OpTime getAppliedThrough(OperationContext* opCtx) const override;
+
+private:
+ /**
+ * Reads the MinValid document from disk.
+ * Returns an empty document if not present.
+ */
+ BSONObj _getMinValidDocument(OperationContext* opCtx) const;
+
+ /**
+ * Updates the MinValid document according to the provided update spec. If the collection does
+ * not exist, it is created. If the document does not exist, it is upserted.
+ *
+ * This fasserts on failure.
+ */
+ void _updateMinValidDocument(OperationContext* opCtx, const BSONObj& updateSpec);
+
+ StorageInterface* _storageInterface;
+ const NamespaceString _minValidNss;
+};
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/replication_consistency_markers_impl_test.cpp b/src/mongo/db/repl/replication_consistency_markers_impl_test.cpp
new file mode 100644
index 00000000000..4762913ed41
--- /dev/null
+++ b/src/mongo/db/repl/replication_consistency_markers_impl_test.cpp
@@ -0,0 +1,227 @@
+/**
+ * Copyright 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/repl/replication_consistency_markers_impl.h"
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/client.h"
+#include "mongo/db/concurrency/d_concurrency.h"
+#include "mongo/db/concurrency/write_conflict_exception.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/dbhelpers.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/repl/replication_coordinator.h"
+#include "mongo/db/repl/replication_coordinator_mock.h"
+#include "mongo/db/repl/storage_interface_impl.h"
+#include "mongo/db/service_context_d_test_fixture.h"
+#include "mongo/db/storage/recovery_unit_noop.h"
+#include "mongo/stdx/memory.h"
+#include "mongo/unittest/unittest.h"
+#include "mongo/util/assert_util.h"
+#include "mongo/util/mongoutils/str.h"
+
+namespace {
+
+using namespace mongo;
+using namespace mongo::repl;
+
+/**
+ * Generates a unique namespace from the test registration agent.
+ */
+template <typename T>
+NamespaceString makeNamespace(const T& t, const std::string& suffix = "") {
+ return NamespaceString(std::string("local." + t.getSuiteName() + "_" + t.getTestName())
+ .substr(0, NamespaceString::MaxNsCollectionLen - suffix.length()) +
+ suffix);
+}
+
+/**
+ * Returns min valid document.
+ */
+BSONObj getMinValidDocument(OperationContext* opCtx, const NamespaceString& minValidNss) {
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ Lock::DBLock dblk(opCtx, minValidNss.db(), MODE_IS);
+ Lock::CollectionLock lk(opCtx->lockState(), minValidNss.ns(), MODE_IS);
+ BSONObj mv;
+ if (Helpers::getSingleton(opCtx, minValidNss.ns().c_str(), mv)) {
+ return mv;
+ }
+ }
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(opCtx, "getMinValidDocument", minValidNss.ns());
+ return BSONObj();
+}
+
+class ReplicationConsistencyMarkersTest : public ServiceContextMongoDTest {
+protected:
+ OperationContext* getOperationContext() {
+ return _opCtx.get();
+ }
+
+ StorageInterface* getStorageInterface() {
+ return _storageInterface.get();
+ }
+
+private:
+ void setUp() override {
+ ServiceContextMongoDTest::setUp();
+ _createOpCtx();
+ auto replCoord = stdx::make_unique<ReplicationCoordinatorMock>(getServiceContext());
+ ReplicationCoordinator::set(getServiceContext(), std::move(replCoord));
+ _storageInterface = stdx::make_unique<StorageInterfaceImpl>();
+ }
+
+ void tearDown() override {
+ _opCtx.reset(nullptr);
+ _storageInterface.reset();
+ ServiceContextMongoDTest::tearDown();
+ }
+
+ void _createOpCtx() {
+ _opCtx = cc().makeOperationContext();
+ }
+
+ ServiceContext::UniqueOperationContext _opCtx;
+ std::unique_ptr<StorageInterfaceImpl> _storageInterface;
+};
+
+/**
+ * Recovery unit that tracks if waitUntilDurable() is called.
+ */
+class RecoveryUnitWithDurabilityTracking : public RecoveryUnitNoop {
+public:
+ bool waitUntilDurable() override;
+ bool waitUntilDurableCalled = false;
+};
+
+bool RecoveryUnitWithDurabilityTracking::waitUntilDurable() {
+ waitUntilDurableCalled = true;
+ return RecoveryUnitNoop::waitUntilDurable();
+}
+
+TEST_F(ReplicationConsistencyMarkersTest, InitialSyncFlag) {
+ auto nss = makeNamespace(_agent);
+
+ ReplicationConsistencyMarkersImpl minValid(getStorageInterface(), nss);
+ auto opCtx = getOperationContext();
+
+ // Initial sync flag should be unset after initializing a new storage engine.
+ ASSERT_FALSE(minValid.getInitialSyncFlag(opCtx));
+
+ // Setting initial sync flag should affect getInitialSyncFlag() result.
+ minValid.setInitialSyncFlag(opCtx);
+ ASSERT_TRUE(minValid.getInitialSyncFlag(opCtx));
+
+ // Check min valid document using storage engine interface.
+ auto minValidDocument = getMinValidDocument(opCtx, nss);
+ ASSERT_TRUE(
+ minValidDocument.hasField(ReplicationConsistencyMarkersImpl::kInitialSyncFlagFieldName));
+ ASSERT_TRUE(minValidDocument.getBoolField(
+ ReplicationConsistencyMarkersImpl::kInitialSyncFlagFieldName));
+
+ // Clearing initial sync flag should affect getInitialSyncFlag() result.
+ minValid.clearInitialSyncFlag(opCtx);
+ ASSERT_FALSE(minValid.getInitialSyncFlag(opCtx));
+}
+
+TEST_F(ReplicationConsistencyMarkersTest, GetMinValidAfterSettingInitialSyncFlagWorks) {
+ auto nss = makeNamespace(_agent);
+
+ ReplicationConsistencyMarkersImpl minValid(getStorageInterface(), nss);
+ auto opCtx = getOperationContext();
+
+ // Initial sync flag should be unset after initializing a new storage engine.
+ ASSERT_FALSE(minValid.getInitialSyncFlag(opCtx));
+
+ // Setting initial sync flag should affect getInitialSyncFlag() result.
+ minValid.setInitialSyncFlag(opCtx);
+ ASSERT_TRUE(minValid.getInitialSyncFlag(opCtx));
+
+ ASSERT(minValid.getMinValid(opCtx).isNull());
+ ASSERT(minValid.getAppliedThrough(opCtx).isNull());
+ ASSERT(minValid.getOplogDeleteFromPoint(opCtx).isNull());
+}
+
+TEST_F(ReplicationConsistencyMarkersTest, ReplicationConsistencyMarkers) {
+ auto nss = makeNamespace(_agent);
+
+ ReplicationConsistencyMarkersImpl minValid(getStorageInterface(), nss);
+ auto opCtx = getOperationContext();
+
+ // MinValid boundaries should all be null after initializing a new storage engine.
+ ASSERT(minValid.getMinValid(opCtx).isNull());
+ ASSERT(minValid.getAppliedThrough(opCtx).isNull());
+ ASSERT(minValid.getOplogDeleteFromPoint(opCtx).isNull());
+
+ // Setting min valid boundaries should affect getMinValid() result.
+ OpTime startOpTime({Seconds(123), 0}, 1LL);
+ OpTime endOpTime({Seconds(456), 0}, 1LL);
+ minValid.setAppliedThrough(opCtx, startOpTime);
+ minValid.setMinValid(opCtx, endOpTime);
+ minValid.setOplogDeleteFromPoint(opCtx, endOpTime.getTimestamp());
+
+ ASSERT_EQ(minValid.getAppliedThrough(opCtx), startOpTime);
+ ASSERT_EQ(minValid.getMinValid(opCtx), endOpTime);
+ ASSERT_EQ(minValid.getOplogDeleteFromPoint(opCtx), endOpTime.getTimestamp());
+
+ // setMinValid always changes minValid, but setMinValidToAtLeast only does if higher.
+ minValid.setMinValid(opCtx, startOpTime); // Forcibly lower it.
+ ASSERT_EQ(minValid.getMinValid(opCtx), startOpTime);
+ minValid.setMinValidToAtLeast(opCtx, endOpTime); // Higher than current (sets it).
+ ASSERT_EQ(minValid.getMinValid(opCtx), endOpTime);
+ minValid.setMinValidToAtLeast(opCtx, startOpTime); // Lower than current (no-op).
+ ASSERT_EQ(minValid.getMinValid(opCtx), endOpTime);
+
+ // Check min valid document using storage engine interface.
+ auto minValidDocument = getMinValidDocument(opCtx, nss);
+ ASSERT_TRUE(minValidDocument.hasField(ReplicationConsistencyMarkersImpl::kBeginFieldName));
+ ASSERT_TRUE(minValidDocument[ReplicationConsistencyMarkersImpl::kBeginFieldName].isABSONObj());
+ ASSERT_EQUALS(startOpTime,
+ unittest::assertGet(OpTime::parseFromOplogEntry(
+ minValidDocument[ReplicationConsistencyMarkersImpl::kBeginFieldName].Obj())));
+ ASSERT_EQUALS(endOpTime, unittest::assertGet(OpTime::parseFromOplogEntry(minValidDocument)));
+ ASSERT_EQUALS(
+ endOpTime.getTimestamp(),
+ minValidDocument[ReplicationConsistencyMarkersImpl::kOplogDeleteFromPointFieldName]
+ .timestamp());
+
+ // Recovery unit will be owned by "opCtx".
+ RecoveryUnitWithDurabilityTracking* recoveryUnit = new RecoveryUnitWithDurabilityTracking();
+ opCtx->setRecoveryUnit(recoveryUnit, OperationContext::kNotInUnitOfWork);
+
+ // Set min valid without waiting for the changes to be durable.
+ OpTime endOpTime2({Seconds(789), 0}, 1LL);
+ minValid.setMinValid(opCtx, endOpTime2);
+ minValid.setAppliedThrough(opCtx, {});
+ ASSERT_EQUALS(minValid.getAppliedThrough(opCtx), OpTime());
+ ASSERT_EQUALS(minValid.getMinValid(opCtx), endOpTime2);
+ ASSERT_FALSE(recoveryUnit->waitUntilDurableCalled);
+}
+
+} // namespace
diff --git a/src/mongo/db/repl/replication_consistency_markers_mock.cpp b/src/mongo/db/repl/replication_consistency_markers_mock.cpp
new file mode 100644
index 00000000000..5fdd402d33b
--- /dev/null
+++ b/src/mongo/db/repl/replication_consistency_markers_mock.cpp
@@ -0,0 +1,92 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/repl/replication_consistency_markers_mock.h"
+
+namespace mongo {
+namespace repl {
+
+bool ReplicationConsistencyMarkersMock::getInitialSyncFlag(OperationContext* opCtx) const {
+ stdx::lock_guard<stdx::mutex> lock(_initialSyncFlagMutex);
+ return _initialSyncFlag;
+}
+
+void ReplicationConsistencyMarkersMock::setInitialSyncFlag(OperationContext* opCtx) {
+ stdx::lock_guard<stdx::mutex> lock(_initialSyncFlagMutex);
+ _initialSyncFlag = true;
+}
+
+void ReplicationConsistencyMarkersMock::clearInitialSyncFlag(OperationContext* opCtx) {
+ stdx::lock_guard<stdx::mutex> lock(_initialSyncFlagMutex);
+ _initialSyncFlag = false;
+}
+
+OpTime ReplicationConsistencyMarkersMock::getMinValid(OperationContext* opCtx) const {
+ stdx::lock_guard<stdx::mutex> lock(_minValidBoundariesMutex);
+ return _minValid;
+}
+
+void ReplicationConsistencyMarkersMock::setMinValid(OperationContext* opCtx,
+ const OpTime& minValid) {
+ stdx::lock_guard<stdx::mutex> lock(_minValidBoundariesMutex);
+ _minValid = minValid;
+}
+
+void ReplicationConsistencyMarkersMock::setMinValidToAtLeast(OperationContext* opCtx,
+ const OpTime& minValid) {
+ stdx::lock_guard<stdx::mutex> lock(_minValidBoundariesMutex);
+ _minValid = std::max(_minValid, minValid);
+}
+
+void ReplicationConsistencyMarkersMock::setOplogDeleteFromPoint(OperationContext* opCtx,
+ const Timestamp& timestamp) {
+ stdx::lock_guard<stdx::mutex> lock(_minValidBoundariesMutex);
+ _oplogDeleteFromPoint = timestamp;
+}
+
+Timestamp ReplicationConsistencyMarkersMock::getOplogDeleteFromPoint(
+ OperationContext* opCtx) const {
+ stdx::lock_guard<stdx::mutex> lock(_minValidBoundariesMutex);
+ return _oplogDeleteFromPoint;
+}
+
+void ReplicationConsistencyMarkersMock::setAppliedThrough(OperationContext* opCtx,
+ const OpTime& optime) {
+ stdx::lock_guard<stdx::mutex> lock(_minValidBoundariesMutex);
+ _appliedThrough = optime;
+}
+
+OpTime ReplicationConsistencyMarkersMock::getAppliedThrough(OperationContext* opCtx) const {
+ stdx::lock_guard<stdx::mutex> lock(_minValidBoundariesMutex);
+ return _appliedThrough;
+}
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/replication_consistency_markers_mock.h b/src/mongo/db/repl/replication_consistency_markers_mock.h
new file mode 100644
index 00000000000..82269f6a60b
--- /dev/null
+++ b/src/mongo/db/repl/replication_consistency_markers_mock.h
@@ -0,0 +1,78 @@
+/**
+* Copyright (C) 2017 MongoDB Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*
+* As a special exception, the copyright holders give permission to link the
+* code of portions of this program with the OpenSSL library under certain
+* conditions as described in each individual source file and distribute
+* linked combinations including the program with the OpenSSL library. You
+* must comply with the GNU Affero General Public License in all respects for
+* all of the code used other than as permitted herein. If you modify file(s)
+* with this exception, you may extend this exception to your version of the
+* file(s), but you are not obligated to do so. If you do not wish to do so,
+* delete this exception statement from your version. If you delete this
+* exception statement from all source files in the program, then also delete
+* it in the license file.
+*/
+
+#pragma once
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/db/repl/optime.h"
+#include "mongo/db/repl/replication_consistency_markers.h"
+#include "mongo/stdx/mutex.h"
+
+namespace mongo {
+
+class BSONObj;
+class OperationContext;
+class Timestamp;
+
+namespace repl {
+
+/**
+ * A mock ReplicationConsistencyMarkers implementation that stores everything in memory.
+ */
+class ReplicationConsistencyMarkersMock : public ReplicationConsistencyMarkers {
+ MONGO_DISALLOW_COPYING(ReplicationConsistencyMarkersMock);
+
+public:
+ ReplicationConsistencyMarkersMock() = default;
+
+ bool getInitialSyncFlag(OperationContext* opCtx) const override;
+ void setInitialSyncFlag(OperationContext* opCtx) override;
+ void clearInitialSyncFlag(OperationContext* opCtx) override;
+
+ OpTime getMinValid(OperationContext* opCtx) const override;
+ void setMinValid(OperationContext* opCtx, const OpTime& minValid) override;
+ void setMinValidToAtLeast(OperationContext* opCtx, const OpTime& minValid) override;
+
+ void setOplogDeleteFromPoint(OperationContext* opCtx, const Timestamp& timestamp) override;
+ Timestamp getOplogDeleteFromPoint(OperationContext* opCtx) const override;
+
+ void setAppliedThrough(OperationContext* opCtx, const OpTime& optime) override;
+ OpTime getAppliedThrough(OperationContext* opCtx) const override;
+
+private:
+ mutable stdx::mutex _initialSyncFlagMutex;
+ bool _initialSyncFlag = false;
+
+ mutable stdx::mutex _minValidBoundariesMutex;
+ OpTime _appliedThrough;
+ OpTime _minValid;
+ Timestamp _oplogDeleteFromPoint;
+};
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index a70df25f4e0..961926a1f35 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -64,6 +64,7 @@
#include "mongo/db/repl/oplog_buffer_proxy.h"
#include "mongo/db/repl/repl_settings.h"
#include "mongo/db/repl/replication_coordinator_global.h"
+#include "mongo/db/repl/replication_process.h"
#include "mongo/db/repl/rs_sync.h"
#include "mongo/db/repl/snapshot_thread.h"
#include "mongo/db/repl/storage_interface.h"
@@ -203,16 +204,18 @@ void scheduleWork(executor::TaskExecutor* executor,
ReplicationCoordinatorExternalStateImpl::ReplicationCoordinatorExternalStateImpl(
ServiceContext* service,
DropPendingCollectionReaper* dropPendingCollectionReaper,
- StorageInterface* storageInterface)
+ StorageInterface* storageInterface,
+ ReplicationProcess* replicationProcess)
: _service(service),
_dropPendingCollectionReaper(dropPendingCollectionReaper),
- _storageInterface(storageInterface) {
+ _storageInterface(storageInterface),
+ _replicationProcess(replicationProcess) {
uassert(ErrorCodes::BadValue, "A StorageInterface is required.", _storageInterface);
}
ReplicationCoordinatorExternalStateImpl::~ReplicationCoordinatorExternalStateImpl() {}
bool ReplicationCoordinatorExternalStateImpl::isInitialSyncFlagSet(OperationContext* opCtx) {
- return _storageInterface->getInitialSyncFlag(opCtx);
+ return _replicationProcess->getConsistencyMarkers()->getInitialSyncFlag(opCtx);
}
void ReplicationCoordinatorExternalStateImpl::startSteadyStateReplication(
@@ -222,7 +225,8 @@ void ReplicationCoordinatorExternalStateImpl::startSteadyStateReplication(
invariant(replCoord);
invariant(!_bgSync);
log() << "Starting replication fetcher thread";
- _bgSync = stdx::make_unique<BackgroundSync>(this, makeSteadyStateOplogBuffer(opCtx));
+ _bgSync = stdx::make_unique<BackgroundSync>(
+ this, _replicationProcess, makeSteadyStateOplogBuffer(opCtx));
_bgSync->startup(opCtx);
log() << "Starting replication applier thread";
@@ -332,11 +336,12 @@ void ReplicationCoordinatorExternalStateImpl::shutdown(OperationContext* opCtx)
// Perform additional shutdown steps below that must be done outside _threadMutex.
- if (_storageInterface->getOplogDeleteFromPoint(opCtx).isNull() &&
- loadLastOpTime(opCtx) == _storageInterface->getAppliedThrough(opCtx)) {
+ if (_replicationProcess->getConsistencyMarkers()->getOplogDeleteFromPoint(opCtx).isNull() &&
+ loadLastOpTime(opCtx) ==
+ _replicationProcess->getConsistencyMarkers()->getAppliedThrough(opCtx)) {
// Clear the appliedThrough marker to indicate we are consistent with the top of the
// oplog.
- _storageInterface->setAppliedThrough(opCtx, {});
+ _replicationProcess->getConsistencyMarkers()->setAppliedThrough(opCtx, {});
}
}
@@ -385,11 +390,6 @@ Status ReplicationCoordinatorExternalStateImpl::initializeReplSetStorage(Operati
}
MONGO_WRITE_CONFLICT_RETRY_LOOP_END(opCtx, "initiate oplog entry", "local.oplog.rs");
- // This initializes the minvalid document with a null "ts" because older versions (<=3.2)
- // get angry if the minValid document is present but doesn't have a "ts" field.
- // Consider removing this once we no longer need to support downgrading to 3.2.
- _storageInterface->setMinValidToAtLeast(opCtx, {});
-
FeatureCompatibilityVersion::setIfCleanStartup(opCtx, _storageInterface);
} catch (const DBException& ex) {
return ex.toStatus();
@@ -414,8 +414,9 @@ OpTime ReplicationCoordinatorExternalStateImpl::onTransitionToPrimary(OperationC
// Clear the appliedThrough marker so on startup we'll use the top of the oplog. This must be
// done before we add anything to our oplog.
- invariant(_storageInterface->getOplogDeleteFromPoint(opCtx).isNull());
- _storageInterface->setAppliedThrough(opCtx, {});
+ invariant(
+ _replicationProcess->getConsistencyMarkers()->getOplogDeleteFromPoint(opCtx).isNull());
+ _replicationProcess->getConsistencyMarkers()->setAppliedThrough(opCtx, {});
if (isV1ElectionProtocol) {
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
@@ -564,17 +565,14 @@ void ReplicationCoordinatorExternalStateImpl::setGlobalTimestamp(ServiceContext*
}
void ReplicationCoordinatorExternalStateImpl::cleanUpLastApplyBatch(OperationContext* opCtx) {
- if (_storageInterface->getInitialSyncFlag(opCtx)) {
+ if (_replicationProcess->getConsistencyMarkers()->getInitialSyncFlag(opCtx)) {
return; // Initial Sync will take over so no cleanup is needed.
}
- // This initializes the minvalid document with a null "ts" because older versions (<=3.2)
- // get angry if the minValid document is present but doesn't have a "ts" field.
- // Consider removing this once we no longer need to support downgrading to 3.2.
- _storageInterface->setMinValidToAtLeast(opCtx, {});
-
- const auto deleteFromPoint = _storageInterface->getOplogDeleteFromPoint(opCtx);
- const auto appliedThrough = _storageInterface->getAppliedThrough(opCtx);
+ const auto deleteFromPoint =
+ _replicationProcess->getConsistencyMarkers()->getOplogDeleteFromPoint(opCtx);
+ const auto appliedThrough =
+ _replicationProcess->getConsistencyMarkers()->getAppliedThrough(opCtx);
const bool needToDeleteEndOfOplog = !deleteFromPoint.isNull() &&
// This version should never have a non-null deleteFromPoint with a null appliedThrough.
@@ -591,7 +589,8 @@ void ReplicationCoordinatorExternalStateImpl::cleanUpLastApplyBatch(OperationCon
log() << "Removing unapplied entries starting at: " << deleteFromPoint;
truncateOplogTo(opCtx, deleteFromPoint);
}
- _storageInterface->setOplogDeleteFromPoint(opCtx, {}); // clear the deleteFromPoint
+ _replicationProcess->getConsistencyMarkers()->setOplogDeleteFromPoint(
+ opCtx, {}); // clear the deleteFromPoint
if (appliedThrough.isNull()) {
// No follow-up work to do.
@@ -643,7 +642,7 @@ void ReplicationCoordinatorExternalStateImpl::cleanUpLastApplyBatch(OperationCon
while (cursor->more()) {
auto entry = cursor->nextSafe();
fassertStatusOK(40294, SyncTail::syncApply(opCtx, entry, true));
- _storageInterface->setAppliedThrough(
+ _replicationProcess->getConsistencyMarkers()->setAppliedThrough(
opCtx, fassertStatusOK(40295, OpTime::parseFromOplogEntry(entry)));
}
}
@@ -653,7 +652,7 @@ StatusWith<OpTime> ReplicationCoordinatorExternalStateImpl::loadLastOpTime(
// TODO: handle WriteConflictExceptions below
try {
// If we are doing an initial sync do not read from the oplog.
- if (_storageInterface->getInitialSyncFlag(opCtx)) {
+ if (_replicationProcess->getConsistencyMarkers()->getInitialSyncFlag(opCtx)) {
return {ErrorCodes::InitialSyncFailure, "In the middle of an initial sync."};
}
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
index b6b4b535a7d..68318f01455 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
@@ -52,6 +52,7 @@ using UniqueLock = stdx::unique_lock<stdx::mutex>;
} // namespace
class DropPendingCollectionReaper;
+class ReplicationProcess;
class SnapshotThread;
class StorageInterface;
class NoopWriter;
@@ -64,7 +65,8 @@ public:
ReplicationCoordinatorExternalStateImpl(
ServiceContext* service,
DropPendingCollectionReaper* dropPendingCollectionReaper,
- StorageInterface* storageInterface);
+ StorageInterface* storageInterface,
+ ReplicationProcess* replicationProcess);
virtual ~ReplicationCoordinatorExternalStateImpl();
virtual void startThreads(const ReplSettings& settings) override;
virtual void startSteadyStateReplication(OperationContext* opCtx,
@@ -163,6 +165,9 @@ private:
DropPendingCollectionReaper* _dropPendingCollectionReaper;
StorageInterface* _storageInterface;
+
+ ReplicationProcess* _replicationProcess;
+
// True when the threads have been started
bool _startedThreads = false;
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 2b738bc824c..222d8b1c507 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -630,6 +630,7 @@ void ReplicationCoordinatorImpl::_startDataReplication(OperationContext* opCtx,
stdx::make_unique<DataReplicatorExternalStateInitialSync>(this,
_externalState.get()),
_storage,
+ _replicationProcess,
onCompletion);
_initialSyncer = initialSyncerCopy;
}
diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
index 291d6a7652f..b1a93ec2b24 100644
--- a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
+++ b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
@@ -38,6 +38,7 @@
#include "mongo/db/repl/repl_set_heartbeat_args.h"
#include "mongo/db/repl/repl_set_heartbeat_args_v1.h"
#include "mongo/db/repl/repl_settings.h"
+#include "mongo/db/repl/replication_consistency_markers_mock.h"
#include "mongo/db/repl/replication_coordinator_external_state_mock.h"
#include "mongo/db/repl/replication_coordinator_impl.h"
#include "mongo/db/repl/replication_process.h"
@@ -124,7 +125,10 @@ void ReplCoordTest::init() {
StorageInterface::set(service, std::unique_ptr<StorageInterface>(storageInterface));
ASSERT_TRUE(storageInterface == StorageInterface::get(service));
- ReplicationProcess::set(service, stdx::make_unique<ReplicationProcess>(storageInterface));
+ ReplicationProcess::set(
+ service,
+ stdx::make_unique<ReplicationProcess>(
+ storageInterface, stdx::make_unique<ReplicationConsistencyMarkersMock>()));
auto replicationProcess = ReplicationProcess::get(service);
// PRNG seed for tests.
diff --git a/src/mongo/db/repl/replication_process.cpp b/src/mongo/db/repl/replication_process.cpp
index 62eba5e679a..7e7dc077b11 100644
--- a/src/mongo/db/repl/replication_process.cpp
+++ b/src/mongo/db/repl/replication_process.cpp
@@ -33,6 +33,7 @@
#include "mongo/db/client.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/repl/replication_consistency_markers.h"
#include "mongo/db/repl/replication_process.h"
#include "mongo/db/repl/rollback_gen.h"
#include "mongo/db/repl/storage_interface.h"
@@ -74,13 +75,17 @@ ReplicationProcess* ReplicationProcess::get(OperationContext* opCtx) {
}
-void ReplicationProcess::set(ServiceContext* service, std::unique_ptr<ReplicationProcess> storage) {
- auto& storageInterface = getReplicationProcess(service);
- storageInterface = std::move(storage);
+void ReplicationProcess::set(ServiceContext* service, std::unique_ptr<ReplicationProcess> process) {
+ auto& replicationProcess = getReplicationProcess(service);
+ replicationProcess = std::move(process);
}
-ReplicationProcess::ReplicationProcess(StorageInterface* storageInterface)
- : _storageInterface(storageInterface), _rbid(kUninitializedRollbackId) {}
+ReplicationProcess::ReplicationProcess(
+ StorageInterface* storageInterface,
+ std::unique_ptr<ReplicationConsistencyMarkers> consistencyMarkers)
+ : _storageInterface(storageInterface),
+ _consistencyMarkers(std::move(consistencyMarkers)),
+ _rbid(kUninitializedRollbackId) {}
StatusWith<int> ReplicationProcess::getRollbackID(OperationContext* opCtx) {
stdx::lock_guard<stdx::mutex> lock(_mutex);
@@ -173,5 +178,9 @@ Status ReplicationProcess::clearRollbackProgress(OperationContext* opCtx) {
return Status::OK();
}
+ReplicationConsistencyMarkers* ReplicationProcess::getConsistencyMarkers() {
+ return _consistencyMarkers.get();
+}
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/replication_process.h b/src/mongo/db/repl/replication_process.h
index 7e6a81f362c..da4a2886c6a 100644
--- a/src/mongo/db/repl/replication_process.h
+++ b/src/mongo/db/repl/replication_process.h
@@ -36,6 +36,7 @@
#include "mongo/base/status_with.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/repl/optime.h"
+#include "mongo/db/repl/replication_consistency_markers.h"
#include "mongo/stdx/mutex.h"
namespace mongo {
@@ -77,10 +78,10 @@ public:
static ReplicationProcess* get(ServiceContext* service);
static ReplicationProcess* get(ServiceContext& service);
static ReplicationProcess* get(OperationContext* opCtx);
- static void set(ServiceContext* service, std::unique_ptr<ReplicationProcess> storageInterface);
+ static void set(ServiceContext* service, std::unique_ptr<ReplicationProcess> process);
- // Constructor and Destructor.
- explicit ReplicationProcess(StorageInterface* storageInterface);
+ ReplicationProcess(StorageInterface* storageInterface,
+ std::unique_ptr<ReplicationConsistencyMarkers> consistencyMarkers);
virtual ~ReplicationProcess() = default;
/**
@@ -128,6 +129,11 @@ public:
*/
Status clearRollbackProgress(OperationContext* opCtx);
+ /**
+ * Returns an object used for operating on the documents that maintain replication consistency.
+ */
+ ReplicationConsistencyMarkers* getConsistencyMarkers();
+
private:
// All member variables are labeled with one of the following codes indicating the
// synchronization rules for accessing them.
@@ -142,6 +148,9 @@ private:
// Used to access the storage layer.
StorageInterface* const _storageInterface; // (R)
+ // Used for operations on documents that maintain replication consistency.
+ std::unique_ptr<ReplicationConsistencyMarkers> _consistencyMarkers; // (S)
+
// Rollback ID. This is a cached copy of the persisted value in the local.system.rollback.id
// collection.
int _rbid; // (M)
diff --git a/src/mongo/db/repl/replication_process_test.cpp b/src/mongo/db/repl/replication_process_test.cpp
index 4e42ad6e085..7228913507c 100644
--- a/src/mongo/db/repl/replication_process_test.cpp
+++ b/src/mongo/db/repl/replication_process_test.cpp
@@ -33,6 +33,7 @@
#include "mongo/db/client.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/repl/optime.h"
+#include "mongo/db/repl/replication_consistency_markers_impl.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/repl/replication_process.h"
@@ -75,7 +76,9 @@ ServiceContext::UniqueOperationContext makeOpCtx() {
TEST_F(ReplicationProcessTest, ServiceContextDecorator) {
auto serviceContext = getServiceContext();
ASSERT_FALSE(ReplicationProcess::get(serviceContext));
- ReplicationProcess* replicationProcess = new ReplicationProcess(_storageInterface.get());
+ ReplicationProcess* replicationProcess = new ReplicationProcess(
+ _storageInterface.get(),
+ stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()));
ReplicationProcess::set(serviceContext,
std::unique_ptr<ReplicationProcess>(replicationProcess));
ASSERT_TRUE(replicationProcess == ReplicationProcess::get(serviceContext));
@@ -85,7 +88,9 @@ TEST_F(ReplicationProcessTest, ServiceContextDecorator) {
TEST_F(ReplicationProcessTest,
GetRollbackProgressReturnsNoSuchKeyIfDocumentWithIdProgressIsNotFound) {
- ReplicationProcess replicationProcess(_storageInterface.get());
+ ReplicationProcess replicationProcess(
+ _storageInterface.get(),
+ stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()));
// Collection is not found.
auto opCtx = makeOpCtx();
@@ -119,7 +124,9 @@ TEST_F(ReplicationProcessTest, GetRollbackProgressReturnsBadStatusIfApplyUntilFi
ASSERT_OK(_storageInterface->insertDocument(
opCtx.get(), ReplicationProcess::kRollbackProgressNamespace, doc));
- ReplicationProcess replicationProcess(_storageInterface.get());
+ ReplicationProcess replicationProcess(
+ _storageInterface.get(),
+ stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()));
ASSERT_EQUALS(mongo::AssertionException::convertExceptionCode(40410),
replicationProcess.getRollbackProgress(opCtx.get()));
}
@@ -139,7 +146,9 @@ TEST_F(ReplicationProcessTest,
ASSERT_OK(_storageInterface->insertDocument(
opCtx.get(), ReplicationProcess::kRollbackProgressNamespace, doc));
- ReplicationProcess replicationProcess(_storageInterface.get());
+ ReplicationProcess replicationProcess(
+ _storageInterface.get(),
+ stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()));
ASSERT_EQUALS(ErrorCodes::TypeMismatch, replicationProcess.getRollbackProgress(opCtx.get()));
}
@@ -156,7 +165,9 @@ TEST_F(ReplicationProcessTest,
ASSERT_OK(_storageInterface->insertDocument(
opCtx.get(), ReplicationProcess::kRollbackProgressNamespace, doc));
- ReplicationProcess replicationProcess(_storageInterface.get());
+ ReplicationProcess replicationProcess(
+ _storageInterface.get(),
+ stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()));
ASSERT_EQUALS(applyUntil,
unittest::assertGet(replicationProcess.getRollbackProgress(opCtx.get())));
@@ -169,7 +180,9 @@ TEST_F(ReplicationProcessTest,
SetRollbackProgressCreatesCollectionBeforeInsertingDocumentIfCollectionDoesNotExist) {
OpTime applyUntil({Seconds(123), 0}, 1LL);
auto opCtx = makeOpCtx();
- ReplicationProcess replicationProcess(_storageInterface.get());
+ ReplicationProcess replicationProcess(
+ _storageInterface.get(),
+ stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()));
ASSERT_OK(replicationProcess.setRollbackProgress(opCtx.get(), applyUntil));
ASSERT_EQUALS(1U,
unittest::assertGet(_storageInterface->getCollectionCount(
@@ -185,14 +198,18 @@ TEST_F(ReplicationProcessTest,
OpTime applyUntil({Seconds(123), 0}, 1LL);
auto opCtx = makeOpCtx();
- ReplicationProcess replicationProcess(_storageInterface.get());
+ ReplicationProcess replicationProcess(
+ _storageInterface.get(),
+ stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()));
ASSERT_EQUALS(ErrorCodes::IllegalOperation,
replicationProcess.setRollbackProgress(opCtx.get(), applyUntil));
}
TEST_F(ReplicationProcessTest, ClearRollbackProgressReturnsSuccessIfCollectionDoesNotExist) {
auto opCtx = makeOpCtx();
- ReplicationProcess replicationProcess(_storageInterface.get());
+ ReplicationProcess replicationProcess(
+ _storageInterface.get(),
+ stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()));
ASSERT_OK(replicationProcess.clearRollbackProgress(opCtx.get()));
}
@@ -202,7 +219,9 @@ TEST_F(ReplicationProcessTest,
_storageInterface = stdx::make_unique<StorageInterfaceMock>();
auto opCtx = makeOpCtx();
- ReplicationProcess replicationProcess(_storageInterface.get());
+ ReplicationProcess replicationProcess(
+ _storageInterface.get(),
+ stdx::make_unique<ReplicationConsistencyMarkersImpl>(_storageInterface.get()));
ASSERT_EQUALS(ErrorCodes::IllegalOperation,
replicationProcess.clearRollbackProgress(opCtx.get()));
}
diff --git a/src/mongo/db/repl/rollback_test_fixture.cpp b/src/mongo/db/repl/rollback_test_fixture.cpp
index 23866621c80..791cbf0e2e7 100644
--- a/src/mongo/db/repl/rollback_test_fixture.cpp
+++ b/src/mongo/db/repl/rollback_test_fixture.cpp
@@ -34,6 +34,7 @@
#include "mongo/db/client.h"
#include "mongo/db/repl/oplog.h"
+#include "mongo/db/repl/replication_consistency_markers_mock.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/repl/replication_process.h"
@@ -70,17 +71,18 @@ RollbackTest::RollbackTest() : _threadPoolExecutorTest(createThreadPoolOptions()
void RollbackTest::setUp() {
_serviceContextMongoDTest.setUp();
_threadPoolExecutorTest.setUp();
- _opCtx = cc().makeOperationContext();
auto serviceContext = _serviceContextMongoDTest.getServiceContext();
- ReplicationProcess::set(serviceContext,
- stdx::make_unique<ReplicationProcess>(&_storageInterface));
+ _replicationProcess = stdx::make_unique<ReplicationProcess>(
+ &_storageInterface, stdx::make_unique<ReplicationConsistencyMarkersMock>());
_coordinator = new ReplicationCoordinatorRollbackMock(serviceContext);
ReplicationCoordinator::set(serviceContext,
std::unique_ptr<ReplicationCoordinator>(_coordinator));
setOplogCollectionName();
- _storageInterface.setAppliedThrough(_opCtx.get(), OpTime{});
- _storageInterface.setMinValid(_opCtx.get(), OpTime{});
- ReplicationProcess::get(_opCtx.get())->initializeRollbackID(_opCtx.get());
+
+ _opCtx = cc().makeOperationContext();
+ _replicationProcess->getConsistencyMarkers()->setAppliedThrough(_opCtx.get(), OpTime{});
+ _replicationProcess->getConsistencyMarkers()->setMinValid(_opCtx.get(), OpTime{});
+ _replicationProcess->initializeRollbackID(_opCtx.get());
_threadPoolExecutorTest.launchExecutorThread();
}
@@ -98,6 +100,7 @@ void RollbackTest::tearDown() {
// ServiceContextMongoD::tearDown() does not destroy service context so it is okay
// to access the service context after tearDown().
auto serviceContext = _serviceContextMongoDTest.getServiceContext();
+ _replicationProcess.reset();
ReplicationCoordinator::set(serviceContext, {});
}
diff --git a/src/mongo/db/repl/rollback_test_fixture.h b/src/mongo/db/repl/rollback_test_fixture.h
index ece8cd682d2..d3e959ddc2f 100644
--- a/src/mongo/db/repl/rollback_test_fixture.h
+++ b/src/mongo/db/repl/rollback_test_fixture.h
@@ -29,6 +29,7 @@
#pragma once
#include "mongo/db/repl/replication_coordinator_mock.h"
+#include "mongo/db/repl/replication_process.h"
#include "mongo/db/repl/storage_interface_mock.h"
#include "mongo/db/service_context.h"
#include "mongo/db/service_context_d_test_fixture.h"
@@ -86,8 +87,10 @@ protected:
class ReplicationCoordinatorRollbackMock;
ReplicationCoordinatorRollbackMock* _coordinator = nullptr;
- // StorageInterface used to access minValid.
StorageInterfaceMock _storageInterface;
+
+ // ReplicationProcess used to access consistency markers.
+ std::unique_ptr<ReplicationProcess> _replicationProcess;
};
/**
diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp
index 85cdd0a4141..4dfd9dd5c68 100644
--- a/src/mongo/db/repl/rs_rollback.cpp
+++ b/src/mongo/db/repl/rs_rollback.cpp
@@ -63,7 +63,6 @@
#include "mongo/db/repl/roll_back_local_operations.h"
#include "mongo/db/repl/rollback_source.h"
#include "mongo/db/repl/rslog.h"
-#include "mongo/db/repl/storage_interface.h"
#include "mongo/db/s/shard_identity_rollback_notifier.h"
#include "mongo/util/exit.h"
#include "mongo/util/fail_point_service.h"
@@ -340,7 +339,7 @@ namespace {
void checkRbidAndUpdateMinValid(OperationContext* opCtx,
const int rbid,
const RollbackSource& rollbackSource,
- StorageInterface* storageInterface) {
+ ReplicationProcess* replicationProcess) {
// It is important that the steps are performed in order to avoid racing with upstream rollbacks
//
// 1) Get the last doc in their oplog.
@@ -360,8 +359,8 @@ void checkRbidAndUpdateMinValid(OperationContext* opCtx,
// online until we get to that point in freshness.
OpTime minValid = fassertStatusOK(28774, OpTime::parseFromOplogEntry(newMinValidDoc));
log() << "Setting minvalid to " << minValid;
- storageInterface->setAppliedThrough(opCtx, {}); // Use top of oplog.
- storageInterface->setMinValid(opCtx, minValid);
+ replicationProcess->getConsistencyMarkers()->setAppliedThrough(opCtx, {}); // Use top of oplog.
+ replicationProcess->getConsistencyMarkers()->setMinValid(opCtx, minValid);
if (MONGO_FAIL_POINT(rollbackHangThenFailAfterWritingMinValid)) {
// This log output is used in js tests so please leave it.
@@ -380,7 +379,7 @@ void syncFixUp(OperationContext* opCtx,
const FixUpInfo& fixUpInfo,
const RollbackSource& rollbackSource,
ReplicationCoordinator* replCoord,
- StorageInterface* storageInterface) {
+ ReplicationProcess* replicationProcess) {
// fetch all first so we needn't handle interruption in a fancy way
unsigned long long totalSize = 0;
@@ -419,7 +418,7 @@ void syncFixUp(OperationContext* opCtx,
}
log() << "rollback 3.5";
- checkRbidAndUpdateMinValid(opCtx, fixUpInfo.rbid, rollbackSource, storageInterface);
+ checkRbidAndUpdateMinValid(opCtx, fixUpInfo.rbid, rollbackSource, replicationProcess);
// update them
log() << "rollback 4 n:" << goodVersions.size();
@@ -521,7 +520,7 @@ void syncFixUp(OperationContext* opCtx,
// we did more reading from primary, so check it again for a rollback (which would mess
// us up), and make minValid newer.
log() << "rollback 4.2";
- checkRbidAndUpdateMinValid(opCtx, fixUpInfo.rbid, rollbackSource, storageInterface);
+ checkRbidAndUpdateMinValid(opCtx, fixUpInfo.rbid, rollbackSource, replicationProcess);
}
log() << "rollback 4.6";
@@ -785,7 +784,7 @@ Status _syncRollback(OperationContext* opCtx,
const RollbackSource& rollbackSource,
int requiredRBID,
ReplicationCoordinator* replCoord,
- StorageInterface* storageInterface) {
+ ReplicationProcess* replicationProcess) {
invariant(!opCtx->lockState()->isLocked());
FixUpInfo how;
@@ -828,10 +827,10 @@ Status _syncRollback(OperationContext* opCtx,
log() << "rollback 3 fixup";
try {
ON_BLOCK_EXIT([&] {
- auto status = ReplicationProcess::get(opCtx)->incrementRollbackID(opCtx);
+ auto status = replicationProcess->incrementRollbackID(opCtx);
fassertStatusOK(40425, status);
});
- syncFixUp(opCtx, how, rollbackSource, replCoord, storageInterface);
+ syncFixUp(opCtx, how, rollbackSource, replCoord, replicationProcess);
} catch (const RSFatalException& e) {
return Status(ErrorCodes::UnrecoverableRollbackError, e.what(), 18753);
}
@@ -856,7 +855,7 @@ Status syncRollback(OperationContext* opCtx,
const RollbackSource& rollbackSource,
int requiredRBID,
ReplicationCoordinator* replCoord,
- StorageInterface* storageInterface) {
+ ReplicationProcess* replicationProcess) {
invariant(opCtx);
invariant(replCoord);
@@ -864,8 +863,8 @@ Status syncRollback(OperationContext* opCtx,
DisableDocumentValidation validationDisabler(opCtx);
UnreplicatedWritesBlock replicationDisabler(opCtx);
- Status status =
- _syncRollback(opCtx, localOplog, rollbackSource, requiredRBID, replCoord, storageInterface);
+ Status status = _syncRollback(
+ opCtx, localOplog, rollbackSource, requiredRBID, replCoord, replicationProcess);
log() << "rollback finished" << rsLog;
return status;
@@ -876,7 +875,7 @@ void rollback(OperationContext* opCtx,
const RollbackSource& rollbackSource,
int requiredRBID,
ReplicationCoordinator* replCoord,
- StorageInterface* storageInterface,
+ ReplicationProcess* replicationProcess,
stdx::function<void(int)> sleepSecsFn) {
// Set state to ROLLBACK while we are in this function. This prevents serving reads, even from
// the oplog. This can fail if we are elected PRIMARY, in which case we better not do any
@@ -901,7 +900,7 @@ void rollback(OperationContext* opCtx,
try {
auto status = syncRollback(
- opCtx, localOplog, rollbackSource, requiredRBID, replCoord, storageInterface);
+ opCtx, localOplog, rollbackSource, requiredRBID, replCoord, replicationProcess);
// Abort only when syncRollback detects we are in a unrecoverable state.
// WARNING: these statuses sometimes have location codes which are lost with uassertStatusOK
@@ -920,8 +919,8 @@ void rollback(OperationContext* opCtx,
invariant(ex.getCode() != ErrorCodes::UnrecoverableRollbackError);
warning() << "rollback cannot complete at this time (retrying later): " << redact(ex)
- << " appliedThrough=" << replCoord->getMyLastAppliedOpTime()
- << " minvalid=" << storageInterface->getMinValid(opCtx);
+ << " appliedThrough=" << replCoord->getMyLastAppliedOpTime() << " minvalid="
+ << replicationProcess->getConsistencyMarkers()->getMinValid(opCtx);
// Sleep a bit to allow upstream node to coalesce, if that was the cause of the failure. If
// we failed in a way that will keep failing, but wasn't flagged as a fatal failure, this
diff --git a/src/mongo/db/repl/rs_rollback.h b/src/mongo/db/repl/rs_rollback.h
index e305c076e44..db1b174c3dd 100644
--- a/src/mongo/db/repl/rs_rollback.h
+++ b/src/mongo/db/repl/rs_rollback.h
@@ -45,8 +45,8 @@ namespace repl {
class OplogInterface;
class ReplicationCoordinator;
+class ReplicationProcess;
class RollbackSource;
-class StorageInterface;
/**
* Entry point to rollback process.
@@ -63,7 +63,7 @@ void rollback(OperationContext* opCtx,
const RollbackSource& rollbackSource,
int requiredRBID,
ReplicationCoordinator* replCoord,
- StorageInterface* storageInterface,
+ ReplicationProcess* replicationProcess,
stdx::function<void(int)> sleepSecsFn = [](int secs) { sleepsecs(secs); });
/**
@@ -87,7 +87,7 @@ void rollback(OperationContext* opCtx,
* supports fetching documents and copying collections.
* @param requiredRBID Rollback ID we are required to have throughout rollback.
* @param replCoord Used to track the rollback ID and to change the follower state
- * @param storageInterface Used to update minValid.
+ * @param replicationProcess Used to update minValid.
*
* If requiredRBID is supplied, we error if the upstream node has a different RBID (ie it rolled
* back) after fetching any information from it.
@@ -101,7 +101,7 @@ Status syncRollback(OperationContext* opCtx,
const RollbackSource& rollbackSource,
int requiredRBID,
ReplicationCoordinator* replCoord,
- StorageInterface* storageInterface);
+ ReplicationProcess* replicationProcess);
/**
* This namespace contains internal details of the rollback system. It is only exposed in a header
diff --git a/src/mongo/db/repl/rs_rollback_test.cpp b/src/mongo/db/repl/rs_rollback_test.cpp
index d16bbaf229e..6e3615ceee0 100644
--- a/src/mongo/db/repl/rs_rollback_test.cpp
+++ b/src/mongo/db/repl/rs_rollback_test.cpp
@@ -127,14 +127,16 @@ OplogInterfaceMock::Operation makeNoopOplogEntryAndRecordId(Seconds seconds) {
}
TEST_F(RSRollbackTest, InconsistentMinValid) {
- _storageInterface.setAppliedThrough(_opCtx.get(), OpTime(Timestamp(Seconds(0), 0), 0));
- _storageInterface.setMinValid(_opCtx.get(), OpTime(Timestamp(Seconds(1), 0), 0));
+ _replicationProcess->getConsistencyMarkers()->setAppliedThrough(
+ _opCtx.get(), OpTime(Timestamp(Seconds(0), 0), 0));
+ _replicationProcess->getConsistencyMarkers()->setMinValid(_opCtx.get(),
+ OpTime(Timestamp(Seconds(1), 0), 0));
auto status = syncRollback(_opCtx.get(),
OplogInterfaceMock(),
RollbackSourceMock(stdx::make_unique<OplogInterfaceMock>()),
{},
_coordinator,
- &_storageInterface);
+ _replicationProcess.get());
ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code());
ASSERT_EQUALS(18752, status.location());
}
@@ -151,7 +153,7 @@ TEST_F(RSRollbackTest, OplogStartMissing) {
RollbackSourceMock(std::move(remoteOplog)),
{},
_coordinator,
- &_storageInterface)
+ _replicationProcess.get())
.code());
}
@@ -164,7 +166,7 @@ TEST_F(RSRollbackTest, NoRemoteOpLog) {
RollbackSourceMock(stdx::make_unique<OplogInterfaceMock>()),
{},
_coordinator,
- &_storageInterface);
+ _replicationProcess.get());
ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code());
ASSERT_EQUALS(18752, status.location());
}
@@ -186,7 +188,7 @@ TEST_F(RSRollbackTest, RemoteGetRollbackIdThrows) {
RollbackSourceLocal(stdx::make_unique<OplogInterfaceMock>()),
{},
_coordinator,
- &_storageInterface),
+ _replicationProcess.get()),
UserException,
ErrorCodes::UnknownError);
}
@@ -209,7 +211,7 @@ TEST_F(RSRollbackTest, RemoteGetRollbackIdDiffersFromRequiredRBID) {
RollbackSourceLocal(stdx::make_unique<OplogInterfaceMock>()),
1,
_coordinator,
- &_storageInterface),
+ _replicationProcess.get()),
UserException,
ErrorCodes::Error(40362));
}
@@ -227,7 +229,7 @@ TEST_F(RSRollbackTest, BothOplogsAtCommonPoint) {
}))),
{},
_coordinator,
- &_storageInterface));
+ _replicationProcess.get()));
}
/**
@@ -261,7 +263,7 @@ Collection* _createCollection(OperationContext* opCtx,
*/
int _testRollbackDelete(OperationContext* opCtx,
ReplicationCoordinator* coordinator,
- StorageInterface* storageInterface,
+ ReplicationProcess* replicationProcess,
const BSONObj& documentAtSource) {
auto commonOperation =
std::make_pair(BSON("ts" << Timestamp(Seconds(1), 0) << "h" << 1LL), RecordId(1));
@@ -297,7 +299,7 @@ int _testRollbackDelete(OperationContext* opCtx,
rollbackSource,
{},
coordinator,
- storageInterface));
+ replicationProcess));
ASSERT_TRUE(rollbackSource.called);
Lock::DBLock dbLock(opCtx, "test", MODE_S);
@@ -313,16 +315,16 @@ int _testRollbackDelete(OperationContext* opCtx,
TEST_F(RSRollbackTest, RollbackDeleteNoDocumentAtSourceCollectionDoesNotExist) {
createOplog(_opCtx.get());
- ASSERT_EQUALS(-1,
- _testRollbackDelete(_opCtx.get(), _coordinator, &_storageInterface, BSONObj()));
+ ASSERT_EQUALS(
+ -1, _testRollbackDelete(_opCtx.get(), _coordinator, _replicationProcess.get(), BSONObj()));
}
TEST_F(RSRollbackTest, RollbackDeleteNoDocumentAtSourceCollectionExistsNonCapped) {
createOplog(_opCtx.get());
_createCollection(_opCtx.get(), "test.t", CollectionOptions());
- _testRollbackDelete(_opCtx.get(), _coordinator, &_storageInterface, BSONObj());
- ASSERT_EQUALS(0,
- _testRollbackDelete(_opCtx.get(), _coordinator, &_storageInterface, BSONObj()));
+ _testRollbackDelete(_opCtx.get(), _coordinator, _replicationProcess.get(), BSONObj());
+ ASSERT_EQUALS(
+ 0, _testRollbackDelete(_opCtx.get(), _coordinator, _replicationProcess.get(), BSONObj()));
}
TEST_F(RSRollbackTest, RollbackDeleteNoDocumentAtSourceCollectionExistsCapped) {
@@ -330,16 +332,17 @@ TEST_F(RSRollbackTest, RollbackDeleteNoDocumentAtSourceCollectionExistsCapped) {
CollectionOptions options;
options.capped = true;
_createCollection(_opCtx.get(), "test.t", options);
- ASSERT_EQUALS(0,
- _testRollbackDelete(_opCtx.get(), _coordinator, &_storageInterface, BSONObj()));
+ ASSERT_EQUALS(
+ 0, _testRollbackDelete(_opCtx.get(), _coordinator, _replicationProcess.get(), BSONObj()));
}
TEST_F(RSRollbackTest, RollbackDeleteRestoreDocument) {
createOplog(_opCtx.get());
_createCollection(_opCtx.get(), "test.t", CollectionOptions());
BSONObj doc = BSON("_id" << 0 << "a" << 1);
- _testRollbackDelete(_opCtx.get(), _coordinator, &_storageInterface, doc);
- ASSERT_EQUALS(1, _testRollbackDelete(_opCtx.get(), _coordinator, &_storageInterface, doc));
+ _testRollbackDelete(_opCtx.get(), _coordinator, _replicationProcess.get(), doc);
+ ASSERT_EQUALS(1,
+ _testRollbackDelete(_opCtx.get(), _coordinator, _replicationProcess.get(), doc));
}
TEST_F(RSRollbackTest, RollbackInsertDocumentWithNoId) {
@@ -376,7 +379,7 @@ TEST_F(RSRollbackTest, RollbackInsertDocumentWithNoId) {
rollbackSource,
{},
_coordinator,
- &_storageInterface);
+ _replicationProcess.get());
stopCapturingLogMessages();
ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code());
ASSERT_EQUALS(18752, status.location());
@@ -441,7 +444,7 @@ TEST_F(RSRollbackTest, RollbackCreateIndexCommand) {
rollbackSource,
{},
_coordinator,
- &_storageInterface));
+ _replicationProcess.get()));
stopCapturingLogMessages();
ASSERT_EQUALS(1,
countLogLinesContaining("rollback drop index: collection: test.t. index: a_1"));
@@ -502,7 +505,7 @@ TEST_F(RSRollbackTest, RollbackCreateIndexCommandIndexNotInCatalog) {
rollbackSource,
{},
_coordinator,
- &_storageInterface));
+ _replicationProcess.get()));
stopCapturingLogMessages();
ASSERT_EQUALS(1,
countLogLinesContaining("rollback drop index: collection: test.t. index: a_1"));
@@ -551,7 +554,7 @@ TEST_F(RSRollbackTest, RollbackCreateIndexCommandMissingNamespace) {
rollbackSource,
{},
_coordinator,
- &_storageInterface);
+ _replicationProcess.get());
stopCapturingLogMessages();
ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code());
ASSERT_EQUALS(18752, status.location());
@@ -614,7 +617,7 @@ TEST_F(RSRollbackTest, RollbackDropIndexCommandWithOneIndex) {
rollbackSource,
{},
_coordinator,
- &_storageInterface));
+ _replicationProcess.get()));
ASSERT(rollbackSource.called);
}
@@ -694,7 +697,7 @@ TEST_F(RSRollbackTest, RollbackDropIndexCommandWithMultipleIndexes) {
rollbackSource,
{},
_coordinator,
- &_storageInterface));
+ _replicationProcess.get()));
ASSERT(rollbackSource.called);
}
@@ -737,7 +740,7 @@ TEST_F(RSRollbackTest, RollbackCreateIndexCommandInvalidNamespace) {
rollbackSource,
{},
_coordinator,
- &_storageInterface);
+ _replicationProcess.get());
stopCapturingLogMessages();
ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code());
ASSERT_EQUALS(18752, status.location());
@@ -783,7 +786,7 @@ TEST_F(RSRollbackTest, RollbackCreateIndexCommandMissingIndexName) {
rollbackSource,
{},
_coordinator,
- &_storageInterface);
+ _replicationProcess.get());
stopCapturingLogMessages();
ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code());
ASSERT_EQUALS(18752, status.location());
@@ -820,7 +823,7 @@ TEST_F(RSRollbackTest, RollbackUnknownCommand) {
}))),
{},
_coordinator,
- &_storageInterface);
+ _replicationProcess.get());
ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code());
ASSERT_EQUALS(18751, status.location());
}
@@ -857,7 +860,7 @@ TEST_F(RSRollbackTest, RollbackDropCollectionCommand) {
rollbackSource,
{},
_coordinator,
- &_storageInterface));
+ _replicationProcess.get()));
ASSERT_TRUE(rollbackSource.called);
}
@@ -896,7 +899,7 @@ TEST_F(RSRollbackTest, RollbackDropCollectionCommandFailsIfRBIDChangesWhileSynci
rollbackSource,
0,
_coordinator,
- &_storageInterface),
+ _replicationProcess.get()),
DBException,
40365);
ASSERT(rollbackSource.copyCollectionCalled);
@@ -1021,7 +1024,7 @@ TEST_F(RSRollbackTest, RollbackApplyOpsCommand) {
rollbackSource,
{},
_coordinator,
- &_storageInterface));
+ _replicationProcess.get()));
ASSERT_EQUALS(4U, rollbackSource.searchedIds.size());
ASSERT_EQUALS(1U, rollbackSource.searchedIds.count(1));
ASSERT_EQUALS(1U, rollbackSource.searchedIds.count(2));
@@ -1062,7 +1065,7 @@ TEST_F(RSRollbackTest, RollbackCreateCollectionCommand) {
rollbackSource,
{},
_coordinator,
- &_storageInterface));
+ _replicationProcess.get()));
{
Lock::DBLock dbLock(_opCtx.get(), "test", MODE_S);
auto db = dbHolder().get(_opCtx.get(), "test");
@@ -1106,7 +1109,7 @@ TEST_F(RSRollbackTest, RollbackCollectionModificationCommand) {
rollbackSource,
{},
_coordinator,
- &_storageInterface));
+ _replicationProcess.get()));
stopCapturingLogMessages();
ASSERT_TRUE(rollbackSource.called);
for (const auto& message : getCapturedLogMessages()) {
@@ -1148,7 +1151,7 @@ TEST_F(RSRollbackTest, RollbackCollectionModificationCommandInvalidCollectionOpt
rollbackSource,
{},
_coordinator,
- &_storageInterface);
+ _replicationProcess.get());
ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code());
ASSERT_EQUALS(18753, status.location());
}
@@ -1196,7 +1199,7 @@ TEST_F(RSRollbackTest, RollbackReturnsImmediatelyOnFailureToTransitionToRollback
rollbackSourceWithInvalidOplog,
{},
_coordinator,
- &_storageInterface);
+ _replicationProcess.get());
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("Cannot transition from SECONDARY to ROLLBACK"));
@@ -1219,7 +1222,7 @@ DEATH_TEST_F(RSRollbackTest,
rollbackSourceWithInvalidOplog,
{},
_coordinator,
- &_storageInterface);
+ _replicationProcess.get());
}
TEST_F(RSRollbackTest, RollbackLogsRetryMessageAndReturnsOnNonUnrecoverableRollbackError) {
@@ -1238,7 +1241,7 @@ TEST_F(RSRollbackTest, RollbackLogsRetryMessageAndReturnsOnNonUnrecoverableRollb
rollbackSourceWithValidOplog,
{},
_coordinator,
- &_storageInterface,
+ _replicationProcess.get(),
noopSleepSecsFn);
stopCapturingLogMessages();
@@ -1261,7 +1264,7 @@ DEATH_TEST_F(RSRollbackTest,
ASSERT_TRUE(ShardIdentityRollbackNotifier::get(_opCtx.get())->didRollbackHappen());
createOplog(_opCtx.get());
- rollback(_opCtx.get(), localOplog, rollbackSource, {}, _coordinator, &_storageInterface);
+ rollback(_opCtx.get(), localOplog, rollbackSource, {}, _coordinator, _replicationProcess.get());
}
DEATH_TEST_F(
@@ -1277,7 +1280,7 @@ DEATH_TEST_F(
_coordinator->_failSetFollowerModeOnThisMemberState = MemberState::RS_RECOVERING;
createOplog(_opCtx.get());
- rollback(_opCtx.get(), localOplog, rollbackSource, {}, _coordinator, &_storageInterface);
+ rollback(_opCtx.get(), localOplog, rollbackSource, {}, _coordinator, _replicationProcess.get());
}
// The testcases used here are trying to detect off-by-one errors in
diff --git a/src/mongo/db/repl/storage_interface.h b/src/mongo/db/repl/storage_interface.h
index bf5a101b6a7..630d481660f 100644
--- a/src/mongo/db/repl/storage_interface.h
+++ b/src/mongo/db/repl/storage_interface.h
@@ -61,28 +61,6 @@ namespace repl {
* * Drop database and all user databases
* * Drop a collection
* * Insert documents into a collection
- * * Manage minvalid boundaries and initial sync state
- *
- * ***** MINVALID *****
- * This interface provides helper functions for maintaining a single document in the
- * local.replset.minvalid collection.
- *
- * When a member reaches its minValid optime it is in a consistent state. Thus, minValid is
- * set as the last step in initial sync. At the beginning of initial sync, doingInitialSync
- * is appended onto minValid to indicate that initial sync was started but has not yet
- * completed.
- *
- * The document is also updated during "normal" sync. The optime of the last op in each batch is
- * used to set minValid, along with a "begin" field to demark the start and the fact that a batch
- * is active. When the batch is done the "begin" field is removed to indicate that we are in a
- * consistent state when the batch has been fully applied.
- *
- * Example of all fields:
- * { _id:...,
- * doingInitialSync: true // initial sync is active
- * ts:..., t:... // end-OpTime
- * begin: {ts:..., t:...} // a batch is currently being applied, and not consistent
- * }
*/
class StorageInterface {
MONGO_DISALLOW_COPYING(StorageInterface);
@@ -98,42 +76,6 @@ public:
StorageInterface() = default;
virtual ~StorageInterface() = default;
- // MinValid and Initial Sync Flag.
- /**
- * Returns true if initial sync was started but has not not completed.
- */
- virtual bool getInitialSyncFlag(OperationContext* opCtx) const = 0;
-
- /**
- * Sets the the initial sync flag to record that initial sync has not completed.
- *
- * This operation is durable and waits for durable writes (which will block on
- *journaling/checkpointing).
- */
- virtual void setInitialSyncFlag(OperationContext* opCtx) = 0;
-
- /**
- * Clears the the initial sync flag to record that initial sync has completed.
- *
- * This operation is durable and waits for durable writes (which will block on
- *journaling/checkpointing).
- */
- virtual void clearInitialSyncFlag(OperationContext* opCtx) = 0;
-
- /**
- * The minValid value is the earliest (minimum) Timestamp that must be applied in order to
- * consider the dataset consistent.
- */
- virtual void setMinValid(OperationContext* opCtx, const OpTime& minValid) = 0;
- virtual OpTime getMinValid(OperationContext* opCtx) const = 0;
-
- /**
- * Sets minValid only if it is not already higher than endOpTime.
- * Warning, this compares the term and timestamp independently. Do not use if the current
- * minValid could be from the other fork of a rollback.
- */
- virtual void setMinValidToAtLeast(OperationContext* opCtx, const OpTime& endOpTime) = 0;
-
/**
* Rollback ID is an increasing counter of how many rollbacks have occurred on this server.
*/
@@ -141,27 +83,6 @@ public:
virtual Status initializeRollbackID(OperationContext* opCtx) = 0;
virtual Status incrementRollbackID(OperationContext* opCtx) = 0;
- /**
- * On startup all oplog entries with a value >= the oplog delete from point should be deleted.
- * If null, no documents should be deleted.
- */
- virtual void setOplogDeleteFromPoint(OperationContext* opCtx, const Timestamp& timestamp) = 0;
- virtual Timestamp getOplogDeleteFromPoint(OperationContext* opCtx) = 0;
-
- /**
- * The applied through point is a persistent record of where we've applied through. If null, the
- * applied through point is the top of the oplog.
- */
- virtual void setAppliedThrough(OperationContext* opCtx, const OpTime& optime) = 0;
-
- /**
- * You should probably be calling ReplicationCoordinator::getLastAppliedOpTime() instead.
- *
- * This reads the value from storage which isn't always updated when the ReplicationCoordinator
- * is.
- */
- virtual OpTime getAppliedThrough(OperationContext* opCtx) = 0;
-
// Collection creation and population for initial sync.
/**
diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp
index 4ab7100f2b3..ae01e86d259 100644
--- a/src/mongo/db/repl/storage_interface_impl.cpp
+++ b/src/mongo/db/repl/storage_interface_impl.cpp
@@ -80,10 +80,6 @@
namespace mongo {
namespace repl {
-const char StorageInterfaceImpl::kDefaultMinValidNamespace[] = "local.replset.minvalid";
-const char StorageInterfaceImpl::kInitialSyncFlagFieldName[] = "doingInitialSync";
-const char StorageInterfaceImpl::kBeginFieldName[] = "begin";
-const char StorageInterfaceImpl::kOplogDeleteFromPointFieldName[] = "oplogDeleteFromPoint";
const char StorageInterfaceImpl::kDefaultRollbackIdNamespace[] = "local.system.rollback.id";
const char StorageInterfaceImpl::kRollbackIdFieldName[] = "rollbackId";
const char StorageInterfaceImpl::kRollbackIdDocumentId[] = "rollbackId";
@@ -91,48 +87,12 @@ const char StorageInterfaceImpl::kRollbackIdDocumentId[] = "rollbackId";
namespace {
using UniqueLock = stdx::unique_lock<stdx::mutex>;
-const BSONObj kInitialSyncFlag(BSON(StorageInterfaceImpl::kInitialSyncFlagFieldName << true));
-
const auto kIdIndexName = "_id_"_sd;
} // namespace
StorageInterfaceImpl::StorageInterfaceImpl()
- : StorageInterfaceImpl(NamespaceString(StorageInterfaceImpl::kDefaultMinValidNamespace)) {}
-
-StorageInterfaceImpl::StorageInterfaceImpl(const NamespaceString& minValidNss)
- : _minValidNss(minValidNss),
- _rollbackIdNss(StorageInterfaceImpl::kDefaultRollbackIdNamespace) {}
-
-NamespaceString StorageInterfaceImpl::getMinValidNss() const {
- return _minValidNss;
-}
-
-BSONObj StorageInterfaceImpl::getMinValidDocument(OperationContext* opCtx) const {
- MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- Lock::DBLock dblk(opCtx, _minValidNss.db(), MODE_IS);
- Lock::CollectionLock lk(opCtx->lockState(), _minValidNss.ns(), MODE_IS);
- BSONObj doc;
- bool found = Helpers::getSingleton(opCtx, _minValidNss.ns().c_str(), doc);
- invariant(found || doc.isEmpty());
- return doc;
- }
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(
- opCtx, "StorageInterfaceImpl::getMinValidDocument", _minValidNss.ns());
-
- MONGO_UNREACHABLE;
-}
-
-void StorageInterfaceImpl::updateMinValidDocument(OperationContext* opCtx,
- const BSONObj& updateSpec) {
- MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- // For now this needs to be MODE_X because it sometimes creates the collection.
- Lock::DBLock dblk(opCtx, _minValidNss.db(), MODE_X);
- Helpers::putSingleton(opCtx, _minValidNss.ns().c_str(), updateSpec);
- }
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(
- opCtx, "StorageInterfaceImpl::updateMinValidDocument", _minValidNss.ns());
-}
+ : _rollbackIdNss(StorageInterfaceImpl::kDefaultRollbackIdNamespace) {}
StatusWith<int> StorageInterfaceImpl::getRollbackID(OperationContext* opCtx) {
BSONObjBuilder bob;
@@ -201,114 +161,6 @@ Status StorageInterfaceImpl::incrementRollbackID(OperationContext* opCtx) {
return status;
}
-bool StorageInterfaceImpl::getInitialSyncFlag(OperationContext* opCtx) const {
- const BSONObj doc = getMinValidDocument(opCtx);
- const auto flag = doc[kInitialSyncFlagFieldName].trueValue();
- LOG(3) << "returning initial sync flag value of " << flag;
- return flag;
-}
-
-void StorageInterfaceImpl::setInitialSyncFlag(OperationContext* opCtx) {
- LOG(3) << "setting initial sync flag";
- updateMinValidDocument(opCtx, BSON("$set" << kInitialSyncFlag));
- opCtx->recoveryUnit()->waitUntilDurable();
-}
-
-void StorageInterfaceImpl::clearInitialSyncFlag(OperationContext* opCtx) {
- LOG(3) << "clearing initial sync flag";
-
- auto replCoord = repl::ReplicationCoordinator::get(opCtx);
- OpTime time = replCoord->getMyLastAppliedOpTime();
- updateMinValidDocument(
- opCtx,
- BSON("$unset" << kInitialSyncFlag << "$set"
- << BSON("ts" << time.getTimestamp() << "t" << time.getTerm()
- << kBeginFieldName
- << time)));
-
- if (getGlobalServiceContext()->getGlobalStorageEngine()->isDurable()) {
- opCtx->recoveryUnit()->waitUntilDurable();
- replCoord->setMyLastDurableOpTime(time);
- }
-}
-
-OpTime StorageInterfaceImpl::getMinValid(OperationContext* opCtx) const {
- const BSONObj doc = getMinValidDocument(opCtx);
- const auto opTimeStatus = OpTime::parseFromOplogEntry(doc);
- // If any of the keys (fields) are missing from the minvalid document, we return
- // a null OpTime.
- if (opTimeStatus == ErrorCodes::NoSuchKey) {
- return {};
- }
-
- if (!opTimeStatus.isOK()) {
- severe() << "Error parsing minvalid entry: " << redact(doc)
- << ", with status:" << opTimeStatus.getStatus();
- fassertFailedNoTrace(40052);
- }
-
- OpTime minValid = opTimeStatus.getValue();
- LOG(3) << "returning minvalid: " << minValid.toString() << "(" << minValid.toBSON() << ")";
-
- return minValid;
-}
-
-void StorageInterfaceImpl::setMinValid(OperationContext* opCtx, const OpTime& minValid) {
- LOG(3) << "setting minvalid to exactly: " << minValid.toString() << "(" << minValid.toBSON()
- << ")";
- updateMinValidDocument(
- opCtx, BSON("$set" << BSON("ts" << minValid.getTimestamp() << "t" << minValid.getTerm())));
-}
-
-void StorageInterfaceImpl::setMinValidToAtLeast(OperationContext* opCtx, const OpTime& minValid) {
- LOG(3) << "setting minvalid to at least: " << minValid.toString() << "(" << minValid.toBSON()
- << ")";
- updateMinValidDocument(
- opCtx, BSON("$max" << BSON("ts" << minValid.getTimestamp() << "t" << minValid.getTerm())));
-}
-
-void StorageInterfaceImpl::setOplogDeleteFromPoint(OperationContext* opCtx,
- const Timestamp& timestamp) {
- LOG(3) << "setting oplog delete from point to: " << timestamp.toStringPretty();
- updateMinValidDocument(opCtx,
- BSON("$set" << BSON(kOplogDeleteFromPointFieldName << timestamp)));
-}
-
-Timestamp StorageInterfaceImpl::getOplogDeleteFromPoint(OperationContext* opCtx) {
- const BSONObj doc = getMinValidDocument(opCtx);
- Timestamp out = {};
- if (auto field = doc[kOplogDeleteFromPointFieldName]) {
- out = field.timestamp();
- }
-
- LOG(3) << "returning oplog delete from point: " << out;
- return out;
-}
-
-void StorageInterfaceImpl::setAppliedThrough(OperationContext* opCtx, const OpTime& optime) {
- LOG(3) << "setting appliedThrough to: " << optime.toString() << "(" << optime.toBSON() << ")";
- if (optime.isNull()) {
- updateMinValidDocument(opCtx, BSON("$unset" << BSON(kBeginFieldName << 1)));
- } else {
- updateMinValidDocument(opCtx, BSON("$set" << BSON(kBeginFieldName << optime)));
- }
-}
-
-OpTime StorageInterfaceImpl::getAppliedThrough(OperationContext* opCtx) {
- const BSONObj doc = getMinValidDocument(opCtx);
- const auto opTimeStatus = OpTime::parseFromOplogEntry(doc.getObjectField(kBeginFieldName));
- if (!opTimeStatus.isOK()) {
- // Return null OpTime on any parse failure, including if "begin" is missing.
- return {};
- }
-
- OpTime appliedThrough = opTimeStatus.getValue();
- LOG(3) << "returning appliedThrough: " << appliedThrough.toString() << "("
- << appliedThrough.toBSON() << ")";
-
- return appliedThrough;
-}
-
StatusWith<std::unique_ptr<CollectionBulkLoader>>
StorageInterfaceImpl::createCollectionForBulkLoading(
const NamespaceString& nss,
diff --git a/src/mongo/db/repl/storage_interface_impl.h b/src/mongo/db/repl/storage_interface_impl.h
index a41d0ae4584..634fc428d05 100644
--- a/src/mongo/db/repl/storage_interface_impl.h
+++ b/src/mongo/db/repl/storage_interface_impl.h
@@ -45,38 +45,15 @@ class StorageInterfaceImpl : public StorageInterface {
MONGO_DISALLOW_COPYING(StorageInterfaceImpl);
public:
- static const char kDefaultMinValidNamespace[];
- static const char kInitialSyncFlagFieldName[];
- static const char kBeginFieldName[];
- static const char kOplogDeleteFromPointFieldName[];
static const char kDefaultRollbackIdNamespace[];
static const char kRollbackIdFieldName[];
static const char kRollbackIdDocumentId[];
StorageInterfaceImpl();
- explicit StorageInterfaceImpl(const NamespaceString& minValidNss);
- /**
- * Returns namespace of collection containing the minvalid boundaries and initial sync flag.
- */
- NamespaceString getMinValidNss() const;
-
- bool getInitialSyncFlag(OperationContext* opCtx) const override;
-
- void setInitialSyncFlag(OperationContext* opCtx) override;
-
- void clearInitialSyncFlag(OperationContext* opCtx) override;
-
- OpTime getMinValid(OperationContext* opCtx) const override;
- void setMinValid(OperationContext* opCtx, const OpTime& minValid) override;
- void setMinValidToAtLeast(OperationContext* opCtx, const OpTime& endOpTime) override;
StatusWith<int> getRollbackID(OperationContext* opCtx) override;
Status initializeRollbackID(OperationContext* opCtx) override;
Status incrementRollbackID(OperationContext* opCtx) override;
- void setOplogDeleteFromPoint(OperationContext* opCtx, const Timestamp& timestamp) override;
- Timestamp getOplogDeleteFromPoint(OperationContext* opCtx) override;
- void setAppliedThrough(OperationContext* opCtx, const OpTime& optime) override;
- OpTime getAppliedThrough(OperationContext* opCtx) override;
/**
* Allocates a new TaskRunner for use by the passed in collection.
@@ -158,11 +135,6 @@ public:
Status isAdminDbValid(OperationContext* opCtx) override;
private:
- // Returns empty document if not present.
- BSONObj getMinValidDocument(OperationContext* opCtx) const;
- void updateMinValidDocument(OperationContext* opCtx, const BSONObj& updateSpec);
-
- const NamespaceString _minValidNss;
const NamespaceString _rollbackIdNss;
};
diff --git a/src/mongo/db/repl/storage_interface_impl_test.cpp b/src/mongo/db/repl/storage_interface_impl_test.cpp
index e15f54014b2..2c6087aef83 100644
--- a/src/mongo/db/repl/storage_interface_impl_test.cpp
+++ b/src/mongo/db/repl/storage_interface_impl_test.cpp
@@ -39,19 +39,15 @@
#include "mongo/db/client.h"
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
-#include "mongo/db/curop.h"
#include "mongo/db/db_raii.h"
-#include "mongo/db/dbhelpers.h"
#include "mongo/db/index/index_descriptor.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/oplog_interface_local.h"
-#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/repl/storage_interface_impl.h"
#include "mongo/db/service_context_d_test_fixture.h"
-#include "mongo/db/storage/recovery_unit_noop.h"
#include "mongo/stdx/memory.h"
#include "mongo/stdx/thread.h"
#include "mongo/unittest/unittest.h"
@@ -87,22 +83,6 @@ NamespaceString makeNamespace(const T& t, const std::string& suffix = "") {
}
/**
- * Returns min valid document.
- */
-BSONObj getMinValidDocument(OperationContext* opCtx, const NamespaceString& minValidNss) {
- MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- Lock::DBLock dblk(opCtx, minValidNss.db(), MODE_IS);
- Lock::CollectionLock lk(opCtx->lockState(), minValidNss.ns(), MODE_IS);
- BSONObj mv;
- if (Helpers::getSingleton(opCtx, minValidNss.ns().c_str(), mv)) {
- return mv;
- }
- }
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(opCtx, "getMinValidDocument", minValidNss.ns());
- return BSONObj();
-}
-
-/**
* Creates collection options suitable for oplog.
*/
CollectionOptions createOplogCollectionOptions() {
@@ -194,27 +174,12 @@ private:
_ddv = stdx::make_unique<DisableDocumentValidation>(_opCtx.get());
}
-private:
ServiceContext::UniqueOperationContext _opCtx;
std::unique_ptr<UnreplicatedWritesBlock> _uwb;
std::unique_ptr<DisableDocumentValidation> _ddv;
ReplicationCoordinatorMock* _replicationCoordinatorMock = nullptr;
};
-/**
- * Recovery unit that tracks if waitUntilDurable() is called.
- */
-class RecoveryUnitWithDurabilityTracking : public RecoveryUnitNoop {
-public:
- bool waitUntilDurable() override;
- bool waitUntilDurableCalled = false;
-};
-
-bool RecoveryUnitWithDurabilityTracking::waitUntilDurable() {
- waitUntilDurableCalled = true;
- return RecoveryUnitNoop::waitUntilDurable();
-}
-
TEST_F(StorageInterfaceImplTest, ServiceContextDecorator) {
auto serviceContext = getServiceContext();
ASSERT_FALSE(StorageInterface::get(serviceContext));
@@ -225,108 +190,6 @@ TEST_F(StorageInterfaceImplTest, ServiceContextDecorator) {
ASSERT_TRUE(storage == StorageInterface::get(getOperationContext()));
}
-TEST_F(StorageInterfaceImplTest, DefaultMinValidNamespace) {
- ASSERT_EQUALS(NamespaceString(StorageInterfaceImpl::kDefaultMinValidNamespace),
- StorageInterfaceImpl().getMinValidNss());
-}
-
-TEST_F(StorageInterfaceImplTest, InitialSyncFlag) {
- auto nss = makeNamespace(_agent);
-
- StorageInterfaceImpl storage(nss);
- auto opCtx = getOperationContext();
-
- // Initial sync flag should be unset after initializing a new storage engine.
- ASSERT_FALSE(storage.getInitialSyncFlag(opCtx));
-
- // Setting initial sync flag should affect getInitialSyncFlag() result.
- storage.setInitialSyncFlag(opCtx);
- ASSERT_TRUE(storage.getInitialSyncFlag(opCtx));
-
- // Check min valid document using storage engine interface.
- auto minValidDocument = getMinValidDocument(opCtx, nss);
- ASSERT_TRUE(minValidDocument.hasField(StorageInterfaceImpl::kInitialSyncFlagFieldName));
- ASSERT_TRUE(minValidDocument.getBoolField(StorageInterfaceImpl::kInitialSyncFlagFieldName));
-
- // Clearing initial sync flag should affect getInitialSyncFlag() result.
- storage.clearInitialSyncFlag(opCtx);
- ASSERT_FALSE(storage.getInitialSyncFlag(opCtx));
-}
-
-TEST_F(StorageInterfaceImplTest, GetMinValidAfterSettingInitialSyncFlagWorks) {
- auto nss = makeNamespace(_agent);
-
- StorageInterfaceImpl storage(nss);
- auto opCtx = getOperationContext();
-
- // Initial sync flag should be unset after initializing a new storage engine.
- ASSERT_FALSE(storage.getInitialSyncFlag(opCtx));
-
- // Setting initial sync flag should affect getInitialSyncFlag() result.
- storage.setInitialSyncFlag(opCtx);
- ASSERT_TRUE(storage.getInitialSyncFlag(opCtx));
-
- ASSERT(storage.getMinValid(opCtx).isNull());
- ASSERT(storage.getAppliedThrough(opCtx).isNull());
- ASSERT(storage.getOplogDeleteFromPoint(opCtx).isNull());
-}
-
-TEST_F(StorageInterfaceImplTest, MinValid) {
- auto nss = makeNamespace(_agent);
-
- StorageInterfaceImpl storage(nss);
- auto opCtx = getOperationContext();
-
- // MinValid boundaries should all be null after initializing a new storage engine.
- ASSERT(storage.getMinValid(opCtx).isNull());
- ASSERT(storage.getAppliedThrough(opCtx).isNull());
- ASSERT(storage.getOplogDeleteFromPoint(opCtx).isNull());
-
- // Setting min valid boundaries should affect getMinValid() result.
- OpTime startOpTime({Seconds(123), 0}, 1LL);
- OpTime endOpTime({Seconds(456), 0}, 1LL);
- storage.setAppliedThrough(opCtx, startOpTime);
- storage.setMinValid(opCtx, endOpTime);
- storage.setOplogDeleteFromPoint(opCtx, endOpTime.getTimestamp());
-
- ASSERT_EQ(storage.getAppliedThrough(opCtx), startOpTime);
- ASSERT_EQ(storage.getMinValid(opCtx), endOpTime);
- ASSERT_EQ(storage.getOplogDeleteFromPoint(opCtx), endOpTime.getTimestamp());
-
-
- // setMinValid always changes minValid, but setMinValidToAtLeast only does if higher.
- storage.setMinValid(opCtx, startOpTime); // Forcibly lower it.
- ASSERT_EQ(storage.getMinValid(opCtx), startOpTime);
- storage.setMinValidToAtLeast(opCtx, endOpTime); // Higher than current (sets it).
- ASSERT_EQ(storage.getMinValid(opCtx), endOpTime);
- storage.setMinValidToAtLeast(opCtx, startOpTime); // Lower than current (no-op).
- ASSERT_EQ(storage.getMinValid(opCtx), endOpTime);
-
- // Check min valid document using storage engine interface.
- auto minValidDocument = getMinValidDocument(opCtx, nss);
- ASSERT_TRUE(minValidDocument.hasField(StorageInterfaceImpl::kBeginFieldName));
- ASSERT_TRUE(minValidDocument[StorageInterfaceImpl::kBeginFieldName].isABSONObj());
- ASSERT_EQUALS(startOpTime,
- unittest::assertGet(OpTime::parseFromOplogEntry(
- minValidDocument[StorageInterfaceImpl::kBeginFieldName].Obj())));
- ASSERT_EQUALS(endOpTime, unittest::assertGet(OpTime::parseFromOplogEntry(minValidDocument)));
- ASSERT_EQUALS(
- endOpTime.getTimestamp(),
- minValidDocument[StorageInterfaceImpl::kOplogDeleteFromPointFieldName].timestamp());
-
- // Recovery unit will be owned by "opCtx".
- RecoveryUnitWithDurabilityTracking* recoveryUnit = new RecoveryUnitWithDurabilityTracking();
- opCtx->setRecoveryUnit(recoveryUnit, OperationContext::kNotInUnitOfWork);
-
- // Set min valid without waiting for the changes to be durable.
- OpTime endOpTime2({Seconds(789), 0}, 1LL);
- storage.setMinValid(opCtx, endOpTime2);
- storage.setAppliedThrough(opCtx, {});
- ASSERT_EQUALS(storage.getAppliedThrough(opCtx), OpTime());
- ASSERT_EQUALS(storage.getMinValid(opCtx), endOpTime2);
- ASSERT_FALSE(recoveryUnit->waitUntilDurableCalled);
-}
-
TEST_F(StorageInterfaceImplTest, GetRollbackIDReturnsNamespaceNotFoundOnMissingCollection) {
StorageInterfaceImpl storage;
auto opCtx = getOperationContext();
@@ -474,7 +337,7 @@ TEST_F(StorageInterfaceImplTest, InsertDocumentsReturnsOKWhenNoOperationsAreGive
auto opCtx = getOperationContext();
auto nss = makeNamespace(_agent);
createCollection(opCtx, nss);
- StorageInterfaceImpl storage(nss);
+ StorageInterfaceImpl storage;
ASSERT_OK(storage.insertDocuments(opCtx, nss, {}));
}
@@ -487,7 +350,7 @@ TEST_F(StorageInterfaceImplTest,
createCollection(opCtx, nss);
// Non-oplog collection will enforce mandatory _id field requirement on insertion.
- StorageInterfaceImpl storage(nss);
+ StorageInterfaceImpl storage;
auto op = makeOplogEntry({Timestamp(Seconds(1), 0), 1LL});
auto status = storage.insertDocuments(opCtx, nss, {op});
ASSERT_EQUALS(ErrorCodes::InternalError, status);
@@ -504,7 +367,7 @@ TEST_F(StorageInterfaceImplTest,
options.cappedSize = 1024 * 1024;
createCollection(opCtx, nss, options);
// StorageInterfaceImpl::insertDocuments should fall back on inserting the batch one at a time.
- StorageInterfaceImpl storage(nss);
+ StorageInterfaceImpl storage;
auto doc1 = BSON("_id" << 1);
auto doc2 = BSON("_id" << 2);
std::vector<BSONObj> docs({doc1, doc2});
diff --git a/src/mongo/db/repl/storage_interface_mock.cpp b/src/mongo/db/repl/storage_interface_mock.cpp
index 531e4b17310..64ebecdc573 100644
--- a/src/mongo/db/repl/storage_interface_mock.cpp
+++ b/src/mongo/db/repl/storage_interface_mock.cpp
@@ -39,38 +39,8 @@
namespace mongo {
namespace repl {
-bool StorageInterfaceMock::getInitialSyncFlag(OperationContext* opCtx) const {
- stdx::lock_guard<stdx::mutex> lock(_initialSyncFlagMutex);
- return _initialSyncFlag;
-}
-
-void StorageInterfaceMock::setInitialSyncFlag(OperationContext* opCtx) {
- stdx::lock_guard<stdx::mutex> lock(_initialSyncFlagMutex);
- _initialSyncFlag = true;
-}
-
-void StorageInterfaceMock::clearInitialSyncFlag(OperationContext* opCtx) {
- stdx::lock_guard<stdx::mutex> lock(_initialSyncFlagMutex);
- _initialSyncFlag = false;
-}
-
-OpTime StorageInterfaceMock::getMinValid(OperationContext* opCtx) const {
- stdx::lock_guard<stdx::mutex> lock(_minValidBoundariesMutex);
- return _minValid;
-}
-
-void StorageInterfaceMock::setMinValid(OperationContext* opCtx, const OpTime& minValid) {
- stdx::lock_guard<stdx::mutex> lock(_minValidBoundariesMutex);
- _minValid = minValid;
-}
-
-void StorageInterfaceMock::setMinValidToAtLeast(OperationContext* opCtx, const OpTime& minValid) {
- stdx::lock_guard<stdx::mutex> lock(_minValidBoundariesMutex);
- _minValid = std::max(_minValid, minValid);
-}
-
StatusWith<int> StorageInterfaceMock::getRollbackID(OperationContext* opCtx) {
- stdx::lock_guard<stdx::mutex> lock(_minValidBoundariesMutex);
+ stdx::lock_guard<stdx::mutex> lock(_rbidMutex);
if (!_rbidInitialized) {
return Status(ErrorCodes::NamespaceNotFound, "Rollback ID not initialized");
}
@@ -78,7 +48,7 @@ StatusWith<int> StorageInterfaceMock::getRollbackID(OperationContext* opCtx) {
}
Status StorageInterfaceMock::initializeRollbackID(OperationContext* opCtx) {
- stdx::lock_guard<stdx::mutex> lock(_minValidBoundariesMutex);
+ stdx::lock_guard<stdx::mutex> lock(_rbidMutex);
if (_rbidInitialized) {
return Status(ErrorCodes::NamespaceExists, "Rollback ID already initialized");
}
@@ -90,35 +60,13 @@ Status StorageInterfaceMock::initializeRollbackID(OperationContext* opCtx) {
}
Status StorageInterfaceMock::incrementRollbackID(OperationContext* opCtx) {
- stdx::lock_guard<stdx::mutex> lock(_minValidBoundariesMutex);
+ stdx::lock_guard<stdx::mutex> lock(_rbidMutex);
if (!_rbidInitialized) {
return Status(ErrorCodes::NamespaceNotFound, "Rollback ID not initialized");
}
_rbid++;
return Status::OK();
}
-
-void StorageInterfaceMock::setOplogDeleteFromPoint(OperationContext* opCtx,
- const Timestamp& timestamp) {
- stdx::lock_guard<stdx::mutex> lock(_minValidBoundariesMutex);
- _oplogDeleteFromPoint = timestamp;
-}
-
-Timestamp StorageInterfaceMock::getOplogDeleteFromPoint(OperationContext* opCtx) {
- stdx::lock_guard<stdx::mutex> lock(_minValidBoundariesMutex);
- return _oplogDeleteFromPoint;
-}
-
-void StorageInterfaceMock::setAppliedThrough(OperationContext* opCtx, const OpTime& optime) {
- stdx::lock_guard<stdx::mutex> lock(_minValidBoundariesMutex);
- _appliedThrough = optime;
-}
-
-OpTime StorageInterfaceMock::getAppliedThrough(OperationContext* opCtx) {
- stdx::lock_guard<stdx::mutex> lock(_minValidBoundariesMutex);
- return _appliedThrough;
-}
-
Status CollectionBulkLoaderMock::init(Collection* coll,
const std::vector<BSONObj>& secondaryIndexSpecs) {
LOG(1) << "CollectionBulkLoaderMock::init called";
diff --git a/src/mongo/db/repl/storage_interface_mock.h b/src/mongo/db/repl/storage_interface_mock.h
index eaf888b036e..74a6bac4abc 100644
--- a/src/mongo/db/repl/storage_interface_mock.h
+++ b/src/mongo/db/repl/storage_interface_mock.h
@@ -122,20 +122,9 @@ public:
StorageInterfaceMock() = default;
- bool getInitialSyncFlag(OperationContext* opCtx) const override;
- void setInitialSyncFlag(OperationContext* opCtx) override;
- void clearInitialSyncFlag(OperationContext* opCtx) override;
-
- OpTime getMinValid(OperationContext* opCtx) const override;
- void setMinValid(OperationContext* opCtx, const OpTime& minValid) override;
- void setMinValidToAtLeast(OperationContext* opCtx, const OpTime& minValid) override;
StatusWith<int> getRollbackID(OperationContext* opCtx) override;
Status initializeRollbackID(OperationContext* opCtx) override;
Status incrementRollbackID(OperationContext* opCtx) override;
- void setOplogDeleteFromPoint(OperationContext* opCtx, const Timestamp& timestamp) override;
- Timestamp getOplogDeleteFromPoint(OperationContext* opCtx) override;
- void setAppliedThrough(OperationContext* opCtx, const OpTime& optime) override;
- OpTime getAppliedThrough(OperationContext* opCtx) override;
StatusWith<std::unique_ptr<CollectionBulkLoader>> createCollectionForBulkLoading(
const NamespaceString& nss,
@@ -306,15 +295,9 @@ public:
};
private:
- mutable stdx::mutex _initialSyncFlagMutex;
- bool _initialSyncFlag = false;
-
- mutable stdx::mutex _minValidBoundariesMutex;
- OpTime _appliedThrough;
- OpTime _minValid;
+ mutable stdx::mutex _rbidMutex;
int _rbid;
bool _rbidInitialized = false;
- Timestamp _oplogDeleteFromPoint;
};
} // namespace repl
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 68542125d4d..6dda8980def 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -62,6 +62,7 @@
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/repl_set_config.h"
#include "mongo/db/repl/replication_coordinator_global.h"
+#include "mongo/db/repl/replication_process.h"
#include "mongo/db/repl/storage_interface.h"
#include "mongo/db/server_parameters.h"
#include "mongo/db/service_context.h"
@@ -656,7 +657,8 @@ void tryToGoLiveAsASecondary(OperationContext* opCtx, ReplicationCoordinator* re
}
// We can't go to SECONDARY until we reach minvalid.
- if (replCoord->getMyLastAppliedOpTime() < StorageInterface::get(opCtx)->getMinValid(opCtx)) {
+ if (replCoord->getMyLastAppliedOpTime() <
+ ReplicationProcess::get(opCtx)->getConsistencyMarkers()->getMinValid(opCtx)) {
return;
}
@@ -817,8 +819,10 @@ void SyncTail::oplogApplication(ReplicationCoordinator* replCoord) {
// Update various things that care about our last applied optime. Tests rely on 2 happening
// before 3 even though it isn't strictly necessary. The order of 1 doesn't matter.
setNewTimestamp(opCtx.getServiceContext(), lastOpTimeInBatch.getTimestamp()); // 1
- StorageInterface::get(&opCtx)->setAppliedThrough(&opCtx, lastOpTimeInBatch); // 2
- finalizer->record(lastOpTimeInBatch); // 3
+ ReplicationProcess::get(&opCtx)->getConsistencyMarkers()->setAppliedThrough(
+ &opCtx,
+ lastOpTimeInBatch); // 2
+ finalizer->record(lastOpTimeInBatch); // 3
}
}
@@ -1251,7 +1255,7 @@ StatusWith<OpTime> multiApply(OperationContext* opCtx,
prefetchOps(ops, workerPool);
}
- auto storage = StorageInterface::get(opCtx);
+ auto consistencyMarkers = ReplicationProcess::get(opCtx)->getConsistencyMarkers();
LOG(2) << "replication batch size is " << ops.size();
// Stop all readers until we're done. This also prevents doc-locking engines from deleting old
@@ -1272,14 +1276,14 @@ StatusWith<OpTime> multiApply(OperationContext* opCtx,
std::vector<MultiApplier::OperationPtrs> writerVectors(workerPool->getNumThreads());
ON_BLOCK_EXIT([&] { workerPool->join(); });
- storage->setOplogDeleteFromPoint(opCtx, ops.front().getTimestamp());
+ consistencyMarkers->setOplogDeleteFromPoint(opCtx, ops.front().getTimestamp());
scheduleWritesToOplog(opCtx, workerPool, ops);
fillWriterVectors(opCtx, &ops, &writerVectors);
workerPool->join();
- storage->setOplogDeleteFromPoint(opCtx, Timestamp());
- storage->setMinValidToAtLeast(opCtx, ops.back().getOpTime());
+ consistencyMarkers->setOplogDeleteFromPoint(opCtx, Timestamp());
+ consistencyMarkers->setMinValidToAtLeast(opCtx, ops.back().getOpTime());
applyOps(writerVectors, workerPool, applyOperation, &statusVector);
}
diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp
index 6b893f22e3e..3ecdd885777 100644
--- a/src/mongo/db/repl/sync_tail_test.cpp
+++ b/src/mongo/db/repl/sync_tail_test.cpp
@@ -48,8 +48,10 @@
#include "mongo/db/repl/bgsync.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/oplog_interface_local.h"
+#include "mongo/db/repl/replication_consistency_markers_mock.h"
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
+#include "mongo/db/repl/replication_process.h"
#include "mongo/db/repl/storage_interface.h"
#include "mongo/db/repl/storage_interface_mock.h"
#include "mongo/db/repl/sync_tail.h"
@@ -75,6 +77,7 @@ protected:
SyncTail::ApplyCommandInLockFn _applyCmd;
SyncTail::IncrementOpsAppliedStatsFn _incOps;
StorageInterfaceMock* _storageInterface = nullptr;
+ ReplicationProcess* _replicationProcess = nullptr;
// Implements the MultiApplier::ApplyOperationFn interface and does nothing.
static Status noopApplyOperationFn(MultiApplier::OperationPtrs*) {
@@ -118,6 +121,12 @@ void SyncTailTest::setUp() {
const std::vector<BSONObj>&) { return Status::OK(); };
StorageInterface::set(service, std::move(storageInterface));
+ _replicationProcess = new ReplicationProcess(
+ _storageInterface, stdx::make_unique<ReplicationConsistencyMarkersMock>());
+ ReplicationProcess::set(cc().getServiceContext(),
+ std::unique_ptr<ReplicationProcess>(_replicationProcess));
+
+
_opCtx = cc().makeOperationContext();
_opsApplied = 0;
_applyOp = [](OperationContext* opCtx,
@@ -133,6 +142,7 @@ void SyncTailTest::tearDown() {
_opCtx.reset();
ServiceContextMongoDTest::tearDown();
_storageInterface = nullptr;
+ ReplicationProcess::set(getServiceContext(), {});
}
SyncTailWithLocalDocumentFetcher::SyncTailWithLocalDocumentFetcher(const BSONObj& document)
diff --git a/src/mongo/dbtests/SConscript b/src/mongo/dbtests/SConscript
index 64079e4c712..34a511adeba 100644
--- a/src/mongo/dbtests/SConscript
+++ b/src/mongo/dbtests/SConscript
@@ -123,6 +123,7 @@ dbtest = env.Program(
"$BUILD_DIR/mongo/db/query/query_test_service_context",
"$BUILD_DIR/mongo/db/repl/drop_pending_collection_reaper",
"$BUILD_DIR/mongo/db/repl/repl_coordinator_global",
+ "$BUILD_DIR/mongo/db/repl/replication_consistency_markers_impl",
"$BUILD_DIR/mongo/db/repl/replmocks",
"$BUILD_DIR/mongo/db/serveronly",
"$BUILD_DIR/mongo/db/logical_clock",
diff --git a/src/mongo/dbtests/replica_set_tests.cpp b/src/mongo/dbtests/replica_set_tests.cpp
index 7ef712e1b4b..91fa21f8800 100644
--- a/src/mongo/dbtests/replica_set_tests.cpp
+++ b/src/mongo/dbtests/replica_set_tests.cpp
@@ -32,7 +32,9 @@
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/repl/drop_pending_collection_reaper.h"
#include "mongo/db/repl/last_vote.h"
+#include "mongo/db/repl/replication_consistency_markers_impl.h"
#include "mongo/db/repl/replication_coordinator_external_state_impl.h"
+#include "mongo/db/repl/replication_process.h"
#include "mongo/db/repl/storage_interface_mock.h"
#include "mongo/db/service_context.h"
#include "mongo/unittest/unittest.h"
@@ -51,10 +53,14 @@ protected:
_storageInterface = stdx::make_unique<repl::StorageInterfaceMock>();
_dropPendingCollectionReaper =
stdx::make_unique<repl::DropPendingCollectionReaper>(_storageInterface.get());
+ _replicationProcess = stdx::make_unique<repl::ReplicationProcess>(
+ _storageInterface.get(),
+ stdx::make_unique<repl::ReplicationConsistencyMarkersImpl>(_storageInterface.get()));
_replCoordExternalState = stdx::make_unique<repl::ReplicationCoordinatorExternalStateImpl>(
opCtx->getServiceContext(),
_dropPendingCollectionReaper.get(),
- _storageInterface.get());
+ _storageInterface.get(),
+ _replicationProcess.get());
}
void tearDown() {
@@ -65,6 +71,7 @@ protected:
_replCoordExternalState.reset();
_dropPendingCollectionReaper.reset();
_storageInterface.reset();
+ _replicationProcess.reset();
}
repl::ReplicationCoordinatorExternalStateImpl* getReplCoordExternalState() {
@@ -79,6 +86,7 @@ private:
std::unique_ptr<repl::ReplicationCoordinatorExternalStateImpl> _replCoordExternalState;
std::unique_ptr<repl::StorageInterface> _storageInterface;
std::unique_ptr<repl::DropPendingCollectionReaper> _dropPendingCollectionReaper;
+ std::unique_ptr<repl::ReplicationProcess> _replicationProcess;
};
TEST_F(ReplicaSetTest, ReplCoordExternalStateStoresLastVoteWithNewTerm) {
diff --git a/src/mongo/s/sharding_mongod_test_fixture.cpp b/src/mongo/s/sharding_mongod_test_fixture.cpp
index 186d6fdd903..655ff7c1efb 100644
--- a/src/mongo/s/sharding_mongod_test_fixture.cpp
+++ b/src/mongo/s/sharding_mongod_test_fixture.cpp
@@ -46,6 +46,7 @@
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/read_concern_args.h"
#include "mongo/db/repl/repl_settings.h"
+#include "mongo/db/repl/replication_consistency_markers_mock.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/repl/replication_process.h"
@@ -127,8 +128,10 @@ void ShardingMongodTestFixture::setUp() {
auto storagePtr = stdx::make_unique<repl::StorageInterfaceMock>();
- repl::ReplicationProcess::set(service,
- stdx::make_unique<repl::ReplicationProcess>(storagePtr.get()));
+ repl::ReplicationProcess::set(
+ service,
+ stdx::make_unique<repl::ReplicationProcess>(
+ storagePtr.get(), stdx::make_unique<repl::ReplicationConsistencyMarkersMock>()));
repl::ReplicationProcess::get(_opCtx.get())->initializeRollbackID(_opCtx.get());
repl::StorageInterface::set(service, std::move(storagePtr));