summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMathias Stearn <mathias@10gen.com>2015-11-04 18:45:16 -0500
committerMathias Stearn <mathias@10gen.com>2015-11-04 18:45:16 -0500
commitd70512ab5f655868b409bcbf92d1e68af80e68d8 (patch)
tree8a0463264acf349dbbd8eb3e5ca1246a8b50378a /src
parented6c7908a51b92311efad30df78a390fa96e990f (diff)
downloadmongo-d70512ab5f655868b409bcbf92d1e68af80e68d8.tar.gz
Revert "SERVER-21154 Batch and parse oplog entries in parallel with applying them"
This reverts commit 2f70889bbfd4dea77cd26cec2dde28193b06905d.
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/catalog/collection.cpp14
-rw-r--r--src/mongo/db/catalog/collection.h8
-rw-r--r--src/mongo/db/op_observer.cpp4
-rw-r--r--src/mongo/db/op_observer.h4
-rw-r--r--src/mongo/db/repl/bgsync.cpp5
-rw-r--r--src/mongo/db/repl/initial_sync.cpp6
-rw-r--r--src/mongo/db/repl/oplog.cpp11
-rw-r--r--src/mongo/db/repl/oplog.h6
-rw-r--r--src/mongo/db/repl/replication_coordinator.h6
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp6
-rw-r--r--src/mongo/db/repl/rs_sync.cpp8
-rw-r--r--src/mongo/db/repl/sync_tail.cpp275
-rw-r--r--src/mongo/db/repl/sync_tail.h40
-rw-r--r--src/mongo/util/queue.h20
14 files changed, 152 insertions, 261 deletions
diff --git a/src/mongo/db/catalog/collection.cpp b/src/mongo/db/catalog/collection.cpp
index bd319dc6654..2c45a6057b7 100644
--- a/src/mongo/db/catalog/collection.cpp
+++ b/src/mongo/db/catalog/collection.cpp
@@ -331,14 +331,14 @@ Status Collection::insertDocument(OperationContext* txn, const DocWriter* doc, b
Status Collection::insertDocuments(OperationContext* txn,
- const vector<BSONObj>::const_iterator begin,
- const vector<BSONObj>::const_iterator end,
+ vector<BSONObj>::iterator begin,
+ vector<BSONObj>::iterator end,
bool enforceQuota,
bool fromMigrate) {
// Should really be done in the collection object at creation and updated on index create.
const bool hasIdIndex = _indexCatalog.findIdIndex(txn);
- for (auto it = begin; it != end; it++) {
+ for (vector<BSONObj>::iterator it = begin; it != end; it++) {
if (hasIdIndex && (*it)["_id"].eoo()) {
return Status(ErrorCodes::InternalError,
str::stream() << "Collection::insertDocument got "
@@ -411,8 +411,8 @@ Status Collection::insertDocument(OperationContext* txn,
}
Status Collection::_insertDocuments(OperationContext* txn,
- const vector<BSONObj>::const_iterator begin,
- const vector<BSONObj>::const_iterator end,
+ vector<BSONObj>::iterator begin,
+ vector<BSONObj>::iterator end,
bool enforceQuota) {
dassert(txn->lockState()->isCollectionLockedForMode(ns().toString(), MODE_IX));
@@ -421,7 +421,7 @@ Status Collection::_insertDocuments(OperationContext* txn,
// collection access method probably
std::vector<Record> records;
- for (auto it = begin; it != end; it++) {
+ for (vector<BSONObj>::iterator it = begin; it != end; it++) {
Record record = {RecordId(), RecordData(it->objdata(), it->objsize())};
records.push_back(record);
}
@@ -431,7 +431,7 @@ Status Collection::_insertDocuments(OperationContext* txn,
std::vector<BsonRecord> bsonRecords;
int recordIndex = 0;
- for (auto it = begin; it != end; it++) {
+ for (vector<BSONObj>::iterator it = begin; it != end; it++) {
RecordId loc = records[recordIndex++].id;
invariant(RecordId::min() < loc);
invariant(loc < RecordId::max());
diff --git a/src/mongo/db/catalog/collection.h b/src/mongo/db/catalog/collection.h
index eabc8dfdfa9..d5956382f91 100644
--- a/src/mongo/db/catalog/collection.h
+++ b/src/mongo/db/catalog/collection.h
@@ -255,8 +255,8 @@ public:
* If errors occor (including WCE), caller should retry documents individually.
*/
Status insertDocuments(OperationContext* txn,
- std::vector<BSONObj>::const_iterator begin,
- std::vector<BSONObj>::const_iterator end,
+ std::vector<BSONObj>::iterator begin,
+ std::vector<BSONObj>::iterator end,
bool enforceQuota,
bool fromMigrate = false);
@@ -440,8 +440,8 @@ private:
Status _insertDocument(OperationContext* txn, const BSONObj& doc, bool enforceQuota);
Status _insertDocuments(OperationContext* txn,
- std::vector<BSONObj>::const_iterator begin,
- std::vector<BSONObj>::const_iterator end,
+ std::vector<BSONObj>::iterator begin,
+ std::vector<BSONObj>::iterator end,
bool enforceQuota);
bool _enforceQuota(bool userEnforeQuota) const;
diff --git a/src/mongo/db/op_observer.cpp b/src/mongo/db/op_observer.cpp
index 6e5d6161d96..720d26ca9f2 100644
--- a/src/mongo/db/op_observer.cpp
+++ b/src/mongo/db/op_observer.cpp
@@ -58,8 +58,8 @@ void OpObserver::onCreateIndex(OperationContext* txn,
void OpObserver::onInserts(OperationContext* txn,
const NamespaceString& nss,
- vector<BSONObj>::const_iterator begin,
- vector<BSONObj>::const_iterator end,
+ vector<BSONObj>::iterator begin,
+ vector<BSONObj>::iterator end,
bool fromMigrate) {
repl::logOps(txn, "i", nss, begin, end, fromMigrate);
diff --git a/src/mongo/db/op_observer.h b/src/mongo/db/op_observer.h
index 36dafda30c6..2ef3a191f0a 100644
--- a/src/mongo/db/op_observer.h
+++ b/src/mongo/db/op_observer.h
@@ -57,8 +57,8 @@ public:
bool fromMigrate = false);
void onInserts(OperationContext* txn,
const NamespaceString& ns,
- std::vector<BSONObj>::const_iterator begin,
- std::vector<BSONObj>::const_iterator end,
+ std::vector<BSONObj>::iterator begin,
+ std::vector<BSONObj>::iterator end,
bool fromMigrate = false);
void onUpdate(OperationContext* txn, oplogUpdateEntryArgs args);
void onDelete(OperationContext* txn,
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index 8f68fc0aad5..118a9e02046 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -636,11 +636,6 @@ void BackgroundSync::cancelFetcher() {
void BackgroundSync::stop() {
stdx::lock_guard<stdx::mutex> lock(_mutex);
- if (_replCoord->isWaitingForApplierToDrain()) {
- // Signal to consumers that we have entered the paused state.
- _buffer.pushEvenIfFull(BSONObj());
- }
-
_pause = true;
_syncSourceHost = HostAndPort();
_lastOpTimeFetched = OpTime();
diff --git a/src/mongo/db/repl/initial_sync.cpp b/src/mongo/db/repl/initial_sync.cpp
index e0ca82a6ea0..d769d7a89fa 100644
--- a/src/mongo/db/repl/initial_sync.cpp
+++ b/src/mongo/db/repl/initial_sync.cpp
@@ -73,13 +73,13 @@ void InitialSync::_applyOplogUntil(OperationContext* txn, const OpTime& endOpTim
OpQueue ops;
auto replCoord = repl::ReplicationCoordinator::get(txn);
- while (!tryPopAndWaitForMore(txn, &ops)) {
+ while (!tryPopAndWaitForMore(txn, &ops, replCoord)) {
// nothing came back last time, so go again
if (ops.empty())
continue;
// Check if we reached the end
- const BSONObj currentOp = ops.back().raw;
+ const BSONObj currentOp = ops.back();
const OpTime currentOpTime =
fassertStatusOK(28772, OpTime::parseFromOplogEntry(currentOp));
@@ -104,7 +104,7 @@ void InitialSync::_applyOplogUntil(OperationContext* txn, const OpTime& endOpTim
fassertFailedNoTrace(18692);
}
- const BSONObj lastOp = ops.back().raw.getOwned();
+ const BSONObj lastOp = ops.back().getOwned();
// Tally operation information
bytesApplied += ops.getSize();
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index a8e14fc6470..87a748833e3 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -423,8 +423,8 @@ void _logOp(OperationContext* txn,
void logOps(OperationContext* txn,
const char* opstr,
const NamespaceString& nss,
- std::vector<BSONObj>::const_iterator begin,
- std::vector<BSONObj>::const_iterator end,
+ std::vector<BSONObj>::iterator begin,
+ std::vector<BSONObj>::iterator end,
bool fromMigrate) {
ReplicationCoordinator::Mode replMode = ReplicationCoordinator::get(txn)->getReplicationMode();
@@ -459,7 +459,7 @@ void logOp(OperationContext* txn,
_logOp(txn, opstr, ns, obj, o2, fromMigrate, _oplogCollectionName, replMode, true);
}
-OpTime writeOpsToOplog(OperationContext* txn, const std::vector<BSONObj>& ops) {
+OpTime writeOpsToOplog(OperationContext* txn, const std::deque<BSONObj>& ops) {
ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator();
OpTime lastOptime;
@@ -483,10 +483,11 @@ OpTime writeOpsToOplog(OperationContext* txn, const std::vector<BSONObj>& ops) {
OldClientContext ctx(txn, rsOplogName, _localDB);
WriteUnitOfWork wunit(txn);
+ std::vector<BSONObj> opsVect(ops.begin(), ops.end());
checkOplogInsert(
- _localOplogCollection->insertDocuments(txn, ops.begin(), ops.end(), false));
+ _localOplogCollection->insertDocuments(txn, opsVect.begin(), opsVect.end(), false));
lastOptime =
- fassertStatusOK(ErrorCodes::InvalidBSON, OpTime::parseFromOplogEntry(ops.back()));
+ fassertStatusOK(ErrorCodes::InvalidBSON, OpTime::parseFromOplogEntry(opsVect.back()));
wunit.commit();
}
MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "writeOps", _localOplogCollection->ns().ns());
diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h
index b022aa0e545..613c02edbcf 100644
--- a/src/mongo/db/repl/oplog.h
+++ b/src/mongo/db/repl/oplog.h
@@ -74,7 +74,7 @@ void createOplog(OperationContext* txn);
// used internally by replication secondaries after they have applied ops. Updates the global
// optime.
// Returns the optime for the last op inserted.
-OpTime writeOpsToOplog(OperationContext* txn, const std::vector<BSONObj>& ops);
+OpTime writeOpsToOplog(OperationContext* txn, const std::deque<BSONObj>& ops);
extern std::string rsOplogName;
extern std::string masterSlaveOplogName;
@@ -95,8 +95,8 @@ extern int OPLOG_VERSION;
void logOps(OperationContext* txn,
const char* opstr,
const NamespaceString& nss,
- std::vector<BSONObj>::const_iterator begin,
- std::vector<BSONObj>::const_iterator end,
+ std::vector<BSONObj>::iterator begin,
+ std::vector<BSONObj>::iterator end,
bool fromMigrate);
/* For 'u' records, 'obj' captures the mutation made to the object but not
diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h
index 97104b71297..ee58f04038b 100644
--- a/src/mongo/db/repl/replication_coordinator.h
+++ b/src/mongo/db/repl/replication_coordinator.h
@@ -174,8 +174,10 @@ public:
/**
- * Returns how slave delayed this node is configured to be, or 0 seconds if this node is not a
- * member of the current replica set configuration.
+ * Returns how slave delayed this node is configured to be.
+ *
+ * Raises a DBException if this node is not a member of the current replica set
+ * configuration.
*/
virtual Seconds getSlaveDelaySecs() const = 0;
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 2044b43439f..5634f1db32a 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -529,11 +529,7 @@ Status ReplicationCoordinatorImpl::waitForMemberState(MemberState expectedState,
Seconds ReplicationCoordinatorImpl::getSlaveDelaySecs() const {
stdx::lock_guard<stdx::mutex> lk(_mutex);
invariant(_rsConfig.isInitialized());
- if (_selfIndex == -1) {
- // We aren't currently in the set. Return 0 seconds so we can clear out the applier's
- // queue of work.
- return Seconds(0);
- }
+ uassert(28524, "Node not a member of the current set configuration", _selfIndex != -1);
return _rsConfig.getMemberAt(_selfIndex).getSlaveDelay();
}
diff --git a/src/mongo/db/repl/rs_sync.cpp b/src/mongo/db/repl/rs_sync.cpp
index 74246d54439..1ae5f51b249 100644
--- a/src/mongo/db/repl/rs_sync.cpp
+++ b/src/mongo/db/repl/rs_sync.cpp
@@ -131,8 +131,12 @@ void runSyncThread() {
/* we have some data. continue tailing. */
SyncTail tail(BackgroundSync::get(), multiSyncApply);
tail.oplogApplication();
- } catch (...) {
- std::terminate();
+ } catch (const DBException& e) {
+ log() << "Received exception while syncing: " << e.toString();
+ sleepsecs(10);
+ } catch (const std::exception& e) {
+ log() << "Received exception while syncing: " << e.what();
+ sleepsecs(10);
}
}
}
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index ff25c72abea..244eba62c42 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -66,7 +66,6 @@
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
-#include "mongo/util/scopeguard.h"
namespace mongo {
@@ -387,10 +386,10 @@ void prefetchOp(const BSONObj& op) {
}
// Doles out all the work to the reader pool threads and waits for them to complete
-void prefetchOps(const std::deque<SyncTail::OplogEntry>& ops, OldThreadPool* prefetcherPool) {
+void prefetchOps(const std::deque<BSONObj>& ops, OldThreadPool* prefetcherPool) {
invariant(prefetcherPool);
- for (auto&& op : ops) {
- prefetcherPool->schedule(&prefetchOp, op.raw);
+ for (std::deque<BSONObj>::const_iterator it = ops.begin(); it != ops.end(); ++it) {
+ prefetcherPool->schedule(&prefetchOp, *it);
}
prefetcherPool->join();
}
@@ -410,27 +409,28 @@ void applyOps(const std::vector<std::vector<BSONObj>>& writerVectors,
}
}
-void fillWriterVectors(const std::deque<SyncTail::OplogEntry>& ops,
+void fillWriterVectors(const std::deque<BSONObj>& ops,
std::vector<std::vector<BSONObj>>* writerVectors) {
- const bool supportsDocLocking =
- getGlobalServiceContext()->getGlobalStorageEngine()->supportsDocLocking();
- const uint32_t numWriters = writerVectors->size();
-
- for (auto&& op : ops) {
+ for (std::deque<BSONObj>::const_iterator it = ops.begin(); it != ops.end(); ++it) {
+ const BSONElement e = it->getField("ns");
+ verify(e.type() == String);
+ const char* ns = e.valuestr();
+ int len = e.valuestrsize();
uint32_t hash = 0;
- MurmurHash3_x86_32(op.ns.rawData(), op.ns.size(), 0, &hash);
+ MurmurHash3_x86_32(ns, len, 0, &hash);
- const char* opType = op.opType.rawData();
+ const char* opType = it->getField("op").valuestrsafe();
- if (supportsDocLocking && isCrudOpType(opType)) {
+ if (getGlobalServiceContext()->getGlobalStorageEngine()->supportsDocLocking() &&
+ isCrudOpType(opType)) {
BSONElement id;
switch (opType[0]) {
case 'u':
- id = op.o2.Obj()["_id"];
+ id = it->getField("o2").Obj()["_id"];
break;
case 'd':
case 'i':
- id = op.o.Obj()["_id"];
+ id = it->getField("o").Obj()["_id"];
break;
}
@@ -438,7 +438,7 @@ void fillWriterVectors(const std::deque<SyncTail::OplogEntry>& ops,
MurmurHash3_x86_32(&idHash, sizeof(idHash), hash, &hash);
}
- (*writerVectors)[hash % numWriters].push_back(op.raw);
+ (*writerVectors)[hash % writerVectors->size()].push_back(*it);
}
}
@@ -474,19 +474,11 @@ OpTime SyncTail::multiApply(OperationContext* txn, const OpQueue& ops) {
applyOps(writerVectors, &_writerPool, _applyFunc, this);
- OpTime lastOpTime;
- {
- ON_BLOCK_EXIT([&] { _writerPool.join(); });
- std::vector<BSONObj> raws;
- raws.reserve(ops.getDeque().size());
- for (auto&& op : ops.getDeque()) {
- raws.emplace_back(op.raw);
- }
- lastOpTime = writeOpsToOplog(txn, raws);
- if (inShutdown()) {
- return OpTime();
- }
+ OpTime lastOpTime = writeOpsToOplog(txn, ops.getDeque());
+ if (inShutdown()) {
+ return OpTime();
}
+ _writerPool.join();
// We have now written all database writes and updated the oplog to match.
return lastOpTime;
}
@@ -524,143 +516,83 @@ void tryToGoLiveAsASecondary(OperationContext* txn, ReplicationCoordinator* repl
}
}
-class SyncTail::OpQueueBatcher {
- MONGO_DISALLOW_COPYING(OpQueueBatcher);
+/* tail an oplog. ok to return, will be re-called. */
+void SyncTail::oplogApplication() {
+ ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator();
+ ApplyBatchFinalizer finalizer(replCoord);
-public:
- explicit OpQueueBatcher(SyncTail* syncTail) : _syncTail(syncTail), _thread([&] { run(); }) {}
- ~OpQueueBatcher() {
- _inShutdown.store(true);
- _cv.notify_all();
- _thread.join();
- }
+ OperationContextImpl txn;
+ OpTime originalEndOpTime(getMinValid(&txn).end);
- OpQueue getNextBatch(Seconds maxWaitTime) {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- if (_ops.empty()) {
- _cv.wait_for(lk, maxWaitTime);
- }
+ while (!inShutdown()) {
+ OpQueue ops;
- OpQueue ops = std::move(_ops);
- _ops = {};
- _cv.notify_all();
+ Timer batchTimer;
+ int lastTimeChecked = 0;
- return ops;
- }
+ do {
+ int now = batchTimer.seconds();
-private:
- void run() {
- Client::initThread("ReplBatcher");
- OperationContextImpl txn;
- auto replCoord = ReplicationCoordinator::get(&txn);
-
- while (!_inShutdown.load()) {
- Timer batchTimer;
-
- OpQueue ops;
- // tryPopAndWaitForMore returns true when we need to end a batch early
- while (!_syncTail->tryPopAndWaitForMore(&txn, &ops) &&
- (ops.getSize() < replBatchLimitBytes) && !_inShutdown.load()) {
- int now = batchTimer.seconds();
-
- // apply replication batch limits
- if (!ops.empty()) {
- if (now > replBatchLimitSeconds)
- break;
- if (ops.getDeque().size() > replBatchLimitOperations)
- break;
+ // apply replication batch limits
+ if (!ops.empty()) {
+ if (now > replBatchLimitSeconds)
+ break;
+ if (ops.getDeque().size() > replBatchLimitOperations)
+ break;
+ }
+ // occasionally check some things
+ // (always checked in the first iteration of this do-while loop, because
+ // ops is empty)
+ if (ops.empty() || now > lastTimeChecked) {
+ BackgroundSync* bgsync = BackgroundSync::get();
+ if (bgsync->getInitialSyncRequestedFlag()) {
+ // got a resync command
+ return;
}
+ lastTimeChecked = now;
+ // can we become secondary?
+ // we have to check this before calling mgr, as we must be a secondary to
+ // become primary
+ tryToGoLiveAsASecondary(&txn, replCoord);
+ }
- const int slaveDelaySecs = durationCount<Seconds>(replCoord->getSlaveDelaySecs());
- if (!ops.empty() && slaveDelaySecs > 0) {
- const BSONObj lastOp = ops.back().raw;
- const unsigned int opTimestampSecs = lastOp["ts"].timestamp().getSecs();
-
- // Stop the batch as the lastOp is too new to be applied. If we continue
- // on, we can get ops that are way ahead of the delay and this will
- // make this thread sleep longer when handleSlaveDelay is called
- // and apply ops much sooner than we like.
- if (opTimestampSecs > static_cast<unsigned int>(time(0) - slaveDelaySecs)) {
- break;
- }
- }
+ const int slaveDelaySecs = durationCount<Seconds>(replCoord->getSlaveDelaySecs());
+ if (!ops.empty() && slaveDelaySecs > 0) {
+ const BSONObj lastOp = ops.back();
+ const unsigned int opTimestampSecs = lastOp["ts"].timestamp().getSecs();
- if (MONGO_FAIL_POINT(rsSyncApplyStop)) {
+ // Stop the batch as the lastOp is too new to be applied. If we continue
+ // on, we can get ops that are way ahead of the delay and this will
+ // make this thread sleep longer when handleSlaveDelay is called
+ // and apply ops much sooner than we like.
+ if (opTimestampSecs > static_cast<unsigned int>(time(0) - slaveDelaySecs)) {
break;
}
-
- // keep fetching more ops as long as we haven't filled up a full batch yet
- }
-
- // For pausing replication in tests
- while (MONGO_FAIL_POINT(rsSyncApplyStop) && !_inShutdown.load()) {
- sleepmillis(0);
}
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- while (!_ops.empty()) {
- // Block until the previous batch has been taken.
- if (_inShutdown.load())
- return;
- _cv.wait(lk);
+ if (MONGO_FAIL_POINT(rsSyncApplyStop)) {
+ break;
}
- _ops = std::move(ops);
- _cv.notify_all();
- }
- }
-
- AtomicWord<bool> _inShutdown;
- SyncTail* const _syncTail;
-
- stdx::mutex _mutex; // Guards _ops.
- stdx::condition_variable _cv;
- OpQueue _ops;
-
- stdx::thread _thread; // Must be last so all other members are initialized before starting.
-};
-
-/* tail an oplog. ok to return, will be re-called. */
-void SyncTail::oplogApplication() {
- OpQueueBatcher batcher(this);
-
- OperationContextImpl txn;
- auto replCoord = ReplicationCoordinator::get(&txn);
- ApplyBatchFinalizer finalizer(replCoord);
-
- OpTime originalEndOpTime(getMinValid(&txn).end);
- while (!inShutdown()) {
- OpQueue ops;
- do {
- if (BackgroundSync::get()->getInitialSyncRequestedFlag()) {
- // got a resync command
+ // keep fetching more ops as long as we haven't filled up a full batch yet
+ } while (!tryPopAndWaitForMore(&txn, &ops, replCoord) && // tryPopAndWaitForMore returns
+ // true when we need to end a
+ // batch early
+ (ops.getSize() < replBatchLimitBytes) &&
+ !inShutdown());
+
+ // For pausing replication in tests
+ while (MONGO_FAIL_POINT(rsSyncApplyStop)) {
+ sleepmillis(0);
+ if (inShutdown())
return;
- }
-
- tryToGoLiveAsASecondary(&txn, replCoord);
-
- // Blocks up to a second waiting for a batch to be ready to apply. If one doesn't become
- // ready in time, we'll loop again so we can do the above checks periodically.
- ops = batcher.getNextBatch(Seconds(1));
- } while (!inShutdown() && ops.empty());
-
- if (inShutdown())
- return;
-
- invariant(!ops.empty());
-
- const BSONObj lastOp = ops.back().raw;
-
- if (lastOp.isEmpty()) {
- // This means we are currently stepping up as primary, and waiting for this thread to
- // drain the ops we've queued up.
- invariant(ops.getDeque().size() == 1);
- invariant(replCoord->isWaitingForApplierToDrain());
+ }
- replCoord->signalDrainComplete(&txn);
- continue; // This wasn't a real op. Don't try to apply it.
+ if (ops.empty()) {
+ continue;
}
+ const BSONObj lastOp = ops.back();
handleSlaveDelay(lastOp);
// Set minValid to the last OpTime that needs to be applied, in this batch or from the
@@ -696,23 +628,6 @@ void SyncTail::oplogApplication() {
}
}
-SyncTail::OplogEntry::OplogEntry(const BSONObj& rawInput) : raw(rawInput.getOwned()) {
- for (auto elem : raw) {
- const auto name = elem.fieldNameStringData();
- if (name == "ns") {
- ns = elem.valuestrsafe();
- } else if (name == "op") {
- opType = elem.valuestrsafe();
- } else if (name == "o2") {
- o2 = elem;
- } else if (name == "v") {
- version = elem;
- } else if (name == "o") {
- o = elem;
- }
- }
-}
-
// Copies ops out of the bgsync queue into the deque passed in as a parameter.
// Returns true if the batch should be ended early.
// Batch should end early if we encounter a command, or if
@@ -720,7 +635,9 @@ SyncTail::OplogEntry::OplogEntry(const BSONObj& rawInput) : raw(rawInput.getOwne
// This function also blocks 1 second waiting for new ops to appear in the bgsync
// queue. We can't block forever because there are maintenance things we need
// to periodically check in the loop.
-bool SyncTail::tryPopAndWaitForMore(OperationContext* txn, SyncTail::OpQueue* ops) {
+bool SyncTail::tryPopAndWaitForMore(OperationContext* txn,
+ SyncTail::OpQueue* ops,
+ ReplicationCoordinator* replCoord) {
BSONObj op;
// Check to see if there are ops waiting in the bgsync queue
bool peek_success = peek(&op);
@@ -728,6 +645,16 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* txn, SyncTail::OpQueue* op
if (!peek_success) {
// if we don't have anything in the queue, wait a bit for something to appear
if (ops->empty()) {
+ if (replCoord->isWaitingForApplierToDrain()) {
+ BackgroundSync::get()->waitUntilPaused();
+ if (peek(&op)) {
+ // The producer generated a last batch of ops before pausing so return
+ // false so that we'll come back and apply them before signaling the drain
+ // is complete.
+ return false;
+ }
+ replCoord->signalDrainComplete(txn);
+ }
// block up to 1 second
_networkQueue->waitForMore();
return false;
@@ -737,17 +664,16 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* txn, SyncTail::OpQueue* op
return true;
}
- auto entry = OplogEntry(op);
+ const char* ns = op["ns"].valuestrsafe();
- // Check for ops that must be processed one at a time.
- if (entry.raw.isEmpty() || // sentinel that network queue is drained.
- (entry.opType[0] == 'c') || // commands.
+ // check for commands
+ if ((op["op"].valuestrsafe()[0] == 'c') ||
// Index builds are acheived through the use of an insert op, not a command op.
// The following line is the same as what the insert code uses to detect an index build.
- (!entry.ns.empty() && nsToCollectionSubstring(entry.ns) == "system.indexes")) {
+ (*ns != '\0' && nsToCollectionSubstring(ns) == "system.indexes")) {
if (ops->empty()) {
// apply commands one-at-a-time
- ops->push_back(std::move(entry));
+ ops->push_back(op);
_networkQueue->consume();
}
@@ -756,12 +682,13 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* txn, SyncTail::OpQueue* op
}
// check for oplog version change
+ BSONElement elemVersion = op["v"];
int curVersion = 0;
- if (entry.version.eoo())
+ if (elemVersion.eoo())
// missing version means version 1
curVersion = 1;
else
- curVersion = entry.version.Int();
+ curVersion = elemVersion.Int();
if (curVersion != OPLOG_VERSION) {
severe() << "expected oplog version " << OPLOG_VERSION << " but found version "
@@ -770,7 +697,7 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* txn, SyncTail::OpQueue* op
}
// Copy the op to the deque and remove it from the bgsync queue.
- ops->push_back(std::move(entry));
+ ops->push_back(op);
_networkQueue->consume();
// Go back for more ops
diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h
index e3edacaed2a..254f0182bd5 100644
--- a/src/mongo/db/repl/sync_tail.h
+++ b/src/mongo/db/repl/sync_tail.h
@@ -95,58 +95,38 @@ public:
void oplogApplication();
bool peek(BSONObj* obj);
- /**
- * A parsed oplog entry.
- *
- * This only includes the fields used by the code using this object at the time this was
- * written. As more code uses this, more fields should be added.
- *
- * All unowned members (such as StringDatas and BSONElements) point into the raw BSON.
- * All StringData members are guaranteed to be NUL terminated.
- */
- struct OplogEntry {
- explicit OplogEntry(const BSONObj& raw);
-
- BSONObj raw; // Owned.
-
- StringData ns = "";
- StringData opType = "";
-
- BSONElement version;
- BSONElement o;
- BSONElement o2;
- };
-
class OpQueue {
public:
OpQueue() : _size(0) {}
size_t getSize() const {
return _size;
}
- const std::deque<OplogEntry>& getDeque() const {
+ const std::deque<BSONObj>& getDeque() const {
return _deque;
}
- void push_back(OplogEntry&& op) {
- _size += op.raw.objsize();
- _deque.push_back(std::move(op));
+ void push_back(BSONObj& op) {
+ _deque.push_back(op);
+ _size += op.objsize();
}
bool empty() const {
return _deque.empty();
}
- const OplogEntry& back() const {
+ BSONObj back() const {
invariant(!_deque.empty());
return _deque.back();
}
private:
- std::deque<OplogEntry> _deque;
+ std::deque<BSONObj> _deque;
size_t _size;
};
// returns true if we should continue waiting for BSONObjs, false if we should
// stop waiting and apply the queue we have. Only returns false if !ops.empty().
- bool tryPopAndWaitForMore(OperationContext* txn, OpQueue* ops);
+ bool tryPopAndWaitForMore(OperationContext* txn,
+ OpQueue* ops,
+ ReplicationCoordinator* replCoord);
/**
* Fetch a single document referenced in the operation from the sync source.
@@ -178,8 +158,6 @@ protected:
OpTime multiApply(OperationContext* txn, const OpQueue& ops);
private:
- class OpQueueBatcher;
-
std::string _hostname;
BackgroundSyncInterface* _networkQueue;
diff --git a/src/mongo/util/queue.h b/src/mongo/util/queue.h
index 6cdb62538ef..a1b8caac1a0 100644
--- a/src/mongo/util/queue.h
+++ b/src/mongo/util/queue.h
@@ -32,10 +32,9 @@
#include <limits>
#include <queue>
-#include "mongo/base/disallow_copying.h"
#include "mongo/stdx/chrono.h"
#include "mongo/stdx/condition_variable.h"
-#include "mongo/stdx/mutex.h"
+#include "mongo/base/disallow_copying.h"
namespace mongo {
@@ -63,11 +62,6 @@ public:
BlockingQueue(size_t size) : _maxSize(size), _getSize(&_getSizeDefault) {}
BlockingQueue(size_t size, getSizeFunc f) : _maxSize(size), _getSize(f) {}
- void pushEvenIfFull(T const& t) {
- stdx::unique_lock<stdx::mutex> l(_lock);
- pushImpl_inlock(t, _getSize(t));
- }
-
void push(T const& t) {
stdx::unique_lock<stdx::mutex> l(_lock);
_clearing = false;
@@ -75,7 +69,9 @@ public:
while (_currentSize + tSize > _maxSize) {
_cvNoLongerFull.wait(l);
}
- pushImpl_inlock(t, tSize);
+ _queue.push(t);
+ _currentSize += tSize;
+ _cvNoLongerEmpty.notify_one();
}
bool empty() const {
@@ -202,14 +198,6 @@ public:
}
private:
- void pushImpl_inlock(const T& obj, size_t objSize) {
- _clearing = false;
- _queue.push(obj);
- _currentSize += objSize;
- if (_queue.size() == 1) // We were empty.
- _cvNoLongerEmpty.notify_one();
- }
-
mutable stdx::mutex _lock;
std::queue<T> _queue;
const size_t _maxSize;