From 9fe9ddc29177f1929884f00cf9220fbcd5d932f4 Mon Sep 17 00:00:00 2001 From: Mathias Stearn Date: Tue, 14 Jun 2016 15:51:40 -0400 Subject: SERVER-24242 Stop copying OplogEntries so much With this change, we should no longer be copying any OplogEntries during steady-state replication. --- src/mongo/db/repl/sync_tail.cpp | 177 +++++++++++++++++++--------------------- 1 file changed, 86 insertions(+), 91 deletions(-) (limited to 'src/mongo/db/repl/sync_tail.cpp') diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 84871d24249..7a96a27b064 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -458,14 +458,16 @@ void prefetchOps(const MultiApplier::Operations& ops, OldThreadPool* prefetcherP prefetcherPool->join(); } -// Doles out all the work to the writer pool threads and waits for them to complete -void applyOps(const std::vector& writerVectors, +// Doles out all the work to the writer pool threads. +// Does not modify writerVectors, but passes non-const pointers to inner vectors into func. +void applyOps(std::vector* writerVectors, OldThreadPool* writerPool, - MultiApplier::ApplyOperationFn func) { + const MultiApplier::ApplyOperationFn& func) { TimerHolder timer(&applyBatchStats); - for (auto&& ops : writerVectors) { + for (auto&& ops : *writerVectors) { if (!ops.empty()) { - writerPool->schedule(func, stdx::cref(ops)); + auto opsPtr = &ops; + writerPool->schedule([&func, opsPtr] { func(opsPtr); }); } } } @@ -500,9 +502,11 @@ private: StringMap _cache; }; +// This only modifies the isForCappedCollection field on each op. It does not alter the ops vector +// in any other way. void fillWriterVectors(OperationContext* txn, - const MultiApplier::Operations& ops, - std::vector* writerVectors) { + MultiApplier::Operations* ops, + std::vector* writerVectors) { const bool supportsDocLocking = getGlobalServiceContext()->getGlobalStorageEngine()->supportsDocLocking(); const uint32_t numWriters = writerVectors->size(); @@ -511,7 +515,7 @@ void fillWriterVectors(OperationContext* txn, CachingCappedChecker isCapped; - for (auto&& op : ops) { + for (auto&& op : *ops) { StringMapTraits::HashedKey hashedNs(op.ns); uint32_t hash = hashedNs.hash(); @@ -528,12 +532,13 @@ void fillWriterVectors(OperationContext* txn, if (op.opType == "i" && isCapped(txn, hashedNs)) { // Mark capped collection ops before storing them to ensure we do not attempt to bulk // insert them. - OplogEntry modifiedOp = op; - modifiedOp.isForCappedCollection = true; - (*writerVectors)[hash % numWriters].push_back(modifiedOp); - } else { - (*writerVectors)[hash % numWriters].push_back(op); + op.isForCappedCollection = true; } + + auto& writer = (*writerVectors)[hash % numWriters]; + if (writer.empty()) + writer.reserve(8); // skip a few growth rounds. + writer.push_back(&op); } } @@ -541,14 +546,9 @@ void fillWriterVectors(OperationContext* txn, // Applies a batch of oplog entries, by using a set of threads to apply the operations and then // writes the oplog entries to the local oplog. -OpTime SyncTail::multiApply(OperationContext* txn, const OpQueue& ops) { - auto convertToVector = ops.getDeque(); - auto applyOperation = [this](const MultiApplier::Operations& ops) { _applyFunc(ops, this); }; - auto status = - repl::multiApply(txn, - _writerPool.get(), - MultiApplier::Operations(convertToVector.begin(), convertToVector.end()), - applyOperation); +OpTime SyncTail::multiApply(OperationContext* txn, MultiApplier::Operations ops) { + auto applyOperation = [this](MultiApplier::OperationPtrs* ops) { _applyFunc(ops, this); }; + auto status = repl::multiApply(txn, _writerPool.get(), std::move(ops), applyOperation); if (!status.isOK()) { if (status == ErrorCodes::InterruptedAtShutdown) { return OpTime(); @@ -641,14 +641,14 @@ private: OpQueue ops; // tryPopAndWaitForMore returns true when we need to end a batch early while (!_syncTail->tryPopAndWaitForMore(&txn, &ops) && - (ops.getSize() < replBatchLimitBytes) && !_inShutdown.load()) { + (ops.getBytes() < replBatchLimitBytes) && !_inShutdown.load()) { int now = batchTimer.seconds(); // apply replication batch limits if (!ops.empty()) { if (now > replBatchLimitSeconds) break; - if (ops.getDeque().size() > replBatchLimitOperations) + if (ops.getCount() > replBatchLimitOperations) break; } @@ -716,32 +716,25 @@ void SyncTail::oplogApplication() { OpTime originalEndOpTime(minValidBoundaries.end); OpTime lastWriteOpTime{replCoord->getMyLastAppliedOpTime()}; while (!inShutdown()) { - OpQueue ops; - - do { - if (replCoord->getInitialSyncRequestedFlag()) { - // got a resync command - return; - } - - tryToGoLiveAsASecondary(&txn, replCoord, minValidBoundaries, lastWriteOpTime); - - // Blocks up to a second waiting for a batch to be ready to apply. If one doesn't become - // ready in time, we'll loop again so we can do the above checks periodically. - ops = batcher.getNextBatch(Seconds(1)); - } while (!inShutdown() && ops.empty()); - - if (inShutdown()) + if (replCoord->getInitialSyncRequestedFlag()) { + // got a resync command return; + } + + tryToGoLiveAsASecondary(&txn, replCoord, minValidBoundaries, lastWriteOpTime); - invariant(!ops.empty()); + // Blocks up to a second waiting for a batch to be ready to apply. If one doesn't become + // ready in time, we'll loop again so we can do the above checks periodically. + OpQueue ops = batcher.getNextBatch(Seconds(1)); + if (ops.empty()) + continue; // Try again. const BSONObj lastOp = ops.back().raw; if (lastOp.isEmpty()) { // This means that the network thread has coalesced and we have processed all of its // data. - invariant(ops.getDeque().size() == 1); + invariant(ops.getCount() == 1); if (replCoord->isWaitingForApplierToDrain()) { replCoord->signalDrainComplete(&txn); } @@ -793,11 +786,12 @@ void SyncTail::oplogApplication() { // This write will not journal/checkpoint. StorageInterface::get(&txn)->setMinValid(&txn, {start, end}); - lastWriteOpTime = multiApply(&txn, ops); + const size_t opsInBatch = ops.getCount(); + lastWriteOpTime = multiApply(&txn, ops.releaseBatch()); if (lastWriteOpTime.isNull()) { // fassert if oplog application failed for any reasons other than shutdown. - error() << "Failed to apply " << ops.getDeque().size() - << " operations - batch start:" << start << " end:" << end; + error() << "Failed to apply " << opsInBatch << " operations - batch start:" << start + << " end:" << end; fassert(34360, inShutdownStrict()); // Return without setting minvalid in the case of shutdown. return; @@ -819,37 +813,43 @@ void SyncTail::oplogApplication() { // queue. We can't block forever because there are maintenance things we need // to periodically check in the loop. bool SyncTail::tryPopAndWaitForMore(OperationContext* txn, SyncTail::OpQueue* ops) { - BSONObj op; - // Check to see if there are ops waiting in the bgsync queue - bool peek_success = peek(txn, &op); - - if (!peek_success) { - // if we don't have anything in the queue, wait a bit for something to appear - if (ops->empty()) { - // block up to 1 second - _networkQueue->waitForMore(txn); - return false; - } + { + BSONObj op; + // Check to see if there are ops waiting in the bgsync queue + bool peek_success = peek(txn, &op); + if (!peek_success) { + // if we don't have anything in the queue, wait a bit for something to appear + if (ops->empty()) { + // block up to 1 second + _networkQueue->waitForMore(txn); + return false; + } - // otherwise, apply what we have - return true; + // otherwise, apply what we have + return true; + } + ops->emplace_back(std::move(op)); } - auto entry = OplogEntry(op); + auto& entry = ops->back(); // Check for ops that must be processed one at a time. if (entry.raw.isEmpty() || // sentinel that network queue is drained. (entry.opType[0] == 'c') || // commands. - // Index builds are acheived through the use of an insert op, not a command op. + // Index builds are achieved through the use of an insert op, not a command op. // The following line is the same as what the insert code uses to detect an index build. (!entry.ns.empty() && nsToCollectionSubstring(entry.ns) == "system.indexes")) { - if (ops->empty()) { + if (ops->getCount() == 1) { // apply commands one-at-a-time - ops->push_back(std::move(entry)); _networkQueue->consume(txn); + } else { + // This op must be processed alone, but we already had ops in the queue so we can't + // include it in this batch. Since we didn't call consume(), we'll see this again next + // time and process it alone. + ops->pop_back(); } - // otherwise, apply what we have so far and come back for the command + // Apply what we have so far. return true; } @@ -863,12 +863,11 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* txn, SyncTail::OpQueue* op if (curVersion != OplogEntry::kOplogVersion) { severe() << "expected oplog version " << OplogEntry::kOplogVersion << " but found version " - << curVersion << " in oplog entry: " << op; + << curVersion << " in oplog entry: " << entry.raw; fassertFailedNoTrace(18820); } - // Copy the op to the deque and remove it from the bgsync queue. - ops->push_back(std::move(entry)); + // We are going to apply this Op. _networkQueue->consume(txn); // Go back for more ops @@ -1005,7 +1004,7 @@ static void initializeWriterThread() { } // This free function is used by the writer threads to apply each op -void multiSyncApply(const std::vector& ops, SyncTail*) { +void multiSyncApply(MultiApplier::OperationPtrs* ops, SyncTail*) { initializeWriterThread(); auto txn = cc().makeOperationContext(); auto syncApply = [](OperationContext* txn, const BSONObj& op, bool convertUpdateToUpsert) { @@ -1015,7 +1014,7 @@ void multiSyncApply(const std::vector& ops, SyncTail*) { } Status multiSyncApply_noAbort(OperationContext* txn, - const std::vector& oplogEntries, + MultiApplier::OperationPtrs* oplogEntryPointers, SyncApplyFn syncApply) { txn->setReplicatedWrites(false); DisableDocumentValidation validationDisabler(txn); @@ -1023,24 +1022,19 @@ Status multiSyncApply_noAbort(OperationContext* txn, // allow us to get through the magic barrier txn->lockState()->setIsBatchWriter(true); - std::vector oplogEntryPointers(oplogEntries.size()); - for (size_t i = 0; i < oplogEntries.size(); i++) { - oplogEntryPointers[i] = &oplogEntries[i]; - } - - if (oplogEntryPointers.size() > 1) { - std::stable_sort(oplogEntryPointers.begin(), - oplogEntryPointers.end(), + if (oplogEntryPointers->size() > 1) { + std::stable_sort(oplogEntryPointers->begin(), + oplogEntryPointers->end(), [](const OplogEntry* l, const OplogEntry* r) { return l->ns < r->ns; }); } bool convertUpdatesToUpserts = true; // doNotGroupBeforePoint is used to prevent retrying bad group inserts by marking the final op // of a failed group and not allowing further group inserts until that op has been processed. - auto doNotGroupBeforePoint = oplogEntryPointers.begin(); + auto doNotGroupBeforePoint = oplogEntryPointers->begin(); - for (auto oplogEntriesIterator = oplogEntryPointers.begin(); - oplogEntriesIterator != oplogEntryPointers.end(); + for (auto oplogEntriesIterator = oplogEntryPointers->begin(); + oplogEntriesIterator != oplogEntryPointers->end(); ++oplogEntriesIterator) { auto entry = *oplogEntriesIterator; if (entry->opType[0] == 'i' && !entry->isForCappedCollection && @@ -1051,7 +1045,7 @@ Status multiSyncApply_noAbort(OperationContext* txn, int batchCount = 0; auto endOfGroupableOpsIterator = std::find_if( oplogEntriesIterator + 1, - oplogEntryPointers.end(), + oplogEntryPointers->end(), [&](const OplogEntry* nextEntry) { return nextEntry->opType[0] != 'i' || // Must be an insert. nextEntry->ns != entry->ns || // Must be the same namespace. @@ -1133,14 +1127,14 @@ Status multiSyncApply_noAbort(OperationContext* txn, } // This free function is used by the initial sync writer threads to apply each op -void multiInitialSyncApply(const std::vector& ops, SyncTail* st) { +void multiInitialSyncApply(MultiApplier::OperationPtrs* ops, SyncTail* st) { initializeWriterThread(); auto txn = cc().makeOperationContext(); fassertNoTrace(15915, multiInitialSyncApply_noAbort(txn.get(), ops, st)); } Status multiInitialSyncApply_noAbort(OperationContext* txn, - const std::vector& ops, + MultiApplier::OperationPtrs* ops, SyncTail* st) { txn->setReplicatedWrites(false); DisableDocumentValidation validationDisabler(txn); @@ -1150,14 +1144,15 @@ Status multiInitialSyncApply_noAbort(OperationContext* txn, bool convertUpdatesToUpserts = false; - for (std::vector::const_iterator it = ops.begin(); it != ops.end(); ++it) { + for (auto it = ops->begin(); it != ops->end(); ++it) { + auto& entry = **it; try { - const Status s = SyncTail::syncApply(txn, it->raw, convertUpdatesToUpserts); + const Status s = SyncTail::syncApply(txn, entry.raw, convertUpdatesToUpserts); if (!s.isOK()) { - if (st->shouldRetry(txn, it->raw)) { - const Status s2 = SyncTail::syncApply(txn, it->raw, convertUpdatesToUpserts); + if (st->shouldRetry(txn, entry.raw)) { + const Status s2 = SyncTail::syncApply(txn, entry.raw, convertUpdatesToUpserts); if (!s2.isOK()) { - severe() << "Error applying operation (" << it->raw << "): " << s2; + severe() << "Error applying operation (" << entry.raw << "): " << s2; return s2; } } @@ -1167,7 +1162,7 @@ Status multiInitialSyncApply_noAbort(OperationContext* txn, // subsequently got deleted and no longer exists on the Sync Target at all } } catch (const DBException& e) { - severe() << "writer worker caught exception: " << causedBy(e) << " on: " << it->raw; + severe() << "writer worker caught exception: " << causedBy(e) << " on: " << entry.raw; if (inShutdown()) { return {ErrorCodes::InterruptedAtShutdown, e.toString()}; @@ -1182,7 +1177,7 @@ Status multiInitialSyncApply_noAbort(OperationContext* txn, StatusWith multiApply(OperationContext* txn, OldThreadPool* workerPool, - const MultiApplier::Operations& ops, + MultiApplier::Operations ops, MultiApplier::ApplyOperationFn applyOperation) { if (!txn) { return {ErrorCodes::BadValue, "invalid operation context"}; @@ -1205,9 +1200,9 @@ StatusWith multiApply(OperationContext* txn, prefetchOps(ops, workerPool); } - std::vector> writerVectors(workerPool->getNumThreads()); + std::vector writerVectors(workerPool->getNumThreads()); - fillWriterVectors(txn, ops, &writerVectors); + fillWriterVectors(txn, &ops, &writerVectors); LOG(2) << "replication batch size is " << ops.size(); // We must grab this because we're going to grab write locks later. // We hold this mutex the entire time we're writing; it doesn't matter @@ -1224,7 +1219,7 @@ StatusWith multiApply(OperationContext* txn, "attempting to replicate ops while primary"}; } - applyOps(writerVectors, workerPool, applyOperation); + applyOps(&writerVectors, workerPool, applyOperation); { ON_BLOCK_EXIT([&] { workerPool->join(); }); -- cgit v1.2.1