diff options
author | Mathias Stearn <mathias@10gen.com> | 2015-10-29 17:02:55 -0400 |
---|---|---|
committer | Mathias Stearn <mathias@10gen.com> | 2015-11-04 13:53:03 -0500 |
commit | 2f70889bbfd4dea77cd26cec2dde28193b06905d (patch) | |
tree | 5737c404648c4aa55522d63bd2b686961b5bb4d2 /src/mongo/db/repl/sync_tail.cpp | |
parent | f79d18871869e1ae1591506c27c9e56b86bc7706 (diff) | |
download | mongo-2f70889bbfd4dea77cd26cec2dde28193b06905d.tar.gz |
SERVER-21154 Batch and parse oplog entries in parallel with applying them
This includes the start of SERVER-21155.
Diffstat (limited to 'src/mongo/db/repl/sync_tail.cpp')
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 275 |
1 files changed, 174 insertions, 101 deletions
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 244eba62c42..ff25c72abea 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -66,6 +66,7 @@ #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" +#include "mongo/util/scopeguard.h" namespace mongo { @@ -386,10 +387,10 @@ void prefetchOp(const BSONObj& op) { } // Doles out all the work to the reader pool threads and waits for them to complete -void prefetchOps(const std::deque<BSONObj>& ops, OldThreadPool* prefetcherPool) { +void prefetchOps(const std::deque<SyncTail::OplogEntry>& ops, OldThreadPool* prefetcherPool) { invariant(prefetcherPool); - for (std::deque<BSONObj>::const_iterator it = ops.begin(); it != ops.end(); ++it) { - prefetcherPool->schedule(&prefetchOp, *it); + for (auto&& op : ops) { + prefetcherPool->schedule(&prefetchOp, op.raw); } prefetcherPool->join(); } @@ -409,28 +410,27 @@ void applyOps(const std::vector<std::vector<BSONObj>>& writerVectors, } } -void fillWriterVectors(const std::deque<BSONObj>& ops, +void fillWriterVectors(const std::deque<SyncTail::OplogEntry>& ops, std::vector<std::vector<BSONObj>>* writerVectors) { - for (std::deque<BSONObj>::const_iterator it = ops.begin(); it != ops.end(); ++it) { - const BSONElement e = it->getField("ns"); - verify(e.type() == String); - const char* ns = e.valuestr(); - int len = e.valuestrsize(); + const bool supportsDocLocking = + getGlobalServiceContext()->getGlobalStorageEngine()->supportsDocLocking(); + const uint32_t numWriters = writerVectors->size(); + + for (auto&& op : ops) { uint32_t hash = 0; - MurmurHash3_x86_32(ns, len, 0, &hash); + MurmurHash3_x86_32(op.ns.rawData(), op.ns.size(), 0, &hash); - const char* opType = it->getField("op").valuestrsafe(); + const char* opType = op.opType.rawData(); - if (getGlobalServiceContext()->getGlobalStorageEngine()->supportsDocLocking() && - isCrudOpType(opType)) { + if (supportsDocLocking && isCrudOpType(opType)) { BSONElement id; switch (opType[0]) { case 'u': - id = it->getField("o2").Obj()["_id"]; + id = op.o2.Obj()["_id"]; break; case 'd': case 'i': - id = it->getField("o").Obj()["_id"]; + id = op.o.Obj()["_id"]; break; } @@ -438,7 +438,7 @@ void fillWriterVectors(const std::deque<BSONObj>& ops, MurmurHash3_x86_32(&idHash, sizeof(idHash), hash, &hash); } - (*writerVectors)[hash % writerVectors->size()].push_back(*it); + (*writerVectors)[hash % numWriters].push_back(op.raw); } } @@ -474,11 +474,19 @@ OpTime SyncTail::multiApply(OperationContext* txn, const OpQueue& ops) { applyOps(writerVectors, &_writerPool, _applyFunc, this); - OpTime lastOpTime = writeOpsToOplog(txn, ops.getDeque()); - if (inShutdown()) { - return OpTime(); + OpTime lastOpTime; + { + ON_BLOCK_EXIT([&] { _writerPool.join(); }); + std::vector<BSONObj> raws; + raws.reserve(ops.getDeque().size()); + for (auto&& op : ops.getDeque()) { + raws.emplace_back(op.raw); + } + lastOpTime = writeOpsToOplog(txn, raws); + if (inShutdown()) { + return OpTime(); + } } - _writerPool.join(); // We have now written all database writes and updated the oplog to match. return lastOpTime; } @@ -516,83 +524,143 @@ void tryToGoLiveAsASecondary(OperationContext* txn, ReplicationCoordinator* repl } } -/* tail an oplog. ok to return, will be re-called. */ -void SyncTail::oplogApplication() { - ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); - ApplyBatchFinalizer finalizer(replCoord); +class SyncTail::OpQueueBatcher { + MONGO_DISALLOW_COPYING(OpQueueBatcher); - OperationContextImpl txn; - OpTime originalEndOpTime(getMinValid(&txn).end); +public: + explicit OpQueueBatcher(SyncTail* syncTail) : _syncTail(syncTail), _thread([&] { run(); }) {} + ~OpQueueBatcher() { + _inShutdown.store(true); + _cv.notify_all(); + _thread.join(); + } - while (!inShutdown()) { - OpQueue ops; + OpQueue getNextBatch(Seconds maxWaitTime) { + stdx::unique_lock<stdx::mutex> lk(_mutex); + if (_ops.empty()) { + _cv.wait_for(lk, maxWaitTime); + } - Timer batchTimer; - int lastTimeChecked = 0; + OpQueue ops = std::move(_ops); + _ops = {}; + _cv.notify_all(); - do { - int now = batchTimer.seconds(); + return ops; + } - // apply replication batch limits - if (!ops.empty()) { - if (now > replBatchLimitSeconds) - break; - if (ops.getDeque().size() > replBatchLimitOperations) - break; - } - // occasionally check some things - // (always checked in the first iteration of this do-while loop, because - // ops is empty) - if (ops.empty() || now > lastTimeChecked) { - BackgroundSync* bgsync = BackgroundSync::get(); - if (bgsync->getInitialSyncRequestedFlag()) { - // got a resync command - return; +private: + void run() { + Client::initThread("ReplBatcher"); + OperationContextImpl txn; + auto replCoord = ReplicationCoordinator::get(&txn); + + while (!_inShutdown.load()) { + Timer batchTimer; + + OpQueue ops; + // tryPopAndWaitForMore returns true when we need to end a batch early + while (!_syncTail->tryPopAndWaitForMore(&txn, &ops) && + (ops.getSize() < replBatchLimitBytes) && !_inShutdown.load()) { + int now = batchTimer.seconds(); + + // apply replication batch limits + if (!ops.empty()) { + if (now > replBatchLimitSeconds) + break; + if (ops.getDeque().size() > replBatchLimitOperations) + break; } - lastTimeChecked = now; - // can we become secondary? - // we have to check this before calling mgr, as we must be a secondary to - // become primary - tryToGoLiveAsASecondary(&txn, replCoord); - } - const int slaveDelaySecs = durationCount<Seconds>(replCoord->getSlaveDelaySecs()); - if (!ops.empty() && slaveDelaySecs > 0) { - const BSONObj lastOp = ops.back(); - const unsigned int opTimestampSecs = lastOp["ts"].timestamp().getSecs(); + const int slaveDelaySecs = durationCount<Seconds>(replCoord->getSlaveDelaySecs()); + if (!ops.empty() && slaveDelaySecs > 0) { + const BSONObj lastOp = ops.back().raw; + const unsigned int opTimestampSecs = lastOp["ts"].timestamp().getSecs(); + + // Stop the batch as the lastOp is too new to be applied. If we continue + // on, we can get ops that are way ahead of the delay and this will + // make this thread sleep longer when handleSlaveDelay is called + // and apply ops much sooner than we like. + if (opTimestampSecs > static_cast<unsigned int>(time(0) - slaveDelaySecs)) { + break; + } + } - // Stop the batch as the lastOp is too new to be applied. If we continue - // on, we can get ops that are way ahead of the delay and this will - // make this thread sleep longer when handleSlaveDelay is called - // and apply ops much sooner than we like. - if (opTimestampSecs > static_cast<unsigned int>(time(0) - slaveDelaySecs)) { + if (MONGO_FAIL_POINT(rsSyncApplyStop)) { break; } + + // keep fetching more ops as long as we haven't filled up a full batch yet } - if (MONGO_FAIL_POINT(rsSyncApplyStop)) { - break; + // For pausing replication in tests + while (MONGO_FAIL_POINT(rsSyncApplyStop) && !_inShutdown.load()) { + sleepmillis(0); } - // keep fetching more ops as long as we haven't filled up a full batch yet - } while (!tryPopAndWaitForMore(&txn, &ops, replCoord) && // tryPopAndWaitForMore returns - // true when we need to end a - // batch early - (ops.getSize() < replBatchLimitBytes) && - !inShutdown()); - - // For pausing replication in tests - while (MONGO_FAIL_POINT(rsSyncApplyStop)) { - sleepmillis(0); - if (inShutdown()) - return; + stdx::unique_lock<stdx::mutex> lk(_mutex); + while (!_ops.empty()) { + // Block until the previous batch has been taken. + if (_inShutdown.load()) + return; + _cv.wait(lk); + } + _ops = std::move(ops); + _cv.notify_all(); } + } + + AtomicWord<bool> _inShutdown; + SyncTail* const _syncTail; + + stdx::mutex _mutex; // Guards _ops. + stdx::condition_variable _cv; + OpQueue _ops; + + stdx::thread _thread; // Must be last so all other members are initialized before starting. +}; + +/* tail an oplog. ok to return, will be re-called. */ +void SyncTail::oplogApplication() { + OpQueueBatcher batcher(this); + + OperationContextImpl txn; + auto replCoord = ReplicationCoordinator::get(&txn); + ApplyBatchFinalizer finalizer(replCoord); + + OpTime originalEndOpTime(getMinValid(&txn).end); + while (!inShutdown()) { + OpQueue ops; - if (ops.empty()) { - continue; + do { + if (BackgroundSync::get()->getInitialSyncRequestedFlag()) { + // got a resync command + return; + } + + tryToGoLiveAsASecondary(&txn, replCoord); + + // Blocks up to a second waiting for a batch to be ready to apply. If one doesn't become + // ready in time, we'll loop again so we can do the above checks periodically. + ops = batcher.getNextBatch(Seconds(1)); + } while (!inShutdown() && ops.empty()); + + if (inShutdown()) + return; + + invariant(!ops.empty()); + + const BSONObj lastOp = ops.back().raw; + + if (lastOp.isEmpty()) { + // This means we are currently stepping up as primary, and waiting for this thread to + // drain the ops we've queued up. + invariant(ops.getDeque().size() == 1); + invariant(replCoord->isWaitingForApplierToDrain()); + + replCoord->signalDrainComplete(&txn); + continue; // This wasn't a real op. Don't try to apply it. } - const BSONObj lastOp = ops.back(); handleSlaveDelay(lastOp); // Set minValid to the last OpTime that needs to be applied, in this batch or from the @@ -628,6 +696,23 @@ void SyncTail::oplogApplication() { } } +SyncTail::OplogEntry::OplogEntry(const BSONObj& rawInput) : raw(rawInput.getOwned()) { + for (auto elem : raw) { + const auto name = elem.fieldNameStringData(); + if (name == "ns") { + ns = elem.valuestrsafe(); + } else if (name == "op") { + opType = elem.valuestrsafe(); + } else if (name == "o2") { + o2 = elem; + } else if (name == "v") { + version = elem; + } else if (name == "o") { + o = elem; + } + } +} + // Copies ops out of the bgsync queue into the deque passed in as a parameter. // Returns true if the batch should be ended early. // Batch should end early if we encounter a command, or if @@ -635,9 +720,7 @@ void SyncTail::oplogApplication() { // This function also blocks 1 second waiting for new ops to appear in the bgsync // queue. We can't block forever because there are maintenance things we need // to periodically check in the loop. -bool SyncTail::tryPopAndWaitForMore(OperationContext* txn, - SyncTail::OpQueue* ops, - ReplicationCoordinator* replCoord) { +bool SyncTail::tryPopAndWaitForMore(OperationContext* txn, SyncTail::OpQueue* ops) { BSONObj op; // Check to see if there are ops waiting in the bgsync queue bool peek_success = peek(&op); @@ -645,16 +728,6 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* txn, if (!peek_success) { // if we don't have anything in the queue, wait a bit for something to appear if (ops->empty()) { - if (replCoord->isWaitingForApplierToDrain()) { - BackgroundSync::get()->waitUntilPaused(); - if (peek(&op)) { - // The producer generated a last batch of ops before pausing so return - // false so that we'll come back and apply them before signaling the drain - // is complete. - return false; - } - replCoord->signalDrainComplete(txn); - } // block up to 1 second _networkQueue->waitForMore(); return false; @@ -664,16 +737,17 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* txn, return true; } - const char* ns = op["ns"].valuestrsafe(); + auto entry = OplogEntry(op); - // check for commands - if ((op["op"].valuestrsafe()[0] == 'c') || + // Check for ops that must be processed one at a time. + if (entry.raw.isEmpty() || // sentinel that network queue is drained. + (entry.opType[0] == 'c') || // commands. // Index builds are acheived through the use of an insert op, not a command op. // The following line is the same as what the insert code uses to detect an index build. - (*ns != '\0' && nsToCollectionSubstring(ns) == "system.indexes")) { + (!entry.ns.empty() && nsToCollectionSubstring(entry.ns) == "system.indexes")) { if (ops->empty()) { // apply commands one-at-a-time - ops->push_back(op); + ops->push_back(std::move(entry)); _networkQueue->consume(); } @@ -682,13 +756,12 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* txn, } // check for oplog version change - BSONElement elemVersion = op["v"]; int curVersion = 0; - if (elemVersion.eoo()) + if (entry.version.eoo()) // missing version means version 1 curVersion = 1; else - curVersion = elemVersion.Int(); + curVersion = entry.version.Int(); if (curVersion != OPLOG_VERSION) { severe() << "expected oplog version " << OPLOG_VERSION << " but found version " @@ -697,7 +770,7 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* txn, } // Copy the op to the deque and remove it from the bgsync queue. - ops->push_back(op); + ops->push_back(std::move(entry)); _networkQueue->consume(); // Go back for more ops |