summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2018-02-12 18:45:10 -0500
committerBenety Goh <benety@mongodb.com>2018-02-12 18:45:22 -0500
commitca0a855dfc0f479d85b76a640b12a259c0547310 (patch)
treebea7315422cc08c8c869e33f4e79ea0dab11ae13
parent1829b17965a7bdf37a3933bf8625c434d6687fe0 (diff)
downloadmongo-ca0a855dfc0f479d85b76a640b12a259c0547310.tar.gz
SERVER-32913 parallelize application of applyOps oplog entriesr3.7.2
-rw-r--r--src/mongo/db/repl/sync_tail.cpp24
1 files changed, 23 insertions, 1 deletions
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index a6f55483fb7..43ee3f9a0cb 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -58,6 +58,7 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/prefetch.h"
#include "mongo/db/query/query_knobs.h"
+#include "mongo/db/repl/apply_ops.h"
#include "mongo/db/repl/bgsync.h"
#include "mongo/db/repl/initial_syncer.h"
#include "mongo/db/repl/multiapplier.h"
@@ -1556,8 +1557,29 @@ StatusWith<OpTime> multiApply(OperationContext* opCtx,
consistencyMarkers->setOplogTruncateAfterPoint(opCtx, ops.front().getTimestamp());
scheduleWritesToOplog(opCtx, workerPool, ops);
+ // Used by fillWriterVectors() only. May be overridden to point to extracted applyOps
+ // operations.
+ MultiApplier::Operations* opsPtr = &ops;
+
+ // Holds extracted applyOps operations.
+ // The operations in 'applyOpsOperations', rather than the original applyOps command, will
+ // be processed by the writer threads.
+ MultiApplier::Operations applyOpsOperations;
+ const auto& firstOplogEntry = ops.front();
+ if (storageEngine->supportsDocLocking() && firstOplogEntry.isCommand() &&
+ OplogEntry::CommandType::kApplyOps == firstOplogEntry.getCommandType()) {
+ try {
+ applyOpsOperations = ApplyOps::extractOperations(firstOplogEntry);
+ opsPtr = &applyOpsOperations;
+ } catch (...) {
+ warning() << "Unable to extract operations from applyOps "
+ << redact(firstOplogEntry.toBSON()) << ": " << exceptionToStatus()
+ << ". Applying as standalone command.";
+ }
+ }
+
std::vector<MultiApplier::OperationPtrs> writerVectors(workerPool->getNumThreads());
- fillWriterVectors(opCtx, &ops, &writerVectors);
+ fillWriterVectors(opCtx, opsPtr, &writerVectors);
// Wait for writes to finish before applying ops.
workerPool->join();