diff options
author | Benety Goh <benety@mongodb.com> | 2018-05-02 10:52:08 -0400 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2018-05-02 10:52:08 -0400 |
commit | ed967e1216becd9195a77cbce5bd0f9a40a54d5b (patch) | |
tree | 55be8e751d90ce09ffdb85567985b9e5e99d422a /src | |
parent | 17e5914811e5cc9f8dfe5d46b0771abfead6c9c3 (diff) | |
download | mongo-ed967e1216becd9195a77cbce5bd0f9a40a54d5b.tar.gz |
SERVER-32334 OplogApplier and SyncTail accept ReplicationConsistencyMarkers and StorageInterface at construction
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/repl/data_replicator_external_state_impl.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_applier.cpp | 7 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_applier.h | 10 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state_impl.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 9 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.h | 10 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail_test.cpp | 44 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail_test_fixture.cpp | 20 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail_test_fixture.h | 4 | ||||
-rw-r--r-- | src/mongo/dbtests/repltests.cpp | 6 | ||||
-rw-r--r-- | src/mongo/dbtests/storage_timestamp_tests.cpp | 19 |
11 files changed, 115 insertions, 24 deletions
diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.cpp b/src/mongo/db/repl/data_replicator_external_state_impl.cpp index ba3642a1dff..caa8ed1f711 100644 --- a/src/mongo/db/repl/data_replicator_external_state_impl.cpp +++ b/src/mongo/db/repl/data_replicator_external_state_impl.cpp @@ -36,8 +36,10 @@ #include "mongo/db/repl/oplog_buffer_blocking_queue.h" #include "mongo/db/repl/oplog_buffer_collection.h" #include "mongo/db/repl/oplog_buffer_proxy.h" +#include "mongo/db/repl/replication_consistency_markers.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/replication_coordinator_external_state.h" +#include "mongo/db/repl/replication_process.h" #include "mongo/db/repl/storage_interface.h" #include "mongo/db/repl/sync_tail.h" #include "mongo/db/server_parameters.h" @@ -154,7 +156,11 @@ StatusWith<OpTime> DataReplicatorExternalStateImpl::_multiApply(OperationContext OplogApplier::Observer* observer, const HostAndPort& source, ThreadPool* writerPool) { - SyncTail syncTail(observer, repl::multiInitialSyncApply, writerPool); + auto replicationProcess = ReplicationProcess::get(opCtx); + auto consistencyMarkers = replicationProcess->getConsistencyMarkers(); + auto storageInterface = StorageInterface::get(opCtx); + SyncTail syncTail( + observer, consistencyMarkers, storageInterface, repl::multiInitialSyncApply, writerPool); syncTail.setHostname(source.toString()); return syncTail.multiApply(opCtx, std::move(ops)); } diff --git a/src/mongo/db/repl/oplog_applier.cpp b/src/mongo/db/repl/oplog_applier.cpp index 3f3c527795f..1791f45566a 100644 --- a/src/mongo/db/repl/oplog_applier.cpp +++ b/src/mongo/db/repl/oplog_applier.cpp @@ -44,14 +44,19 @@ OplogApplier::OplogApplier(executor::TaskExecutor* executor, OplogBuffer* oplogBuffer, Observer* observer, ReplicationCoordinator* replCoord, + ReplicationConsistencyMarkers* consistencyMarkers, + StorageInterface* storageInterface, const OplogApplier::Options& options, ThreadPool* writerPool) : _executor(executor), _oplogBuffer(oplogBuffer), _observer(observer), _replCoord(replCoord), + _consistencyMarkers(consistencyMarkers), + _storageInterface(storageInterface), _options(options), - _syncTail(std::make_unique<SyncTail>(_observer, multiSyncApply, writerPool)) { + _syncTail(std::make_unique<SyncTail>( + _observer, _consistencyMarkers, _storageInterface, multiSyncApply, writerPool)) { invariant(!options.allowNamespaceNotFoundErrorsOnCrudOps); invariant(!options.relaxUniqueIndexConstraints); } diff --git a/src/mongo/db/repl/oplog_applier.h b/src/mongo/db/repl/oplog_applier.h index a198b9ecb91..19219e75401 100644 --- a/src/mongo/db/repl/oplog_applier.h +++ b/src/mongo/db/repl/oplog_applier.h @@ -37,7 +37,9 @@ #include "mongo/db/repl/multiapplier.h" #include "mongo/db/repl/oplog_buffer.h" #include "mongo/db/repl/oplog_entry.h" +#include "mongo/db/repl/replication_consistency_markers.h" #include "mongo/db/repl/replication_coordinator.h" +#include "mongo/db/repl/storage_interface.h" #include "mongo/executor/task_executor.h" #include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/future.h" @@ -76,6 +78,8 @@ public: OplogBuffer* oplogBuffer, Observer* observer, ReplicationCoordinator* replCoord, + ReplicationConsistencyMarkers* consistencyMarkers, + StorageInterface* storageInterface, const Options& options, ThreadPool* writerPool); @@ -111,6 +115,12 @@ private: // Not owned by us. ReplicationCoordinator* const _replCoord; + // Not owned by us. + ReplicationConsistencyMarkers* const _consistencyMarkers; + + // Not owned by us. + StorageInterface* const _storageInterface; + // Used to configure OplogApplier behavior. const Options _options; diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 807dd2ad6dc..d2c8aeb7f33 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -239,6 +239,8 @@ void ReplicationCoordinatorExternalStateImpl::startSteadyStateReplication( _oplogBuffer.get(), _bgSync.get(), replCoord, + _replicationProcess->getConsistencyMarkers(), + _storageInterface, OplogApplier::Options(), _writerPool.get()); _oplogApplierShutdownFuture = _oplogApplier->startup(); diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index b70f3c56793..3076ddbf35b 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -68,7 +68,6 @@ #include "mongo/db/repl/repl_set_config.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/replication_process.h" -#include "mongo/db/repl/storage_interface.h" #include "mongo/db/server_parameters.h" #include "mongo/db/service_context.h" #include "mongo/db/session.h" @@ -277,9 +276,15 @@ NamespaceStringOrUUID getNsOrUUID(const NamespaceString& nss, const BSONObj& op) } // namespace SyncTail::SyncTail(OplogApplier::Observer* observer, + ReplicationConsistencyMarkers* consistencyMarkers, + StorageInterface* storageInterface, MultiSyncApplyFunc func, ThreadPool* writerPool) - : _observer(observer), _applyFunc(func), _writerPool(writerPool) {} + : _observer(observer), + _consistencyMarkers(consistencyMarkers), + _storageInterface(storageInterface), + _applyFunc(func), + _writerPool(writerPool) {} SyncTail::~SyncTail() {} diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h index 186e9bd11e1..281e3c89af5 100644 --- a/src/mongo/db/repl/sync_tail.h +++ b/src/mongo/db/repl/sync_tail.h @@ -38,6 +38,8 @@ #include "mongo/db/repl/oplog_applier.h" #include "mongo/db/repl/oplog_buffer.h" #include "mongo/db/repl/oplog_entry.h" +#include "mongo/db/repl/replication_consistency_markers.h" +#include "mongo/db/repl/storage_interface.h" #include "mongo/stdx/functional.h" #include "mongo/stdx/mutex.h" #include "mongo/util/concurrency/thread_pool.h" @@ -77,7 +79,11 @@ public: * distributed across writer threads in 'writerPool'. Each writer thread applies its own vector * of operations using 'func'. The writer thread pool is not owned by us. */ - SyncTail(OplogApplier::Observer* observer, MultiSyncApplyFunc func, ThreadPool* writerPool); + SyncTail(OplogApplier::Observer* observer, + ReplicationConsistencyMarkers* consistencyMarkers, + StorageInterface* storageInterface, + MultiSyncApplyFunc func, + ThreadPool* writerPool); virtual ~SyncTail(); /** @@ -248,6 +254,8 @@ private: std::string _hostname; OplogApplier::Observer* const _observer; + ReplicationConsistencyMarkers* const _consistencyMarkers; + StorageInterface* const _storageInterface; // Function to use during applyOps MultiSyncApplyFunc _applyFunc; diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp index d995e4f61e2..52bc87f2402 100644 --- a/src/mongo/db/repl/sync_tail_test.cpp +++ b/src/mongo/db/repl/sync_tail_test.cpp @@ -128,14 +128,15 @@ public: }; SyncTailWithLocalDocumentFetcher::SyncTailWithLocalDocumentFetcher(const BSONObj& document) - : SyncTail(this, SyncTail::MultiSyncApplyFunc(), nullptr), _document(document) {} + : SyncTail(this, nullptr, nullptr, SyncTail::MultiSyncApplyFunc(), nullptr), + _document(document) {} BSONObj SyncTailWithLocalDocumentFetcher::getMissingDoc(OperationContext*, const OplogEntry&) { return _document; } SyncTailWithOperationContextChecker::SyncTailWithOperationContextChecker() - : SyncTail(nullptr, SyncTail::MultiSyncApplyFunc(), nullptr) {} + : SyncTail(nullptr, nullptr, nullptr, SyncTail::MultiSyncApplyFunc(), nullptr) {} bool SyncTailWithOperationContextChecker::fetchAndInsertMissingDocument(OperationContext* opCtx, const OplogEntry&) { @@ -378,11 +379,17 @@ TEST_F(SyncTailTest, SyncApplyCommandThrowsException) { DEATH_TEST_F(SyncTailTest, MultiApplyAbortsWhenNoOperationsAreGiven, "!ops.empty()") { auto writerPool = SyncTail::makeWriterPool(); - SyncTail syncTail(nullptr, noopApplyOperationFn, writerPool.get()); + SyncTail syncTail(nullptr, + getConsistencyMarkers(), + getStorageInterface(), + noopApplyOperationFn, + writerPool.get()); syncTail.multiApply(_opCtx.get(), {}).getStatus().ignore(); } bool _testOplogEntryIsForCappedCollection(OperationContext* opCtx, + ReplicationConsistencyMarkers* const consistencyMarkers, + StorageInterface* const storageInterface, const NamespaceString& nss, const CollectionOptions& options) { auto writerPool = SyncTail::makeWriterPool(); @@ -401,7 +408,8 @@ bool _testOplogEntryIsForCappedCollection(OperationContext* opCtx, auto op = makeInsertDocumentOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss, BSON("a" << 1)); ASSERT_FALSE(op.isForCappedCollection); - SyncTail syncTail(nullptr, applyOperationFn, writerPool.get()); + SyncTail syncTail( + nullptr, consistencyMarkers, storageInterface, applyOperationFn, writerPool.get()); auto lastOpTime = unittest::assertGet(syncTail.multiApply(opCtx, {op})); ASSERT_EQUALS(op.getOpTime(), lastOpTime); @@ -416,14 +424,18 @@ TEST_F( SyncTailTest, MultiApplyDoesNotSetOplogEntryIsForCappedCollectionWhenProcessingNonCappedCollectionInsertOperation) { NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); - ASSERT_FALSE(_testOplogEntryIsForCappedCollection(_opCtx.get(), nss, CollectionOptions())); + ASSERT_FALSE(_testOplogEntryIsForCappedCollection( + _opCtx.get(), getConsistencyMarkers(), getStorageInterface(), nss, CollectionOptions())); } TEST_F(SyncTailTest, MultiApplySetsOplogEntryIsForCappedCollectionWhenProcessingCappedCollectionInsertOperation) { NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); - ASSERT_TRUE( - _testOplogEntryIsForCappedCollection(_opCtx.get(), nss, createOplogCollectionOptions())); + ASSERT_TRUE(_testOplogEntryIsForCappedCollection(_opCtx.get(), + getConsistencyMarkers(), + getStorageInterface(), + nss, + createOplogCollectionOptions())); } TEST_F(SyncTailTest, MultiApplyAssignsOperationsToWriterThreadsBasedOnNamespaceHash) { @@ -453,7 +465,11 @@ TEST_F(SyncTailTest, MultiApplyAssignsOperationsToWriterThreadsBasedOnNamespaceH auto op1 = makeInsertDocumentOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss1, BSON("x" << 1)); auto op2 = makeInsertDocumentOplogEntry({Timestamp(Seconds(2), 0), 1LL}, nss2, BSON("x" << 2)); - SyncTail syncTail(nullptr, applyOperationFn, writerPool.get()); + SyncTail syncTail(nullptr, + getConsistencyMarkers(), + getStorageInterface(), + applyOperationFn, + writerPool.get()); auto lastOpTime = unittest::assertGet(syncTail.multiApply(_opCtx.get(), {op1, op2})); ASSERT_EQUALS(op2.getOpTime(), lastOpTime); @@ -1607,7 +1623,8 @@ TEST_F(SyncTailTxnTableTest, SimpleWriteWithTxn) { date); auto writerPool = SyncTail::makeWriterPool(); - SyncTail syncTail(nullptr, multiSyncApply, writerPool.get()); + SyncTail syncTail( + nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get()); ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp})); checkTxnTable(sessionInfo, {Timestamp(1, 0), 1}, date); @@ -1637,7 +1654,8 @@ TEST_F(SyncTailTxnTableTest, WriteWithTxnMixedWithDirectWriteToTxnTable) { Date_t::now()); auto writerPool = SyncTail::makeWriterPool(); - SyncTail syncTail(nullptr, multiSyncApply, writerPool.get()); + SyncTail syncTail( + nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get()); ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp, deleteOp})); DBDirectClient client(_opCtx.get()); @@ -1681,7 +1699,8 @@ TEST_F(SyncTailTxnTableTest, InterleavedWriteWithTxnMixedWithDirectWriteToTxnTab date); auto writerPool = SyncTail::makeWriterPool(); - SyncTail syncTail(nullptr, multiSyncApply, writerPool.get()); + SyncTail syncTail( + nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get()); ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp, deleteOp, insertOp2})); checkTxnTable(sessionInfo, {Timestamp(3, 0), 2}, date); @@ -1727,7 +1746,8 @@ TEST_F(SyncTailTxnTableTest, MultiApplyUpdatesTheTransactionTable) { {Timestamp(Seconds(7), 0), 1LL}, ns3, BSON("_id" << 0), info); auto writerPool = SyncTail::makeWriterPool(); - SyncTail syncTail(nullptr, multiSyncApply, writerPool.get()); + SyncTail syncTail( + nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get()); ASSERT_OK(syncTail.multiApply( _opCtx.get(), {opSingle, opDiffTxnSmaller, opDiffTxnLarger, opSameTxnSooner, opSameTxnLater, opNoTxn})); diff --git a/src/mongo/db/repl/sync_tail_test_fixture.cpp b/src/mongo/db/repl/sync_tail_test_fixture.cpp index 57c1e8c83a5..bd6453eb042 100644 --- a/src/mongo/db/repl/sync_tail_test_fixture.cpp +++ b/src/mongo/db/repl/sync_tail_test_fixture.cpp @@ -131,6 +131,14 @@ void SyncTailTest::tearDown() { ServiceContextMongoDTest::tearDown(); } +ReplicationConsistencyMarkers* SyncTailTest::getConsistencyMarkers() const { + return _replicationProcess->getConsistencyMarkers(); +} + +StorageInterface* SyncTailTest::getStorageInterface() const { + return StorageInterface::get(_opCtx.get()); +} + void SyncTailTest::_testSyncApplyCrudOperation(ErrorCodes::Error expectedError, const BSONObj& op, bool expectedApplyOpCalled) { @@ -187,7 +195,11 @@ Status SyncTailTest::runOpSteadyState(const OplogEntry& op) { } Status SyncTailTest::runOpsSteadyState(std::vector<OplogEntry> ops) { - SyncTail syncTail(nullptr, SyncTail::MultiSyncApplyFunc(), nullptr); + SyncTail syncTail(nullptr, + getConsistencyMarkers(), + getStorageInterface(), + SyncTail::MultiSyncApplyFunc(), + nullptr); MultiApplier::OperationPtrs opsPtrs; for (auto& op : ops) { opsPtrs.push_back(&op); @@ -201,7 +213,11 @@ Status SyncTailTest::runOpInitialSync(const OplogEntry& op) { } Status SyncTailTest::runOpsInitialSync(std::vector<OplogEntry> ops) { - SyncTail syncTail(nullptr, SyncTail::MultiSyncApplyFunc(), nullptr); + SyncTail syncTail(nullptr, + getConsistencyMarkers(), + getStorageInterface(), + SyncTail::MultiSyncApplyFunc(), + nullptr); MultiApplier::OperationPtrs opsPtrs; for (auto& op : ops) { opsPtrs.push_back(&op); diff --git a/src/mongo/db/repl/sync_tail_test_fixture.h b/src/mongo/db/repl/sync_tail_test_fixture.h index 2ac1b9b74ba..6ae19b96ac6 100644 --- a/src/mongo/db/repl/sync_tail_test_fixture.h +++ b/src/mongo/db/repl/sync_tail_test_fixture.h @@ -31,6 +31,7 @@ #include "mongo/base/status.h" #include "mongo/db/concurrency/lock_manager_defs.h" #include "mongo/db/op_observer_noop.h" +#include "mongo/db/repl/replication_consistency_markers.h" #include "mongo/db/repl/sync_tail.h" #include "mongo/db/service_context_d_test_fixture.h" @@ -124,6 +125,9 @@ protected: void setUp() override; void tearDown() override; + ReplicationConsistencyMarkers* getConsistencyMarkers() const; + StorageInterface* getStorageInterface() const; + Status runOpSteadyState(const OplogEntry& op); Status runOpsSteadyState(std::vector<OplogEntry> ops); Status runOpInitialSync(const OplogEntry& entry); diff --git a/src/mongo/dbtests/repltests.cpp b/src/mongo/dbtests/repltests.cpp index 4eff300faf9..29da873bac0 100644 --- a/src/mongo/dbtests/repltests.cpp +++ b/src/mongo/dbtests/repltests.cpp @@ -1346,7 +1346,9 @@ public: class SyncTest : public SyncTail { public: bool returnEmpty; - SyncTest() : SyncTail(nullptr, SyncTail::MultiSyncApplyFunc(), nullptr), returnEmpty(false) {} + SyncTest() + : SyncTail(nullptr, nullptr, nullptr, SyncTail::MultiSyncApplyFunc(), nullptr), + returnEmpty(false) {} virtual ~SyncTest() {} BSONObj getMissingDoc(OperationContext* opCtx, const OplogEntry& oplogEntry) override { if (returnEmpty) { @@ -1382,7 +1384,7 @@ public: // this should fail because we can't connect try { - SyncTail badSource(nullptr, SyncTail::MultiSyncApplyFunc(), nullptr); + SyncTail badSource(nullptr, nullptr, nullptr, SyncTail::MultiSyncApplyFunc(), nullptr); badSource.setHostname("localhost:123"); OldClientContext ctx(&_opCtx, ns()); diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp index d2c31fb6e1a..ee77ac7d683 100644 --- a/src/mongo/dbtests/storage_timestamp_tests.cpp +++ b/src/mongo/dbtests/storage_timestamp_tests.cpp @@ -91,6 +91,7 @@ public: const Timestamp nullTs = Timestamp(); const int presentTerm = 1; repl::ReplicationCoordinatorMock* _coordinatorMock; + repl::ReplicationConsistencyMarkers* _consistencyMarkers; StorageTimestampTest() { if (mongo::storageGlobalParams.engine != "wiredTiger") { @@ -118,6 +119,9 @@ public: cc().getServiceContext(), std::unique_ptr<repl::ReplicationProcess>(replicationProcess)); + _consistencyMarkers = + repl::ReplicationProcess::get(cc().getServiceContext())->getConsistencyMarkers(); + // Since the Client object persists across tests, even though the global // ReplicationCoordinator does not, we need to clear the last op associated with the client // to avoid the invariant in ReplClientInfo::setLastOp that the optime only goes forward. @@ -1226,8 +1230,10 @@ public: << doc2)); std::vector<repl::OplogEntry> ops = {op0, op1, op2}; + auto storageInterface = repl::StorageInterface::get(_opCtx); auto writerPool = repl::SyncTail::makeWriterPool(); - repl::SyncTail syncTail(nullptr, repl::multiSyncApply, writerPool.get()); + repl::SyncTail syncTail( + nullptr, _consistencyMarkers, storageInterface, repl::multiSyncApply, writerPool.get()); ASSERT_EQUALS(op2.getOpTime(), unittest::assertGet(syncTail.multiApply(_opCtx, ops))); AutoGetCollection autoColl(_opCtx, nss, LockMode::MODE_X, LockMode::MODE_IX); @@ -1329,8 +1335,13 @@ public: // after bulk index builds. std::vector<repl::OplogEntry> ops = {op0, createIndexOp, op1, op2}; + auto storageInterface = repl::StorageInterface::get(_opCtx); auto writerPool = repl::SyncTail::makeWriterPool(); - repl::SyncTail syncTail(nullptr, repl::multiInitialSyncApply, writerPool.get()); + repl::SyncTail syncTail(nullptr, + _consistencyMarkers, + storageInterface, + repl::multiInitialSyncApply, + writerPool.get()); auto lastTime = unittest::assertGet(syncTail.multiApply(_opCtx, ops)); ASSERT_EQ(lastTime.getTimestamp(), insertTime2.asTimestamp()); @@ -1891,8 +1902,10 @@ public: << doc0)); // Apply the operation. + auto storageInterface = repl::StorageInterface::get(_opCtx); auto writerPool = repl::SyncTail::makeWriterPool(1); - repl::SyncTail syncTail(nullptr, applyOperationFn, writerPool.get()); + repl::SyncTail syncTail( + nullptr, _consistencyMarkers, storageInterface, applyOperationFn, writerPool.get()); auto lastOpTime = unittest::assertGet(syncTail.multiApply(_opCtx, {insertOp})); ASSERT_EQ(insertOp.getOpTime(), lastOpTime); |