diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/catalog/collection.cpp | 14 | ||||
-rw-r--r-- | src/mongo/db/catalog/collection.h | 8 | ||||
-rw-r--r-- | src/mongo/db/op_observer.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/op_observer.h | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/bgsync.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/initial_sync.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.cpp | 11 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.h | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator.h | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_sync.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 275 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.h | 40 | ||||
-rw-r--r-- | src/mongo/util/queue.h | 20 |
14 files changed, 152 insertions, 261 deletions
diff --git a/src/mongo/db/catalog/collection.cpp b/src/mongo/db/catalog/collection.cpp index bd319dc6654..2c45a6057b7 100644 --- a/src/mongo/db/catalog/collection.cpp +++ b/src/mongo/db/catalog/collection.cpp @@ -331,14 +331,14 @@ Status Collection::insertDocument(OperationContext* txn, const DocWriter* doc, b Status Collection::insertDocuments(OperationContext* txn, - const vector<BSONObj>::const_iterator begin, - const vector<BSONObj>::const_iterator end, + vector<BSONObj>::iterator begin, + vector<BSONObj>::iterator end, bool enforceQuota, bool fromMigrate) { // Should really be done in the collection object at creation and updated on index create. const bool hasIdIndex = _indexCatalog.findIdIndex(txn); - for (auto it = begin; it != end; it++) { + for (vector<BSONObj>::iterator it = begin; it != end; it++) { if (hasIdIndex && (*it)["_id"].eoo()) { return Status(ErrorCodes::InternalError, str::stream() << "Collection::insertDocument got " @@ -411,8 +411,8 @@ Status Collection::insertDocument(OperationContext* txn, } Status Collection::_insertDocuments(OperationContext* txn, - const vector<BSONObj>::const_iterator begin, - const vector<BSONObj>::const_iterator end, + vector<BSONObj>::iterator begin, + vector<BSONObj>::iterator end, bool enforceQuota) { dassert(txn->lockState()->isCollectionLockedForMode(ns().toString(), MODE_IX)); @@ -421,7 +421,7 @@ Status Collection::_insertDocuments(OperationContext* txn, // collection access method probably std::vector<Record> records; - for (auto it = begin; it != end; it++) { + for (vector<BSONObj>::iterator it = begin; it != end; it++) { Record record = {RecordId(), RecordData(it->objdata(), it->objsize())}; records.push_back(record); } @@ -431,7 +431,7 @@ Status Collection::_insertDocuments(OperationContext* txn, std::vector<BsonRecord> bsonRecords; int recordIndex = 0; - for (auto it = begin; it != end; it++) { + for (vector<BSONObj>::iterator it = begin; it != end; it++) { RecordId loc = records[recordIndex++].id; invariant(RecordId::min() < loc); invariant(loc < RecordId::max()); diff --git a/src/mongo/db/catalog/collection.h b/src/mongo/db/catalog/collection.h index eabc8dfdfa9..d5956382f91 100644 --- a/src/mongo/db/catalog/collection.h +++ b/src/mongo/db/catalog/collection.h @@ -255,8 +255,8 @@ public: * If errors occor (including WCE), caller should retry documents individually. */ Status insertDocuments(OperationContext* txn, - std::vector<BSONObj>::const_iterator begin, - std::vector<BSONObj>::const_iterator end, + std::vector<BSONObj>::iterator begin, + std::vector<BSONObj>::iterator end, bool enforceQuota, bool fromMigrate = false); @@ -440,8 +440,8 @@ private: Status _insertDocument(OperationContext* txn, const BSONObj& doc, bool enforceQuota); Status _insertDocuments(OperationContext* txn, - std::vector<BSONObj>::const_iterator begin, - std::vector<BSONObj>::const_iterator end, + std::vector<BSONObj>::iterator begin, + std::vector<BSONObj>::iterator end, bool enforceQuota); bool _enforceQuota(bool userEnforeQuota) const; diff --git a/src/mongo/db/op_observer.cpp b/src/mongo/db/op_observer.cpp index 6e5d6161d96..720d26ca9f2 100644 --- a/src/mongo/db/op_observer.cpp +++ b/src/mongo/db/op_observer.cpp @@ -58,8 +58,8 @@ void OpObserver::onCreateIndex(OperationContext* txn, void OpObserver::onInserts(OperationContext* txn, const NamespaceString& nss, - vector<BSONObj>::const_iterator begin, - vector<BSONObj>::const_iterator end, + vector<BSONObj>::iterator begin, + vector<BSONObj>::iterator end, bool fromMigrate) { repl::logOps(txn, "i", nss, begin, end, fromMigrate); diff --git a/src/mongo/db/op_observer.h b/src/mongo/db/op_observer.h index 36dafda30c6..2ef3a191f0a 100644 --- a/src/mongo/db/op_observer.h +++ b/src/mongo/db/op_observer.h @@ -57,8 +57,8 @@ public: bool fromMigrate = false); void onInserts(OperationContext* txn, const NamespaceString& ns, - std::vector<BSONObj>::const_iterator begin, - std::vector<BSONObj>::const_iterator end, + std::vector<BSONObj>::iterator begin, + std::vector<BSONObj>::iterator end, bool fromMigrate = false); void onUpdate(OperationContext* txn, oplogUpdateEntryArgs args); void onDelete(OperationContext* txn, diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index 8f68fc0aad5..118a9e02046 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -636,11 +636,6 @@ void BackgroundSync::cancelFetcher() { void BackgroundSync::stop() { stdx::lock_guard<stdx::mutex> lock(_mutex); - if (_replCoord->isWaitingForApplierToDrain()) { - // Signal to consumers that we have entered the paused state. - _buffer.pushEvenIfFull(BSONObj()); - } - _pause = true; _syncSourceHost = HostAndPort(); _lastOpTimeFetched = OpTime(); diff --git a/src/mongo/db/repl/initial_sync.cpp b/src/mongo/db/repl/initial_sync.cpp index e0ca82a6ea0..d769d7a89fa 100644 --- a/src/mongo/db/repl/initial_sync.cpp +++ b/src/mongo/db/repl/initial_sync.cpp @@ -73,13 +73,13 @@ void InitialSync::_applyOplogUntil(OperationContext* txn, const OpTime& endOpTim OpQueue ops; auto replCoord = repl::ReplicationCoordinator::get(txn); - while (!tryPopAndWaitForMore(txn, &ops)) { + while (!tryPopAndWaitForMore(txn, &ops, replCoord)) { // nothing came back last time, so go again if (ops.empty()) continue; // Check if we reached the end - const BSONObj currentOp = ops.back().raw; + const BSONObj currentOp = ops.back(); const OpTime currentOpTime = fassertStatusOK(28772, OpTime::parseFromOplogEntry(currentOp)); @@ -104,7 +104,7 @@ void InitialSync::_applyOplogUntil(OperationContext* txn, const OpTime& endOpTim fassertFailedNoTrace(18692); } - const BSONObj lastOp = ops.back().raw.getOwned(); + const BSONObj lastOp = ops.back().getOwned(); // Tally operation information bytesApplied += ops.getSize(); diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index a8e14fc6470..87a748833e3 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -423,8 +423,8 @@ void _logOp(OperationContext* txn, void logOps(OperationContext* txn, const char* opstr, const NamespaceString& nss, - std::vector<BSONObj>::const_iterator begin, - std::vector<BSONObj>::const_iterator end, + std::vector<BSONObj>::iterator begin, + std::vector<BSONObj>::iterator end, bool fromMigrate) { ReplicationCoordinator::Mode replMode = ReplicationCoordinator::get(txn)->getReplicationMode(); @@ -459,7 +459,7 @@ void logOp(OperationContext* txn, _logOp(txn, opstr, ns, obj, o2, fromMigrate, _oplogCollectionName, replMode, true); } -OpTime writeOpsToOplog(OperationContext* txn, const std::vector<BSONObj>& ops) { +OpTime writeOpsToOplog(OperationContext* txn, const std::deque<BSONObj>& ops) { ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); OpTime lastOptime; @@ -483,10 +483,11 @@ OpTime writeOpsToOplog(OperationContext* txn, const std::vector<BSONObj>& ops) { OldClientContext ctx(txn, rsOplogName, _localDB); WriteUnitOfWork wunit(txn); + std::vector<BSONObj> opsVect(ops.begin(), ops.end()); checkOplogInsert( - _localOplogCollection->insertDocuments(txn, ops.begin(), ops.end(), false)); + _localOplogCollection->insertDocuments(txn, opsVect.begin(), opsVect.end(), false)); lastOptime = - fassertStatusOK(ErrorCodes::InvalidBSON, OpTime::parseFromOplogEntry(ops.back())); + fassertStatusOK(ErrorCodes::InvalidBSON, OpTime::parseFromOplogEntry(opsVect.back())); wunit.commit(); } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "writeOps", _localOplogCollection->ns().ns()); diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h index b022aa0e545..613c02edbcf 100644 --- a/src/mongo/db/repl/oplog.h +++ b/src/mongo/db/repl/oplog.h @@ -74,7 +74,7 @@ void createOplog(OperationContext* txn); // used internally by replication secondaries after they have applied ops. Updates the global // optime. // Returns the optime for the last op inserted. -OpTime writeOpsToOplog(OperationContext* txn, const std::vector<BSONObj>& ops); +OpTime writeOpsToOplog(OperationContext* txn, const std::deque<BSONObj>& ops); extern std::string rsOplogName; extern std::string masterSlaveOplogName; @@ -95,8 +95,8 @@ extern int OPLOG_VERSION; void logOps(OperationContext* txn, const char* opstr, const NamespaceString& nss, - std::vector<BSONObj>::const_iterator begin, - std::vector<BSONObj>::const_iterator end, + std::vector<BSONObj>::iterator begin, + std::vector<BSONObj>::iterator end, bool fromMigrate); /* For 'u' records, 'obj' captures the mutation made to the object but not diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h index 97104b71297..ee58f04038b 100644 --- a/src/mongo/db/repl/replication_coordinator.h +++ b/src/mongo/db/repl/replication_coordinator.h @@ -174,8 +174,10 @@ public: /** - * Returns how slave delayed this node is configured to be, or 0 seconds if this node is not a - * member of the current replica set configuration. + * Returns how slave delayed this node is configured to be. + * + * Raises a DBException if this node is not a member of the current replica set + * configuration. */ virtual Seconds getSlaveDelaySecs() const = 0; diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 2044b43439f..5634f1db32a 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -529,11 +529,7 @@ Status ReplicationCoordinatorImpl::waitForMemberState(MemberState expectedState, Seconds ReplicationCoordinatorImpl::getSlaveDelaySecs() const { stdx::lock_guard<stdx::mutex> lk(_mutex); invariant(_rsConfig.isInitialized()); - if (_selfIndex == -1) { - // We aren't currently in the set. Return 0 seconds so we can clear out the applier's - // queue of work. - return Seconds(0); - } + uassert(28524, "Node not a member of the current set configuration", _selfIndex != -1); return _rsConfig.getMemberAt(_selfIndex).getSlaveDelay(); } diff --git a/src/mongo/db/repl/rs_sync.cpp b/src/mongo/db/repl/rs_sync.cpp index 74246d54439..1ae5f51b249 100644 --- a/src/mongo/db/repl/rs_sync.cpp +++ b/src/mongo/db/repl/rs_sync.cpp @@ -131,8 +131,12 @@ void runSyncThread() { /* we have some data. continue tailing. */ SyncTail tail(BackgroundSync::get(), multiSyncApply); tail.oplogApplication(); - } catch (...) { - std::terminate(); + } catch (const DBException& e) { + log() << "Received exception while syncing: " << e.toString(); + sleepsecs(10); + } catch (const std::exception& e) { + log() << "Received exception while syncing: " << e.what(); + sleepsecs(10); } } } diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index ff25c72abea..244eba62c42 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -66,7 +66,6 @@ #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" -#include "mongo/util/scopeguard.h" namespace mongo { @@ -387,10 +386,10 @@ void prefetchOp(const BSONObj& op) { } // Doles out all the work to the reader pool threads and waits for them to complete -void prefetchOps(const std::deque<SyncTail::OplogEntry>& ops, OldThreadPool* prefetcherPool) { +void prefetchOps(const std::deque<BSONObj>& ops, OldThreadPool* prefetcherPool) { invariant(prefetcherPool); - for (auto&& op : ops) { - prefetcherPool->schedule(&prefetchOp, op.raw); + for (std::deque<BSONObj>::const_iterator it = ops.begin(); it != ops.end(); ++it) { + prefetcherPool->schedule(&prefetchOp, *it); } prefetcherPool->join(); } @@ -410,27 +409,28 @@ void applyOps(const std::vector<std::vector<BSONObj>>& writerVectors, } } -void fillWriterVectors(const std::deque<SyncTail::OplogEntry>& ops, +void fillWriterVectors(const std::deque<BSONObj>& ops, std::vector<std::vector<BSONObj>>* writerVectors) { - const bool supportsDocLocking = - getGlobalServiceContext()->getGlobalStorageEngine()->supportsDocLocking(); - const uint32_t numWriters = writerVectors->size(); - - for (auto&& op : ops) { + for (std::deque<BSONObj>::const_iterator it = ops.begin(); it != ops.end(); ++it) { + const BSONElement e = it->getField("ns"); + verify(e.type() == String); + const char* ns = e.valuestr(); + int len = e.valuestrsize(); uint32_t hash = 0; - MurmurHash3_x86_32(op.ns.rawData(), op.ns.size(), 0, &hash); + MurmurHash3_x86_32(ns, len, 0, &hash); - const char* opType = op.opType.rawData(); + const char* opType = it->getField("op").valuestrsafe(); - if (supportsDocLocking && isCrudOpType(opType)) { + if (getGlobalServiceContext()->getGlobalStorageEngine()->supportsDocLocking() && + isCrudOpType(opType)) { BSONElement id; switch (opType[0]) { case 'u': - id = op.o2.Obj()["_id"]; + id = it->getField("o2").Obj()["_id"]; break; case 'd': case 'i': - id = op.o.Obj()["_id"]; + id = it->getField("o").Obj()["_id"]; break; } @@ -438,7 +438,7 @@ void fillWriterVectors(const std::deque<SyncTail::OplogEntry>& ops, MurmurHash3_x86_32(&idHash, sizeof(idHash), hash, &hash); } - (*writerVectors)[hash % numWriters].push_back(op.raw); + (*writerVectors)[hash % writerVectors->size()].push_back(*it); } } @@ -474,19 +474,11 @@ OpTime SyncTail::multiApply(OperationContext* txn, const OpQueue& ops) { applyOps(writerVectors, &_writerPool, _applyFunc, this); - OpTime lastOpTime; - { - ON_BLOCK_EXIT([&] { _writerPool.join(); }); - std::vector<BSONObj> raws; - raws.reserve(ops.getDeque().size()); - for (auto&& op : ops.getDeque()) { - raws.emplace_back(op.raw); - } - lastOpTime = writeOpsToOplog(txn, raws); - if (inShutdown()) { - return OpTime(); - } + OpTime lastOpTime = writeOpsToOplog(txn, ops.getDeque()); + if (inShutdown()) { + return OpTime(); } + _writerPool.join(); // We have now written all database writes and updated the oplog to match. return lastOpTime; } @@ -524,143 +516,83 @@ void tryToGoLiveAsASecondary(OperationContext* txn, ReplicationCoordinator* repl } } -class SyncTail::OpQueueBatcher { - MONGO_DISALLOW_COPYING(OpQueueBatcher); +/* tail an oplog. ok to return, will be re-called. */ +void SyncTail::oplogApplication() { + ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); + ApplyBatchFinalizer finalizer(replCoord); -public: - explicit OpQueueBatcher(SyncTail* syncTail) : _syncTail(syncTail), _thread([&] { run(); }) {} - ~OpQueueBatcher() { - _inShutdown.store(true); - _cv.notify_all(); - _thread.join(); - } + OperationContextImpl txn; + OpTime originalEndOpTime(getMinValid(&txn).end); - OpQueue getNextBatch(Seconds maxWaitTime) { - stdx::unique_lock<stdx::mutex> lk(_mutex); - if (_ops.empty()) { - _cv.wait_for(lk, maxWaitTime); - } + while (!inShutdown()) { + OpQueue ops; - OpQueue ops = std::move(_ops); - _ops = {}; - _cv.notify_all(); + Timer batchTimer; + int lastTimeChecked = 0; - return ops; - } + do { + int now = batchTimer.seconds(); -private: - void run() { - Client::initThread("ReplBatcher"); - OperationContextImpl txn; - auto replCoord = ReplicationCoordinator::get(&txn); - - while (!_inShutdown.load()) { - Timer batchTimer; - - OpQueue ops; - // tryPopAndWaitForMore returns true when we need to end a batch early - while (!_syncTail->tryPopAndWaitForMore(&txn, &ops) && - (ops.getSize() < replBatchLimitBytes) && !_inShutdown.load()) { - int now = batchTimer.seconds(); - - // apply replication batch limits - if (!ops.empty()) { - if (now > replBatchLimitSeconds) - break; - if (ops.getDeque().size() > replBatchLimitOperations) - break; + // apply replication batch limits + if (!ops.empty()) { + if (now > replBatchLimitSeconds) + break; + if (ops.getDeque().size() > replBatchLimitOperations) + break; + } + // occasionally check some things + // (always checked in the first iteration of this do-while loop, because + // ops is empty) + if (ops.empty() || now > lastTimeChecked) { + BackgroundSync* bgsync = BackgroundSync::get(); + if (bgsync->getInitialSyncRequestedFlag()) { + // got a resync command + return; } + lastTimeChecked = now; + // can we become secondary? + // we have to check this before calling mgr, as we must be a secondary to + // become primary + tryToGoLiveAsASecondary(&txn, replCoord); + } - const int slaveDelaySecs = durationCount<Seconds>(replCoord->getSlaveDelaySecs()); - if (!ops.empty() && slaveDelaySecs > 0) { - const BSONObj lastOp = ops.back().raw; - const unsigned int opTimestampSecs = lastOp["ts"].timestamp().getSecs(); - - // Stop the batch as the lastOp is too new to be applied. If we continue - // on, we can get ops that are way ahead of the delay and this will - // make this thread sleep longer when handleSlaveDelay is called - // and apply ops much sooner than we like. - if (opTimestampSecs > static_cast<unsigned int>(time(0) - slaveDelaySecs)) { - break; - } - } + const int slaveDelaySecs = durationCount<Seconds>(replCoord->getSlaveDelaySecs()); + if (!ops.empty() && slaveDelaySecs > 0) { + const BSONObj lastOp = ops.back(); + const unsigned int opTimestampSecs = lastOp["ts"].timestamp().getSecs(); - if (MONGO_FAIL_POINT(rsSyncApplyStop)) { + // Stop the batch as the lastOp is too new to be applied. If we continue + // on, we can get ops that are way ahead of the delay and this will + // make this thread sleep longer when handleSlaveDelay is called + // and apply ops much sooner than we like. + if (opTimestampSecs > static_cast<unsigned int>(time(0) - slaveDelaySecs)) { break; } - - // keep fetching more ops as long as we haven't filled up a full batch yet - } - - // For pausing replication in tests - while (MONGO_FAIL_POINT(rsSyncApplyStop) && !_inShutdown.load()) { - sleepmillis(0); } - stdx::unique_lock<stdx::mutex> lk(_mutex); - while (!_ops.empty()) { - // Block until the previous batch has been taken. - if (_inShutdown.load()) - return; - _cv.wait(lk); + if (MONGO_FAIL_POINT(rsSyncApplyStop)) { + break; } - _ops = std::move(ops); - _cv.notify_all(); - } - } - - AtomicWord<bool> _inShutdown; - SyncTail* const _syncTail; - - stdx::mutex _mutex; // Guards _ops. - stdx::condition_variable _cv; - OpQueue _ops; - - stdx::thread _thread; // Must be last so all other members are initialized before starting. -}; - -/* tail an oplog. ok to return, will be re-called. */ -void SyncTail::oplogApplication() { - OpQueueBatcher batcher(this); - - OperationContextImpl txn; - auto replCoord = ReplicationCoordinator::get(&txn); - ApplyBatchFinalizer finalizer(replCoord); - - OpTime originalEndOpTime(getMinValid(&txn).end); - while (!inShutdown()) { - OpQueue ops; - do { - if (BackgroundSync::get()->getInitialSyncRequestedFlag()) { - // got a resync command + // keep fetching more ops as long as we haven't filled up a full batch yet + } while (!tryPopAndWaitForMore(&txn, &ops, replCoord) && // tryPopAndWaitForMore returns + // true when we need to end a + // batch early + (ops.getSize() < replBatchLimitBytes) && + !inShutdown()); + + // For pausing replication in tests + while (MONGO_FAIL_POINT(rsSyncApplyStop)) { + sleepmillis(0); + if (inShutdown()) return; - } - - tryToGoLiveAsASecondary(&txn, replCoord); - - // Blocks up to a second waiting for a batch to be ready to apply. If one doesn't become - // ready in time, we'll loop again so we can do the above checks periodically. - ops = batcher.getNextBatch(Seconds(1)); - } while (!inShutdown() && ops.empty()); - - if (inShutdown()) - return; - - invariant(!ops.empty()); - - const BSONObj lastOp = ops.back().raw; - - if (lastOp.isEmpty()) { - // This means we are currently stepping up as primary, and waiting for this thread to - // drain the ops we've queued up. - invariant(ops.getDeque().size() == 1); - invariant(replCoord->isWaitingForApplierToDrain()); + } - replCoord->signalDrainComplete(&txn); - continue; // This wasn't a real op. Don't try to apply it. + if (ops.empty()) { + continue; } + const BSONObj lastOp = ops.back(); handleSlaveDelay(lastOp); // Set minValid to the last OpTime that needs to be applied, in this batch or from the @@ -696,23 +628,6 @@ void SyncTail::oplogApplication() { } } -SyncTail::OplogEntry::OplogEntry(const BSONObj& rawInput) : raw(rawInput.getOwned()) { - for (auto elem : raw) { - const auto name = elem.fieldNameStringData(); - if (name == "ns") { - ns = elem.valuestrsafe(); - } else if (name == "op") { - opType = elem.valuestrsafe(); - } else if (name == "o2") { - o2 = elem; - } else if (name == "v") { - version = elem; - } else if (name == "o") { - o = elem; - } - } -} - // Copies ops out of the bgsync queue into the deque passed in as a parameter. // Returns true if the batch should be ended early. // Batch should end early if we encounter a command, or if @@ -720,7 +635,9 @@ SyncTail::OplogEntry::OplogEntry(const BSONObj& rawInput) : raw(rawInput.getOwne // This function also blocks 1 second waiting for new ops to appear in the bgsync // queue. We can't block forever because there are maintenance things we need // to periodically check in the loop. -bool SyncTail::tryPopAndWaitForMore(OperationContext* txn, SyncTail::OpQueue* ops) { +bool SyncTail::tryPopAndWaitForMore(OperationContext* txn, + SyncTail::OpQueue* ops, + ReplicationCoordinator* replCoord) { BSONObj op; // Check to see if there are ops waiting in the bgsync queue bool peek_success = peek(&op); @@ -728,6 +645,16 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* txn, SyncTail::OpQueue* op if (!peek_success) { // if we don't have anything in the queue, wait a bit for something to appear if (ops->empty()) { + if (replCoord->isWaitingForApplierToDrain()) { + BackgroundSync::get()->waitUntilPaused(); + if (peek(&op)) { + // The producer generated a last batch of ops before pausing so return + // false so that we'll come back and apply them before signaling the drain + // is complete. + return false; + } + replCoord->signalDrainComplete(txn); + } // block up to 1 second _networkQueue->waitForMore(); return false; @@ -737,17 +664,16 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* txn, SyncTail::OpQueue* op return true; } - auto entry = OplogEntry(op); + const char* ns = op["ns"].valuestrsafe(); - // Check for ops that must be processed one at a time. - if (entry.raw.isEmpty() || // sentinel that network queue is drained. - (entry.opType[0] == 'c') || // commands. + // check for commands + if ((op["op"].valuestrsafe()[0] == 'c') || // Index builds are acheived through the use of an insert op, not a command op. // The following line is the same as what the insert code uses to detect an index build. - (!entry.ns.empty() && nsToCollectionSubstring(entry.ns) == "system.indexes")) { + (*ns != '\0' && nsToCollectionSubstring(ns) == "system.indexes")) { if (ops->empty()) { // apply commands one-at-a-time - ops->push_back(std::move(entry)); + ops->push_back(op); _networkQueue->consume(); } @@ -756,12 +682,13 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* txn, SyncTail::OpQueue* op } // check for oplog version change + BSONElement elemVersion = op["v"]; int curVersion = 0; - if (entry.version.eoo()) + if (elemVersion.eoo()) // missing version means version 1 curVersion = 1; else - curVersion = entry.version.Int(); + curVersion = elemVersion.Int(); if (curVersion != OPLOG_VERSION) { severe() << "expected oplog version " << OPLOG_VERSION << " but found version " @@ -770,7 +697,7 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* txn, SyncTail::OpQueue* op } // Copy the op to the deque and remove it from the bgsync queue. - ops->push_back(std::move(entry)); + ops->push_back(op); _networkQueue->consume(); // Go back for more ops diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h index e3edacaed2a..254f0182bd5 100644 --- a/src/mongo/db/repl/sync_tail.h +++ b/src/mongo/db/repl/sync_tail.h @@ -95,58 +95,38 @@ public: void oplogApplication(); bool peek(BSONObj* obj); - /** - * A parsed oplog entry. - * - * This only includes the fields used by the code using this object at the time this was - * written. As more code uses this, more fields should be added. - * - * All unowned members (such as StringDatas and BSONElements) point into the raw BSON. - * All StringData members are guaranteed to be NUL terminated. - */ - struct OplogEntry { - explicit OplogEntry(const BSONObj& raw); - - BSONObj raw; // Owned. - - StringData ns = ""; - StringData opType = ""; - - BSONElement version; - BSONElement o; - BSONElement o2; - }; - class OpQueue { public: OpQueue() : _size(0) {} size_t getSize() const { return _size; } - const std::deque<OplogEntry>& getDeque() const { + const std::deque<BSONObj>& getDeque() const { return _deque; } - void push_back(OplogEntry&& op) { - _size += op.raw.objsize(); - _deque.push_back(std::move(op)); + void push_back(BSONObj& op) { + _deque.push_back(op); + _size += op.objsize(); } bool empty() const { return _deque.empty(); } - const OplogEntry& back() const { + BSONObj back() const { invariant(!_deque.empty()); return _deque.back(); } private: - std::deque<OplogEntry> _deque; + std::deque<BSONObj> _deque; size_t _size; }; // returns true if we should continue waiting for BSONObjs, false if we should // stop waiting and apply the queue we have. Only returns false if !ops.empty(). - bool tryPopAndWaitForMore(OperationContext* txn, OpQueue* ops); + bool tryPopAndWaitForMore(OperationContext* txn, + OpQueue* ops, + ReplicationCoordinator* replCoord); /** * Fetch a single document referenced in the operation from the sync source. @@ -178,8 +158,6 @@ protected: OpTime multiApply(OperationContext* txn, const OpQueue& ops); private: - class OpQueueBatcher; - std::string _hostname; BackgroundSyncInterface* _networkQueue; diff --git a/src/mongo/util/queue.h b/src/mongo/util/queue.h index 6cdb62538ef..a1b8caac1a0 100644 --- a/src/mongo/util/queue.h +++ b/src/mongo/util/queue.h @@ -32,10 +32,9 @@ #include <limits> #include <queue> -#include "mongo/base/disallow_copying.h" #include "mongo/stdx/chrono.h" #include "mongo/stdx/condition_variable.h" -#include "mongo/stdx/mutex.h" +#include "mongo/base/disallow_copying.h" namespace mongo { @@ -63,11 +62,6 @@ public: BlockingQueue(size_t size) : _maxSize(size), _getSize(&_getSizeDefault) {} BlockingQueue(size_t size, getSizeFunc f) : _maxSize(size), _getSize(f) {} - void pushEvenIfFull(T const& t) { - stdx::unique_lock<stdx::mutex> l(_lock); - pushImpl_inlock(t, _getSize(t)); - } - void push(T const& t) { stdx::unique_lock<stdx::mutex> l(_lock); _clearing = false; @@ -75,7 +69,9 @@ public: while (_currentSize + tSize > _maxSize) { _cvNoLongerFull.wait(l); } - pushImpl_inlock(t, tSize); + _queue.push(t); + _currentSize += tSize; + _cvNoLongerEmpty.notify_one(); } bool empty() const { @@ -202,14 +198,6 @@ public: } private: - void pushImpl_inlock(const T& obj, size_t objSize) { - _clearing = false; - _queue.push(obj); - _currentSize += objSize; - if (_queue.size() == 1) // We were empty. - _cvNoLongerEmpty.notify_one(); - } - mutable stdx::mutex _lock; std::queue<T> _queue; const size_t _maxSize; |