summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/multi_key_path_tracker.cpp2
-rw-r--r--src/mongo/db/multi_key_path_tracker.h6
-rw-r--r--src/mongo/db/repl/data_replicator_external_state.h5
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_impl.cpp11
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_impl.h3
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_mock.cpp10
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_mock.h14
-rw-r--r--src/mongo/db/repl/initial_syncer.cpp6
-rw-r--r--src/mongo/db/repl/initial_syncer_test.cpp21
-rw-r--r--src/mongo/db/repl/multiapplier.h3
-rw-r--r--src/mongo/db/repl/multiapplier_test.cpp2
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state.h5
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp7
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.h3
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_mock.cpp5
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_mock.h3
-rw-r--r--src/mongo/db/repl/sync_tail.cpp103
-rw-r--r--src/mongo/db/repl/sync_tail.h26
-rw-r--r--src/mongo/db/repl/sync_tail_test.cpp126
-rw-r--r--src/mongo/db/repl/sync_tail_test_fixture.cpp3
-rw-r--r--src/mongo/db/repl/sync_tail_test_fixture.h2
-rw-r--r--src/mongo/dbtests/storage_timestamp_tests.cpp14
22 files changed, 258 insertions, 122 deletions
diff --git a/src/mongo/db/multi_key_path_tracker.cpp b/src/mongo/db/multi_key_path_tracker.cpp
index ecb1348d92a..43b5fd0567b 100644
--- a/src/mongo/db/multi_key_path_tracker.cpp
+++ b/src/mongo/db/multi_key_path_tracker.cpp
@@ -40,7 +40,7 @@ void MultikeyPathTracker::addMultikeyPathInfo(MultikeyPathInfo info) {
_multikeyPathInfo.emplace_back(info);
}
-const std::vector<MultikeyPathInfo>& MultikeyPathTracker::getMultikeyPathInfo() const {
+const WorkerMultikeyPathInfo& MultikeyPathTracker::getMultikeyPathInfo() const {
return _multikeyPathInfo;
}
diff --git a/src/mongo/db/multi_key_path_tracker.h b/src/mongo/db/multi_key_path_tracker.h
index 781ec9924cc..3f1d4800bab 100644
--- a/src/mongo/db/multi_key_path_tracker.h
+++ b/src/mongo/db/multi_key_path_tracker.h
@@ -41,6 +41,8 @@ struct MultikeyPathInfo {
MultikeyPaths multikeyPaths;
};
+using WorkerMultikeyPathInfo = std::vector<MultikeyPathInfo>;
+
/**
* An OperationContext decoration that tracks which indexes should be made multikey. This is used
* by IndexCatalogEntryImpl::setMultikey() to track what indexes should be set as multikey during
@@ -64,7 +66,7 @@ public:
/**
* Returns the multikey path information that has been saved.
*/
- const std::vector<MultikeyPathInfo>& getMultikeyPathInfo() const;
+ const WorkerMultikeyPathInfo& getMultikeyPathInfo() const;
/**
* Specifies that we should track multikey path information on this MultikeyPathTracker. This is
@@ -87,7 +89,7 @@ public:
private:
- std::vector<MultikeyPathInfo> _multikeyPathInfo;
+ WorkerMultikeyPathInfo _multikeyPathInfo;
bool _trackMultikeyPathInfo = false;
};
diff --git a/src/mongo/db/repl/data_replicator_external_state.h b/src/mongo/db/repl/data_replicator_external_state.h
index 1fc6fc8a9a5..818a9fdd38a 100644
--- a/src/mongo/db/repl/data_replicator_external_state.h
+++ b/src/mongo/db/repl/data_replicator_external_state.h
@@ -137,12 +137,15 @@ private:
/**
* Used by _multiApply() to write operations to database during initial sync. `fetchCount` is a
* pointer to a counter that is incremented every time we fetch a missing document.
+ * `workerMultikeyPathInfo` is a pointer to a list of objects tracking which indexes to set as
+ * multikey at the end of the batch. It should never be null.
*
* Used exclusively by the InitialSyncer to construct a MultiApplier.
*/
virtual Status _multiInitialSyncApply(MultiApplier::OperationPtrs* ops,
const HostAndPort& source,
- AtomicUInt32* fetchCount) = 0;
+ AtomicUInt32* fetchCount,
+ WorkerMultikeyPathInfo* workerMultikeyPathInfo) = 0;
// Provides InitialSyncer with access to _multiApply, _multiSyncApply and
// _multiInitialSyncApply.
diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.cpp b/src/mongo/db/repl/data_replicator_external_state_impl.cpp
index 64492dc63dd..49ea11001b1 100644
--- a/src/mongo/db/repl/data_replicator_external_state_impl.cpp
+++ b/src/mongo/db/repl/data_replicator_external_state_impl.cpp
@@ -126,10 +126,13 @@ StatusWith<OpTime> DataReplicatorExternalStateImpl::_multiApply(
return _replicationCoordinatorExternalState->multiApply(opCtx, std::move(ops), applyOperation);
}
-Status DataReplicatorExternalStateImpl::_multiInitialSyncApply(MultiApplier::OperationPtrs* ops,
- const HostAndPort& source,
- AtomicUInt32* fetchCount) {
- return _replicationCoordinatorExternalState->multiInitialSyncApply(ops, source, fetchCount);
+Status DataReplicatorExternalStateImpl::_multiInitialSyncApply(
+ MultiApplier::OperationPtrs* ops,
+ const HostAndPort& source,
+ AtomicUInt32* fetchCount,
+ WorkerMultikeyPathInfo* workerMultikeyPathInfo) {
+ return _replicationCoordinatorExternalState->multiInitialSyncApply(
+ ops, source, fetchCount, workerMultikeyPathInfo);
}
ReplicationCoordinator* DataReplicatorExternalStateImpl::getReplicationCoordinator() const {
diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.h b/src/mongo/db/repl/data_replicator_external_state_impl.h
index b39ea02fcb4..586409b2b50 100644
--- a/src/mongo/db/repl/data_replicator_external_state_impl.h
+++ b/src/mongo/db/repl/data_replicator_external_state_impl.h
@@ -72,7 +72,8 @@ private:
Status _multiInitialSyncApply(MultiApplier::OperationPtrs* ops,
const HostAndPort& source,
- AtomicUInt32* fetchCount) override;
+ AtomicUInt32* fetchCount,
+ WorkerMultikeyPathInfo* workerMultikeyPathInfo) override;
protected:
ReplicationCoordinator* getReplicationCoordinator() const;
diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.cpp b/src/mongo/db/repl/data_replicator_external_state_mock.cpp
index 73b921e5eb3..de00a40efbf 100644
--- a/src/mongo/db/repl/data_replicator_external_state_mock.cpp
+++ b/src/mongo/db/repl/data_replicator_external_state_mock.cpp
@@ -101,11 +101,13 @@ StatusWith<OpTime> DataReplicatorExternalStateMock::_multiApply(
return multiApplyFn(opCtx, std::move(ops), applyOperation);
}
-Status DataReplicatorExternalStateMock::_multiInitialSyncApply(MultiApplier::OperationPtrs* ops,
- const HostAndPort& source,
- AtomicUInt32* fetchCount) {
+Status DataReplicatorExternalStateMock::_multiInitialSyncApply(
+ MultiApplier::OperationPtrs* ops,
+ const HostAndPort& source,
+ AtomicUInt32* fetchCount,
+ WorkerMultikeyPathInfo* workerMultikeyPathInfo) {
- return multiInitialSyncApplyFn(ops, source, fetchCount);
+ return multiInitialSyncApplyFn(ops, source, fetchCount, workerMultikeyPathInfo);
}
} // namespace repl
diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.h b/src/mongo/db/repl/data_replicator_external_state_mock.h
index df14c7c10a6..c556b822315 100644
--- a/src/mongo/db/repl/data_replicator_external_state_mock.h
+++ b/src/mongo/db/repl/data_replicator_external_state_mock.h
@@ -89,10 +89,15 @@ public:
MultiApplier::MultiApplyFn multiApplyFn;
// Override to change _multiInitialSyncApply behavior.
- using MultiInitialSyncApplyFn = stdx::function<Status(
- MultiApplier::OperationPtrs* ops, const HostAndPort& source, AtomicUInt32* fetchCount)>;
+ using MultiInitialSyncApplyFn =
+ stdx::function<Status(MultiApplier::OperationPtrs* ops,
+ const HostAndPort& source,
+ AtomicUInt32* fetchCount,
+ WorkerMultikeyPathInfo* workerMultikeyPathInfo)>;
MultiInitialSyncApplyFn multiInitialSyncApplyFn = [](
- MultiApplier::OperationPtrs*, const HostAndPort&, AtomicUInt32*) { return Status::OK(); };
+ MultiApplier::OperationPtrs*, const HostAndPort&, AtomicUInt32*, WorkerMultikeyPathInfo*) {
+ return Status::OK();
+ };
StatusWith<ReplSetConfig> replSetConfigResult = ReplSetConfig();
@@ -103,7 +108,8 @@ private:
Status _multiInitialSyncApply(MultiApplier::OperationPtrs* ops,
const HostAndPort& source,
- AtomicUInt32* fetchCount) override;
+ AtomicUInt32* fetchCount,
+ WorkerMultikeyPathInfo* workerMultikeyPathInfo) override;
};
diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp
index f65dea7e86e..ed6a2086060 100644
--- a/src/mongo/db/repl/initial_syncer.cpp
+++ b/src/mongo/db/repl/initial_syncer.cpp
@@ -932,8 +932,10 @@ void InitialSyncer::_getNextApplierBatchCallback(
if (!ops.empty()) {
_fetchCount.store(0);
MultiApplier::ApplyOperationFn applyOperationsForEachReplicationWorkerThreadFn =
- [ =, source = _syncSource ](MultiApplier::OperationPtrs * x) {
- return _dataReplicatorExternalState->_multiInitialSyncApply(x, source, &_fetchCount);
+ [ =, source = _syncSource ](MultiApplier::OperationPtrs * x,
+ WorkerMultikeyPathInfo * workerMultikeyPathInfo) {
+ return _dataReplicatorExternalState->_multiInitialSyncApply(
+ x, source, &_fetchCount, workerMultikeyPathInfo);
};
MultiApplier::MultiApplyFn applyBatchOfOperationsFn =
[=](OperationContext* opCtx,
diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp
index 630629ff7a5..2d959fb8a5c 100644
--- a/src/mongo/db/repl/initial_syncer_test.cpp
+++ b/src/mongo/db/repl/initial_syncer_test.cpp
@@ -3619,18 +3619,21 @@ TEST_F(
const MultiApplier::Operations& ops,
MultiApplier::ApplyOperationFn applyOperation) {
// 'OperationPtr*' is ignored by our overridden _multiInitialSyncApply().
- applyOperation(nullptr).transitional_ignore();
+ ASSERT_OK(applyOperation(nullptr, nullptr));
return ops.back().getOpTime();
};
bool fetchCountIncremented = false;
- getExternalState()->multiInitialSyncApplyFn = [&fetchCountIncremented](
- MultiApplier::OperationPtrs*, const HostAndPort&, AtomicUInt32* fetchCount) {
- if (!fetchCountIncremented) {
- fetchCount->addAndFetch(1);
- fetchCountIncremented = true;
- }
- return Status::OK();
- };
+ getExternalState()->multiInitialSyncApplyFn =
+ [&fetchCountIncremented](MultiApplier::OperationPtrs*,
+ const HostAndPort&,
+ AtomicUInt32* fetchCount,
+ WorkerMultikeyPathInfo*) {
+ if (!fetchCountIncremented) {
+ fetchCount->addAndFetch(1);
+ fetchCountIncremented = true;
+ }
+ return Status::OK();
+ };
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
diff --git a/src/mongo/db/repl/multiapplier.h b/src/mongo/db/repl/multiapplier.h
index a81ed55b47f..e3844e8b0b6 100644
--- a/src/mongo/db/repl/multiapplier.h
+++ b/src/mongo/db/repl/multiapplier.h
@@ -38,6 +38,7 @@
#include "mongo/base/status.h"
#include "mongo/base/status_with.h"
#include "mongo/db/jsobj.h"
+#include "mongo/db/multi_key_path_tracker.h"
#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/service_context.h"
#include "mongo/executor/task_executor.h"
@@ -71,7 +72,7 @@ public:
* would have the same outcome as calling SyncTail::syncApply() (oplog application mode
* will be embedded in the function implementation).
*/
- using ApplyOperationFn = stdx::function<Status(OperationPtrs*)>;
+ using ApplyOperationFn = stdx::function<Status(OperationPtrs*, WorkerMultikeyPathInfo*)>;
using MultiApplyFn = stdx::function<StatusWith<OpTime>(
OperationContext*, MultiApplier::Operations, MultiApplier::ApplyOperationFn)>;
diff --git a/src/mongo/db/repl/multiapplier_test.cpp b/src/mongo/db/repl/multiapplier_test.cpp
index 792f77be583..5227eed6c20 100644
--- a/src/mongo/db/repl/multiapplier_test.cpp
+++ b/src/mongo/db/repl/multiapplier_test.cpp
@@ -59,7 +59,7 @@ void MultiApplierTest::setUp() {
launchExecutorThread();
}
-Status applyOperation(MultiApplier::OperationPtrs*) {
+Status applyOperation(MultiApplier::OperationPtrs*, WorkerMultikeyPathInfo*) {
return Status::OK();
};
diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h
index 3fe63ac7daf..0730e1bed42 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state.h
@@ -299,11 +299,14 @@ public:
/**
* Used by multiApply() to writes operations to database during initial sync. `fetchCount` is a
* pointer to a counter that is incremented every time we fetch a missing document.
+ * `workerMultikeyPathInfo` is a pointer to a list of objects tracking which indexes to set as
+ * multikey at the end of the batch.
*
*/
virtual Status multiInitialSyncApply(MultiApplier::OperationPtrs* ops,
const HostAndPort& source,
- AtomicUInt32* fetchCount) = 0;
+ AtomicUInt32* fetchCount,
+ WorkerMultikeyPathInfo* workerMultikeyPathInfo) = 0;
/**
* This function creates an oplog buffer of the type specified at server startup.
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index 6866e6137af..c4af85c09fd 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -880,14 +880,17 @@ StatusWith<OpTime> ReplicationCoordinatorExternalStateImpl::multiApply(
}
Status ReplicationCoordinatorExternalStateImpl::multiInitialSyncApply(
- MultiApplier::OperationPtrs* ops, const HostAndPort& source, AtomicUInt32* fetchCount) {
+ MultiApplier::OperationPtrs* ops,
+ const HostAndPort& source,
+ AtomicUInt32* fetchCount,
+ WorkerMultikeyPathInfo* workerMultikeyPathInfo) {
// repl::multiInitialSyncApply uses SyncTail::shouldRetry() (and implicitly getMissingDoc())
// to fetch missing documents during initial sync. Therefore, it is fine to construct SyncTail
// with invalid BackgroundSync, MultiSyncApplyFunc and writerPool arguments because we will not
// be accessing any SyncTail functionality that require these constructor parameters.
SyncTail syncTail(nullptr, SyncTail::MultiSyncApplyFunc(), nullptr);
syncTail.setHostname(source.toString());
- return repl::multiInitialSyncApply(ops, &syncTail, fetchCount);
+ return repl::multiInitialSyncApply(ops, &syncTail, fetchCount, workerMultikeyPathInfo);
}
std::unique_ptr<OplogBuffer> ReplicationCoordinatorExternalStateImpl::makeInitialSyncOplogBuffer(
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
index 49fffb04b79..4e8e0ca1eaf 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
@@ -111,7 +111,8 @@ public:
MultiApplier::ApplyOperationFn applyOperation) override;
virtual Status multiInitialSyncApply(MultiApplier::OperationPtrs* ops,
const HostAndPort& source,
- AtomicUInt32* fetchCount) override;
+ AtomicUInt32* fetchCount,
+ WorkerMultikeyPathInfo* workerMultikeyPathInfo) override;
virtual std::unique_ptr<OplogBuffer> makeInitialSyncOplogBuffer(
OperationContext* opCtx) const override;
virtual std::unique_ptr<OplogBuffer> makeSteadyStateOplogBuffer(
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
index 7042e7b7d8e..0cec9f77524 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
@@ -251,7 +251,10 @@ StatusWith<OpTime> ReplicationCoordinatorExternalStateMock::multiApply(
}
Status ReplicationCoordinatorExternalStateMock::multiInitialSyncApply(
- MultiApplier::OperationPtrs* ops, const HostAndPort& source, AtomicUInt32* fetchCount) {
+ MultiApplier::OperationPtrs* ops,
+ const HostAndPort& source,
+ AtomicUInt32* fetchCount,
+ WorkerMultikeyPathInfo* workerMultikeyPathInfo) {
return Status::OK();
}
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h
index caed8890585..f227fef5489 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h
@@ -99,7 +99,8 @@ public:
MultiApplier::ApplyOperationFn applyOperation) override;
virtual Status multiInitialSyncApply(MultiApplier::OperationPtrs* ops,
const HostAndPort& source,
- AtomicUInt32* fetchCount) override;
+ AtomicUInt32* fetchCount,
+ WorkerMultikeyPathInfo* workerMultikeyPathInfo) override;
virtual std::unique_ptr<OplogBuffer> makeInitialSyncOplogBuffer(
OperationContext* opCtx) const override;
virtual std::unique_ptr<OplogBuffer> makeSteadyStateOplogBuffer(
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 28561964f37..92dc3fe5aec 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -449,13 +449,14 @@ void prefetchOps(const MultiApplier::Operations& ops, OldThreadPool* prefetcherP
void applyOps(std::vector<MultiApplier::OperationPtrs>& writerVectors,
OldThreadPool* writerPool,
const MultiApplier::ApplyOperationFn& func,
- std::vector<Status>* statusVector) {
+ std::vector<Status>* statusVector,
+ std::vector<WorkerMultikeyPathInfo>* workerMultikeyPathInfo) {
invariant(writerVectors.size() == statusVector->size());
TimerHolder timer(&applyBatchStats);
for (size_t i = 0; i < writerVectors.size(); i++) {
if (!writerVectors[i].empty()) {
- writerPool->schedule([&func, &writerVectors, statusVector, i] {
- (*statusVector)[i] = func(&writerVectors[i]);
+ writerPool->schedule([&func, &writerVectors, statusVector, workerMultikeyPathInfo, i] {
+ (*statusVector)[i] = func(&writerVectors[i], &((*workerMultikeyPathInfo)[i]));
});
}
}
@@ -705,32 +706,16 @@ OpTime SyncTail::multiApply_forTest(OperationContext* opCtx, MultiApplier::Opera
* this batch, it will not be updated.
*/
OpTime SyncTail::multiApply(OperationContext* opCtx, MultiApplier::Operations ops) {
- auto applyOperation = [this](MultiApplier::OperationPtrs* ops) -> Status {
- _applyFunc(ops, this);
+ auto applyOperation = [this](MultiApplier::OperationPtrs* ops,
+ WorkerMultikeyPathInfo* workerMultikeyPathInfo) -> Status {
+ _applyFunc(ops, this, workerMultikeyPathInfo);
// This function is used by 3.2 initial sync and steady state data replication.
// _applyFunc() will throw or abort on error, so we return OK here.
return Status::OK();
};
- Timestamp firstTimeInBatch = ops.front().getTimestamp();
- OpTime finalOpTime = fassertStatusOK(
+ return fassertStatusOK(
34437, repl::multiApply(opCtx, _writerPool.get(), std::move(ops), applyOperation));
-
- invariant(!MultikeyPathTracker::get(opCtx).isTrackingMultikeyPathInfo());
- // Set any indexes to multikey that this batch ignored.
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- for (MultikeyPathInfo info : _multikeyPathInfo) {
- // We timestamp every multikey write with the first timestamp in the batch. It is always
- // safe to set an index as multikey too early, just not too late. We conservatively pick
- // the first timestamp in the batch since we do not have enough information to find out
- // the timestamp of the first write that set the given multikey path.
- fassertStatusOK(50686,
- StorageInterface::get(opCtx)->setIndexIsMultikey(
- opCtx, info.nss, info.indexName, info.multikeyPaths, firstTimeInBatch));
- }
- _multikeyPathInfo.clear();
-
- return finalOpTime;
}
namespace {
@@ -1243,30 +1228,28 @@ bool SyncTail::fetchAndInsertMissingDocument(OperationContext* opCtx,
});
}
-void SyncTail::addMultikeyPathInfo(std::vector<MultikeyPathInfo> infoList) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- _multikeyPathInfo.reserve(_multikeyPathInfo.size() + infoList.size());
- for (MultikeyPathInfo info : infoList) {
- _multikeyPathInfo.emplace_back(info);
- }
-}
-
// This free function is used by the writer threads to apply each op
-void multiSyncApply(MultiApplier::OperationPtrs* ops, SyncTail* st) {
+void multiSyncApply(MultiApplier::OperationPtrs* ops,
+ SyncTail* st,
+ WorkerMultikeyPathInfo* workerMultikeyPathInfo) {
initializeWriterThread();
auto opCtx = cc().makeOperationContext();
auto syncApply = [](
OperationContext* opCtx, const BSONObj& op, OplogApplication::Mode oplogApplicationMode) {
return SyncTail::syncApply(opCtx, op, oplogApplicationMode);
};
+ {
+ ON_BLOCK_EXIT(
+ [&opCtx] { MultikeyPathTracker::get(opCtx.get()).stopTrackingMultikeyPathInfo(); });
+ MultikeyPathTracker::get(opCtx.get()).startTrackingMultikeyPathInfo();
+ fassertNoTrace(16359, multiSyncApply_noAbort(opCtx.get(), ops, syncApply));
+ }
- ON_BLOCK_EXIT(
- [&opCtx] { MultikeyPathTracker::get(opCtx.get()).stopTrackingMultikeyPathInfo(); });
- MultikeyPathTracker::get(opCtx.get()).startTrackingMultikeyPathInfo();
- fassertNoTrace(16359, multiSyncApply_noAbort(opCtx.get(), ops, syncApply));
-
- if (!MultikeyPathTracker::get(opCtx.get()).getMultikeyPathInfo().empty()) {
- st->addMultikeyPathInfo(MultikeyPathTracker::get(opCtx.get()).getMultikeyPathInfo());
+ invariant(!MultikeyPathTracker::get(opCtx.get()).isTrackingMultikeyPathInfo());
+ invariant(workerMultikeyPathInfo->empty());
+ auto newPaths = MultikeyPathTracker::get(opCtx.get()).getMultikeyPathInfo();
+ if (!newPaths.empty()) {
+ workerMultikeyPathInfo->swap(newPaths);
}
}
@@ -1449,16 +1432,18 @@ Status multiSyncApply_noAbort(OperationContext* opCtx,
Status multiInitialSyncApply(MultiApplier::OperationPtrs* ops,
SyncTail* st,
- AtomicUInt32* fetchCount) {
+ AtomicUInt32* fetchCount,
+ WorkerMultikeyPathInfo* workerMultikeyPathInfo) {
initializeWriterThread();
auto opCtx = cc().makeOperationContext();
- return multiInitialSyncApply_noAbort(opCtx.get(), ops, st, fetchCount);
+ return multiInitialSyncApply_noAbort(opCtx.get(), ops, st, fetchCount, workerMultikeyPathInfo);
}
Status multiInitialSyncApply_noAbort(OperationContext* opCtx,
MultiApplier::OperationPtrs* ops,
SyncTail* st,
- AtomicUInt32* fetchCount) {
+ AtomicUInt32* fetchCount,
+ WorkerMultikeyPathInfo* workerMultikeyPathInfo) {
UnreplicatedWritesBlock uwb(opCtx);
DisableDocumentValidation validationDisabler(opCtx);
{ // Ensure that the MultikeyPathTracker stops tracking paths.
@@ -1503,17 +1488,12 @@ Status multiInitialSyncApply_noAbort(OperationContext* opCtx,
}
invariant(!MultikeyPathTracker::get(opCtx).isTrackingMultikeyPathInfo());
- // Set any indexes to multikey that this batch ignored.
- Timestamp firstTimeInBatch = ops->front()->getTimestamp();
- for (MultikeyPathInfo info : MultikeyPathTracker::get(opCtx).getMultikeyPathInfo()) {
- // We timestamp every multikey write with the first timestamp in the batch. It is always
- // safe to set an index as multikey too early, just not too late. We conservatively pick
- // the first timestamp in the batch since we do not have enough information to find out
- // the timestamp of the first write that set the given multikey path.
- fassertStatusOK(50685,
- StorageInterface::get(opCtx)->setIndexIsMultikey(
- opCtx, info.nss, info.indexName, info.multikeyPaths, firstTimeInBatch));
+ invariant(workerMultikeyPathInfo->empty());
+ auto newPaths = MultikeyPathTracker::get(opCtx).getMultikeyPathInfo();
+ if (!newPaths.empty()) {
+ workerMultikeyPathInfo->swap(newPaths);
}
+
return Status::OK();
}
@@ -1558,6 +1538,7 @@ StatusWith<OpTime> multiApply(OperationContext* opCtx,
}
std::vector<Status> statusVector(workerPool->getNumThreads(), Status::OK());
+ std::vector<WorkerMultikeyPathInfo> multikeyVector(workerPool->getNumThreads());
{
// We must wait for the all work we've dispatched to complete before leaving this block
// because the spawned threads refer to objects on the stack
@@ -1598,7 +1579,7 @@ StatusWith<OpTime> multiApply(OperationContext* opCtx,
consistencyMarkers->setOplogTruncateAfterPoint(opCtx, Timestamp());
consistencyMarkers->setMinValidToAtLeast(opCtx, ops.back().getOpTime());
- applyOps(writerVectors, workerPool, applyOperation, &statusVector);
+ applyOps(writerVectors, workerPool, applyOperation, &statusVector, &multikeyVector);
workerPool->join();
// Update the transaction table to point to the latest oplog entries for each session id.
@@ -1614,6 +1595,22 @@ StatusWith<OpTime> multiApply(OperationContext* opCtx,
storageEngine->replicationBatchIsComplete();
}
+ Timestamp firstTimeInBatch = ops.front().getTimestamp();
+ // Set any indexes to multikey that this batch ignored. This must be done while holding the
+ // parallel batch writer mutex.
+ for (WorkerMultikeyPathInfo infoVector : multikeyVector) {
+ for (MultikeyPathInfo info : infoVector) {
+ // We timestamp every multikey write with the first timestamp in the batch. It is always
+ // safe to set an index as multikey too early, just not too late. We conservatively pick
+ // the first timestamp in the batch since we do not have enough information to find out
+ // the timestamp of the first write that set the given multikey path.
+ fassertStatusOK(
+ 50686,
+ StorageInterface::get(opCtx)->setIndexIsMultikey(
+ opCtx, info.nss, info.indexName, info.multikeyPaths, firstTimeInBatch));
+ }
+ }
+
// If any of the statuses is not ok, return error.
for (auto& status : statusVector) {
if (!status.isOK()) {
diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h
index c4436bb4b1c..0b7d2cd2374 100644
--- a/src/mongo/db/repl/sync_tail.h
+++ b/src/mongo/db/repl/sync_tail.h
@@ -55,7 +55,9 @@ class OpTime;
*/
class SyncTail {
public:
- using MultiSyncApplyFunc = stdx::function<void(MultiApplier::OperationPtrs* ops, SyncTail* st)>;
+ using MultiSyncApplyFunc = stdx::function<void(MultiApplier::OperationPtrs* ops,
+ SyncTail* st,
+ WorkerMultikeyPathInfo* workerMultikeyPathInfo)>;
/**
* Type of function to increment "repl.apply.ops" server status metric.
@@ -226,12 +228,6 @@ public:
static AtomicInt32 replBatchLimitOperations;
/**
- * Adds the given multikey path information to the list of indexes to make multikey at the
- * end of the current batch.
- */
- void addMultikeyPathInfo(std::vector<MultikeyPathInfo> infoList);
-
- /**
* Passthrough function to test multiApply.
*/
OpTime multiApply_forTest(OperationContext* opCtx, MultiApplier::Operations ops);
@@ -256,12 +252,6 @@ private:
// persistent pool of worker threads for writing ops to the databases
std::unique_ptr<OldThreadPool> _writerPool;
-
- // Protects member variables below.
- mutable stdx::mutex _mutex;
-
- // Maintains the information for all indexes that must be set as multikey in the current batch.
- std::vector<MultikeyPathInfo> _multikeyPathInfo;
};
/**
@@ -282,12 +272,15 @@ StatusWith<OpTime> multiApply(OperationContext* opCtx,
// They consume the passed in OperationPtrs and callers should not make any assumptions about the
// state of the container after calling. However, these functions cannot modify the pointed-to
// operations because the OperationPtrs container contains const pointers.
-void multiSyncApply(MultiApplier::OperationPtrs* ops, SyncTail* st);
+void multiSyncApply(MultiApplier::OperationPtrs* ops,
+ SyncTail* st,
+ WorkerMultikeyPathInfo* workerMultikeyPathInfo);
// Used by 3.4 initial sync.
Status multiInitialSyncApply(MultiApplier::OperationPtrs* ops,
SyncTail* st,
- AtomicUInt32* fetchCount);
+ AtomicUInt32* fetchCount,
+ WorkerMultikeyPathInfo* workerMultikeyPathInfo);
/**
* Testing-only version of multiSyncApply that returns an error instead of aborting.
@@ -307,7 +300,8 @@ Status multiSyncApply_noAbort(OperationContext* opCtx,
Status multiInitialSyncApply_noAbort(OperationContext* opCtx,
MultiApplier::OperationPtrs* ops,
SyncTail* st,
- AtomicUInt32* fetchCount);
+ AtomicUInt32* fetchCount,
+ WorkerMultikeyPathInfo* workerMultikeyPathInfo);
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp
index 0a469ef7cd0..83b474cafa9 100644
--- a/src/mongo/db/repl/sync_tail_test.cpp
+++ b/src/mongo/db/repl/sync_tail_test.cpp
@@ -563,8 +563,8 @@ bool _testOplogEntryIsForCappedCollection(OperationContext* opCtx,
const CollectionOptions& options) {
auto writerPool = SyncTail::makeWriterPool();
MultiApplier::Operations operationsApplied;
- auto applyOperationFn =
- [&operationsApplied](MultiApplier::OperationPtrs* operationsToApply) -> Status {
+ auto applyOperationFn = [&operationsApplied](MultiApplier::OperationPtrs* operationsToApply,
+ WorkerMultikeyPathInfo*) -> Status {
for (auto&& opPtr : *operationsToApply) {
operationsApplied.push_back(*opPtr);
}
@@ -611,8 +611,9 @@ TEST_F(SyncTailTest, MultiApplyAssignsOperationsToWriterThreadsBasedOnNamespaceH
stdx::mutex mutex;
std::vector<MultiApplier::Operations> operationsApplied;
- auto applyOperationFn = [&mutex, &operationsApplied](
- MultiApplier::OperationPtrs* operationsForWriterThreadToApply) -> Status {
+ auto applyOperationFn =
+ [&mutex, &operationsApplied](MultiApplier::OperationPtrs* operationsForWriterThreadToApply,
+ WorkerMultikeyPathInfo*) -> Status {
stdx::lock_guard<stdx::mutex> lock(mutex);
operationsApplied.emplace_back();
for (auto&& opPtr : *operationsForWriterThreadToApply) {
@@ -782,12 +783,107 @@ TEST_F(SyncTailTest, MultiSyncApplyUsesSyncApplyToApplyOperation) {
_opCtx.reset();
MultiApplier::OperationPtrs ops = {&op};
- multiSyncApply(&ops, nullptr);
+ WorkerMultikeyPathInfo pathInfo;
+ multiSyncApply(&ops, nullptr, &pathInfo);
// Collection should be created after SyncTail::syncApply() processes operation.
_opCtx = cc().makeOperationContext();
ASSERT_TRUE(AutoGetCollectionForReadCommand(_opCtx.get(), nss).getCollection());
}
+void testWorkerMultikeyPaths(const OplogEntry& op, unsigned long numPaths) {
+ WorkerMultikeyPathInfo pathInfo;
+ MultiApplier::OperationPtrs ops = {&op};
+ multiSyncApply(&ops, nullptr, &pathInfo);
+ ASSERT_EQ(pathInfo.size(), numPaths);
+}
+
+TEST_F(SyncTailTest, MultiSyncApplyAddsWorkerMultikeyPathInfoOnInsert) {
+ NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName());
+ _opCtx.reset();
+
+ {
+ auto op = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss);
+ testWorkerMultikeyPaths(op, 0UL);
+ }
+
+ {
+ auto keyPattern = BSON("a" << 1);
+ auto op =
+ makeCreateIndexOplogEntry({Timestamp(Seconds(2), 0), 1LL}, nss, "a_1", keyPattern);
+ testWorkerMultikeyPaths(op, 0UL);
+ }
+
+ {
+ auto doc = BSON("_id" << 1 << "a" << BSON_ARRAY(4 << 5));
+ auto op = makeInsertDocumentOplogEntry({Timestamp(Seconds(3), 0), 1LL}, nss, doc);
+ testWorkerMultikeyPaths(op, 1UL);
+ }
+}
+
+TEST_F(SyncTailTest, MultiSyncApplyAddsMultipleWorkerMultikeyPathInfo) {
+ NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName());
+ _opCtx.reset();
+
+ {
+ auto op = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss);
+ testWorkerMultikeyPaths(op, 0UL);
+ }
+
+ {
+ auto keyPattern = BSON("a" << 1);
+ auto op =
+ makeCreateIndexOplogEntry({Timestamp(Seconds(2), 0), 1LL}, nss, "a_1", keyPattern);
+ testWorkerMultikeyPaths(op, 0UL);
+ }
+
+ {
+ auto keyPattern = BSON("b" << 1);
+ auto op =
+ makeCreateIndexOplogEntry({Timestamp(Seconds(3), 0), 1LL}, nss, "b_1", keyPattern);
+ testWorkerMultikeyPaths(op, 0UL);
+ }
+
+ {
+ auto docA = BSON("_id" << 1 << "a" << BSON_ARRAY(4 << 5));
+ auto opA = makeInsertDocumentOplogEntry({Timestamp(Seconds(4), 0), 1LL}, nss, docA);
+ auto docB = BSON("_id" << 2 << "b" << BSON_ARRAY(6 << 7));
+ auto opB = makeInsertDocumentOplogEntry({Timestamp(Seconds(5), 0), 1LL}, nss, docB);
+ WorkerMultikeyPathInfo pathInfo;
+ MultiApplier::OperationPtrs ops = {&opA, &opB};
+ multiSyncApply(&ops, nullptr, &pathInfo);
+ ASSERT_EQ(pathInfo.size(), 2UL);
+ }
+}
+
+TEST_F(SyncTailTest, MultiSyncApplyDoesNotAddWorkerMultikeyPathInfoOnCreateIndex) {
+ NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName());
+ _opCtx.reset();
+
+ {
+ auto op = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss);
+ testWorkerMultikeyPaths(op, 0UL);
+ }
+
+ {
+ auto doc = BSON("_id" << 1 << "a" << BSON_ARRAY(4 << 5));
+ auto op = makeInsertDocumentOplogEntry({Timestamp(Seconds(2), 0), 1LL}, nss, doc);
+ testWorkerMultikeyPaths(op, 0UL);
+ }
+
+ {
+ auto keyPattern = BSON("a" << 1);
+ auto op =
+ makeCreateIndexOplogEntry({Timestamp(Seconds(3), 0), 1LL}, nss, "a_1", keyPattern);
+ testWorkerMultikeyPaths(op, 0UL);
+ }
+
+ {
+ auto doc = BSON("_id" << 2 << "a" << BSON_ARRAY(6 << 7));
+ auto op = makeInsertDocumentOplogEntry({Timestamp(Seconds(4), 0), 1LL}, nss, doc);
+ testWorkerMultikeyPaths(op, 0UL);
+ }
+}
+
DEATH_TEST_F(SyncTailTest,
MultiSyncApplyFailsWhenCollectionCreationTriesToMakeUUID,
"Attempted to create a new collection") {
@@ -798,7 +894,7 @@ DEATH_TEST_F(SyncTailTest,
auto op = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss);
_opCtx.reset();
MultiApplier::OperationPtrs ops = {&op};
- multiSyncApply(&ops, nullptr);
+ multiSyncApply(&ops, nullptr, nullptr);
}
TEST_F(SyncTailTest, MultiInitialSyncApplyFailsWhenCollectionCreationTriesToMakeUUID) {
@@ -809,7 +905,8 @@ TEST_F(SyncTailTest, MultiInitialSyncApplyFailsWhenCollectionCreationTriesToMake
_opCtx.reset();
MultiApplier::OperationPtrs ops = {&op};
- ASSERT_EQUALS(ErrorCodes::InvalidOptions, multiInitialSyncApply(&ops, nullptr, nullptr));
+ ASSERT_EQUALS(ErrorCodes::InvalidOptions,
+ multiInitialSyncApply(&ops, nullptr, nullptr, nullptr));
}
TEST_F(SyncTailTest, MultiSyncApplyDisablesDocumentValidationWhileApplyingOperations) {
@@ -1170,7 +1267,8 @@ TEST_F(SyncTailTest, MultiInitialSyncApplyDisablesDocumentValidationWhileApplyin
{Timestamp(Seconds(1), 0), 1LL}, nss, BSON("_id" << 0), BSON("_id" << 0 << "x" << 2));
MultiApplier::OperationPtrs ops = {&op};
AtomicUInt32 fetchCount(0);
- ASSERT_OK(multiInitialSyncApply_noAbort(_opCtx.get(), &ops, &syncTail, &fetchCount));
+ WorkerMultikeyPathInfo pathInfo;
+ ASSERT_OK(multiInitialSyncApply_noAbort(_opCtx.get(), &ops, &syncTail, &fetchCount, &pathInfo));
ASSERT_EQUALS(fetchCount.load(), 1U);
}
@@ -1189,7 +1287,8 @@ TEST_F(SyncTailTest, MultiInitialSyncApplyIgnoresUpdateOperationIfDocumentIsMiss
{Timestamp(Seconds(1), 0), 1LL}, nss, BSON("_id" << 0), BSON("_id" << 0 << "x" << 2));
MultiApplier::OperationPtrs ops = {&op};
AtomicUInt32 fetchCount(0);
- ASSERT_OK(multiInitialSyncApply_noAbort(_opCtx.get(), &ops, &syncTail, &fetchCount));
+ WorkerMultikeyPathInfo pathInfo;
+ ASSERT_OK(multiInitialSyncApply_noAbort(_opCtx.get(), &ops, &syncTail, &fetchCount, &pathInfo));
// Since the missing document is not found on the sync source, the collection referenced by
// the failed operation should not be automatically created.
@@ -1211,7 +1310,8 @@ TEST_F(SyncTailTest, MultiInitialSyncApplySkipsDocumentOnNamespaceNotFound) {
auto op3 = makeInsertDocumentOplogEntry({Timestamp(Seconds(4), 0), 1LL}, nss, doc3);
MultiApplier::OperationPtrs ops = {&op0, &op1, &op2, &op3};
AtomicUInt32 fetchCount(0);
- ASSERT_OK(multiInitialSyncApply_noAbort(_opCtx.get(), &ops, &syncTail, &fetchCount));
+ WorkerMultikeyPathInfo pathInfo;
+ ASSERT_OK(multiInitialSyncApply_noAbort(_opCtx.get(), &ops, &syncTail, &fetchCount, &pathInfo));
ASSERT_EQUALS(fetchCount.load(), 0U);
OplogInterfaceLocal collectionReader(_opCtx.get(), nss.ns());
@@ -1236,7 +1336,8 @@ TEST_F(SyncTailTest, MultiInitialSyncApplySkipsIndexCreationOnNamespaceNotFound)
auto op3 = makeInsertDocumentOplogEntry({Timestamp(Seconds(4), 0), 1LL}, nss, doc3);
MultiApplier::OperationPtrs ops = {&op0, &op1, &op2, &op3};
AtomicUInt32 fetchCount(0);
- ASSERT_OK(multiInitialSyncApply_noAbort(_opCtx.get(), &ops, &syncTail, &fetchCount));
+ WorkerMultikeyPathInfo pathInfo;
+ ASSERT_OK(multiInitialSyncApply_noAbort(_opCtx.get(), &ops, &syncTail, &fetchCount, &pathInfo));
ASSERT_EQUALS(fetchCount.load(), 0U);
OplogInterfaceLocal collectionReader(_opCtx.get(), nss.ns());
@@ -1259,7 +1360,8 @@ TEST_F(SyncTailTest,
{Timestamp(Seconds(1), 0), 1LL}, nss, BSON("_id" << 0), updatedDocument);
MultiApplier::OperationPtrs ops = {&op};
AtomicUInt32 fetchCount(0);
- ASSERT_OK(multiInitialSyncApply_noAbort(_opCtx.get(), &ops, &syncTail, &fetchCount));
+ WorkerMultikeyPathInfo pathInfo;
+ ASSERT_OK(multiInitialSyncApply_noAbort(_opCtx.get(), &ops, &syncTail, &fetchCount, &pathInfo));
ASSERT_EQUALS(fetchCount.load(), 1U);
// The collection referenced by "ns" in the failed operation is automatically created to hold
diff --git a/src/mongo/db/repl/sync_tail_test_fixture.cpp b/src/mongo/db/repl/sync_tail_test_fixture.cpp
index 88c9908ba55..4953a93ec41 100644
--- a/src/mongo/db/repl/sync_tail_test_fixture.cpp
+++ b/src/mongo/db/repl/sync_tail_test_fixture.cpp
@@ -179,7 +179,8 @@ Status SyncTailTest::runOpsInitialSync(std::vector<OplogEntry> ops) {
opsPtrs.push_back(&op);
}
AtomicUInt32 fetchCount(0);
- return multiInitialSyncApply_noAbort(_opCtx.get(), &opsPtrs, &syncTail, &fetchCount);
+ WorkerMultikeyPathInfo pathInfo;
+ return multiInitialSyncApply_noAbort(_opCtx.get(), &opsPtrs, &syncTail, &fetchCount, &pathInfo);
}
diff --git a/src/mongo/db/repl/sync_tail_test_fixture.h b/src/mongo/db/repl/sync_tail_test_fixture.h
index 435f126d877..cc8fb3b44c4 100644
--- a/src/mongo/db/repl/sync_tail_test_fixture.h
+++ b/src/mongo/db/repl/sync_tail_test_fixture.h
@@ -58,7 +58,7 @@ protected:
ReplicationProcess* _replicationProcess = nullptr;
// Implements the MultiApplier::ApplyOperationFn interface and does nothing.
- static Status noopApplyOperationFn(MultiApplier::OperationPtrs*) {
+ static Status noopApplyOperationFn(MultiApplier::OperationPtrs*, WorkerMultikeyPathInfo*) {
return Status::OK();
}
diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp
index ecabdabb8f8..f5f64fbd81f 100644
--- a/src/mongo/dbtests/storage_timestamp_tests.cpp
+++ b/src/mongo/dbtests/storage_timestamp_tests.cpp
@@ -1340,11 +1340,19 @@ public:
// We add in an index creation op to test that we restart tracking multikey path info
// after bulk index builds.
- std::vector<const repl::OplogEntry*> ops = {&op0, &createIndexOp, &op1, &op2};
+ std::vector<repl::OplogEntry> ops = {op0, createIndexOp, op1, op2};
- repl::SyncTail syncTail(nullptr, repl::SyncTail::MultiSyncApplyFunc(), nullptr);
AtomicUInt32 fetchCount(0);
- ASSERT_OK(repl::multiInitialSyncApply_noAbort(_opCtx, &ops, &syncTail, &fetchCount));
+ repl::SyncTail syncTail(nullptr, repl::multiSyncApply);
+ repl::MultiApplier::ApplyOperationFn applyOpFn = [&fetchCount, &syncTail](
+ repl::MultiApplier::OperationPtrs* ops,
+ WorkerMultikeyPathInfo* workerMultikeyPathInfo) {
+ return repl::multiInitialSyncApply(ops, &syncTail, &fetchCount, workerMultikeyPathInfo);
+ };
+
+ auto lastTime =
+ assertGet(repl::multiApply(_opCtx, syncTail.getWriterPool(), ops, applyOpFn));
+ ASSERT_EQ(lastTime.getTimestamp(), insertTime2.asTimestamp());
AutoGetCollection autoColl(_opCtx, nss, LockMode::MODE_X, LockMode::MODE_IX);
assertMultikeyPaths(