diff options
author | Benety Goh <benety@mongodb.com> | 2018-03-09 21:53:14 -0500 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2018-03-09 21:53:31 -0500 |
commit | a92dae55be023cadeb43904da227000017e85f72 (patch) | |
tree | b03b2003839464199f0bab0af4fc5d076a1f93e1 | |
parent | 25d521ca3283fcb21e125b30403e0036d05a8338 (diff) | |
download | mongo-a92dae55be023cadeb43904da227000017e85f72.tar.gz |
SERVER-33732 extend SyncTail::MultiSyncApplyFunc to accept an OperationContext
20 files changed, 92 insertions, 87 deletions
diff --git a/src/mongo/db/repl/data_replicator_external_state.h b/src/mongo/db/repl/data_replicator_external_state.h index 442022eeeeb..75f9071b74a 100644 --- a/src/mongo/db/repl/data_replicator_external_state.h +++ b/src/mongo/db/repl/data_replicator_external_state.h @@ -141,7 +141,8 @@ private: * * Used exclusively by the InitialSyncer to construct a MultiApplier. */ - virtual Status _multiInitialSyncApply(MultiApplier::OperationPtrs* ops, + virtual Status _multiInitialSyncApply(OperationContext* opCtx, + MultiApplier::OperationPtrs* ops, const HostAndPort& source, AtomicUInt32* fetchCount, WorkerMultikeyPathInfo* workerMultikeyPathInfo) = 0; diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.cpp b/src/mongo/db/repl/data_replicator_external_state_impl.cpp index 1d93a2fb5cf..05b71aa8213 100644 --- a/src/mongo/db/repl/data_replicator_external_state_impl.cpp +++ b/src/mongo/db/repl/data_replicator_external_state_impl.cpp @@ -127,12 +127,13 @@ StatusWith<OpTime> DataReplicatorExternalStateImpl::_multiApply( } Status DataReplicatorExternalStateImpl::_multiInitialSyncApply( + OperationContext* opCtx, MultiApplier::OperationPtrs* ops, const HostAndPort& source, AtomicUInt32* fetchCount, WorkerMultikeyPathInfo* workerMultikeyPathInfo) { return _replicationCoordinatorExternalState->multiInitialSyncApply( - ops, source, fetchCount, workerMultikeyPathInfo); + opCtx, ops, source, fetchCount, workerMultikeyPathInfo); } ReplicationCoordinator* DataReplicatorExternalStateImpl::getReplicationCoordinator() const { diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.h b/src/mongo/db/repl/data_replicator_external_state_impl.h index b66dbaff53b..3b5a0078eb1 100644 --- a/src/mongo/db/repl/data_replicator_external_state_impl.h +++ b/src/mongo/db/repl/data_replicator_external_state_impl.h @@ -70,7 +70,8 @@ private: MultiApplier::Operations ops, MultiApplier::ApplyOperationFn applyOperation) override; - Status _multiInitialSyncApply(MultiApplier::OperationPtrs* ops, + Status _multiInitialSyncApply(OperationContext* opCtx, + MultiApplier::OperationPtrs* ops, const HostAndPort& source, AtomicUInt32* fetchCount, WorkerMultikeyPathInfo* workerMultikeyPathInfo) override; diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.cpp b/src/mongo/db/repl/data_replicator_external_state_mock.cpp index 71ad72d166d..8d605e61e61 100644 --- a/src/mongo/db/repl/data_replicator_external_state_mock.cpp +++ b/src/mongo/db/repl/data_replicator_external_state_mock.cpp @@ -102,12 +102,13 @@ StatusWith<OpTime> DataReplicatorExternalStateMock::_multiApply( } Status DataReplicatorExternalStateMock::_multiInitialSyncApply( + OperationContext* opCtx, MultiApplier::OperationPtrs* ops, const HostAndPort& source, AtomicUInt32* fetchCount, WorkerMultikeyPathInfo* workerMultikeyPathInfo) { - return multiInitialSyncApplyFn(ops, source, fetchCount, workerMultikeyPathInfo); + return multiInitialSyncApplyFn(opCtx, ops, source, fetchCount, workerMultikeyPathInfo); } } // namespace repl diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.h b/src/mongo/db/repl/data_replicator_external_state_mock.h index 1b93307f2fa..7d5f058627b 100644 --- a/src/mongo/db/repl/data_replicator_external_state_mock.h +++ b/src/mongo/db/repl/data_replicator_external_state_mock.h @@ -90,12 +90,16 @@ public: // Override to change _multiInitialSyncApply behavior. using MultiInitialSyncApplyFn = - stdx::function<Status(MultiApplier::OperationPtrs* ops, + stdx::function<Status(OperationContext* opCtx, + MultiApplier::OperationPtrs* ops, const HostAndPort& source, AtomicUInt32* fetchCount, WorkerMultikeyPathInfo* workerMultikeyPathInfo)>; - MultiInitialSyncApplyFn multiInitialSyncApplyFn = []( - MultiApplier::OperationPtrs*, const HostAndPort&, AtomicUInt32*, WorkerMultikeyPathInfo*) { + MultiInitialSyncApplyFn multiInitialSyncApplyFn = [](OperationContext*, + MultiApplier::OperationPtrs*, + const HostAndPort&, + AtomicUInt32*, + WorkerMultikeyPathInfo*) { return Status::OK(); }; @@ -106,7 +110,8 @@ private: MultiApplier::Operations ops, MultiApplier::ApplyOperationFn applyOperation) override; - Status _multiInitialSyncApply(MultiApplier::OperationPtrs* ops, + Status _multiInitialSyncApply(OperationContext* opCtx, + MultiApplier::OperationPtrs* ops, const HostAndPort& source, AtomicUInt32* fetchCount, WorkerMultikeyPathInfo* workerMultikeyPathInfo) override; diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp index f13ec7cde42..785dfa51864 100644 --- a/src/mongo/db/repl/initial_syncer.cpp +++ b/src/mongo/db/repl/initial_syncer.cpp @@ -935,10 +935,11 @@ void InitialSyncer::_getNextApplierBatchCallback( if (!ops.empty()) { _fetchCount.store(0); MultiApplier::ApplyOperationFn applyOperationsForEachReplicationWorkerThreadFn = - [ =, source = _syncSource ](MultiApplier::OperationPtrs * x, + [ =, source = _syncSource ](OperationContext * opCtx, + MultiApplier::OperationPtrs * x, WorkerMultikeyPathInfo * workerMultikeyPathInfo) { return _dataReplicatorExternalState->_multiInitialSyncApply( - x, source, &_fetchCount, workerMultikeyPathInfo); + opCtx, x, source, &_fetchCount, workerMultikeyPathInfo); }; MultiApplier::MultiApplyFn applyBatchOfOperationsFn = [=](OperationContext* opCtx, diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp index 9d01a75c1c9..04ce849a71e 100644 --- a/src/mongo/db/repl/initial_syncer_test.cpp +++ b/src/mongo/db/repl/initial_syncer_test.cpp @@ -3573,16 +3573,17 @@ TEST_F( // missing document. // This forces InitialSyncer to evaluate its end timestamp for applying operations after each // batch. - getExternalState()->multiApplyFn = [](OperationContext*, + getExternalState()->multiApplyFn = [](OperationContext* opCtx, const MultiApplier::Operations& ops, MultiApplier::ApplyOperationFn applyOperation) { // 'OperationPtr*' is ignored by our overridden _multiInitialSyncApply(). - ASSERT_OK(applyOperation(nullptr, nullptr)); + ASSERT_OK(applyOperation(opCtx, nullptr, nullptr)); return ops.back().getOpTime(); }; bool fetchCountIncremented = false; getExternalState()->multiInitialSyncApplyFn = - [&fetchCountIncremented](MultiApplier::OperationPtrs*, + [&fetchCountIncremented](OperationContext*, + MultiApplier::OperationPtrs*, const HostAndPort&, AtomicUInt32* fetchCount, WorkerMultikeyPathInfo*) { diff --git a/src/mongo/db/repl/multiapplier.h b/src/mongo/db/repl/multiapplier.h index e3844e8b0b6..1f3392a946e 100644 --- a/src/mongo/db/repl/multiapplier.h +++ b/src/mongo/db/repl/multiapplier.h @@ -68,11 +68,13 @@ public: using CallbackFn = stdx::function<void(const Status&)>; /** - * Type of function to to apply a single operation. In production, this function - * would have the same outcome as calling SyncTail::syncApply() (oplog application mode - * will be embedded in the function implementation). + * Type of function for a writer thread during oplog application to apply a set of operations + * that have been assigned (hashed by SyncTail::fillWriterVectors()) to that writer thread. + * In production, this function would have the same outcome as calling SyncTail::syncApply() + * (oplog application mode will be embedded in the function implementation). */ - using ApplyOperationFn = stdx::function<Status(OperationPtrs*, WorkerMultikeyPathInfo*)>; + using ApplyOperationFn = + stdx::function<Status(OperationContext*, OperationPtrs*, WorkerMultikeyPathInfo*)>; using MultiApplyFn = stdx::function<StatusWith<OpTime>( OperationContext*, MultiApplier::Operations, MultiApplier::ApplyOperationFn)>; diff --git a/src/mongo/db/repl/multiapplier_test.cpp b/src/mongo/db/repl/multiapplier_test.cpp index 5227eed6c20..023d99d0796 100644 --- a/src/mongo/db/repl/multiapplier_test.cpp +++ b/src/mongo/db/repl/multiapplier_test.cpp @@ -59,7 +59,7 @@ void MultiApplierTest::setUp() { launchExecutorThread(); } -Status applyOperation(MultiApplier::OperationPtrs*, WorkerMultikeyPathInfo*) { +Status applyOperation(OperationContext*, MultiApplier::OperationPtrs*, WorkerMultikeyPathInfo*) { return Status::OK(); }; diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h index ec457252d2b..88e96414736 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state.h +++ b/src/mongo/db/repl/replication_coordinator_external_state.h @@ -303,7 +303,8 @@ public: * multikey at the end of the batch. * */ - virtual Status multiInitialSyncApply(MultiApplier::OperationPtrs* ops, + virtual Status multiInitialSyncApply(OperationContext* opCtx, + MultiApplier::OperationPtrs* ops, const HostAndPort& source, AtomicUInt32* fetchCount, WorkerMultikeyPathInfo* workerMultikeyPathInfo) = 0; diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index c5ae515cd07..ae0044bb392 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -859,6 +859,7 @@ StatusWith<OpTime> ReplicationCoordinatorExternalStateImpl::multiApply( } Status ReplicationCoordinatorExternalStateImpl::multiInitialSyncApply( + OperationContext* opCtx, MultiApplier::OperationPtrs* ops, const HostAndPort& source, AtomicUInt32* fetchCount, @@ -869,7 +870,7 @@ Status ReplicationCoordinatorExternalStateImpl::multiInitialSyncApply( // be accessing any SyncTail functionality that require these constructor parameters. SyncTail syncTail(nullptr, SyncTail::MultiSyncApplyFunc(), nullptr); syncTail.setHostname(source.toString()); - return repl::multiInitialSyncApply(ops, &syncTail, fetchCount, workerMultikeyPathInfo); + return repl::multiInitialSyncApply(opCtx, ops, &syncTail, fetchCount, workerMultikeyPathInfo); } std::unique_ptr<OplogBuffer> ReplicationCoordinatorExternalStateImpl::makeInitialSyncOplogBuffer( diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h index a303c88e990..3d2ffb8871c 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h @@ -109,7 +109,8 @@ public: virtual StatusWith<OpTime> multiApply(OperationContext* opCtx, MultiApplier::Operations ops, MultiApplier::ApplyOperationFn applyOperation) override; - virtual Status multiInitialSyncApply(MultiApplier::OperationPtrs* ops, + virtual Status multiInitialSyncApply(OperationContext* opCtx, + MultiApplier::OperationPtrs* ops, const HostAndPort& source, AtomicUInt32* fetchCount, WorkerMultikeyPathInfo* workerMultikeyPathInfo) override; diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp index a5ec47110c4..19f345c12ec 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp @@ -251,6 +251,7 @@ StatusWith<OpTime> ReplicationCoordinatorExternalStateMock::multiApply( } Status ReplicationCoordinatorExternalStateMock::multiInitialSyncApply( + OperationContext* opCtx, MultiApplier::OperationPtrs* ops, const HostAndPort& source, AtomicUInt32* fetchCount, diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h index 675b9ca94fe..bce130aead5 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h @@ -97,7 +97,8 @@ public: virtual StatusWith<OpTime> multiApply(OperationContext* opCtx, MultiApplier::Operations ops, MultiApplier::ApplyOperationFn applyOperation) override; - virtual Status multiInitialSyncApply(MultiApplier::OperationPtrs* ops, + virtual Status multiInitialSyncApply(OperationContext* opCtx, + MultiApplier::OperationPtrs* ops, const HostAndPort& source, AtomicUInt32* fetchCount, WorkerMultikeyPathInfo* workerMultikeyPathInfo) override; diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 8eff5a066fc..6a1acb6e439 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -448,7 +448,10 @@ void applyOps(std::vector<MultiApplier::OperationPtrs>& writerVectors, &writer = writerVectors.at(i), &status = statusVector->at(i), &workerMultikeyPathInfo = workerMultikeyPathInfo->at(i) - ] { status = func(&writer, &workerMultikeyPathInfo); })); + ] { + auto opCtx = cc().makeOperationContext(); + status = func(opCtx.get(), &writer, &workerMultikeyPathInfo); + })); } } } @@ -706,9 +709,10 @@ SessionRecordMap getLatestSessionRecords(const MultiApplier::Operations& ops) { * this batch, it will not be updated. */ StatusWith<OpTime> SyncTail::multiApply(OperationContext* opCtx, MultiApplier::Operations ops) { - auto applyOperation = [this](MultiApplier::OperationPtrs* ops, + auto applyOperation = [this](OperationContext* opCtx, + MultiApplier::OperationPtrs* ops, WorkerMultikeyPathInfo* workerMultikeyPathInfo) -> Status { - _applyFunc(ops, this, workerMultikeyPathInfo); + _applyFunc(opCtx, ops, this, workerMultikeyPathInfo); // This function is used by 3.2 initial sync and steady state data replication. // _applyFunc() will throw or abort on error, so we return OK here. return Status::OK(); @@ -1224,16 +1228,15 @@ bool SyncTail::fetchAndInsertMissingDocument(OperationContext* opCtx, } // This free function is used by the writer threads to apply each op -void multiSyncApply(MultiApplier::OperationPtrs* ops, +void multiSyncApply(OperationContext* opCtx, + MultiApplier::OperationPtrs* ops, SyncTail* st, WorkerMultikeyPathInfo* workerMultikeyPathInfo) { - auto opCtx = cc().makeOperationContext(); auto syncApply = []( OperationContext* opCtx, const BSONObj& op, OplogApplication::Mode oplogApplicationMode) { return SyncTail::syncApply(opCtx, op, oplogApplicationMode); }; - fassertNoTrace(16359, - multiSyncApply_noAbort(opCtx.get(), ops, workerMultikeyPathInfo, syncApply)); + fassertNoTrace(16359, multiSyncApply_noAbort(opCtx, ops, workerMultikeyPathInfo, syncApply)); } Status multiSyncApply_noAbort(OperationContext* opCtx, @@ -1295,19 +1298,11 @@ Status multiSyncApply_noAbort(OperationContext* opCtx, return Status::OK(); } -Status multiInitialSyncApply(MultiApplier::OperationPtrs* ops, +Status multiInitialSyncApply(OperationContext* opCtx, + MultiApplier::OperationPtrs* ops, SyncTail* st, AtomicUInt32* fetchCount, WorkerMultikeyPathInfo* workerMultikeyPathInfo) { - auto opCtx = cc().makeOperationContext(); - return multiInitialSyncApply_noAbort(opCtx.get(), ops, st, fetchCount, workerMultikeyPathInfo); -} - -Status multiInitialSyncApply_noAbort(OperationContext* opCtx, - MultiApplier::OperationPtrs* ops, - SyncTail* st, - AtomicUInt32* fetchCount, - WorkerMultikeyPathInfo* workerMultikeyPathInfo) { UnreplicatedWritesBlock uwb(opCtx); DisableDocumentValidation validationDisabler(opCtx); ShouldNotConflictWithSecondaryBatchApplicationBlock shouldNotConflictBlock(opCtx->lockState()); diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h index b322410f684..63c4cf31f7f 100644 --- a/src/mongo/db/repl/sync_tail.h +++ b/src/mongo/db/repl/sync_tail.h @@ -60,7 +60,8 @@ class OpTime; */ class SyncTail { public: - using MultiSyncApplyFunc = stdx::function<void(MultiApplier::OperationPtrs* ops, + using MultiSyncApplyFunc = stdx::function<void(OperationContext* opCtx, + MultiApplier::OperationPtrs* ops, SyncTail* st, WorkerMultikeyPathInfo* workerMultikeyPathInfo)>; @@ -278,12 +279,13 @@ StatusWith<OpTime> multiApply(OperationContext* opCtx, // They consume the passed in OperationPtrs and callers should not make any assumptions about the // state of the container after calling. However, these functions cannot modify the pointed-to // operations because the OperationPtrs container contains const pointers. -void multiSyncApply(MultiApplier::OperationPtrs* ops, +void multiSyncApply(OperationContext* opCtx, + MultiApplier::OperationPtrs* ops, SyncTail* st, WorkerMultikeyPathInfo* workerMultikeyPathInfo); -// Used by 3.4 initial sync. -Status multiInitialSyncApply(MultiApplier::OperationPtrs* ops, +Status multiInitialSyncApply(OperationContext* opCtx, + MultiApplier::OperationPtrs* ops, SyncTail* st, AtomicUInt32* fetchCount, WorkerMultikeyPathInfo* workerMultikeyPathInfo); @@ -300,15 +302,5 @@ Status multiSyncApply_noAbort(OperationContext* opCtx, WorkerMultikeyPathInfo* workerMultikeyPathInfo, SyncApplyFn syncApply); -/** - * Testing-only version of multiInitialSyncApply that accepts an external operation context and - * returns an error instead of aborting. - */ -Status multiInitialSyncApply_noAbort(OperationContext* opCtx, - MultiApplier::OperationPtrs* ops, - SyncTail* st, - AtomicUInt32* fetchCount, - WorkerMultikeyPathInfo* workerMultikeyPathInfo); - } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp index 96b77faa606..28c5ef44156 100644 --- a/src/mongo/db/repl/sync_tail_test.cpp +++ b/src/mongo/db/repl/sync_tail_test.cpp @@ -564,7 +564,8 @@ bool _testOplogEntryIsForCappedCollection(OperationContext* opCtx, const CollectionOptions& options) { auto writerPool = SyncTail::makeWriterPool(); MultiApplier::Operations operationsApplied; - auto applyOperationFn = [&operationsApplied](MultiApplier::OperationPtrs* operationsToApply, + auto applyOperationFn = [&operationsApplied](OperationContext* opCtx, + MultiApplier::OperationPtrs* operationsToApply, WorkerMultikeyPathInfo*) -> Status { for (auto&& opPtr : *operationsToApply) { operationsApplied.push_back(*opPtr); @@ -613,7 +614,8 @@ TEST_F(SyncTailTest, MultiApplyAssignsOperationsToWriterThreadsBasedOnNamespaceH stdx::mutex mutex; std::vector<MultiApplier::Operations> operationsApplied; auto applyOperationFn = - [&mutex, &operationsApplied](MultiApplier::OperationPtrs* operationsForWriterThreadToApply, + [&mutex, &operationsApplied](OperationContext* opCtx, + MultiApplier::OperationPtrs* operationsForWriterThreadToApply, WorkerMultikeyPathInfo*) -> Status { stdx::lock_guard<stdx::mutex> lock(mutex); operationsApplied.emplace_back(); @@ -781,67 +783,65 @@ TEST_F(SyncTailTest, MultiApplyUpdatesTheTransactionTable) { TEST_F(SyncTailTest, MultiSyncApplyUsesSyncApplyToApplyOperation) { NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); auto op = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss); - _opCtx.reset(); MultiApplier::OperationPtrs ops = {&op}; WorkerMultikeyPathInfo pathInfo; - multiSyncApply(&ops, nullptr, &pathInfo); + multiSyncApply(_opCtx.get(), &ops, nullptr, &pathInfo); // Collection should be created after SyncTail::syncApply() processes operation. - _opCtx = cc().makeOperationContext(); ASSERT_TRUE(AutoGetCollectionForReadCommand(_opCtx.get(), nss).getCollection()); } -void testWorkerMultikeyPaths(const OplogEntry& op, unsigned long numPaths) { +void testWorkerMultikeyPaths(OperationContext* opCtx, + const OplogEntry& op, + unsigned long numPaths) { WorkerMultikeyPathInfo pathInfo; MultiApplier::OperationPtrs ops = {&op}; - multiSyncApply(&ops, nullptr, &pathInfo); + multiSyncApply(opCtx, &ops, nullptr, &pathInfo); ASSERT_EQ(pathInfo.size(), numPaths); } TEST_F(SyncTailTest, MultiSyncApplyAddsWorkerMultikeyPathInfoOnInsert) { NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); - _opCtx.reset(); { auto op = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss); - testWorkerMultikeyPaths(op, 0UL); + testWorkerMultikeyPaths(_opCtx.get(), op, 0UL); } { auto keyPattern = BSON("a" << 1); auto op = makeCreateIndexOplogEntry({Timestamp(Seconds(2), 0), 1LL}, nss, "a_1", keyPattern); - testWorkerMultikeyPaths(op, 0UL); + testWorkerMultikeyPaths(_opCtx.get(), op, 0UL); } { auto doc = BSON("_id" << 1 << "a" << BSON_ARRAY(4 << 5)); auto op = makeInsertDocumentOplogEntry({Timestamp(Seconds(3), 0), 1LL}, nss, doc); - testWorkerMultikeyPaths(op, 1UL); + testWorkerMultikeyPaths(_opCtx.get(), op, 1UL); } } TEST_F(SyncTailTest, MultiSyncApplyAddsMultipleWorkerMultikeyPathInfo) { NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); - _opCtx.reset(); { auto op = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss); - testWorkerMultikeyPaths(op, 0UL); + testWorkerMultikeyPaths(_opCtx.get(), op, 0UL); } { auto keyPattern = BSON("a" << 1); auto op = makeCreateIndexOplogEntry({Timestamp(Seconds(2), 0), 1LL}, nss, "a_1", keyPattern); - testWorkerMultikeyPaths(op, 0UL); + testWorkerMultikeyPaths(_opCtx.get(), op, 0UL); } { auto keyPattern = BSON("b" << 1); auto op = makeCreateIndexOplogEntry({Timestamp(Seconds(3), 0), 1LL}, nss, "b_1", keyPattern); - testWorkerMultikeyPaths(op, 0UL); + testWorkerMultikeyPaths(_opCtx.get(), op, 0UL); } { @@ -851,37 +851,36 @@ TEST_F(SyncTailTest, MultiSyncApplyAddsMultipleWorkerMultikeyPathInfo) { auto opB = makeInsertDocumentOplogEntry({Timestamp(Seconds(5), 0), 1LL}, nss, docB); WorkerMultikeyPathInfo pathInfo; MultiApplier::OperationPtrs ops = {&opA, &opB}; - multiSyncApply(&ops, nullptr, &pathInfo); + multiSyncApply(_opCtx.get(), &ops, nullptr, &pathInfo); ASSERT_EQ(pathInfo.size(), 2UL); } } TEST_F(SyncTailTest, MultiSyncApplyDoesNotAddWorkerMultikeyPathInfoOnCreateIndex) { NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); - _opCtx.reset(); { auto op = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss); - testWorkerMultikeyPaths(op, 0UL); + testWorkerMultikeyPaths(_opCtx.get(), op, 0UL); } { auto doc = BSON("_id" << 1 << "a" << BSON_ARRAY(4 << 5)); auto op = makeInsertDocumentOplogEntry({Timestamp(Seconds(2), 0), 1LL}, nss, doc); - testWorkerMultikeyPaths(op, 0UL); + testWorkerMultikeyPaths(_opCtx.get(), op, 0UL); } { auto keyPattern = BSON("a" << 1); auto op = makeCreateIndexOplogEntry({Timestamp(Seconds(3), 0), 1LL}, nss, "a_1", keyPattern); - testWorkerMultikeyPaths(op, 0UL); + testWorkerMultikeyPaths(_opCtx.get(), op, 0UL); } { auto doc = BSON("_id" << 2 << "a" << BSON_ARRAY(6 << 7)); auto op = makeInsertDocumentOplogEntry({Timestamp(Seconds(4), 0), 1LL}, nss, doc); - testWorkerMultikeyPaths(op, 0UL); + testWorkerMultikeyPaths(_opCtx.get(), op, 0UL); } } @@ -893,9 +892,8 @@ DEATH_TEST_F(SyncTailTest, NamespaceString nss("foo." + _agent.getSuiteName() + "_" + _agent.getTestName()); auto op = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss); - _opCtx.reset(); MultiApplier::OperationPtrs ops = {&op}; - multiSyncApply(&ops, nullptr, nullptr); + multiSyncApply(_opCtx.get(), &ops, nullptr, nullptr); } TEST_F(SyncTailTest, MultiInitialSyncApplyFailsWhenCollectionCreationTriesToMakeUUID) { @@ -904,10 +902,9 @@ TEST_F(SyncTailTest, MultiInitialSyncApplyFailsWhenCollectionCreationTriesToMake auto op = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss); - _opCtx.reset(); MultiApplier::OperationPtrs ops = {&op}; ASSERT_EQUALS(ErrorCodes::InvalidOptions, - multiInitialSyncApply(&ops, nullptr, nullptr, nullptr)); + multiInitialSyncApply(_opCtx.get(), &ops, nullptr, nullptr, nullptr)); } TEST_F(SyncTailTest, MultiSyncApplyDisablesDocumentValidationWhileApplyingOperations) { @@ -1279,7 +1276,7 @@ TEST_F(SyncTailTest, MultiInitialSyncApplyDisablesDocumentValidationWhileApplyin MultiApplier::OperationPtrs ops = {&op}; AtomicUInt32 fetchCount(0); WorkerMultikeyPathInfo pathInfo; - ASSERT_OK(multiInitialSyncApply_noAbort(_opCtx.get(), &ops, &syncTail, &fetchCount, &pathInfo)); + ASSERT_OK(multiInitialSyncApply(_opCtx.get(), &ops, &syncTail, &fetchCount, &pathInfo)); ASSERT_EQUALS(fetchCount.load(), 1U); } @@ -1299,7 +1296,7 @@ TEST_F(SyncTailTest, MultiInitialSyncApplyIgnoresUpdateOperationIfDocumentIsMiss MultiApplier::OperationPtrs ops = {&op}; AtomicUInt32 fetchCount(0); WorkerMultikeyPathInfo pathInfo; - ASSERT_OK(multiInitialSyncApply_noAbort(_opCtx.get(), &ops, &syncTail, &fetchCount, &pathInfo)); + ASSERT_OK(multiInitialSyncApply(_opCtx.get(), &ops, &syncTail, &fetchCount, &pathInfo)); // Since the missing document is not found on the sync source, the collection referenced by // the failed operation should not be automatically created. @@ -1322,7 +1319,7 @@ TEST_F(SyncTailTest, MultiInitialSyncApplySkipsDocumentOnNamespaceNotFound) { MultiApplier::OperationPtrs ops = {&op0, &op1, &op2, &op3}; AtomicUInt32 fetchCount(0); WorkerMultikeyPathInfo pathInfo; - ASSERT_OK(multiInitialSyncApply_noAbort(_opCtx.get(), &ops, &syncTail, &fetchCount, &pathInfo)); + ASSERT_OK(multiInitialSyncApply(_opCtx.get(), &ops, &syncTail, &fetchCount, &pathInfo)); ASSERT_EQUALS(fetchCount.load(), 0U); OplogInterfaceLocal collectionReader(_opCtx.get(), nss.ns()); @@ -1348,7 +1345,7 @@ TEST_F(SyncTailTest, MultiInitialSyncApplySkipsIndexCreationOnNamespaceNotFound) MultiApplier::OperationPtrs ops = {&op0, &op1, &op2, &op3}; AtomicUInt32 fetchCount(0); WorkerMultikeyPathInfo pathInfo; - ASSERT_OK(multiInitialSyncApply_noAbort(_opCtx.get(), &ops, &syncTail, &fetchCount, &pathInfo)); + ASSERT_OK(multiInitialSyncApply(_opCtx.get(), &ops, &syncTail, &fetchCount, &pathInfo)); ASSERT_EQUALS(fetchCount.load(), 0U); OplogInterfaceLocal collectionReader(_opCtx.get(), nss.ns()); @@ -1372,7 +1369,7 @@ TEST_F(SyncTailTest, MultiApplier::OperationPtrs ops = {&op}; AtomicUInt32 fetchCount(0); WorkerMultikeyPathInfo pathInfo; - ASSERT_OK(multiInitialSyncApply_noAbort(_opCtx.get(), &ops, &syncTail, &fetchCount, &pathInfo)); + ASSERT_OK(multiInitialSyncApply(_opCtx.get(), &ops, &syncTail, &fetchCount, &pathInfo)); ASSERT_EQUALS(fetchCount.load(), 1U); // The collection referenced by "ns" in the failed operation is automatically created to hold diff --git a/src/mongo/db/repl/sync_tail_test_fixture.cpp b/src/mongo/db/repl/sync_tail_test_fixture.cpp index 3cd0f67d67f..ce647ca9d3e 100644 --- a/src/mongo/db/repl/sync_tail_test_fixture.cpp +++ b/src/mongo/db/repl/sync_tail_test_fixture.cpp @@ -184,7 +184,7 @@ Status SyncTailTest::runOpsInitialSync(std::vector<OplogEntry> ops) { } AtomicUInt32 fetchCount(0); WorkerMultikeyPathInfo pathInfo; - return multiInitialSyncApply_noAbort(_opCtx.get(), &opsPtrs, &syncTail, &fetchCount, &pathInfo); + return multiInitialSyncApply(_opCtx.get(), &opsPtrs, &syncTail, &fetchCount, &pathInfo); } diff --git a/src/mongo/db/repl/sync_tail_test_fixture.h b/src/mongo/db/repl/sync_tail_test_fixture.h index cc8fb3b44c4..7f6161b0f32 100644 --- a/src/mongo/db/repl/sync_tail_test_fixture.h +++ b/src/mongo/db/repl/sync_tail_test_fixture.h @@ -58,7 +58,9 @@ protected: ReplicationProcess* _replicationProcess = nullptr; // Implements the MultiApplier::ApplyOperationFn interface and does nothing. - static Status noopApplyOperationFn(MultiApplier::OperationPtrs*, WorkerMultikeyPathInfo*) { + static Status noopApplyOperationFn(OperationContext*, + MultiApplier::OperationPtrs*, + WorkerMultikeyPathInfo*) { return Status::OK(); } diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp index 19df2ed1567..4fcaf174c3e 100644 --- a/src/mongo/dbtests/storage_timestamp_tests.cpp +++ b/src/mongo/dbtests/storage_timestamp_tests.cpp @@ -1348,10 +1348,11 @@ public: std::vector<repl::OplogEntry> ops = {op0, createIndexOp, op1, op2}; AtomicUInt32 fetchCount(0); - auto applyOpFn = [&fetchCount](repl::MultiApplier::OperationPtrs* ops, + auto applyOpFn = [&fetchCount](OperationContext* opCtx, + repl::MultiApplier::OperationPtrs* ops, repl::SyncTail* st, WorkerMultikeyPathInfo* workerMultikeyPathInfo) { - return repl::multiInitialSyncApply(ops, st, &fetchCount, workerMultikeyPathInfo); + return repl::multiInitialSyncApply(opCtx, ops, st, &fetchCount, workerMultikeyPathInfo); }; auto writerPool = repl::SyncTail::makeWriterPool(); |