diff options
author | Mathias Stearn <mathias@10gen.com> | 2015-10-29 17:02:55 -0400 |
---|---|---|
committer | Mathias Stearn <mathias@10gen.com> | 2015-11-04 13:53:03 -0500 |
commit | 2f70889bbfd4dea77cd26cec2dde28193b06905d (patch) | |
tree | 5737c404648c4aa55522d63bd2b686961b5bb4d2 | |
parent | f79d18871869e1ae1591506c27c9e56b86bc7706 (diff) | |
download | mongo-2f70889bbfd4dea77cd26cec2dde28193b06905d.tar.gz |
SERVER-21154 Batch and parse oplog entries in parallel with applying them
This includes the start of SERVER-21155.
-rw-r--r-- | src/mongo/db/catalog/collection.cpp | 14 | ||||
-rw-r--r-- | src/mongo/db/catalog/collection.h | 8 | ||||
-rw-r--r-- | src/mongo/db/op_observer.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/op_observer.h | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/bgsync.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/initial_sync.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.cpp | 11 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.h | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator.h | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_sync.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 275 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.h | 40 | ||||
-rw-r--r-- | src/mongo/util/queue.h | 20 |
14 files changed, 261 insertions, 152 deletions
diff --git a/src/mongo/db/catalog/collection.cpp b/src/mongo/db/catalog/collection.cpp index 2c45a6057b7..bd319dc6654 100644 --- a/src/mongo/db/catalog/collection.cpp +++ b/src/mongo/db/catalog/collection.cpp @@ -331,14 +331,14 @@ Status Collection::insertDocument(OperationContext* txn, const DocWriter* doc, b Status Collection::insertDocuments(OperationContext* txn, - vector<BSONObj>::iterator begin, - vector<BSONObj>::iterator end, + const vector<BSONObj>::const_iterator begin, + const vector<BSONObj>::const_iterator end, bool enforceQuota, bool fromMigrate) { // Should really be done in the collection object at creation and updated on index create. const bool hasIdIndex = _indexCatalog.findIdIndex(txn); - for (vector<BSONObj>::iterator it = begin; it != end; it++) { + for (auto it = begin; it != end; it++) { if (hasIdIndex && (*it)["_id"].eoo()) { return Status(ErrorCodes::InternalError, str::stream() << "Collection::insertDocument got " @@ -411,8 +411,8 @@ Status Collection::insertDocument(OperationContext* txn, } Status Collection::_insertDocuments(OperationContext* txn, - vector<BSONObj>::iterator begin, - vector<BSONObj>::iterator end, + const vector<BSONObj>::const_iterator begin, + const vector<BSONObj>::const_iterator end, bool enforceQuota) { dassert(txn->lockState()->isCollectionLockedForMode(ns().toString(), MODE_IX)); @@ -421,7 +421,7 @@ Status Collection::_insertDocuments(OperationContext* txn, // collection access method probably std::vector<Record> records; - for (vector<BSONObj>::iterator it = begin; it != end; it++) { + for (auto it = begin; it != end; it++) { Record record = {RecordId(), RecordData(it->objdata(), it->objsize())}; records.push_back(record); } @@ -431,7 +431,7 @@ Status Collection::_insertDocuments(OperationContext* txn, std::vector<BsonRecord> bsonRecords; int recordIndex = 0; - for (vector<BSONObj>::iterator it = begin; it != end; it++) { + for (auto it = begin; it != end; it++) { RecordId loc = records[recordIndex++].id; invariant(RecordId::min() < loc); invariant(loc < RecordId::max()); diff --git a/src/mongo/db/catalog/collection.h b/src/mongo/db/catalog/collection.h index d5956382f91..eabc8dfdfa9 100644 --- a/src/mongo/db/catalog/collection.h +++ b/src/mongo/db/catalog/collection.h @@ -255,8 +255,8 @@ public: * If errors occor (including WCE), caller should retry documents individually. */ Status insertDocuments(OperationContext* txn, - std::vector<BSONObj>::iterator begin, - std::vector<BSONObj>::iterator end, + std::vector<BSONObj>::const_iterator begin, + std::vector<BSONObj>::const_iterator end, bool enforceQuota, bool fromMigrate = false); @@ -440,8 +440,8 @@ private: Status _insertDocument(OperationContext* txn, const BSONObj& doc, bool enforceQuota); Status _insertDocuments(OperationContext* txn, - std::vector<BSONObj>::iterator begin, - std::vector<BSONObj>::iterator end, + std::vector<BSONObj>::const_iterator begin, + std::vector<BSONObj>::const_iterator end, bool enforceQuota); bool _enforceQuota(bool userEnforeQuota) const; diff --git a/src/mongo/db/op_observer.cpp b/src/mongo/db/op_observer.cpp index 720d26ca9f2..6e5d6161d96 100644 --- a/src/mongo/db/op_observer.cpp +++ b/src/mongo/db/op_observer.cpp @@ -58,8 +58,8 @@ void OpObserver::onCreateIndex(OperationContext* txn, void OpObserver::onInserts(OperationContext* txn, const NamespaceString& nss, - vector<BSONObj>::iterator begin, - vector<BSONObj>::iterator end, + vector<BSONObj>::const_iterator begin, + vector<BSONObj>::const_iterator end, bool fromMigrate) { repl::logOps(txn, "i", nss, begin, end, fromMigrate); diff --git a/src/mongo/db/op_observer.h b/src/mongo/db/op_observer.h index 2ef3a191f0a..36dafda30c6 100644 --- a/src/mongo/db/op_observer.h +++ b/src/mongo/db/op_observer.h @@ -57,8 +57,8 @@ public: bool fromMigrate = false); void onInserts(OperationContext* txn, const NamespaceString& ns, - std::vector<BSONObj>::iterator begin, - std::vector<BSONObj>::iterator end, + std::vector<BSONObj>::const_iterator begin, + std::vector<BSONObj>::const_iterator end, bool fromMigrate = false); void onUpdate(OperationContext* txn, oplogUpdateEntryArgs args); void onDelete(OperationContext* txn, diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index 118a9e02046..8f68fc0aad5 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -636,6 +636,11 @@ void BackgroundSync::cancelFetcher() { void BackgroundSync::stop() { stdx::lock_guard<stdx::mutex> lock(_mutex); + if (_replCoord->isWaitingForApplierToDrain()) { + // Signal to consumers that we have entered the paused state. + _buffer.pushEvenIfFull(BSONObj()); + } + _pause = true; _syncSourceHost = HostAndPort(); _lastOpTimeFetched = OpTime(); diff --git a/src/mongo/db/repl/initial_sync.cpp b/src/mongo/db/repl/initial_sync.cpp index d769d7a89fa..e0ca82a6ea0 100644 --- a/src/mongo/db/repl/initial_sync.cpp +++ b/src/mongo/db/repl/initial_sync.cpp @@ -73,13 +73,13 @@ void InitialSync::_applyOplogUntil(OperationContext* txn, const OpTime& endOpTim OpQueue ops; auto replCoord = repl::ReplicationCoordinator::get(txn); - while (!tryPopAndWaitForMore(txn, &ops, replCoord)) { + while (!tryPopAndWaitForMore(txn, &ops)) { // nothing came back last time, so go again if (ops.empty()) continue; // Check if we reached the end - const BSONObj currentOp = ops.back(); + const BSONObj currentOp = ops.back().raw; const OpTime currentOpTime = fassertStatusOK(28772, OpTime::parseFromOplogEntry(currentOp)); @@ -104,7 +104,7 @@ void InitialSync::_applyOplogUntil(OperationContext* txn, const OpTime& endOpTim fassertFailedNoTrace(18692); } - const BSONObj lastOp = ops.back().getOwned(); + const BSONObj lastOp = ops.back().raw.getOwned(); // Tally operation information bytesApplied += ops.getSize(); diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 87a748833e3..a8e14fc6470 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -423,8 +423,8 @@ void _logOp(OperationContext* txn, void logOps(OperationContext* txn, const char* opstr, const NamespaceString& nss, - std::vector<BSONObj>::iterator begin, - std::vector<BSONObj>::iterator end, + std::vector<BSONObj>::const_iterator begin, + std::vector<BSONObj>::const_iterator end, bool fromMigrate) { ReplicationCoordinator::Mode replMode = ReplicationCoordinator::get(txn)->getReplicationMode(); @@ -459,7 +459,7 @@ void logOp(OperationContext* txn, _logOp(txn, opstr, ns, obj, o2, fromMigrate, _oplogCollectionName, replMode, true); } -OpTime writeOpsToOplog(OperationContext* txn, const std::deque<BSONObj>& ops) { +OpTime writeOpsToOplog(OperationContext* txn, const std::vector<BSONObj>& ops) { ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); OpTime lastOptime; @@ -483,11 +483,10 @@ OpTime writeOpsToOplog(OperationContext* txn, const std::deque<BSONObj>& ops) { OldClientContext ctx(txn, rsOplogName, _localDB); WriteUnitOfWork wunit(txn); - std::vector<BSONObj> opsVect(ops.begin(), ops.end()); checkOplogInsert( - _localOplogCollection->insertDocuments(txn, opsVect.begin(), opsVect.end(), false)); + _localOplogCollection->insertDocuments(txn, ops.begin(), ops.end(), false)); lastOptime = - fassertStatusOK(ErrorCodes::InvalidBSON, OpTime::parseFromOplogEntry(opsVect.back())); + fassertStatusOK(ErrorCodes::InvalidBSON, OpTime::parseFromOplogEntry(ops.back())); wunit.commit(); } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "writeOps", _localOplogCollection->ns().ns()); diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h index 613c02edbcf..b022aa0e545 100644 --- a/src/mongo/db/repl/oplog.h +++ b/src/mongo/db/repl/oplog.h @@ -74,7 +74,7 @@ void createOplog(OperationContext* txn); // used internally by replication secondaries after they have applied ops. Updates the global // optime. // Returns the optime for the last op inserted. -OpTime writeOpsToOplog(OperationContext* txn, const std::deque<BSONObj>& ops); +OpTime writeOpsToOplog(OperationContext* txn, const std::vector<BSONObj>& ops); extern std::string rsOplogName; extern std::string masterSlaveOplogName; @@ -95,8 +95,8 @@ extern int OPLOG_VERSION; void logOps(OperationContext* txn, const char* opstr, const NamespaceString& nss, - std::vector<BSONObj>::iterator begin, - std::vector<BSONObj>::iterator end, + std::vector<BSONObj>::const_iterator begin, + std::vector<BSONObj>::const_iterator end, bool fromMigrate); /* For 'u' records, 'obj' captures the mutation made to the object but not diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h index ee58f04038b..97104b71297 100644 --- a/src/mongo/db/repl/replication_coordinator.h +++ b/src/mongo/db/repl/replication_coordinator.h @@ -174,10 +174,8 @@ public: /** - * Returns how slave delayed this node is configured to be. - * - * Raises a DBException if this node is not a member of the current replica set - * configuration. + * Returns how slave delayed this node is configured to be, or 0 seconds if this node is not a + * member of the current replica set configuration. */ virtual Seconds getSlaveDelaySecs() const = 0; diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 5634f1db32a..2044b43439f 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -529,7 +529,11 @@ Status ReplicationCoordinatorImpl::waitForMemberState(MemberState expectedState, Seconds ReplicationCoordinatorImpl::getSlaveDelaySecs() const { stdx::lock_guard<stdx::mutex> lk(_mutex); invariant(_rsConfig.isInitialized()); - uassert(28524, "Node not a member of the current set configuration", _selfIndex != -1); + if (_selfIndex == -1) { + // We aren't currently in the set. Return 0 seconds so we can clear out the applier's + // queue of work. + return Seconds(0); + } return _rsConfig.getMemberAt(_selfIndex).getSlaveDelay(); } diff --git a/src/mongo/db/repl/rs_sync.cpp b/src/mongo/db/repl/rs_sync.cpp index 1ae5f51b249..74246d54439 100644 --- a/src/mongo/db/repl/rs_sync.cpp +++ b/src/mongo/db/repl/rs_sync.cpp @@ -131,12 +131,8 @@ void runSyncThread() { /* we have some data. continue tailing. */ SyncTail tail(BackgroundSync::get(), multiSyncApply); tail.oplogApplication(); - } catch (const DBException& e) { - log() << "Received exception while syncing: " << e.toString(); - sleepsecs(10); - } catch (const std::exception& e) { - log() << "Received exception while syncing: " << e.what(); - sleepsecs(10); + } catch (...) { + std::terminate(); } } } diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 244eba62c42..ff25c72abea 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -66,6 +66,7 @@ #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" +#include "mongo/util/scopeguard.h" namespace mongo { @@ -386,10 +387,10 @@ void prefetchOp(const BSONObj& op) { } // Doles out all the work to the reader pool threads and waits for them to complete -void prefetchOps(const std::deque<BSONObj>& ops, OldThreadPool* prefetcherPool) { +void prefetchOps(const std::deque<SyncTail::OplogEntry>& ops, OldThreadPool* prefetcherPool) { invariant(prefetcherPool); - for (std::deque<BSONObj>::const_iterator it = ops.begin(); it != ops.end(); ++it) { - prefetcherPool->schedule(&prefetchOp, *it); + for (auto&& op : ops) { + prefetcherPool->schedule(&prefetchOp, op.raw); } prefetcherPool->join(); } @@ -409,28 +410,27 @@ void applyOps(const std::vector<std::vector<BSONObj>>& writerVectors, } } -void fillWriterVectors(const std::deque<BSONObj>& ops, +void fillWriterVectors(const std::deque<SyncTail::OplogEntry>& ops, std::vector<std::vector<BSONObj>>* writerVectors) { - for (std::deque<BSONObj>::const_iterator it = ops.begin(); it != ops.end(); ++it) { - const BSONElement e = it->getField("ns"); - verify(e.type() == String); - const char* ns = e.valuestr(); - int len = e.valuestrsize(); + const bool supportsDocLocking = + getGlobalServiceContext()->getGlobalStorageEngine()->supportsDocLocking(); + const uint32_t numWriters = writerVectors->size(); + + for (auto&& op : ops) { uint32_t hash = 0; - MurmurHash3_x86_32(ns, len, 0, &hash); + MurmurHash3_x86_32(op.ns.rawData(), op.ns.size(), 0, &hash); - const char* opType = it->getField("op").valuestrsafe(); + const char* opType = op.opType.rawData(); - if (getGlobalServiceContext()->getGlobalStorageEngine()->supportsDocLocking() && - isCrudOpType(opType)) { + if (supportsDocLocking && isCrudOpType(opType)) { BSONElement id; switch (opType[0]) { case 'u': - id = it->getField("o2").Obj()["_id"]; + id = op.o2.Obj()["_id"]; break; case 'd': case 'i': - id = it->getField("o").Obj()["_id"]; + id = op.o.Obj()["_id"]; break; } @@ -438,7 +438,7 @@ void fillWriterVectors(const std::deque<BSONObj>& ops, MurmurHash3_x86_32(&idHash, sizeof(idHash), hash, &hash); } - (*writerVectors)[hash % writerVectors->size()].push_back(*it); + (*writerVectors)[hash % numWriters].push_back(op.raw); } } @@ -474,11 +474,19 @@ OpTime SyncTail::multiApply(OperationContext* txn, const OpQueue& ops) { applyOps(writerVectors, &_writerPool, _applyFunc, this); - OpTime lastOpTime = writeOpsToOplog(txn, ops.getDeque()); - if (inShutdown()) { - return OpTime(); + OpTime lastOpTime; + { + ON_BLOCK_EXIT([&] { _writerPool.join(); }); + std::vector<BSONObj> raws; + raws.reserve(ops.getDeque().size()); + for (auto&& op : ops.getDeque()) { + raws.emplace_back(op.raw); + } + lastOpTime = writeOpsToOplog(txn, raws); + if (inShutdown()) { + return OpTime(); + } } - _writerPool.join(); // We have now written all database writes and updated the oplog to match. return lastOpTime; } @@ -516,83 +524,143 @@ void tryToGoLiveAsASecondary(OperationContext* txn, ReplicationCoordinator* repl } } -/* tail an oplog. ok to return, will be re-called. */ -void SyncTail::oplogApplication() { - ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); - ApplyBatchFinalizer finalizer(replCoord); +class SyncTail::OpQueueBatcher { + MONGO_DISALLOW_COPYING(OpQueueBatcher); - OperationContextImpl txn; - OpTime originalEndOpTime(getMinValid(&txn).end); +public: + explicit OpQueueBatcher(SyncTail* syncTail) : _syncTail(syncTail), _thread([&] { run(); }) {} + ~OpQueueBatcher() { + _inShutdown.store(true); + _cv.notify_all(); + _thread.join(); + } - while (!inShutdown()) { - OpQueue ops; + OpQueue getNextBatch(Seconds maxWaitTime) { + stdx::unique_lock<stdx::mutex> lk(_mutex); + if (_ops.empty()) { + _cv.wait_for(lk, maxWaitTime); + } - Timer batchTimer; - int lastTimeChecked = 0; + OpQueue ops = std::move(_ops); + _ops = {}; + _cv.notify_all(); - do { - int now = batchTimer.seconds(); + return ops; + } - // apply replication batch limits - if (!ops.empty()) { - if (now > replBatchLimitSeconds) - break; - if (ops.getDeque().size() > replBatchLimitOperations) - break; - } - // occasionally check some things - // (always checked in the first iteration of this do-while loop, because - // ops is empty) - if (ops.empty() || now > lastTimeChecked) { - BackgroundSync* bgsync = BackgroundSync::get(); - if (bgsync->getInitialSyncRequestedFlag()) { - // got a resync command - return; +private: + void run() { + Client::initThread("ReplBatcher"); + OperationContextImpl txn; + auto replCoord = ReplicationCoordinator::get(&txn); + + while (!_inShutdown.load()) { + Timer batchTimer; + + OpQueue ops; + // tryPopAndWaitForMore returns true when we need to end a batch early + while (!_syncTail->tryPopAndWaitForMore(&txn, &ops) && + (ops.getSize() < replBatchLimitBytes) && !_inShutdown.load()) { + int now = batchTimer.seconds(); + + // apply replication batch limits + if (!ops.empty()) { + if (now > replBatchLimitSeconds) + break; + if (ops.getDeque().size() > replBatchLimitOperations) + break; } - lastTimeChecked = now; - // can we become secondary? - // we have to check this before calling mgr, as we must be a secondary to - // become primary - tryToGoLiveAsASecondary(&txn, replCoord); - } - const int slaveDelaySecs = durationCount<Seconds>(replCoord->getSlaveDelaySecs()); - if (!ops.empty() && slaveDelaySecs > 0) { - const BSONObj lastOp = ops.back(); - const unsigned int opTimestampSecs = lastOp["ts"].timestamp().getSecs(); + const int slaveDelaySecs = durationCount<Seconds>(replCoord->getSlaveDelaySecs()); + if (!ops.empty() && slaveDelaySecs > 0) { + const BSONObj lastOp = ops.back().raw; + const unsigned int opTimestampSecs = lastOp["ts"].timestamp().getSecs(); + + // Stop the batch as the lastOp is too new to be applied. If we continue + // on, we can get ops that are way ahead of the delay and this will + // make this thread sleep longer when handleSlaveDelay is called + // and apply ops much sooner than we like. + if (opTimestampSecs > static_cast<unsigned int>(time(0) - slaveDelaySecs)) { + break; + } + } - // Stop the batch as the lastOp is too new to be applied. If we continue - // on, we can get ops that are way ahead of the delay and this will - // make this thread sleep longer when handleSlaveDelay is called - // and apply ops much sooner than we like. - if (opTimestampSecs > static_cast<unsigned int>(time(0) - slaveDelaySecs)) { + if (MONGO_FAIL_POINT(rsSyncApplyStop)) { break; } + + // keep fetching more ops as long as we haven't filled up a full batch yet } - if (MONGO_FAIL_POINT(rsSyncApplyStop)) { - break; + // For pausing replication in tests + while (MONGO_FAIL_POINT(rsSyncApplyStop) && !_inShutdown.load()) { + sleepmillis(0); } - // keep fetching more ops as long as we haven't filled up a full batch yet - } while (!tryPopAndWaitForMore(&txn, &ops, replCoord) && // tryPopAndWaitForMore returns - // true when we need to end a - // batch early - (ops.getSize() < replBatchLimitBytes) && - !inShutdown()); - - // For pausing replication in tests - while (MONGO_FAIL_POINT(rsSyncApplyStop)) { - sleepmillis(0); - if (inShutdown()) - return; + stdx::unique_lock<stdx::mutex> lk(_mutex); + while (!_ops.empty()) { + // Block until the previous batch has been taken. + if (_inShutdown.load()) + return; + _cv.wait(lk); + } + _ops = std::move(ops); + _cv.notify_all(); } + } + + AtomicWord<bool> _inShutdown; + SyncTail* const _syncTail; + + stdx::mutex _mutex; // Guards _ops. + stdx::condition_variable _cv; + OpQueue _ops; + + stdx::thread _thread; // Must be last so all other members are initialized before starting. +}; + +/* tail an oplog. ok to return, will be re-called. */ +void SyncTail::oplogApplication() { + OpQueueBatcher batcher(this); + + OperationContextImpl txn; + auto replCoord = ReplicationCoordinator::get(&txn); + ApplyBatchFinalizer finalizer(replCoord); + + OpTime originalEndOpTime(getMinValid(&txn).end); + while (!inShutdown()) { + OpQueue ops; - if (ops.empty()) { - continue; + do { + if (BackgroundSync::get()->getInitialSyncRequestedFlag()) { + // got a resync command + return; + } + + tryToGoLiveAsASecondary(&txn, replCoord); + + // Blocks up to a second waiting for a batch to be ready to apply. If one doesn't become + // ready in time, we'll loop again so we can do the above checks periodically. + ops = batcher.getNextBatch(Seconds(1)); + } while (!inShutdown() && ops.empty()); + + if (inShutdown()) + return; + + invariant(!ops.empty()); + + const BSONObj lastOp = ops.back().raw; + + if (lastOp.isEmpty()) { + // This means we are currently stepping up as primary, and waiting for this thread to + // drain the ops we've queued up. + invariant(ops.getDeque().size() == 1); + invariant(replCoord->isWaitingForApplierToDrain()); + + replCoord->signalDrainComplete(&txn); + continue; // This wasn't a real op. Don't try to apply it. } - const BSONObj lastOp = ops.back(); handleSlaveDelay(lastOp); // Set minValid to the last OpTime that needs to be applied, in this batch or from the @@ -628,6 +696,23 @@ void SyncTail::oplogApplication() { } } +SyncTail::OplogEntry::OplogEntry(const BSONObj& rawInput) : raw(rawInput.getOwned()) { + for (auto elem : raw) { + const auto name = elem.fieldNameStringData(); + if (name == "ns") { + ns = elem.valuestrsafe(); + } else if (name == "op") { + opType = elem.valuestrsafe(); + } else if (name == "o2") { + o2 = elem; + } else if (name == "v") { + version = elem; + } else if (name == "o") { + o = elem; + } + } +} + // Copies ops out of the bgsync queue into the deque passed in as a parameter. // Returns true if the batch should be ended early. // Batch should end early if we encounter a command, or if @@ -635,9 +720,7 @@ void SyncTail::oplogApplication() { // This function also blocks 1 second waiting for new ops to appear in the bgsync // queue. We can't block forever because there are maintenance things we need // to periodically check in the loop. -bool SyncTail::tryPopAndWaitForMore(OperationContext* txn, - SyncTail::OpQueue* ops, - ReplicationCoordinator* replCoord) { +bool SyncTail::tryPopAndWaitForMore(OperationContext* txn, SyncTail::OpQueue* ops) { BSONObj op; // Check to see if there are ops waiting in the bgsync queue bool peek_success = peek(&op); @@ -645,16 +728,6 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* txn, if (!peek_success) { // if we don't have anything in the queue, wait a bit for something to appear if (ops->empty()) { - if (replCoord->isWaitingForApplierToDrain()) { - BackgroundSync::get()->waitUntilPaused(); - if (peek(&op)) { - // The producer generated a last batch of ops before pausing so return - // false so that we'll come back and apply them before signaling the drain - // is complete. - return false; - } - replCoord->signalDrainComplete(txn); - } // block up to 1 second _networkQueue->waitForMore(); return false; @@ -664,16 +737,17 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* txn, return true; } - const char* ns = op["ns"].valuestrsafe(); + auto entry = OplogEntry(op); - // check for commands - if ((op["op"].valuestrsafe()[0] == 'c') || + // Check for ops that must be processed one at a time. + if (entry.raw.isEmpty() || // sentinel that network queue is drained. + (entry.opType[0] == 'c') || // commands. // Index builds are acheived through the use of an insert op, not a command op. // The following line is the same as what the insert code uses to detect an index build. - (*ns != '\0' && nsToCollectionSubstring(ns) == "system.indexes")) { + (!entry.ns.empty() && nsToCollectionSubstring(entry.ns) == "system.indexes")) { if (ops->empty()) { // apply commands one-at-a-time - ops->push_back(op); + ops->push_back(std::move(entry)); _networkQueue->consume(); } @@ -682,13 +756,12 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* txn, } // check for oplog version change - BSONElement elemVersion = op["v"]; int curVersion = 0; - if (elemVersion.eoo()) + if (entry.version.eoo()) // missing version means version 1 curVersion = 1; else - curVersion = elemVersion.Int(); + curVersion = entry.version.Int(); if (curVersion != OPLOG_VERSION) { severe() << "expected oplog version " << OPLOG_VERSION << " but found version " @@ -697,7 +770,7 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* txn, } // Copy the op to the deque and remove it from the bgsync queue. - ops->push_back(op); + ops->push_back(std::move(entry)); _networkQueue->consume(); // Go back for more ops diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h index 254f0182bd5..e3edacaed2a 100644 --- a/src/mongo/db/repl/sync_tail.h +++ b/src/mongo/db/repl/sync_tail.h @@ -95,38 +95,58 @@ public: void oplogApplication(); bool peek(BSONObj* obj); + /** + * A parsed oplog entry. + * + * This only includes the fields used by the code using this object at the time this was + * written. As more code uses this, more fields should be added. + * + * All unowned members (such as StringDatas and BSONElements) point into the raw BSON. + * All StringData members are guaranteed to be NUL terminated. + */ + struct OplogEntry { + explicit OplogEntry(const BSONObj& raw); + + BSONObj raw; // Owned. + + StringData ns = ""; + StringData opType = ""; + + BSONElement version; + BSONElement o; + BSONElement o2; + }; + class OpQueue { public: OpQueue() : _size(0) {} size_t getSize() const { return _size; } - const std::deque<BSONObj>& getDeque() const { + const std::deque<OplogEntry>& getDeque() const { return _deque; } - void push_back(BSONObj& op) { - _deque.push_back(op); - _size += op.objsize(); + void push_back(OplogEntry&& op) { + _size += op.raw.objsize(); + _deque.push_back(std::move(op)); } bool empty() const { return _deque.empty(); } - BSONObj back() const { + const OplogEntry& back() const { invariant(!_deque.empty()); return _deque.back(); } private: - std::deque<BSONObj> _deque; + std::deque<OplogEntry> _deque; size_t _size; }; // returns true if we should continue waiting for BSONObjs, false if we should // stop waiting and apply the queue we have. Only returns false if !ops.empty(). - bool tryPopAndWaitForMore(OperationContext* txn, - OpQueue* ops, - ReplicationCoordinator* replCoord); + bool tryPopAndWaitForMore(OperationContext* txn, OpQueue* ops); /** * Fetch a single document referenced in the operation from the sync source. @@ -158,6 +178,8 @@ protected: OpTime multiApply(OperationContext* txn, const OpQueue& ops); private: + class OpQueueBatcher; + std::string _hostname; BackgroundSyncInterface* _networkQueue; diff --git a/src/mongo/util/queue.h b/src/mongo/util/queue.h index a1b8caac1a0..6cdb62538ef 100644 --- a/src/mongo/util/queue.h +++ b/src/mongo/util/queue.h @@ -32,9 +32,10 @@ #include <limits> #include <queue> +#include "mongo/base/disallow_copying.h" #include "mongo/stdx/chrono.h" #include "mongo/stdx/condition_variable.h" -#include "mongo/base/disallow_copying.h" +#include "mongo/stdx/mutex.h" namespace mongo { @@ -62,6 +63,11 @@ public: BlockingQueue(size_t size) : _maxSize(size), _getSize(&_getSizeDefault) {} BlockingQueue(size_t size, getSizeFunc f) : _maxSize(size), _getSize(f) {} + void pushEvenIfFull(T const& t) { + stdx::unique_lock<stdx::mutex> l(_lock); + pushImpl_inlock(t, _getSize(t)); + } + void push(T const& t) { stdx::unique_lock<stdx::mutex> l(_lock); _clearing = false; @@ -69,9 +75,7 @@ public: while (_currentSize + tSize > _maxSize) { _cvNoLongerFull.wait(l); } - _queue.push(t); - _currentSize += tSize; - _cvNoLongerEmpty.notify_one(); + pushImpl_inlock(t, tSize); } bool empty() const { @@ -198,6 +202,14 @@ public: } private: + void pushImpl_inlock(const T& obj, size_t objSize) { + _clearing = false; + _queue.push(obj); + _currentSize += objSize; + if (_queue.size() == 1) // We were empty. + _cvNoLongerEmpty.notify_one(); + } + mutable stdx::mutex _lock; std::queue<T> _queue; const size_t _maxSize; |