summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/sync_tail.cpp
diff options
context:
space:
mode:
authorMathias Stearn <mathias@10gen.com>2016-06-14 15:51:40 -0400
committerMathias Stearn <mathias@10gen.com>2016-06-22 16:04:36 -0400
commit9fe9ddc29177f1929884f00cf9220fbcd5d932f4 (patch)
tree1fe42a12d5daed8f920d50b087589f0205337788 /src/mongo/db/repl/sync_tail.cpp
parente508ddcb51eec941ae50d9c2efb06b601811dc19 (diff)
downloadmongo-9fe9ddc29177f1929884f00cf9220fbcd5d932f4.tar.gz
SERVER-24242 Stop copying OplogEntries so much
With this change, we should no longer be copying any OplogEntries during steady-state replication.
Diffstat (limited to 'src/mongo/db/repl/sync_tail.cpp')
-rw-r--r--src/mongo/db/repl/sync_tail.cpp177
1 files changed, 86 insertions, 91 deletions
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 84871d24249..7a96a27b064 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -458,14 +458,16 @@ void prefetchOps(const MultiApplier::Operations& ops, OldThreadPool* prefetcherP
prefetcherPool->join();
}
-// Doles out all the work to the writer pool threads and waits for them to complete
-void applyOps(const std::vector<MultiApplier::Operations>& writerVectors,
+// Doles out all the work to the writer pool threads.
+// Does not modify writerVectors, but passes non-const pointers to inner vectors into func.
+void applyOps(std::vector<MultiApplier::OperationPtrs>* writerVectors,
OldThreadPool* writerPool,
- MultiApplier::ApplyOperationFn func) {
+ const MultiApplier::ApplyOperationFn& func) {
TimerHolder timer(&applyBatchStats);
- for (auto&& ops : writerVectors) {
+ for (auto&& ops : *writerVectors) {
if (!ops.empty()) {
- writerPool->schedule(func, stdx::cref(ops));
+ auto opsPtr = &ops;
+ writerPool->schedule([&func, opsPtr] { func(opsPtr); });
}
}
}
@@ -500,9 +502,11 @@ private:
StringMap<bool> _cache;
};
+// This only modifies the isForCappedCollection field on each op. It does not alter the ops vector
+// in any other way.
void fillWriterVectors(OperationContext* txn,
- const MultiApplier::Operations& ops,
- std::vector<MultiApplier::Operations>* writerVectors) {
+ MultiApplier::Operations* ops,
+ std::vector<MultiApplier::OperationPtrs>* writerVectors) {
const bool supportsDocLocking =
getGlobalServiceContext()->getGlobalStorageEngine()->supportsDocLocking();
const uint32_t numWriters = writerVectors->size();
@@ -511,7 +515,7 @@ void fillWriterVectors(OperationContext* txn,
CachingCappedChecker isCapped;
- for (auto&& op : ops) {
+ for (auto&& op : *ops) {
StringMapTraits::HashedKey hashedNs(op.ns);
uint32_t hash = hashedNs.hash();
@@ -528,12 +532,13 @@ void fillWriterVectors(OperationContext* txn,
if (op.opType == "i" && isCapped(txn, hashedNs)) {
// Mark capped collection ops before storing them to ensure we do not attempt to bulk
// insert them.
- OplogEntry modifiedOp = op;
- modifiedOp.isForCappedCollection = true;
- (*writerVectors)[hash % numWriters].push_back(modifiedOp);
- } else {
- (*writerVectors)[hash % numWriters].push_back(op);
+ op.isForCappedCollection = true;
}
+
+ auto& writer = (*writerVectors)[hash % numWriters];
+ if (writer.empty())
+ writer.reserve(8); // skip a few growth rounds.
+ writer.push_back(&op);
}
}
@@ -541,14 +546,9 @@ void fillWriterVectors(OperationContext* txn,
// Applies a batch of oplog entries, by using a set of threads to apply the operations and then
// writes the oplog entries to the local oplog.
-OpTime SyncTail::multiApply(OperationContext* txn, const OpQueue& ops) {
- auto convertToVector = ops.getDeque();
- auto applyOperation = [this](const MultiApplier::Operations& ops) { _applyFunc(ops, this); };
- auto status =
- repl::multiApply(txn,
- _writerPool.get(),
- MultiApplier::Operations(convertToVector.begin(), convertToVector.end()),
- applyOperation);
+OpTime SyncTail::multiApply(OperationContext* txn, MultiApplier::Operations ops) {
+ auto applyOperation = [this](MultiApplier::OperationPtrs* ops) { _applyFunc(ops, this); };
+ auto status = repl::multiApply(txn, _writerPool.get(), std::move(ops), applyOperation);
if (!status.isOK()) {
if (status == ErrorCodes::InterruptedAtShutdown) {
return OpTime();
@@ -641,14 +641,14 @@ private:
OpQueue ops;
// tryPopAndWaitForMore returns true when we need to end a batch early
while (!_syncTail->tryPopAndWaitForMore(&txn, &ops) &&
- (ops.getSize() < replBatchLimitBytes) && !_inShutdown.load()) {
+ (ops.getBytes() < replBatchLimitBytes) && !_inShutdown.load()) {
int now = batchTimer.seconds();
// apply replication batch limits
if (!ops.empty()) {
if (now > replBatchLimitSeconds)
break;
- if (ops.getDeque().size() > replBatchLimitOperations)
+ if (ops.getCount() > replBatchLimitOperations)
break;
}
@@ -716,32 +716,25 @@ void SyncTail::oplogApplication() {
OpTime originalEndOpTime(minValidBoundaries.end);
OpTime lastWriteOpTime{replCoord->getMyLastAppliedOpTime()};
while (!inShutdown()) {
- OpQueue ops;
-
- do {
- if (replCoord->getInitialSyncRequestedFlag()) {
- // got a resync command
- return;
- }
-
- tryToGoLiveAsASecondary(&txn, replCoord, minValidBoundaries, lastWriteOpTime);
-
- // Blocks up to a second waiting for a batch to be ready to apply. If one doesn't become
- // ready in time, we'll loop again so we can do the above checks periodically.
- ops = batcher.getNextBatch(Seconds(1));
- } while (!inShutdown() && ops.empty());
-
- if (inShutdown())
+ if (replCoord->getInitialSyncRequestedFlag()) {
+ // got a resync command
return;
+ }
+
+ tryToGoLiveAsASecondary(&txn, replCoord, minValidBoundaries, lastWriteOpTime);
- invariant(!ops.empty());
+ // Blocks up to a second waiting for a batch to be ready to apply. If one doesn't become
+ // ready in time, we'll loop again so we can do the above checks periodically.
+ OpQueue ops = batcher.getNextBatch(Seconds(1));
+ if (ops.empty())
+ continue; // Try again.
const BSONObj lastOp = ops.back().raw;
if (lastOp.isEmpty()) {
// This means that the network thread has coalesced and we have processed all of its
// data.
- invariant(ops.getDeque().size() == 1);
+ invariant(ops.getCount() == 1);
if (replCoord->isWaitingForApplierToDrain()) {
replCoord->signalDrainComplete(&txn);
}
@@ -793,11 +786,12 @@ void SyncTail::oplogApplication() {
// This write will not journal/checkpoint.
StorageInterface::get(&txn)->setMinValid(&txn, {start, end});
- lastWriteOpTime = multiApply(&txn, ops);
+ const size_t opsInBatch = ops.getCount();
+ lastWriteOpTime = multiApply(&txn, ops.releaseBatch());
if (lastWriteOpTime.isNull()) {
// fassert if oplog application failed for any reasons other than shutdown.
- error() << "Failed to apply " << ops.getDeque().size()
- << " operations - batch start:" << start << " end:" << end;
+ error() << "Failed to apply " << opsInBatch << " operations - batch start:" << start
+ << " end:" << end;
fassert(34360, inShutdownStrict());
// Return without setting minvalid in the case of shutdown.
return;
@@ -819,37 +813,43 @@ void SyncTail::oplogApplication() {
// queue. We can't block forever because there are maintenance things we need
// to periodically check in the loop.
bool SyncTail::tryPopAndWaitForMore(OperationContext* txn, SyncTail::OpQueue* ops) {
- BSONObj op;
- // Check to see if there are ops waiting in the bgsync queue
- bool peek_success = peek(txn, &op);
-
- if (!peek_success) {
- // if we don't have anything in the queue, wait a bit for something to appear
- if (ops->empty()) {
- // block up to 1 second
- _networkQueue->waitForMore(txn);
- return false;
- }
+ {
+ BSONObj op;
+ // Check to see if there are ops waiting in the bgsync queue
+ bool peek_success = peek(txn, &op);
+ if (!peek_success) {
+ // if we don't have anything in the queue, wait a bit for something to appear
+ if (ops->empty()) {
+ // block up to 1 second
+ _networkQueue->waitForMore(txn);
+ return false;
+ }
- // otherwise, apply what we have
- return true;
+ // otherwise, apply what we have
+ return true;
+ }
+ ops->emplace_back(std::move(op));
}
- auto entry = OplogEntry(op);
+ auto& entry = ops->back();
// Check for ops that must be processed one at a time.
if (entry.raw.isEmpty() || // sentinel that network queue is drained.
(entry.opType[0] == 'c') || // commands.
- // Index builds are acheived through the use of an insert op, not a command op.
+ // Index builds are achieved through the use of an insert op, not a command op.
// The following line is the same as what the insert code uses to detect an index build.
(!entry.ns.empty() && nsToCollectionSubstring(entry.ns) == "system.indexes")) {
- if (ops->empty()) {
+ if (ops->getCount() == 1) {
// apply commands one-at-a-time
- ops->push_back(std::move(entry));
_networkQueue->consume(txn);
+ } else {
+ // This op must be processed alone, but we already had ops in the queue so we can't
+ // include it in this batch. Since we didn't call consume(), we'll see this again next
+ // time and process it alone.
+ ops->pop_back();
}
- // otherwise, apply what we have so far and come back for the command
+ // Apply what we have so far.
return true;
}
@@ -863,12 +863,11 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* txn, SyncTail::OpQueue* op
if (curVersion != OplogEntry::kOplogVersion) {
severe() << "expected oplog version " << OplogEntry::kOplogVersion << " but found version "
- << curVersion << " in oplog entry: " << op;
+ << curVersion << " in oplog entry: " << entry.raw;
fassertFailedNoTrace(18820);
}
- // Copy the op to the deque and remove it from the bgsync queue.
- ops->push_back(std::move(entry));
+ // We are going to apply this Op.
_networkQueue->consume(txn);
// Go back for more ops
@@ -1005,7 +1004,7 @@ static void initializeWriterThread() {
}
// This free function is used by the writer threads to apply each op
-void multiSyncApply(const std::vector<OplogEntry>& ops, SyncTail*) {
+void multiSyncApply(MultiApplier::OperationPtrs* ops, SyncTail*) {
initializeWriterThread();
auto txn = cc().makeOperationContext();
auto syncApply = [](OperationContext* txn, const BSONObj& op, bool convertUpdateToUpsert) {
@@ -1015,7 +1014,7 @@ void multiSyncApply(const std::vector<OplogEntry>& ops, SyncTail*) {
}
Status multiSyncApply_noAbort(OperationContext* txn,
- const std::vector<OplogEntry>& oplogEntries,
+ MultiApplier::OperationPtrs* oplogEntryPointers,
SyncApplyFn syncApply) {
txn->setReplicatedWrites(false);
DisableDocumentValidation validationDisabler(txn);
@@ -1023,24 +1022,19 @@ Status multiSyncApply_noAbort(OperationContext* txn,
// allow us to get through the magic barrier
txn->lockState()->setIsBatchWriter(true);
- std::vector<const OplogEntry*> oplogEntryPointers(oplogEntries.size());
- for (size_t i = 0; i < oplogEntries.size(); i++) {
- oplogEntryPointers[i] = &oplogEntries[i];
- }
-
- if (oplogEntryPointers.size() > 1) {
- std::stable_sort(oplogEntryPointers.begin(),
- oplogEntryPointers.end(),
+ if (oplogEntryPointers->size() > 1) {
+ std::stable_sort(oplogEntryPointers->begin(),
+ oplogEntryPointers->end(),
[](const OplogEntry* l, const OplogEntry* r) { return l->ns < r->ns; });
}
bool convertUpdatesToUpserts = true;
// doNotGroupBeforePoint is used to prevent retrying bad group inserts by marking the final op
// of a failed group and not allowing further group inserts until that op has been processed.
- auto doNotGroupBeforePoint = oplogEntryPointers.begin();
+ auto doNotGroupBeforePoint = oplogEntryPointers->begin();
- for (auto oplogEntriesIterator = oplogEntryPointers.begin();
- oplogEntriesIterator != oplogEntryPointers.end();
+ for (auto oplogEntriesIterator = oplogEntryPointers->begin();
+ oplogEntriesIterator != oplogEntryPointers->end();
++oplogEntriesIterator) {
auto entry = *oplogEntriesIterator;
if (entry->opType[0] == 'i' && !entry->isForCappedCollection &&
@@ -1051,7 +1045,7 @@ Status multiSyncApply_noAbort(OperationContext* txn,
int batchCount = 0;
auto endOfGroupableOpsIterator = std::find_if(
oplogEntriesIterator + 1,
- oplogEntryPointers.end(),
+ oplogEntryPointers->end(),
[&](const OplogEntry* nextEntry) {
return nextEntry->opType[0] != 'i' || // Must be an insert.
nextEntry->ns != entry->ns || // Must be the same namespace.
@@ -1133,14 +1127,14 @@ Status multiSyncApply_noAbort(OperationContext* txn,
}
// This free function is used by the initial sync writer threads to apply each op
-void multiInitialSyncApply(const std::vector<OplogEntry>& ops, SyncTail* st) {
+void multiInitialSyncApply(MultiApplier::OperationPtrs* ops, SyncTail* st) {
initializeWriterThread();
auto txn = cc().makeOperationContext();
fassertNoTrace(15915, multiInitialSyncApply_noAbort(txn.get(), ops, st));
}
Status multiInitialSyncApply_noAbort(OperationContext* txn,
- const std::vector<OplogEntry>& ops,
+ MultiApplier::OperationPtrs* ops,
SyncTail* st) {
txn->setReplicatedWrites(false);
DisableDocumentValidation validationDisabler(txn);
@@ -1150,14 +1144,15 @@ Status multiInitialSyncApply_noAbort(OperationContext* txn,
bool convertUpdatesToUpserts = false;
- for (std::vector<OplogEntry>::const_iterator it = ops.begin(); it != ops.end(); ++it) {
+ for (auto it = ops->begin(); it != ops->end(); ++it) {
+ auto& entry = **it;
try {
- const Status s = SyncTail::syncApply(txn, it->raw, convertUpdatesToUpserts);
+ const Status s = SyncTail::syncApply(txn, entry.raw, convertUpdatesToUpserts);
if (!s.isOK()) {
- if (st->shouldRetry(txn, it->raw)) {
- const Status s2 = SyncTail::syncApply(txn, it->raw, convertUpdatesToUpserts);
+ if (st->shouldRetry(txn, entry.raw)) {
+ const Status s2 = SyncTail::syncApply(txn, entry.raw, convertUpdatesToUpserts);
if (!s2.isOK()) {
- severe() << "Error applying operation (" << it->raw << "): " << s2;
+ severe() << "Error applying operation (" << entry.raw << "): " << s2;
return s2;
}
}
@@ -1167,7 +1162,7 @@ Status multiInitialSyncApply_noAbort(OperationContext* txn,
// subsequently got deleted and no longer exists on the Sync Target at all
}
} catch (const DBException& e) {
- severe() << "writer worker caught exception: " << causedBy(e) << " on: " << it->raw;
+ severe() << "writer worker caught exception: " << causedBy(e) << " on: " << entry.raw;
if (inShutdown()) {
return {ErrorCodes::InterruptedAtShutdown, e.toString()};
@@ -1182,7 +1177,7 @@ Status multiInitialSyncApply_noAbort(OperationContext* txn,
StatusWith<OpTime> multiApply(OperationContext* txn,
OldThreadPool* workerPool,
- const MultiApplier::Operations& ops,
+ MultiApplier::Operations ops,
MultiApplier::ApplyOperationFn applyOperation) {
if (!txn) {
return {ErrorCodes::BadValue, "invalid operation context"};
@@ -1205,9 +1200,9 @@ StatusWith<OpTime> multiApply(OperationContext* txn,
prefetchOps(ops, workerPool);
}
- std::vector<std::vector<OplogEntry>> writerVectors(workerPool->getNumThreads());
+ std::vector<MultiApplier::OperationPtrs> writerVectors(workerPool->getNumThreads());
- fillWriterVectors(txn, ops, &writerVectors);
+ fillWriterVectors(txn, &ops, &writerVectors);
LOG(2) << "replication batch size is " << ops.size();
// We must grab this because we're going to grab write locks later.
// We hold this mutex the entire time we're writing; it doesn't matter
@@ -1224,7 +1219,7 @@ StatusWith<OpTime> multiApply(OperationContext* txn,
"attempting to replicate ops while primary"};
}
- applyOps(writerVectors, workerPool, applyOperation);
+ applyOps(&writerVectors, workerPool, applyOperation);
{
ON_BLOCK_EXIT([&] { workerPool->join(); });