From 97fc082fcf2abc9428de053f88967b848ba36c7f Mon Sep 17 00:00:00 2001 From: Judah Schvimer Date: Tue, 13 Feb 2018 11:42:41 -0500 Subject: SERVER-33290 secondaries must set multikey before releasing parallel batch writer mutex --- src/mongo/db/multi_key_path_tracker.cpp | 2 +- src/mongo/db/multi_key_path_tracker.h | 6 +- src/mongo/db/repl/data_replicator_external_state.h | 5 +- .../repl/data_replicator_external_state_impl.cpp | 11 +- .../db/repl/data_replicator_external_state_impl.h | 3 +- .../repl/data_replicator_external_state_mock.cpp | 10 +- .../db/repl/data_replicator_external_state_mock.h | 14 ++- src/mongo/db/repl/initial_syncer.cpp | 6 +- src/mongo/db/repl/initial_syncer_test.cpp | 21 ++-- src/mongo/db/repl/multiapplier.h | 3 +- src/mongo/db/repl/multiapplier_test.cpp | 2 +- .../repl/replication_coordinator_external_state.h | 5 +- ...replication_coordinator_external_state_impl.cpp | 7 +- .../replication_coordinator_external_state_impl.h | 3 +- ...replication_coordinator_external_state_mock.cpp | 5 +- .../replication_coordinator_external_state_mock.h | 3 +- src/mongo/db/repl/sync_tail.cpp | 103 ++++++++--------- src/mongo/db/repl/sync_tail.h | 26 ++--- src/mongo/db/repl/sync_tail_test.cpp | 126 +++++++++++++++++++-- src/mongo/db/repl/sync_tail_test_fixture.cpp | 3 +- src/mongo/db/repl/sync_tail_test_fixture.h | 2 +- src/mongo/dbtests/storage_timestamp_tests.cpp | 14 ++- 22 files changed, 258 insertions(+), 122 deletions(-) (limited to 'src') diff --git a/src/mongo/db/multi_key_path_tracker.cpp b/src/mongo/db/multi_key_path_tracker.cpp index ecb1348d92a..43b5fd0567b 100644 --- a/src/mongo/db/multi_key_path_tracker.cpp +++ b/src/mongo/db/multi_key_path_tracker.cpp @@ -40,7 +40,7 @@ void MultikeyPathTracker::addMultikeyPathInfo(MultikeyPathInfo info) { _multikeyPathInfo.emplace_back(info); } -const std::vector& MultikeyPathTracker::getMultikeyPathInfo() const { +const WorkerMultikeyPathInfo& MultikeyPathTracker::getMultikeyPathInfo() const { return _multikeyPathInfo; } diff --git a/src/mongo/db/multi_key_path_tracker.h b/src/mongo/db/multi_key_path_tracker.h index 781ec9924cc..3f1d4800bab 100644 --- a/src/mongo/db/multi_key_path_tracker.h +++ b/src/mongo/db/multi_key_path_tracker.h @@ -41,6 +41,8 @@ struct MultikeyPathInfo { MultikeyPaths multikeyPaths; }; +using WorkerMultikeyPathInfo = std::vector; + /** * An OperationContext decoration that tracks which indexes should be made multikey. This is used * by IndexCatalogEntryImpl::setMultikey() to track what indexes should be set as multikey during @@ -64,7 +66,7 @@ public: /** * Returns the multikey path information that has been saved. */ - const std::vector& getMultikeyPathInfo() const; + const WorkerMultikeyPathInfo& getMultikeyPathInfo() const; /** * Specifies that we should track multikey path information on this MultikeyPathTracker. This is @@ -87,7 +89,7 @@ public: private: - std::vector _multikeyPathInfo; + WorkerMultikeyPathInfo _multikeyPathInfo; bool _trackMultikeyPathInfo = false; }; diff --git a/src/mongo/db/repl/data_replicator_external_state.h b/src/mongo/db/repl/data_replicator_external_state.h index 1fc6fc8a9a5..818a9fdd38a 100644 --- a/src/mongo/db/repl/data_replicator_external_state.h +++ b/src/mongo/db/repl/data_replicator_external_state.h @@ -137,12 +137,15 @@ private: /** * Used by _multiApply() to write operations to database during initial sync. `fetchCount` is a * pointer to a counter that is incremented every time we fetch a missing document. + * `workerMultikeyPathInfo` is a pointer to a list of objects tracking which indexes to set as + * multikey at the end of the batch. It should never be null. * * Used exclusively by the InitialSyncer to construct a MultiApplier. */ virtual Status _multiInitialSyncApply(MultiApplier::OperationPtrs* ops, const HostAndPort& source, - AtomicUInt32* fetchCount) = 0; + AtomicUInt32* fetchCount, + WorkerMultikeyPathInfo* workerMultikeyPathInfo) = 0; // Provides InitialSyncer with access to _multiApply, _multiSyncApply and // _multiInitialSyncApply. diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.cpp b/src/mongo/db/repl/data_replicator_external_state_impl.cpp index 64492dc63dd..49ea11001b1 100644 --- a/src/mongo/db/repl/data_replicator_external_state_impl.cpp +++ b/src/mongo/db/repl/data_replicator_external_state_impl.cpp @@ -126,10 +126,13 @@ StatusWith DataReplicatorExternalStateImpl::_multiApply( return _replicationCoordinatorExternalState->multiApply(opCtx, std::move(ops), applyOperation); } -Status DataReplicatorExternalStateImpl::_multiInitialSyncApply(MultiApplier::OperationPtrs* ops, - const HostAndPort& source, - AtomicUInt32* fetchCount) { - return _replicationCoordinatorExternalState->multiInitialSyncApply(ops, source, fetchCount); +Status DataReplicatorExternalStateImpl::_multiInitialSyncApply( + MultiApplier::OperationPtrs* ops, + const HostAndPort& source, + AtomicUInt32* fetchCount, + WorkerMultikeyPathInfo* workerMultikeyPathInfo) { + return _replicationCoordinatorExternalState->multiInitialSyncApply( + ops, source, fetchCount, workerMultikeyPathInfo); } ReplicationCoordinator* DataReplicatorExternalStateImpl::getReplicationCoordinator() const { diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.h b/src/mongo/db/repl/data_replicator_external_state_impl.h index b39ea02fcb4..586409b2b50 100644 --- a/src/mongo/db/repl/data_replicator_external_state_impl.h +++ b/src/mongo/db/repl/data_replicator_external_state_impl.h @@ -72,7 +72,8 @@ private: Status _multiInitialSyncApply(MultiApplier::OperationPtrs* ops, const HostAndPort& source, - AtomicUInt32* fetchCount) override; + AtomicUInt32* fetchCount, + WorkerMultikeyPathInfo* workerMultikeyPathInfo) override; protected: ReplicationCoordinator* getReplicationCoordinator() const; diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.cpp b/src/mongo/db/repl/data_replicator_external_state_mock.cpp index 73b921e5eb3..de00a40efbf 100644 --- a/src/mongo/db/repl/data_replicator_external_state_mock.cpp +++ b/src/mongo/db/repl/data_replicator_external_state_mock.cpp @@ -101,11 +101,13 @@ StatusWith DataReplicatorExternalStateMock::_multiApply( return multiApplyFn(opCtx, std::move(ops), applyOperation); } -Status DataReplicatorExternalStateMock::_multiInitialSyncApply(MultiApplier::OperationPtrs* ops, - const HostAndPort& source, - AtomicUInt32* fetchCount) { +Status DataReplicatorExternalStateMock::_multiInitialSyncApply( + MultiApplier::OperationPtrs* ops, + const HostAndPort& source, + AtomicUInt32* fetchCount, + WorkerMultikeyPathInfo* workerMultikeyPathInfo) { - return multiInitialSyncApplyFn(ops, source, fetchCount); + return multiInitialSyncApplyFn(ops, source, fetchCount, workerMultikeyPathInfo); } } // namespace repl diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.h b/src/mongo/db/repl/data_replicator_external_state_mock.h index df14c7c10a6..c556b822315 100644 --- a/src/mongo/db/repl/data_replicator_external_state_mock.h +++ b/src/mongo/db/repl/data_replicator_external_state_mock.h @@ -89,10 +89,15 @@ public: MultiApplier::MultiApplyFn multiApplyFn; // Override to change _multiInitialSyncApply behavior. - using MultiInitialSyncApplyFn = stdx::function; + using MultiInitialSyncApplyFn = + stdx::function; MultiInitialSyncApplyFn multiInitialSyncApplyFn = []( - MultiApplier::OperationPtrs*, const HostAndPort&, AtomicUInt32*) { return Status::OK(); }; + MultiApplier::OperationPtrs*, const HostAndPort&, AtomicUInt32*, WorkerMultikeyPathInfo*) { + return Status::OK(); + }; StatusWith replSetConfigResult = ReplSetConfig(); @@ -103,7 +108,8 @@ private: Status _multiInitialSyncApply(MultiApplier::OperationPtrs* ops, const HostAndPort& source, - AtomicUInt32* fetchCount) override; + AtomicUInt32* fetchCount, + WorkerMultikeyPathInfo* workerMultikeyPathInfo) override; }; diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp index f65dea7e86e..ed6a2086060 100644 --- a/src/mongo/db/repl/initial_syncer.cpp +++ b/src/mongo/db/repl/initial_syncer.cpp @@ -932,8 +932,10 @@ void InitialSyncer::_getNextApplierBatchCallback( if (!ops.empty()) { _fetchCount.store(0); MultiApplier::ApplyOperationFn applyOperationsForEachReplicationWorkerThreadFn = - [ =, source = _syncSource ](MultiApplier::OperationPtrs * x) { - return _dataReplicatorExternalState->_multiInitialSyncApply(x, source, &_fetchCount); + [ =, source = _syncSource ](MultiApplier::OperationPtrs * x, + WorkerMultikeyPathInfo * workerMultikeyPathInfo) { + return _dataReplicatorExternalState->_multiInitialSyncApply( + x, source, &_fetchCount, workerMultikeyPathInfo); }; MultiApplier::MultiApplyFn applyBatchOfOperationsFn = [=](OperationContext* opCtx, diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp index 630629ff7a5..2d959fb8a5c 100644 --- a/src/mongo/db/repl/initial_syncer_test.cpp +++ b/src/mongo/db/repl/initial_syncer_test.cpp @@ -3619,18 +3619,21 @@ TEST_F( const MultiApplier::Operations& ops, MultiApplier::ApplyOperationFn applyOperation) { // 'OperationPtr*' is ignored by our overridden _multiInitialSyncApply(). - applyOperation(nullptr).transitional_ignore(); + ASSERT_OK(applyOperation(nullptr, nullptr)); return ops.back().getOpTime(); }; bool fetchCountIncremented = false; - getExternalState()->multiInitialSyncApplyFn = [&fetchCountIncremented]( - MultiApplier::OperationPtrs*, const HostAndPort&, AtomicUInt32* fetchCount) { - if (!fetchCountIncremented) { - fetchCount->addAndFetch(1); - fetchCountIncremented = true; - } - return Status::OK(); - }; + getExternalState()->multiInitialSyncApplyFn = + [&fetchCountIncremented](MultiApplier::OperationPtrs*, + const HostAndPort&, + AtomicUInt32* fetchCount, + WorkerMultikeyPathInfo*) { + if (!fetchCountIncremented) { + fetchCount->addAndFetch(1); + fetchCountIncremented = true; + } + return Status::OK(); + }; _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345)); ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts)); diff --git a/src/mongo/db/repl/multiapplier.h b/src/mongo/db/repl/multiapplier.h index a81ed55b47f..e3844e8b0b6 100644 --- a/src/mongo/db/repl/multiapplier.h +++ b/src/mongo/db/repl/multiapplier.h @@ -38,6 +38,7 @@ #include "mongo/base/status.h" #include "mongo/base/status_with.h" #include "mongo/db/jsobj.h" +#include "mongo/db/multi_key_path_tracker.h" #include "mongo/db/repl/oplog_entry.h" #include "mongo/db/service_context.h" #include "mongo/executor/task_executor.h" @@ -71,7 +72,7 @@ public: * would have the same outcome as calling SyncTail::syncApply() (oplog application mode * will be embedded in the function implementation). */ - using ApplyOperationFn = stdx::function; + using ApplyOperationFn = stdx::function; using MultiApplyFn = stdx::function( OperationContext*, MultiApplier::Operations, MultiApplier::ApplyOperationFn)>; diff --git a/src/mongo/db/repl/multiapplier_test.cpp b/src/mongo/db/repl/multiapplier_test.cpp index 792f77be583..5227eed6c20 100644 --- a/src/mongo/db/repl/multiapplier_test.cpp +++ b/src/mongo/db/repl/multiapplier_test.cpp @@ -59,7 +59,7 @@ void MultiApplierTest::setUp() { launchExecutorThread(); } -Status applyOperation(MultiApplier::OperationPtrs*) { +Status applyOperation(MultiApplier::OperationPtrs*, WorkerMultikeyPathInfo*) { return Status::OK(); }; diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h index 3fe63ac7daf..0730e1bed42 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state.h +++ b/src/mongo/db/repl/replication_coordinator_external_state.h @@ -299,11 +299,14 @@ public: /** * Used by multiApply() to writes operations to database during initial sync. `fetchCount` is a * pointer to a counter that is incremented every time we fetch a missing document. + * `workerMultikeyPathInfo` is a pointer to a list of objects tracking which indexes to set as + * multikey at the end of the batch. * */ virtual Status multiInitialSyncApply(MultiApplier::OperationPtrs* ops, const HostAndPort& source, - AtomicUInt32* fetchCount) = 0; + AtomicUInt32* fetchCount, + WorkerMultikeyPathInfo* workerMultikeyPathInfo) = 0; /** * This function creates an oplog buffer of the type specified at server startup. diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 6866e6137af..c4af85c09fd 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -880,14 +880,17 @@ StatusWith ReplicationCoordinatorExternalStateImpl::multiApply( } Status ReplicationCoordinatorExternalStateImpl::multiInitialSyncApply( - MultiApplier::OperationPtrs* ops, const HostAndPort& source, AtomicUInt32* fetchCount) { + MultiApplier::OperationPtrs* ops, + const HostAndPort& source, + AtomicUInt32* fetchCount, + WorkerMultikeyPathInfo* workerMultikeyPathInfo) { // repl::multiInitialSyncApply uses SyncTail::shouldRetry() (and implicitly getMissingDoc()) // to fetch missing documents during initial sync. Therefore, it is fine to construct SyncTail // with invalid BackgroundSync, MultiSyncApplyFunc and writerPool arguments because we will not // be accessing any SyncTail functionality that require these constructor parameters. SyncTail syncTail(nullptr, SyncTail::MultiSyncApplyFunc(), nullptr); syncTail.setHostname(source.toString()); - return repl::multiInitialSyncApply(ops, &syncTail, fetchCount); + return repl::multiInitialSyncApply(ops, &syncTail, fetchCount, workerMultikeyPathInfo); } std::unique_ptr ReplicationCoordinatorExternalStateImpl::makeInitialSyncOplogBuffer( diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h index 49fffb04b79..4e8e0ca1eaf 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h @@ -111,7 +111,8 @@ public: MultiApplier::ApplyOperationFn applyOperation) override; virtual Status multiInitialSyncApply(MultiApplier::OperationPtrs* ops, const HostAndPort& source, - AtomicUInt32* fetchCount) override; + AtomicUInt32* fetchCount, + WorkerMultikeyPathInfo* workerMultikeyPathInfo) override; virtual std::unique_ptr makeInitialSyncOplogBuffer( OperationContext* opCtx) const override; virtual std::unique_ptr makeSteadyStateOplogBuffer( diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp index 7042e7b7d8e..0cec9f77524 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp @@ -251,7 +251,10 @@ StatusWith ReplicationCoordinatorExternalStateMock::multiApply( } Status ReplicationCoordinatorExternalStateMock::multiInitialSyncApply( - MultiApplier::OperationPtrs* ops, const HostAndPort& source, AtomicUInt32* fetchCount) { + MultiApplier::OperationPtrs* ops, + const HostAndPort& source, + AtomicUInt32* fetchCount, + WorkerMultikeyPathInfo* workerMultikeyPathInfo) { return Status::OK(); } diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h index caed8890585..f227fef5489 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h @@ -99,7 +99,8 @@ public: MultiApplier::ApplyOperationFn applyOperation) override; virtual Status multiInitialSyncApply(MultiApplier::OperationPtrs* ops, const HostAndPort& source, - AtomicUInt32* fetchCount) override; + AtomicUInt32* fetchCount, + WorkerMultikeyPathInfo* workerMultikeyPathInfo) override; virtual std::unique_ptr makeInitialSyncOplogBuffer( OperationContext* opCtx) const override; virtual std::unique_ptr makeSteadyStateOplogBuffer( diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 28561964f37..92dc3fe5aec 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -449,13 +449,14 @@ void prefetchOps(const MultiApplier::Operations& ops, OldThreadPool* prefetcherP void applyOps(std::vector& writerVectors, OldThreadPool* writerPool, const MultiApplier::ApplyOperationFn& func, - std::vector* statusVector) { + std::vector* statusVector, + std::vector* workerMultikeyPathInfo) { invariant(writerVectors.size() == statusVector->size()); TimerHolder timer(&applyBatchStats); for (size_t i = 0; i < writerVectors.size(); i++) { if (!writerVectors[i].empty()) { - writerPool->schedule([&func, &writerVectors, statusVector, i] { - (*statusVector)[i] = func(&writerVectors[i]); + writerPool->schedule([&func, &writerVectors, statusVector, workerMultikeyPathInfo, i] { + (*statusVector)[i] = func(&writerVectors[i], &((*workerMultikeyPathInfo)[i])); }); } } @@ -705,32 +706,16 @@ OpTime SyncTail::multiApply_forTest(OperationContext* opCtx, MultiApplier::Opera * this batch, it will not be updated. */ OpTime SyncTail::multiApply(OperationContext* opCtx, MultiApplier::Operations ops) { - auto applyOperation = [this](MultiApplier::OperationPtrs* ops) -> Status { - _applyFunc(ops, this); + auto applyOperation = [this](MultiApplier::OperationPtrs* ops, + WorkerMultikeyPathInfo* workerMultikeyPathInfo) -> Status { + _applyFunc(ops, this, workerMultikeyPathInfo); // This function is used by 3.2 initial sync and steady state data replication. // _applyFunc() will throw or abort on error, so we return OK here. return Status::OK(); }; - Timestamp firstTimeInBatch = ops.front().getTimestamp(); - OpTime finalOpTime = fassertStatusOK( + return fassertStatusOK( 34437, repl::multiApply(opCtx, _writerPool.get(), std::move(ops), applyOperation)); - - invariant(!MultikeyPathTracker::get(opCtx).isTrackingMultikeyPathInfo()); - // Set any indexes to multikey that this batch ignored. - stdx::lock_guard lk(_mutex); - for (MultikeyPathInfo info : _multikeyPathInfo) { - // We timestamp every multikey write with the first timestamp in the batch. It is always - // safe to set an index as multikey too early, just not too late. We conservatively pick - // the first timestamp in the batch since we do not have enough information to find out - // the timestamp of the first write that set the given multikey path. - fassertStatusOK(50686, - StorageInterface::get(opCtx)->setIndexIsMultikey( - opCtx, info.nss, info.indexName, info.multikeyPaths, firstTimeInBatch)); - } - _multikeyPathInfo.clear(); - - return finalOpTime; } namespace { @@ -1243,30 +1228,28 @@ bool SyncTail::fetchAndInsertMissingDocument(OperationContext* opCtx, }); } -void SyncTail::addMultikeyPathInfo(std::vector infoList) { - stdx::lock_guard lk(_mutex); - _multikeyPathInfo.reserve(_multikeyPathInfo.size() + infoList.size()); - for (MultikeyPathInfo info : infoList) { - _multikeyPathInfo.emplace_back(info); - } -} - // This free function is used by the writer threads to apply each op -void multiSyncApply(MultiApplier::OperationPtrs* ops, SyncTail* st) { +void multiSyncApply(MultiApplier::OperationPtrs* ops, + SyncTail* st, + WorkerMultikeyPathInfo* workerMultikeyPathInfo) { initializeWriterThread(); auto opCtx = cc().makeOperationContext(); auto syncApply = []( OperationContext* opCtx, const BSONObj& op, OplogApplication::Mode oplogApplicationMode) { return SyncTail::syncApply(opCtx, op, oplogApplicationMode); }; + { + ON_BLOCK_EXIT( + [&opCtx] { MultikeyPathTracker::get(opCtx.get()).stopTrackingMultikeyPathInfo(); }); + MultikeyPathTracker::get(opCtx.get()).startTrackingMultikeyPathInfo(); + fassertNoTrace(16359, multiSyncApply_noAbort(opCtx.get(), ops, syncApply)); + } - ON_BLOCK_EXIT( - [&opCtx] { MultikeyPathTracker::get(opCtx.get()).stopTrackingMultikeyPathInfo(); }); - MultikeyPathTracker::get(opCtx.get()).startTrackingMultikeyPathInfo(); - fassertNoTrace(16359, multiSyncApply_noAbort(opCtx.get(), ops, syncApply)); - - if (!MultikeyPathTracker::get(opCtx.get()).getMultikeyPathInfo().empty()) { - st->addMultikeyPathInfo(MultikeyPathTracker::get(opCtx.get()).getMultikeyPathInfo()); + invariant(!MultikeyPathTracker::get(opCtx.get()).isTrackingMultikeyPathInfo()); + invariant(workerMultikeyPathInfo->empty()); + auto newPaths = MultikeyPathTracker::get(opCtx.get()).getMultikeyPathInfo(); + if (!newPaths.empty()) { + workerMultikeyPathInfo->swap(newPaths); } } @@ -1449,16 +1432,18 @@ Status multiSyncApply_noAbort(OperationContext* opCtx, Status multiInitialSyncApply(MultiApplier::OperationPtrs* ops, SyncTail* st, - AtomicUInt32* fetchCount) { + AtomicUInt32* fetchCount, + WorkerMultikeyPathInfo* workerMultikeyPathInfo) { initializeWriterThread(); auto opCtx = cc().makeOperationContext(); - return multiInitialSyncApply_noAbort(opCtx.get(), ops, st, fetchCount); + return multiInitialSyncApply_noAbort(opCtx.get(), ops, st, fetchCount, workerMultikeyPathInfo); } Status multiInitialSyncApply_noAbort(OperationContext* opCtx, MultiApplier::OperationPtrs* ops, SyncTail* st, - AtomicUInt32* fetchCount) { + AtomicUInt32* fetchCount, + WorkerMultikeyPathInfo* workerMultikeyPathInfo) { UnreplicatedWritesBlock uwb(opCtx); DisableDocumentValidation validationDisabler(opCtx); { // Ensure that the MultikeyPathTracker stops tracking paths. @@ -1503,17 +1488,12 @@ Status multiInitialSyncApply_noAbort(OperationContext* opCtx, } invariant(!MultikeyPathTracker::get(opCtx).isTrackingMultikeyPathInfo()); - // Set any indexes to multikey that this batch ignored. - Timestamp firstTimeInBatch = ops->front()->getTimestamp(); - for (MultikeyPathInfo info : MultikeyPathTracker::get(opCtx).getMultikeyPathInfo()) { - // We timestamp every multikey write with the first timestamp in the batch. It is always - // safe to set an index as multikey too early, just not too late. We conservatively pick - // the first timestamp in the batch since we do not have enough information to find out - // the timestamp of the first write that set the given multikey path. - fassertStatusOK(50685, - StorageInterface::get(opCtx)->setIndexIsMultikey( - opCtx, info.nss, info.indexName, info.multikeyPaths, firstTimeInBatch)); + invariant(workerMultikeyPathInfo->empty()); + auto newPaths = MultikeyPathTracker::get(opCtx).getMultikeyPathInfo(); + if (!newPaths.empty()) { + workerMultikeyPathInfo->swap(newPaths); } + return Status::OK(); } @@ -1558,6 +1538,7 @@ StatusWith multiApply(OperationContext* opCtx, } std::vector statusVector(workerPool->getNumThreads(), Status::OK()); + std::vector multikeyVector(workerPool->getNumThreads()); { // We must wait for the all work we've dispatched to complete before leaving this block // because the spawned threads refer to objects on the stack @@ -1598,7 +1579,7 @@ StatusWith multiApply(OperationContext* opCtx, consistencyMarkers->setOplogTruncateAfterPoint(opCtx, Timestamp()); consistencyMarkers->setMinValidToAtLeast(opCtx, ops.back().getOpTime()); - applyOps(writerVectors, workerPool, applyOperation, &statusVector); + applyOps(writerVectors, workerPool, applyOperation, &statusVector, &multikeyVector); workerPool->join(); // Update the transaction table to point to the latest oplog entries for each session id. @@ -1614,6 +1595,22 @@ StatusWith multiApply(OperationContext* opCtx, storageEngine->replicationBatchIsComplete(); } + Timestamp firstTimeInBatch = ops.front().getTimestamp(); + // Set any indexes to multikey that this batch ignored. This must be done while holding the + // parallel batch writer mutex. + for (WorkerMultikeyPathInfo infoVector : multikeyVector) { + for (MultikeyPathInfo info : infoVector) { + // We timestamp every multikey write with the first timestamp in the batch. It is always + // safe to set an index as multikey too early, just not too late. We conservatively pick + // the first timestamp in the batch since we do not have enough information to find out + // the timestamp of the first write that set the given multikey path. + fassertStatusOK( + 50686, + StorageInterface::get(opCtx)->setIndexIsMultikey( + opCtx, info.nss, info.indexName, info.multikeyPaths, firstTimeInBatch)); + } + } + // If any of the statuses is not ok, return error. for (auto& status : statusVector) { if (!status.isOK()) { diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h index c4436bb4b1c..0b7d2cd2374 100644 --- a/src/mongo/db/repl/sync_tail.h +++ b/src/mongo/db/repl/sync_tail.h @@ -55,7 +55,9 @@ class OpTime; */ class SyncTail { public: - using MultiSyncApplyFunc = stdx::function; + using MultiSyncApplyFunc = stdx::function; /** * Type of function to increment "repl.apply.ops" server status metric. @@ -225,12 +227,6 @@ public: static AtomicInt32 replBatchLimitOperations; - /** - * Adds the given multikey path information to the list of indexes to make multikey at the - * end of the current batch. - */ - void addMultikeyPathInfo(std::vector infoList); - /** * Passthrough function to test multiApply. */ @@ -256,12 +252,6 @@ private: // persistent pool of worker threads for writing ops to the databases std::unique_ptr _writerPool; - - // Protects member variables below. - mutable stdx::mutex _mutex; - - // Maintains the information for all indexes that must be set as multikey in the current batch. - std::vector _multikeyPathInfo; }; /** @@ -282,12 +272,15 @@ StatusWith multiApply(OperationContext* opCtx, // They consume the passed in OperationPtrs and callers should not make any assumptions about the // state of the container after calling. However, these functions cannot modify the pointed-to // operations because the OperationPtrs container contains const pointers. -void multiSyncApply(MultiApplier::OperationPtrs* ops, SyncTail* st); +void multiSyncApply(MultiApplier::OperationPtrs* ops, + SyncTail* st, + WorkerMultikeyPathInfo* workerMultikeyPathInfo); // Used by 3.4 initial sync. Status multiInitialSyncApply(MultiApplier::OperationPtrs* ops, SyncTail* st, - AtomicUInt32* fetchCount); + AtomicUInt32* fetchCount, + WorkerMultikeyPathInfo* workerMultikeyPathInfo); /** * Testing-only version of multiSyncApply that returns an error instead of aborting. @@ -307,7 +300,8 @@ Status multiSyncApply_noAbort(OperationContext* opCtx, Status multiInitialSyncApply_noAbort(OperationContext* opCtx, MultiApplier::OperationPtrs* ops, SyncTail* st, - AtomicUInt32* fetchCount); + AtomicUInt32* fetchCount, + WorkerMultikeyPathInfo* workerMultikeyPathInfo); } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp index 0a469ef7cd0..83b474cafa9 100644 --- a/src/mongo/db/repl/sync_tail_test.cpp +++ b/src/mongo/db/repl/sync_tail_test.cpp @@ -563,8 +563,8 @@ bool _testOplogEntryIsForCappedCollection(OperationContext* opCtx, const CollectionOptions& options) { auto writerPool = SyncTail::makeWriterPool(); MultiApplier::Operations operationsApplied; - auto applyOperationFn = - [&operationsApplied](MultiApplier::OperationPtrs* operationsToApply) -> Status { + auto applyOperationFn = [&operationsApplied](MultiApplier::OperationPtrs* operationsToApply, + WorkerMultikeyPathInfo*) -> Status { for (auto&& opPtr : *operationsToApply) { operationsApplied.push_back(*opPtr); } @@ -611,8 +611,9 @@ TEST_F(SyncTailTest, MultiApplyAssignsOperationsToWriterThreadsBasedOnNamespaceH stdx::mutex mutex; std::vector operationsApplied; - auto applyOperationFn = [&mutex, &operationsApplied]( - MultiApplier::OperationPtrs* operationsForWriterThreadToApply) -> Status { + auto applyOperationFn = + [&mutex, &operationsApplied](MultiApplier::OperationPtrs* operationsForWriterThreadToApply, + WorkerMultikeyPathInfo*) -> Status { stdx::lock_guard lock(mutex); operationsApplied.emplace_back(); for (auto&& opPtr : *operationsForWriterThreadToApply) { @@ -782,12 +783,107 @@ TEST_F(SyncTailTest, MultiSyncApplyUsesSyncApplyToApplyOperation) { _opCtx.reset(); MultiApplier::OperationPtrs ops = {&op}; - multiSyncApply(&ops, nullptr); + WorkerMultikeyPathInfo pathInfo; + multiSyncApply(&ops, nullptr, &pathInfo); // Collection should be created after SyncTail::syncApply() processes operation. _opCtx = cc().makeOperationContext(); ASSERT_TRUE(AutoGetCollectionForReadCommand(_opCtx.get(), nss).getCollection()); } +void testWorkerMultikeyPaths(const OplogEntry& op, unsigned long numPaths) { + WorkerMultikeyPathInfo pathInfo; + MultiApplier::OperationPtrs ops = {&op}; + multiSyncApply(&ops, nullptr, &pathInfo); + ASSERT_EQ(pathInfo.size(), numPaths); +} + +TEST_F(SyncTailTest, MultiSyncApplyAddsWorkerMultikeyPathInfoOnInsert) { + NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); + _opCtx.reset(); + + { + auto op = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss); + testWorkerMultikeyPaths(op, 0UL); + } + + { + auto keyPattern = BSON("a" << 1); + auto op = + makeCreateIndexOplogEntry({Timestamp(Seconds(2), 0), 1LL}, nss, "a_1", keyPattern); + testWorkerMultikeyPaths(op, 0UL); + } + + { + auto doc = BSON("_id" << 1 << "a" << BSON_ARRAY(4 << 5)); + auto op = makeInsertDocumentOplogEntry({Timestamp(Seconds(3), 0), 1LL}, nss, doc); + testWorkerMultikeyPaths(op, 1UL); + } +} + +TEST_F(SyncTailTest, MultiSyncApplyAddsMultipleWorkerMultikeyPathInfo) { + NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); + _opCtx.reset(); + + { + auto op = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss); + testWorkerMultikeyPaths(op, 0UL); + } + + { + auto keyPattern = BSON("a" << 1); + auto op = + makeCreateIndexOplogEntry({Timestamp(Seconds(2), 0), 1LL}, nss, "a_1", keyPattern); + testWorkerMultikeyPaths(op, 0UL); + } + + { + auto keyPattern = BSON("b" << 1); + auto op = + makeCreateIndexOplogEntry({Timestamp(Seconds(3), 0), 1LL}, nss, "b_1", keyPattern); + testWorkerMultikeyPaths(op, 0UL); + } + + { + auto docA = BSON("_id" << 1 << "a" << BSON_ARRAY(4 << 5)); + auto opA = makeInsertDocumentOplogEntry({Timestamp(Seconds(4), 0), 1LL}, nss, docA); + auto docB = BSON("_id" << 2 << "b" << BSON_ARRAY(6 << 7)); + auto opB = makeInsertDocumentOplogEntry({Timestamp(Seconds(5), 0), 1LL}, nss, docB); + WorkerMultikeyPathInfo pathInfo; + MultiApplier::OperationPtrs ops = {&opA, &opB}; + multiSyncApply(&ops, nullptr, &pathInfo); + ASSERT_EQ(pathInfo.size(), 2UL); + } +} + +TEST_F(SyncTailTest, MultiSyncApplyDoesNotAddWorkerMultikeyPathInfoOnCreateIndex) { + NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); + _opCtx.reset(); + + { + auto op = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss); + testWorkerMultikeyPaths(op, 0UL); + } + + { + auto doc = BSON("_id" << 1 << "a" << BSON_ARRAY(4 << 5)); + auto op = makeInsertDocumentOplogEntry({Timestamp(Seconds(2), 0), 1LL}, nss, doc); + testWorkerMultikeyPaths(op, 0UL); + } + + { + auto keyPattern = BSON("a" << 1); + auto op = + makeCreateIndexOplogEntry({Timestamp(Seconds(3), 0), 1LL}, nss, "a_1", keyPattern); + testWorkerMultikeyPaths(op, 0UL); + } + + { + auto doc = BSON("_id" << 2 << "a" << BSON_ARRAY(6 << 7)); + auto op = makeInsertDocumentOplogEntry({Timestamp(Seconds(4), 0), 1LL}, nss, doc); + testWorkerMultikeyPaths(op, 0UL); + } +} + DEATH_TEST_F(SyncTailTest, MultiSyncApplyFailsWhenCollectionCreationTriesToMakeUUID, "Attempted to create a new collection") { @@ -798,7 +894,7 @@ DEATH_TEST_F(SyncTailTest, auto op = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss); _opCtx.reset(); MultiApplier::OperationPtrs ops = {&op}; - multiSyncApply(&ops, nullptr); + multiSyncApply(&ops, nullptr, nullptr); } TEST_F(SyncTailTest, MultiInitialSyncApplyFailsWhenCollectionCreationTriesToMakeUUID) { @@ -809,7 +905,8 @@ TEST_F(SyncTailTest, MultiInitialSyncApplyFailsWhenCollectionCreationTriesToMake _opCtx.reset(); MultiApplier::OperationPtrs ops = {&op}; - ASSERT_EQUALS(ErrorCodes::InvalidOptions, multiInitialSyncApply(&ops, nullptr, nullptr)); + ASSERT_EQUALS(ErrorCodes::InvalidOptions, + multiInitialSyncApply(&ops, nullptr, nullptr, nullptr)); } TEST_F(SyncTailTest, MultiSyncApplyDisablesDocumentValidationWhileApplyingOperations) { @@ -1170,7 +1267,8 @@ TEST_F(SyncTailTest, MultiInitialSyncApplyDisablesDocumentValidationWhileApplyin {Timestamp(Seconds(1), 0), 1LL}, nss, BSON("_id" << 0), BSON("_id" << 0 << "x" << 2)); MultiApplier::OperationPtrs ops = {&op}; AtomicUInt32 fetchCount(0); - ASSERT_OK(multiInitialSyncApply_noAbort(_opCtx.get(), &ops, &syncTail, &fetchCount)); + WorkerMultikeyPathInfo pathInfo; + ASSERT_OK(multiInitialSyncApply_noAbort(_opCtx.get(), &ops, &syncTail, &fetchCount, &pathInfo)); ASSERT_EQUALS(fetchCount.load(), 1U); } @@ -1189,7 +1287,8 @@ TEST_F(SyncTailTest, MultiInitialSyncApplyIgnoresUpdateOperationIfDocumentIsMiss {Timestamp(Seconds(1), 0), 1LL}, nss, BSON("_id" << 0), BSON("_id" << 0 << "x" << 2)); MultiApplier::OperationPtrs ops = {&op}; AtomicUInt32 fetchCount(0); - ASSERT_OK(multiInitialSyncApply_noAbort(_opCtx.get(), &ops, &syncTail, &fetchCount)); + WorkerMultikeyPathInfo pathInfo; + ASSERT_OK(multiInitialSyncApply_noAbort(_opCtx.get(), &ops, &syncTail, &fetchCount, &pathInfo)); // Since the missing document is not found on the sync source, the collection referenced by // the failed operation should not be automatically created. @@ -1211,7 +1310,8 @@ TEST_F(SyncTailTest, MultiInitialSyncApplySkipsDocumentOnNamespaceNotFound) { auto op3 = makeInsertDocumentOplogEntry({Timestamp(Seconds(4), 0), 1LL}, nss, doc3); MultiApplier::OperationPtrs ops = {&op0, &op1, &op2, &op3}; AtomicUInt32 fetchCount(0); - ASSERT_OK(multiInitialSyncApply_noAbort(_opCtx.get(), &ops, &syncTail, &fetchCount)); + WorkerMultikeyPathInfo pathInfo; + ASSERT_OK(multiInitialSyncApply_noAbort(_opCtx.get(), &ops, &syncTail, &fetchCount, &pathInfo)); ASSERT_EQUALS(fetchCount.load(), 0U); OplogInterfaceLocal collectionReader(_opCtx.get(), nss.ns()); @@ -1236,7 +1336,8 @@ TEST_F(SyncTailTest, MultiInitialSyncApplySkipsIndexCreationOnNamespaceNotFound) auto op3 = makeInsertDocumentOplogEntry({Timestamp(Seconds(4), 0), 1LL}, nss, doc3); MultiApplier::OperationPtrs ops = {&op0, &op1, &op2, &op3}; AtomicUInt32 fetchCount(0); - ASSERT_OK(multiInitialSyncApply_noAbort(_opCtx.get(), &ops, &syncTail, &fetchCount)); + WorkerMultikeyPathInfo pathInfo; + ASSERT_OK(multiInitialSyncApply_noAbort(_opCtx.get(), &ops, &syncTail, &fetchCount, &pathInfo)); ASSERT_EQUALS(fetchCount.load(), 0U); OplogInterfaceLocal collectionReader(_opCtx.get(), nss.ns()); @@ -1259,7 +1360,8 @@ TEST_F(SyncTailTest, {Timestamp(Seconds(1), 0), 1LL}, nss, BSON("_id" << 0), updatedDocument); MultiApplier::OperationPtrs ops = {&op}; AtomicUInt32 fetchCount(0); - ASSERT_OK(multiInitialSyncApply_noAbort(_opCtx.get(), &ops, &syncTail, &fetchCount)); + WorkerMultikeyPathInfo pathInfo; + ASSERT_OK(multiInitialSyncApply_noAbort(_opCtx.get(), &ops, &syncTail, &fetchCount, &pathInfo)); ASSERT_EQUALS(fetchCount.load(), 1U); // The collection referenced by "ns" in the failed operation is automatically created to hold diff --git a/src/mongo/db/repl/sync_tail_test_fixture.cpp b/src/mongo/db/repl/sync_tail_test_fixture.cpp index 88c9908ba55..4953a93ec41 100644 --- a/src/mongo/db/repl/sync_tail_test_fixture.cpp +++ b/src/mongo/db/repl/sync_tail_test_fixture.cpp @@ -179,7 +179,8 @@ Status SyncTailTest::runOpsInitialSync(std::vector ops) { opsPtrs.push_back(&op); } AtomicUInt32 fetchCount(0); - return multiInitialSyncApply_noAbort(_opCtx.get(), &opsPtrs, &syncTail, &fetchCount); + WorkerMultikeyPathInfo pathInfo; + return multiInitialSyncApply_noAbort(_opCtx.get(), &opsPtrs, &syncTail, &fetchCount, &pathInfo); } diff --git a/src/mongo/db/repl/sync_tail_test_fixture.h b/src/mongo/db/repl/sync_tail_test_fixture.h index 435f126d877..cc8fb3b44c4 100644 --- a/src/mongo/db/repl/sync_tail_test_fixture.h +++ b/src/mongo/db/repl/sync_tail_test_fixture.h @@ -58,7 +58,7 @@ protected: ReplicationProcess* _replicationProcess = nullptr; // Implements the MultiApplier::ApplyOperationFn interface and does nothing. - static Status noopApplyOperationFn(MultiApplier::OperationPtrs*) { + static Status noopApplyOperationFn(MultiApplier::OperationPtrs*, WorkerMultikeyPathInfo*) { return Status::OK(); } diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp index ecabdabb8f8..f5f64fbd81f 100644 --- a/src/mongo/dbtests/storage_timestamp_tests.cpp +++ b/src/mongo/dbtests/storage_timestamp_tests.cpp @@ -1340,11 +1340,19 @@ public: // We add in an index creation op to test that we restart tracking multikey path info // after bulk index builds. - std::vector ops = {&op0, &createIndexOp, &op1, &op2}; + std::vector ops = {op0, createIndexOp, op1, op2}; - repl::SyncTail syncTail(nullptr, repl::SyncTail::MultiSyncApplyFunc(), nullptr); AtomicUInt32 fetchCount(0); - ASSERT_OK(repl::multiInitialSyncApply_noAbort(_opCtx, &ops, &syncTail, &fetchCount)); + repl::SyncTail syncTail(nullptr, repl::multiSyncApply); + repl::MultiApplier::ApplyOperationFn applyOpFn = [&fetchCount, &syncTail]( + repl::MultiApplier::OperationPtrs* ops, + WorkerMultikeyPathInfo* workerMultikeyPathInfo) { + return repl::multiInitialSyncApply(ops, &syncTail, &fetchCount, workerMultikeyPathInfo); + }; + + auto lastTime = + assertGet(repl::multiApply(_opCtx, syncTail.getWriterPool(), ops, applyOpFn)); + ASSERT_EQ(lastTime.getTimestamp(), insertTime2.asTimestamp()); AutoGetCollection autoColl(_opCtx, nss, LockMode::MODE_X, LockMode::MODE_IX); assertMultikeyPaths( -- cgit v1.2.1