summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/sync_tail.cpp
diff options
context:
space:
mode:
authorMihai Andrei <mihai.andrei@mongodb.com>2019-09-12 19:59:08 +0000
committerevergreen <evergreen@mongodb.com>2019-09-12 19:59:08 +0000
commite7fa33abb338eda55653fe61665b5f17cc22b4d3 (patch)
tree321d255edc14c674a4d49df1e9f89efbd90eb344 /src/mongo/db/repl/sync_tail.cpp
parentb40e542972c082e85098c09298eb56436bf57abb (diff)
downloadmongo-e7fa33abb338eda55653fe61665b5f17cc22b4d3.tar.gz
SERVER-42995 Remove redundant SyncTail methods
Diffstat (limited to 'src/mongo/db/repl/sync_tail.cpp')
-rw-r--r--src/mongo/db/repl/sync_tail.cpp205
1 files changed, 117 insertions, 88 deletions
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 165ab31c7a3..af355139e1e 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -463,6 +463,69 @@ void tryToGoLiveAsASecondary(OperationContext* opCtx,
}
}
+/**
+ * Updates a CRUD op's hash and isForCappedCollection field if necessary.
+ */
+void processCrudOp(OperationContext* opCtx,
+ OplogEntry* op,
+ uint32_t* hash,
+ StringMapHashedKey* hashedNs) {
+ const auto serviceContext = opCtx->getServiceContext();
+ const auto storageEngine = serviceContext->getStorageEngine();
+ const bool supportsDocLocking = storageEngine->supportsDocLocking();
+ CachedCollectionProperties collPropertiesCache;
+ auto collProperties = collPropertiesCache.getCollectionProperties(opCtx, *hashedNs);
+
+ // For doc locking engines, include the _id of the document in the hash so we get
+ // parallelism even if all writes are to a single collection.
+ //
+ // For capped collections, this is illegal, since capped collections must preserve
+ // insertion order.
+ if (supportsDocLocking && !collProperties.isCapped) {
+ BSONElement id = op->getIdElement();
+ BSONElementComparator elementHasher(BSONElementComparator::FieldNamesMode::kIgnore,
+ collProperties.collator);
+ const size_t idHash = elementHasher.hash(id);
+ MurmurHash3_x86_32(&idHash, sizeof(idHash), *hash, hash);
+ }
+
+ if (op->getOpType() == OpTypeEnum::kInsert && collProperties.isCapped) {
+ // Mark capped collection ops before storing them to ensure we do not attempt to
+ // bulk insert them.
+ op->isForCappedCollection = true;
+ }
+}
+
+/**
+ * Adds a single oplog entry to the appropriate writer vector.
+ */
+void addToWriterVector(OplogEntry* op,
+ std::vector<MultiApplier::OperationPtrs>* writerVectors,
+ uint32_t hash) {
+ const uint32_t numWriters = writerVectors->size();
+ auto& writer = (*writerVectors)[hash % numWriters];
+ if (writer.empty()) {
+ writer.reserve(8); // Skip a few growth rounds
+ }
+ writer.push_back(op);
+}
+
+/**
+ * Adds a set of derivedOps to writerVectors.
+ */
+void addDerivedOps(OperationContext* opCtx,
+ MultiApplier::Operations* derivedOps,
+ std::vector<MultiApplier::OperationPtrs>* writerVectors) {
+ for (auto&& op : *derivedOps) {
+ auto hashedNs = StringMapHasher().hashed_key(op.getNss().ns());
+ uint32_t hash = static_cast<uint32_t>(hashedNs.hash());
+ if (op.isCrudOpType()) {
+ processCrudOp(opCtx, &op, &hash, &hashedNs);
+ }
+ addToWriterVector(&op, writerVectors, hash);
+ }
+}
+
} // namespace
class SyncTail::OpQueueBatcher {
@@ -627,20 +690,15 @@ private:
stdx::thread _thread; // Must be last so all other members are initialized before starting.
};
-void SyncTail::oplogApplication(OplogBuffer* oplogBuffer,
- OplogApplier::GetNextApplierBatchFn getNextApplierBatchFn,
- ReplicationCoordinator* replCoord) {
+void SyncTail::runLoop(OplogBuffer* oplogBuffer,
+ OplogApplier::GetNextApplierBatchFn getNextApplierBatchFn,
+ ReplicationCoordinator* replCoord) {
// We don't start data replication for arbiters at all and it's not allowed to reconfig
// arbiterOnly field for any member.
invariant(!replCoord->getMemberState().arbiter());
OpQueueBatcher batcher(this, _storageInterface, oplogBuffer, getNextApplierBatchFn);
- _oplogApplication(replCoord, &batcher);
-}
-
-void SyncTail::_oplogApplication(ReplicationCoordinator* replCoord,
- OpQueueBatcher* batcher) noexcept {
std::unique_ptr<ApplyBatchFinalizer> finalizer{
getGlobalServiceContext()->getStorageEngine()->isDurable()
? new ApplyBatchFinalizerForJournal(replCoord)
@@ -683,7 +741,7 @@ void SyncTail::_oplogApplication(ReplicationCoordinator* replCoord,
// Blocks up to a second waiting for a batch to be ready to apply. If one doesn't become
// ready in time, we'll loop again so we can do the above checks periodically.
- OpQueue ops = batcher->getNextBatch(Seconds(1));
+ OpQueue ops = batcher.getNextBatch(Seconds(1));
if (ops.empty()) {
if (ops.mustShutdown()) {
// Shut down and exit oplog application loop.
@@ -780,6 +838,10 @@ bool SyncTail::inShutdown() const {
Status syncApply(OperationContext* opCtx,
const OplogEntryBatch& batch,
OplogApplication::Mode oplogApplicationMode) {
+ // Guarantees that syncApply's context matches that of its calling function, multiSyncApply.
+ invariant(!opCtx->writesAreReplicated());
+ invariant(documentValidationDisabled(opCtx));
+
auto op = batch.getOp();
// Count each log op application as a separate operation, for reporting purposes
CurOp individualOp(opCtx);
@@ -789,11 +851,6 @@ Status syncApply(OperationContext* opCtx,
auto incrementOpsAppliedStats = [] { opsAppliedStats.increment(1); };
auto applyOp = [&](Database* db) {
- // For non-initial-sync, we convert updates to upserts
- // to suppress errors when replaying oplog entries.
- UnreplicatedWritesBlock uwb(opCtx);
- DisableDocumentValidation validationDisabler(opCtx);
-
// We convert updates to upserts when not in initial sync because after rollback and during
// startup we may replay an update after a delete and crash since we do not ignore
// errors. In initial sync we simply ignore these update errors so there is no reason to
@@ -974,18 +1031,13 @@ Status multiSyncApply(OperationContext* opCtx,
* with transactions.
* sessionUpdateTracker - if provided, keeps track of session info from ops.
*/
-void SyncTail::_fillWriterVectors(OperationContext* opCtx,
- MultiApplier::Operations* ops,
- std::vector<MultiApplier::OperationPtrs>* writerVectors,
- std::vector<MultiApplier::Operations>* derivedOps,
- SessionUpdateTracker* sessionUpdateTracker) noexcept {
- const auto serviceContext = opCtx->getServiceContext();
- const auto storageEngine = serviceContext->getStorageEngine();
+void SyncTail::_deriveOpsAndFillWriterVectors(
+ OperationContext* opCtx,
+ MultiApplier::Operations* ops,
+ std::vector<MultiApplier::OperationPtrs>* writerVectors,
+ std::vector<MultiApplier::Operations>* derivedOps,
+ SessionUpdateTracker* sessionUpdateTracker) noexcept {
- const bool supportsDocLocking = storageEngine->supportsDocLocking();
- const uint32_t numWriters = writerVectors->size();
-
- CachedCollectionProperties collPropertiesCache;
LogicalSessionIdMap<std::vector<OplogEntry*>> partialTxnOps;
for (auto&& op : *ops) {
@@ -1006,10 +1058,11 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx,
if (sessionUpdateTracker) {
if (auto newOplogWrites = sessionUpdateTracker->updateSession(op)) {
derivedOps->emplace_back(std::move(*newOplogWrites));
- _fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr);
+ addDerivedOps(opCtx, &derivedOps->back(), writerVectors);
}
}
+
// If this entry is part of a multi-oplog-entry transaction, ignore it until the commit.
// We must save it here because we are not guaranteed it has been written to the oplog
// yet.
@@ -1030,29 +1083,8 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx,
partialTxnList.clear();
}
- if (op.isCrudOpType()) {
- auto collProperties = collPropertiesCache.getCollectionProperties(opCtx, hashedNs);
-
- // For doc locking engines, include the _id of the document in the hash so we get
- // parallelism even if all writes are to a single collection.
- //
- // For capped collections, this is illegal, since capped collections must preserve
- // insertion order.
- if (supportsDocLocking && !collProperties.isCapped) {
- BSONElement id = op.getIdElement();
- BSONElementComparator elementHasher(BSONElementComparator::FieldNamesMode::kIgnore,
- collProperties.collator);
- const size_t idHash = elementHasher.hash(id);
- MurmurHash3_x86_32(&idHash, sizeof(idHash), hash, &hash);
- }
-
- if (op.getOpType() == OpTypeEnum::kInsert && collProperties.isCapped) {
- // Mark capped collection ops before storing them to ensure we do not attempt to
- // bulk insert them.
- op.isForCappedCollection = true;
- }
- }
-
+ if (op.isCrudOpType())
+ processCrudOp(opCtx, &op, &hash, &hashedNs);
// Extract applyOps operations and fill writers with extracted operations using this
// function.
if (op.isTerminalApplyOps()) {
@@ -1070,14 +1102,15 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx,
partialTxnList.clear();
// Transaction entries cannot have different session updates.
- _fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr);
+ addDerivedOps(opCtx, &derivedOps->back(), writerVectors);
} else {
// The applyOps entry was not generated as part of a transaction.
invariant(!op.getPrevWriteOpTimeInTransaction());
+
derivedOps->emplace_back(ApplyOps::extractOperations(op));
// Nested entries cannot have different session updates.
- _fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr);
+ addDerivedOps(opCtx, &derivedOps->back(), writerVectors);
}
continue;
}
@@ -1093,15 +1126,11 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx,
readTransactionOperationsFromOplogChain(opCtx, op, partialTxnList));
partialTxnList.clear();
- _fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr);
+ addDerivedOps(opCtx, &derivedOps->back(), writerVectors);
continue;
}
- auto& writer = (*writerVectors)[hash % numWriters];
- if (writer.empty()) {
- writer.reserve(8); // Skip a few growth rounds
- }
- writer.push_back(&op);
+ addToWriterVector(&op, writerVectors, hash);
}
}
@@ -1109,41 +1138,15 @@ void SyncTail::fillWriterVectors(OperationContext* opCtx,
MultiApplier::Operations* ops,
std::vector<MultiApplier::OperationPtrs>* writerVectors,
std::vector<MultiApplier::Operations>* derivedOps) noexcept {
+
SessionUpdateTracker sessionUpdateTracker;
- _fillWriterVectors(opCtx, ops, writerVectors, derivedOps, &sessionUpdateTracker);
+ _deriveOpsAndFillWriterVectors(opCtx, ops, writerVectors, derivedOps, &sessionUpdateTracker);
auto newOplogWrites = sessionUpdateTracker.flushAll();
if (!newOplogWrites.empty()) {
derivedOps->emplace_back(std::move(newOplogWrites));
- _fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr);
- }
-}
-
-void SyncTail::_applyOps(std::vector<MultiApplier::OperationPtrs>& writerVectors,
- std::vector<Status>* statusVector,
- std::vector<WorkerMultikeyPathInfo>* workerMultikeyPathInfo) {
- invariant(writerVectors.size() == statusVector->size());
- for (size_t i = 0; i < writerVectors.size(); i++) {
- if (writerVectors[i].empty())
- continue;
-
- _writerPool->schedule(
- [this,
- &writer = writerVectors.at(i),
- &status = statusVector->at(i),
- &workerMultikeyPathInfo = workerMultikeyPathInfo->at(i)](auto scheduleStatus) {
- invariant(scheduleStatus);
-
- auto opCtx = cc().makeOperationContext();
-
- // This code path is only executed on secondaries and initial syncing nodes, so it
- // is safe to exclude any writes from Flow Control.
- opCtx->setShouldParticipateInFlowControl(false);
-
- status = opCtx->runWithoutInterruptionExceptAtGlobalShutdown([&] {
- return _applyFunc(opCtx.get(), &writer, this, &workerMultikeyPathInfo);
- });
- });
+ _deriveOpsAndFillWriterVectors(
+ opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr);
}
}
@@ -1212,7 +1215,33 @@ StatusWith<OpTime> SyncTail::multiApply(OperationContext* opCtx, MultiApplier::O
{
std::vector<Status> statusVector(_writerPool->getStats().numThreads, Status::OK());
- _applyOps(writerVectors, &statusVector, &multikeyVector);
+
+ // Doles out all the work to the writer pool threads. writerVectors is not modified,
+ // but multiSyncApply will modify the vectors that it contains.
+ invariant(writerVectors.size() == statusVector.size());
+ for (size_t i = 0; i < writerVectors.size(); i++) {
+ if (writerVectors[i].empty())
+ continue;
+
+ _writerPool->schedule(
+ [this,
+ &writer = writerVectors.at(i),
+ &status = statusVector.at(i),
+ &multikeyVector = multikeyVector.at(i)](auto scheduleStatus) {
+ invariant(scheduleStatus);
+
+ auto opCtx = cc().makeOperationContext();
+
+ // This code path is only executed on secondaries and initial syncing nodes,
+ // so it is safe to exclude any writes from Flow Control.
+ opCtx->setShouldParticipateInFlowControl(false);
+
+ status = opCtx->runWithoutInterruptionExceptAtGlobalShutdown([&] {
+ return _applyFunc(opCtx.get(), &writer, this, &multikeyVector);
+ });
+ });
+ }
+
_writerPool->waitForIdle();
// If any of the statuses is not ok, return error.