summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/catalog/collection.cpp14
-rw-r--r--src/mongo/db/catalog/collection.h8
-rw-r--r--src/mongo/db/op_observer.cpp4
-rw-r--r--src/mongo/db/op_observer.h4
-rw-r--r--src/mongo/db/repl/bgsync.cpp10
-rw-r--r--src/mongo/db/repl/initial_sync.cpp6
-rw-r--r--src/mongo/db/repl/oplog.cpp11
-rw-r--r--src/mongo/db/repl/oplog.h6
-rw-r--r--src/mongo/db/repl/replication_coordinator.h6
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp6
-rw-r--r--src/mongo/db/repl/rs_sync.cpp8
-rw-r--r--src/mongo/db/repl/sync_tail.cpp275
-rw-r--r--src/mongo/db/repl/sync_tail.h40
-rw-r--r--src/mongo/util/queue.h33
14 files changed, 276 insertions, 155 deletions
diff --git a/src/mongo/db/catalog/collection.cpp b/src/mongo/db/catalog/collection.cpp
index 2c45a6057b7..bd319dc6654 100644
--- a/src/mongo/db/catalog/collection.cpp
+++ b/src/mongo/db/catalog/collection.cpp
@@ -331,14 +331,14 @@ Status Collection::insertDocument(OperationContext* txn, const DocWriter* doc, b
Status Collection::insertDocuments(OperationContext* txn,
- vector<BSONObj>::iterator begin,
- vector<BSONObj>::iterator end,
+ const vector<BSONObj>::const_iterator begin,
+ const vector<BSONObj>::const_iterator end,
bool enforceQuota,
bool fromMigrate) {
// Should really be done in the collection object at creation and updated on index create.
const bool hasIdIndex = _indexCatalog.findIdIndex(txn);
- for (vector<BSONObj>::iterator it = begin; it != end; it++) {
+ for (auto it = begin; it != end; it++) {
if (hasIdIndex && (*it)["_id"].eoo()) {
return Status(ErrorCodes::InternalError,
str::stream() << "Collection::insertDocument got "
@@ -411,8 +411,8 @@ Status Collection::insertDocument(OperationContext* txn,
}
Status Collection::_insertDocuments(OperationContext* txn,
- vector<BSONObj>::iterator begin,
- vector<BSONObj>::iterator end,
+ const vector<BSONObj>::const_iterator begin,
+ const vector<BSONObj>::const_iterator end,
bool enforceQuota) {
dassert(txn->lockState()->isCollectionLockedForMode(ns().toString(), MODE_IX));
@@ -421,7 +421,7 @@ Status Collection::_insertDocuments(OperationContext* txn,
// collection access method probably
std::vector<Record> records;
- for (vector<BSONObj>::iterator it = begin; it != end; it++) {
+ for (auto it = begin; it != end; it++) {
Record record = {RecordId(), RecordData(it->objdata(), it->objsize())};
records.push_back(record);
}
@@ -431,7 +431,7 @@ Status Collection::_insertDocuments(OperationContext* txn,
std::vector<BsonRecord> bsonRecords;
int recordIndex = 0;
- for (vector<BSONObj>::iterator it = begin; it != end; it++) {
+ for (auto it = begin; it != end; it++) {
RecordId loc = records[recordIndex++].id;
invariant(RecordId::min() < loc);
invariant(loc < RecordId::max());
diff --git a/src/mongo/db/catalog/collection.h b/src/mongo/db/catalog/collection.h
index d5956382f91..eabc8dfdfa9 100644
--- a/src/mongo/db/catalog/collection.h
+++ b/src/mongo/db/catalog/collection.h
@@ -255,8 +255,8 @@ public:
* If errors occor (including WCE), caller should retry documents individually.
*/
Status insertDocuments(OperationContext* txn,
- std::vector<BSONObj>::iterator begin,
- std::vector<BSONObj>::iterator end,
+ std::vector<BSONObj>::const_iterator begin,
+ std::vector<BSONObj>::const_iterator end,
bool enforceQuota,
bool fromMigrate = false);
@@ -440,8 +440,8 @@ private:
Status _insertDocument(OperationContext* txn, const BSONObj& doc, bool enforceQuota);
Status _insertDocuments(OperationContext* txn,
- std::vector<BSONObj>::iterator begin,
- std::vector<BSONObj>::iterator end,
+ std::vector<BSONObj>::const_iterator begin,
+ std::vector<BSONObj>::const_iterator end,
bool enforceQuota);
bool _enforceQuota(bool userEnforeQuota) const;
diff --git a/src/mongo/db/op_observer.cpp b/src/mongo/db/op_observer.cpp
index 720d26ca9f2..6e5d6161d96 100644
--- a/src/mongo/db/op_observer.cpp
+++ b/src/mongo/db/op_observer.cpp
@@ -58,8 +58,8 @@ void OpObserver::onCreateIndex(OperationContext* txn,
void OpObserver::onInserts(OperationContext* txn,
const NamespaceString& nss,
- vector<BSONObj>::iterator begin,
- vector<BSONObj>::iterator end,
+ vector<BSONObj>::const_iterator begin,
+ vector<BSONObj>::const_iterator end,
bool fromMigrate) {
repl::logOps(txn, "i", nss, begin, end, fromMigrate);
diff --git a/src/mongo/db/op_observer.h b/src/mongo/db/op_observer.h
index 2ef3a191f0a..36dafda30c6 100644
--- a/src/mongo/db/op_observer.h
+++ b/src/mongo/db/op_observer.h
@@ -57,8 +57,8 @@ public:
bool fromMigrate = false);
void onInserts(OperationContext* txn,
const NamespaceString& ns,
- std::vector<BSONObj>::iterator begin,
- std::vector<BSONObj>::iterator end,
+ std::vector<BSONObj>::const_iterator begin,
+ std::vector<BSONObj>::const_iterator end,
bool fromMigrate = false);
void onUpdate(OperationContext* txn, oplogUpdateEntryArgs args);
void onDelete(OperationContext* txn,
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index 118a9e02046..f41a1c78608 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -229,9 +229,13 @@ void BackgroundSync::_producerThread() {
if (!isPaused()) {
stop();
}
- if (_replCoord->isWaitingForApplierToDrain() && _buffer.empty()) {
- // This will wake up the applier if it is sitting in blockingPeek().
- _buffer.clear();
+ if (_replCoord->isWaitingForApplierToDrain()) {
+ // Signal to consumers that we have entered the paused state if the signal isn't already
+ // in the queue.
+ const boost::optional<BSONObj> lastObjectPushed = _buffer.lastObjectPushed();
+ if (!lastObjectPushed || !lastObjectPushed->isEmpty()) {
+ _buffer.pushEvenIfFull(BSONObj());
+ }
}
sleepsecs(1);
return;
diff --git a/src/mongo/db/repl/initial_sync.cpp b/src/mongo/db/repl/initial_sync.cpp
index d769d7a89fa..e0ca82a6ea0 100644
--- a/src/mongo/db/repl/initial_sync.cpp
+++ b/src/mongo/db/repl/initial_sync.cpp
@@ -73,13 +73,13 @@ void InitialSync::_applyOplogUntil(OperationContext* txn, const OpTime& endOpTim
OpQueue ops;
auto replCoord = repl::ReplicationCoordinator::get(txn);
- while (!tryPopAndWaitForMore(txn, &ops, replCoord)) {
+ while (!tryPopAndWaitForMore(txn, &ops)) {
// nothing came back last time, so go again
if (ops.empty())
continue;
// Check if we reached the end
- const BSONObj currentOp = ops.back();
+ const BSONObj currentOp = ops.back().raw;
const OpTime currentOpTime =
fassertStatusOK(28772, OpTime::parseFromOplogEntry(currentOp));
@@ -104,7 +104,7 @@ void InitialSync::_applyOplogUntil(OperationContext* txn, const OpTime& endOpTim
fassertFailedNoTrace(18692);
}
- const BSONObj lastOp = ops.back().getOwned();
+ const BSONObj lastOp = ops.back().raw.getOwned();
// Tally operation information
bytesApplied += ops.getSize();
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index 87a748833e3..a8e14fc6470 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -423,8 +423,8 @@ void _logOp(OperationContext* txn,
void logOps(OperationContext* txn,
const char* opstr,
const NamespaceString& nss,
- std::vector<BSONObj>::iterator begin,
- std::vector<BSONObj>::iterator end,
+ std::vector<BSONObj>::const_iterator begin,
+ std::vector<BSONObj>::const_iterator end,
bool fromMigrate) {
ReplicationCoordinator::Mode replMode = ReplicationCoordinator::get(txn)->getReplicationMode();
@@ -459,7 +459,7 @@ void logOp(OperationContext* txn,
_logOp(txn, opstr, ns, obj, o2, fromMigrate, _oplogCollectionName, replMode, true);
}
-OpTime writeOpsToOplog(OperationContext* txn, const std::deque<BSONObj>& ops) {
+OpTime writeOpsToOplog(OperationContext* txn, const std::vector<BSONObj>& ops) {
ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator();
OpTime lastOptime;
@@ -483,11 +483,10 @@ OpTime writeOpsToOplog(OperationContext* txn, const std::deque<BSONObj>& ops) {
OldClientContext ctx(txn, rsOplogName, _localDB);
WriteUnitOfWork wunit(txn);
- std::vector<BSONObj> opsVect(ops.begin(), ops.end());
checkOplogInsert(
- _localOplogCollection->insertDocuments(txn, opsVect.begin(), opsVect.end(), false));
+ _localOplogCollection->insertDocuments(txn, ops.begin(), ops.end(), false));
lastOptime =
- fassertStatusOK(ErrorCodes::InvalidBSON, OpTime::parseFromOplogEntry(opsVect.back()));
+ fassertStatusOK(ErrorCodes::InvalidBSON, OpTime::parseFromOplogEntry(ops.back()));
wunit.commit();
}
MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "writeOps", _localOplogCollection->ns().ns());
diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h
index 613c02edbcf..b022aa0e545 100644
--- a/src/mongo/db/repl/oplog.h
+++ b/src/mongo/db/repl/oplog.h
@@ -74,7 +74,7 @@ void createOplog(OperationContext* txn);
// used internally by replication secondaries after they have applied ops. Updates the global
// optime.
// Returns the optime for the last op inserted.
-OpTime writeOpsToOplog(OperationContext* txn, const std::deque<BSONObj>& ops);
+OpTime writeOpsToOplog(OperationContext* txn, const std::vector<BSONObj>& ops);
extern std::string rsOplogName;
extern std::string masterSlaveOplogName;
@@ -95,8 +95,8 @@ extern int OPLOG_VERSION;
void logOps(OperationContext* txn,
const char* opstr,
const NamespaceString& nss,
- std::vector<BSONObj>::iterator begin,
- std::vector<BSONObj>::iterator end,
+ std::vector<BSONObj>::const_iterator begin,
+ std::vector<BSONObj>::const_iterator end,
bool fromMigrate);
/* For 'u' records, 'obj' captures the mutation made to the object but not
diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h
index ee58f04038b..97104b71297 100644
--- a/src/mongo/db/repl/replication_coordinator.h
+++ b/src/mongo/db/repl/replication_coordinator.h
@@ -174,10 +174,8 @@ public:
/**
- * Returns how slave delayed this node is configured to be.
- *
- * Raises a DBException if this node is not a member of the current replica set
- * configuration.
+ * Returns how slave delayed this node is configured to be, or 0 seconds if this node is not a
+ * member of the current replica set configuration.
*/
virtual Seconds getSlaveDelaySecs() const = 0;
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 0c3521a0c98..55226375553 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -528,7 +528,11 @@ Status ReplicationCoordinatorImpl::waitForMemberState(MemberState expectedState,
Seconds ReplicationCoordinatorImpl::getSlaveDelaySecs() const {
stdx::lock_guard<stdx::mutex> lk(_mutex);
invariant(_rsConfig.isInitialized());
- uassert(28524, "Node not a member of the current set configuration", _selfIndex != -1);
+ if (_selfIndex == -1) {
+ // We aren't currently in the set. Return 0 seconds so we can clear out the applier's
+ // queue of work.
+ return Seconds(0);
+ }
return _rsConfig.getMemberAt(_selfIndex).getSlaveDelay();
}
diff --git a/src/mongo/db/repl/rs_sync.cpp b/src/mongo/db/repl/rs_sync.cpp
index 1ae5f51b249..74246d54439 100644
--- a/src/mongo/db/repl/rs_sync.cpp
+++ b/src/mongo/db/repl/rs_sync.cpp
@@ -131,12 +131,8 @@ void runSyncThread() {
/* we have some data. continue tailing. */
SyncTail tail(BackgroundSync::get(), multiSyncApply);
tail.oplogApplication();
- } catch (const DBException& e) {
- log() << "Received exception while syncing: " << e.toString();
- sleepsecs(10);
- } catch (const std::exception& e) {
- log() << "Received exception while syncing: " << e.what();
- sleepsecs(10);
+ } catch (...) {
+ std::terminate();
}
}
}
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 244eba62c42..3bab6bd1063 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -66,6 +66,7 @@
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
+#include "mongo/util/scopeguard.h"
namespace mongo {
@@ -386,10 +387,10 @@ void prefetchOp(const BSONObj& op) {
}
// Doles out all the work to the reader pool threads and waits for them to complete
-void prefetchOps(const std::deque<BSONObj>& ops, OldThreadPool* prefetcherPool) {
+void prefetchOps(const std::deque<SyncTail::OplogEntry>& ops, OldThreadPool* prefetcherPool) {
invariant(prefetcherPool);
- for (std::deque<BSONObj>::const_iterator it = ops.begin(); it != ops.end(); ++it) {
- prefetcherPool->schedule(&prefetchOp, *it);
+ for (auto&& op : ops) {
+ prefetcherPool->schedule(&prefetchOp, op.raw);
}
prefetcherPool->join();
}
@@ -409,28 +410,27 @@ void applyOps(const std::vector<std::vector<BSONObj>>& writerVectors,
}
}
-void fillWriterVectors(const std::deque<BSONObj>& ops,
+void fillWriterVectors(const std::deque<SyncTail::OplogEntry>& ops,
std::vector<std::vector<BSONObj>>* writerVectors) {
- for (std::deque<BSONObj>::const_iterator it = ops.begin(); it != ops.end(); ++it) {
- const BSONElement e = it->getField("ns");
- verify(e.type() == String);
- const char* ns = e.valuestr();
- int len = e.valuestrsize();
+ const bool supportsDocLocking =
+ getGlobalServiceContext()->getGlobalStorageEngine()->supportsDocLocking();
+ const uint32_t numWriters = writerVectors->size();
+
+ for (auto&& op : ops) {
uint32_t hash = 0;
- MurmurHash3_x86_32(ns, len, 0, &hash);
+ MurmurHash3_x86_32(op.ns.rawData(), op.ns.size(), 0, &hash);
- const char* opType = it->getField("op").valuestrsafe();
+ const char* opType = op.opType.rawData();
- if (getGlobalServiceContext()->getGlobalStorageEngine()->supportsDocLocking() &&
- isCrudOpType(opType)) {
+ if (supportsDocLocking && isCrudOpType(opType)) {
BSONElement id;
switch (opType[0]) {
case 'u':
- id = it->getField("o2").Obj()["_id"];
+ id = op.o2.Obj()["_id"];
break;
case 'd':
case 'i':
- id = it->getField("o").Obj()["_id"];
+ id = op.o.Obj()["_id"];
break;
}
@@ -438,7 +438,7 @@ void fillWriterVectors(const std::deque<BSONObj>& ops,
MurmurHash3_x86_32(&idHash, sizeof(idHash), hash, &hash);
}
- (*writerVectors)[hash % writerVectors->size()].push_back(*it);
+ (*writerVectors)[hash % numWriters].push_back(op.raw);
}
}
@@ -474,11 +474,19 @@ OpTime SyncTail::multiApply(OperationContext* txn, const OpQueue& ops) {
applyOps(writerVectors, &_writerPool, _applyFunc, this);
- OpTime lastOpTime = writeOpsToOplog(txn, ops.getDeque());
- if (inShutdown()) {
- return OpTime();
+ OpTime lastOpTime;
+ {
+ ON_BLOCK_EXIT([&] { _writerPool.join(); });
+ std::vector<BSONObj> raws;
+ raws.reserve(ops.getDeque().size());
+ for (auto&& op : ops.getDeque()) {
+ raws.emplace_back(op.raw);
+ }
+ lastOpTime = writeOpsToOplog(txn, raws);
+ if (inShutdown()) {
+ return OpTime();
+ }
}
- _writerPool.join();
// We have now written all database writes and updated the oplog to match.
return lastOpTime;
}
@@ -516,83 +524,143 @@ void tryToGoLiveAsASecondary(OperationContext* txn, ReplicationCoordinator* repl
}
}
-/* tail an oplog. ok to return, will be re-called. */
-void SyncTail::oplogApplication() {
- ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator();
- ApplyBatchFinalizer finalizer(replCoord);
+class SyncTail::OpQueueBatcher {
+ MONGO_DISALLOW_COPYING(OpQueueBatcher);
- OperationContextImpl txn;
- OpTime originalEndOpTime(getMinValid(&txn).end);
+public:
+ explicit OpQueueBatcher(SyncTail* syncTail) : _syncTail(syncTail), _thread([&] { run(); }) {}
+ ~OpQueueBatcher() {
+ _inShutdown.store(true);
+ _cv.notify_all();
+ _thread.join();
+ }
- while (!inShutdown()) {
- OpQueue ops;
+ OpQueue getNextBatch(Seconds maxWaitTime) {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ if (_ops.empty()) {
+ _cv.wait_for(lk, maxWaitTime);
+ }
- Timer batchTimer;
- int lastTimeChecked = 0;
+ OpQueue ops = std::move(_ops);
+ _ops = {};
+ _cv.notify_all();
- do {
- int now = batchTimer.seconds();
+ return ops;
+ }
- // apply replication batch limits
- if (!ops.empty()) {
- if (now > replBatchLimitSeconds)
- break;
- if (ops.getDeque().size() > replBatchLimitOperations)
- break;
- }
- // occasionally check some things
- // (always checked in the first iteration of this do-while loop, because
- // ops is empty)
- if (ops.empty() || now > lastTimeChecked) {
- BackgroundSync* bgsync = BackgroundSync::get();
- if (bgsync->getInitialSyncRequestedFlag()) {
- // got a resync command
- return;
+private:
+ void run() {
+ Client::initThread("ReplBatcher");
+ OperationContextImpl txn;
+ auto replCoord = ReplicationCoordinator::get(&txn);
+
+ while (!_inShutdown.load()) {
+ Timer batchTimer;
+
+ OpQueue ops;
+ // tryPopAndWaitForMore returns true when we need to end a batch early
+ while (!_syncTail->tryPopAndWaitForMore(&txn, &ops) &&
+ (ops.getSize() < replBatchLimitBytes) && !_inShutdown.load()) {
+ int now = batchTimer.seconds();
+
+ // apply replication batch limits
+ if (!ops.empty()) {
+ if (now > replBatchLimitSeconds)
+ break;
+ if (ops.getDeque().size() > replBatchLimitOperations)
+ break;
}
- lastTimeChecked = now;
- // can we become secondary?
- // we have to check this before calling mgr, as we must be a secondary to
- // become primary
- tryToGoLiveAsASecondary(&txn, replCoord);
- }
- const int slaveDelaySecs = durationCount<Seconds>(replCoord->getSlaveDelaySecs());
- if (!ops.empty() && slaveDelaySecs > 0) {
- const BSONObj lastOp = ops.back();
- const unsigned int opTimestampSecs = lastOp["ts"].timestamp().getSecs();
+ const int slaveDelaySecs = durationCount<Seconds>(replCoord->getSlaveDelaySecs());
+ if (!ops.empty() && slaveDelaySecs > 0) {
+ const BSONObj lastOp = ops.back().raw;
+ const unsigned int opTimestampSecs = lastOp["ts"].timestamp().getSecs();
- // Stop the batch as the lastOp is too new to be applied. If we continue
- // on, we can get ops that are way ahead of the delay and this will
- // make this thread sleep longer when handleSlaveDelay is called
- // and apply ops much sooner than we like.
- if (opTimestampSecs > static_cast<unsigned int>(time(0) - slaveDelaySecs)) {
+ // Stop the batch as the lastOp is too new to be applied. If we continue
+ // on, we can get ops that are way ahead of the delay and this will
+ // make this thread sleep longer when handleSlaveDelay is called
+ // and apply ops much sooner than we like.
+ if (opTimestampSecs > static_cast<unsigned int>(time(0) - slaveDelaySecs)) {
+ break;
+ }
+ }
+
+ if (MONGO_FAIL_POINT(rsSyncApplyStop)) {
break;
}
+
+ // keep fetching more ops as long as we haven't filled up a full batch yet
}
- if (MONGO_FAIL_POINT(rsSyncApplyStop)) {
- break;
+ // For pausing replication in tests
+ while (MONGO_FAIL_POINT(rsSyncApplyStop) && !_inShutdown.load()) {
+ sleepmillis(0);
}
- // keep fetching more ops as long as we haven't filled up a full batch yet
- } while (!tryPopAndWaitForMore(&txn, &ops, replCoord) && // tryPopAndWaitForMore returns
- // true when we need to end a
- // batch early
- (ops.getSize() < replBatchLimitBytes) &&
- !inShutdown());
-
- // For pausing replication in tests
- while (MONGO_FAIL_POINT(rsSyncApplyStop)) {
- sleepmillis(0);
- if (inShutdown())
- return;
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ while (!_ops.empty()) {
+ // Block until the previous batch has been taken.
+ if (_inShutdown.load())
+ return;
+ _cv.wait(lk);
+ }
+ _ops = std::move(ops);
+ _cv.notify_all();
}
+ }
+
+ AtomicWord<bool> _inShutdown;
+ SyncTail* const _syncTail;
+
+ stdx::mutex _mutex; // Guards _ops.
+ stdx::condition_variable _cv;
+ OpQueue _ops;
+
+ stdx::thread _thread; // Must be last so all other members are initialized before starting.
+};
- if (ops.empty()) {
- continue;
+/* tail an oplog. ok to return, will be re-called. */
+void SyncTail::oplogApplication() {
+ OpQueueBatcher batcher(this);
+
+ OperationContextImpl txn;
+ auto replCoord = ReplicationCoordinator::get(&txn);
+ ApplyBatchFinalizer finalizer(replCoord);
+
+ OpTime originalEndOpTime(getMinValid(&txn).end);
+ while (!inShutdown()) {
+ OpQueue ops;
+
+ do {
+ if (BackgroundSync::get()->getInitialSyncRequestedFlag()) {
+ // got a resync command
+ return;
+ }
+
+ tryToGoLiveAsASecondary(&txn, replCoord);
+
+ // Blocks up to a second waiting for a batch to be ready to apply. If one doesn't become
+ // ready in time, we'll loop again so we can do the above checks periodically.
+ ops = batcher.getNextBatch(Seconds(1));
+ } while (!inShutdown() && ops.empty());
+
+ if (inShutdown())
+ return;
+
+ invariant(!ops.empty());
+
+ const BSONObj lastOp = ops.back().raw;
+
+ if (lastOp.isEmpty()) {
+ // This means that the network thread has coalesced and we have processed all of its
+ // data.
+ invariant(ops.getDeque().size() == 1);
+ if (replCoord->isWaitingForApplierToDrain()) {
+ replCoord->signalDrainComplete(&txn);
+ }
+ continue; // This wasn't a real op. Don't try to apply it.
}
- const BSONObj lastOp = ops.back();
handleSlaveDelay(lastOp);
// Set minValid to the last OpTime that needs to be applied, in this batch or from the
@@ -628,6 +696,23 @@ void SyncTail::oplogApplication() {
}
}
+SyncTail::OplogEntry::OplogEntry(const BSONObj& rawInput) : raw(rawInput.getOwned()) {
+ for (auto elem : raw) {
+ const auto name = elem.fieldNameStringData();
+ if (name == "ns") {
+ ns = elem.valuestrsafe();
+ } else if (name == "op") {
+ opType = elem.valuestrsafe();
+ } else if (name == "o2") {
+ o2 = elem;
+ } else if (name == "v") {
+ version = elem;
+ } else if (name == "o") {
+ o = elem;
+ }
+ }
+}
+
// Copies ops out of the bgsync queue into the deque passed in as a parameter.
// Returns true if the batch should be ended early.
// Batch should end early if we encounter a command, or if
@@ -635,9 +720,7 @@ void SyncTail::oplogApplication() {
// This function also blocks 1 second waiting for new ops to appear in the bgsync
// queue. We can't block forever because there are maintenance things we need
// to periodically check in the loop.
-bool SyncTail::tryPopAndWaitForMore(OperationContext* txn,
- SyncTail::OpQueue* ops,
- ReplicationCoordinator* replCoord) {
+bool SyncTail::tryPopAndWaitForMore(OperationContext* txn, SyncTail::OpQueue* ops) {
BSONObj op;
// Check to see if there are ops waiting in the bgsync queue
bool peek_success = peek(&op);
@@ -645,16 +728,6 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* txn,
if (!peek_success) {
// if we don't have anything in the queue, wait a bit for something to appear
if (ops->empty()) {
- if (replCoord->isWaitingForApplierToDrain()) {
- BackgroundSync::get()->waitUntilPaused();
- if (peek(&op)) {
- // The producer generated a last batch of ops before pausing so return
- // false so that we'll come back and apply them before signaling the drain
- // is complete.
- return false;
- }
- replCoord->signalDrainComplete(txn);
- }
// block up to 1 second
_networkQueue->waitForMore();
return false;
@@ -664,16 +737,17 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* txn,
return true;
}
- const char* ns = op["ns"].valuestrsafe();
+ auto entry = OplogEntry(op);
- // check for commands
- if ((op["op"].valuestrsafe()[0] == 'c') ||
+ // Check for ops that must be processed one at a time.
+ if (entry.raw.isEmpty() || // sentinel that network queue is drained.
+ (entry.opType[0] == 'c') || // commands.
// Index builds are acheived through the use of an insert op, not a command op.
// The following line is the same as what the insert code uses to detect an index build.
- (*ns != '\0' && nsToCollectionSubstring(ns) == "system.indexes")) {
+ (!entry.ns.empty() && nsToCollectionSubstring(entry.ns) == "system.indexes")) {
if (ops->empty()) {
// apply commands one-at-a-time
- ops->push_back(op);
+ ops->push_back(std::move(entry));
_networkQueue->consume();
}
@@ -682,13 +756,12 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* txn,
}
// check for oplog version change
- BSONElement elemVersion = op["v"];
int curVersion = 0;
- if (elemVersion.eoo())
+ if (entry.version.eoo())
// missing version means version 1
curVersion = 1;
else
- curVersion = elemVersion.Int();
+ curVersion = entry.version.Int();
if (curVersion != OPLOG_VERSION) {
severe() << "expected oplog version " << OPLOG_VERSION << " but found version "
@@ -697,7 +770,7 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* txn,
}
// Copy the op to the deque and remove it from the bgsync queue.
- ops->push_back(op);
+ ops->push_back(std::move(entry));
_networkQueue->consume();
// Go back for more ops
diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h
index 254f0182bd5..e3edacaed2a 100644
--- a/src/mongo/db/repl/sync_tail.h
+++ b/src/mongo/db/repl/sync_tail.h
@@ -95,38 +95,58 @@ public:
void oplogApplication();
bool peek(BSONObj* obj);
+ /**
+ * A parsed oplog entry.
+ *
+ * This only includes the fields used by the code using this object at the time this was
+ * written. As more code uses this, more fields should be added.
+ *
+ * All unowned members (such as StringDatas and BSONElements) point into the raw BSON.
+ * All StringData members are guaranteed to be NUL terminated.
+ */
+ struct OplogEntry {
+ explicit OplogEntry(const BSONObj& raw);
+
+ BSONObj raw; // Owned.
+
+ StringData ns = "";
+ StringData opType = "";
+
+ BSONElement version;
+ BSONElement o;
+ BSONElement o2;
+ };
+
class OpQueue {
public:
OpQueue() : _size(0) {}
size_t getSize() const {
return _size;
}
- const std::deque<BSONObj>& getDeque() const {
+ const std::deque<OplogEntry>& getDeque() const {
return _deque;
}
- void push_back(BSONObj& op) {
- _deque.push_back(op);
- _size += op.objsize();
+ void push_back(OplogEntry&& op) {
+ _size += op.raw.objsize();
+ _deque.push_back(std::move(op));
}
bool empty() const {
return _deque.empty();
}
- BSONObj back() const {
+ const OplogEntry& back() const {
invariant(!_deque.empty());
return _deque.back();
}
private:
- std::deque<BSONObj> _deque;
+ std::deque<OplogEntry> _deque;
size_t _size;
};
// returns true if we should continue waiting for BSONObjs, false if we should
// stop waiting and apply the queue we have. Only returns false if !ops.empty().
- bool tryPopAndWaitForMore(OperationContext* txn,
- OpQueue* ops,
- ReplicationCoordinator* replCoord);
+ bool tryPopAndWaitForMore(OperationContext* txn, OpQueue* ops);
/**
* Fetch a single document referenced in the operation from the sync source.
@@ -158,6 +178,8 @@ protected:
OpTime multiApply(OperationContext* txn, const OpQueue& ops);
private:
+ class OpQueueBatcher;
+
std::string _hostname;
BackgroundSyncInterface* _networkQueue;
diff --git a/src/mongo/util/queue.h b/src/mongo/util/queue.h
index a1b8caac1a0..fefcadd3d70 100644
--- a/src/mongo/util/queue.h
+++ b/src/mongo/util/queue.h
@@ -29,12 +29,14 @@
#pragma once
+#include <boost/optional.hpp>
#include <limits>
#include <queue>
+#include "mongo/base/disallow_copying.h"
#include "mongo/stdx/chrono.h"
#include "mongo/stdx/condition_variable.h"
-#include "mongo/base/disallow_copying.h"
+#include "mongo/stdx/mutex.h"
namespace mongo {
@@ -62,6 +64,11 @@ public:
BlockingQueue(size_t size) : _maxSize(size), _getSize(&_getSizeDefault) {}
BlockingQueue(size_t size, getSizeFunc f) : _maxSize(size), _getSize(f) {}
+ void pushEvenIfFull(T const& t) {
+ stdx::unique_lock<stdx::mutex> l(_lock);
+ pushImpl_inlock(t, _getSize(t));
+ }
+
void push(T const& t) {
stdx::unique_lock<stdx::mutex> l(_lock);
_clearing = false;
@@ -69,9 +76,7 @@ public:
while (_currentSize + tSize > _maxSize) {
_cvNoLongerFull.wait(l);
}
- _queue.push(t);
- _currentSize += tSize;
- _cvNoLongerEmpty.notify_one();
+ pushImpl_inlock(t, tSize);
}
bool empty() const {
@@ -197,7 +202,27 @@ public:
return true;
}
+ /**
+ * Returns the item most recently added to the queue or nothing if the queue is empty.
+ */
+ boost::optional<T> lastObjectPushed() const {
+ stdx::unique_lock<stdx::mutex> l(_lock);
+ if (_queue.empty()) {
+ return {};
+ }
+
+ return {_queue.back()};
+ }
+
private:
+ void pushImpl_inlock(const T& obj, size_t objSize) {
+ _clearing = false;
+ _queue.push(obj);
+ _currentSize += objSize;
+ if (_queue.size() == 1) // We were empty.
+ _cvNoLongerEmpty.notify_one();
+ }
+
mutable stdx::mutex _lock;
std::queue<T> _queue;
const size_t _maxSize;