summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/sync_tail.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/sync_tail.cpp')
-rw-r--r--src/mongo/db/repl/sync_tail.cpp275
1 files changed, 174 insertions, 101 deletions
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 244eba62c42..ff25c72abea 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -66,6 +66,7 @@
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
+#include "mongo/util/scopeguard.h"
namespace mongo {
@@ -386,10 +387,10 @@ void prefetchOp(const BSONObj& op) {
}
// Doles out all the work to the reader pool threads and waits for them to complete
-void prefetchOps(const std::deque<BSONObj>& ops, OldThreadPool* prefetcherPool) {
+void prefetchOps(const std::deque<SyncTail::OplogEntry>& ops, OldThreadPool* prefetcherPool) {
invariant(prefetcherPool);
- for (std::deque<BSONObj>::const_iterator it = ops.begin(); it != ops.end(); ++it) {
- prefetcherPool->schedule(&prefetchOp, *it);
+ for (auto&& op : ops) {
+ prefetcherPool->schedule(&prefetchOp, op.raw);
}
prefetcherPool->join();
}
@@ -409,28 +410,27 @@ void applyOps(const std::vector<std::vector<BSONObj>>& writerVectors,
}
}
-void fillWriterVectors(const std::deque<BSONObj>& ops,
+void fillWriterVectors(const std::deque<SyncTail::OplogEntry>& ops,
std::vector<std::vector<BSONObj>>* writerVectors) {
- for (std::deque<BSONObj>::const_iterator it = ops.begin(); it != ops.end(); ++it) {
- const BSONElement e = it->getField("ns");
- verify(e.type() == String);
- const char* ns = e.valuestr();
- int len = e.valuestrsize();
+ const bool supportsDocLocking =
+ getGlobalServiceContext()->getGlobalStorageEngine()->supportsDocLocking();
+ const uint32_t numWriters = writerVectors->size();
+
+ for (auto&& op : ops) {
uint32_t hash = 0;
- MurmurHash3_x86_32(ns, len, 0, &hash);
+ MurmurHash3_x86_32(op.ns.rawData(), op.ns.size(), 0, &hash);
- const char* opType = it->getField("op").valuestrsafe();
+ const char* opType = op.opType.rawData();
- if (getGlobalServiceContext()->getGlobalStorageEngine()->supportsDocLocking() &&
- isCrudOpType(opType)) {
+ if (supportsDocLocking && isCrudOpType(opType)) {
BSONElement id;
switch (opType[0]) {
case 'u':
- id = it->getField("o2").Obj()["_id"];
+ id = op.o2.Obj()["_id"];
break;
case 'd':
case 'i':
- id = it->getField("o").Obj()["_id"];
+ id = op.o.Obj()["_id"];
break;
}
@@ -438,7 +438,7 @@ void fillWriterVectors(const std::deque<BSONObj>& ops,
MurmurHash3_x86_32(&idHash, sizeof(idHash), hash, &hash);
}
- (*writerVectors)[hash % writerVectors->size()].push_back(*it);
+ (*writerVectors)[hash % numWriters].push_back(op.raw);
}
}
@@ -474,11 +474,19 @@ OpTime SyncTail::multiApply(OperationContext* txn, const OpQueue& ops) {
applyOps(writerVectors, &_writerPool, _applyFunc, this);
- OpTime lastOpTime = writeOpsToOplog(txn, ops.getDeque());
- if (inShutdown()) {
- return OpTime();
+ OpTime lastOpTime;
+ {
+ ON_BLOCK_EXIT([&] { _writerPool.join(); });
+ std::vector<BSONObj> raws;
+ raws.reserve(ops.getDeque().size());
+ for (auto&& op : ops.getDeque()) {
+ raws.emplace_back(op.raw);
+ }
+ lastOpTime = writeOpsToOplog(txn, raws);
+ if (inShutdown()) {
+ return OpTime();
+ }
}
- _writerPool.join();
// We have now written all database writes and updated the oplog to match.
return lastOpTime;
}
@@ -516,83 +524,143 @@ void tryToGoLiveAsASecondary(OperationContext* txn, ReplicationCoordinator* repl
}
}
-/* tail an oplog. ok to return, will be re-called. */
-void SyncTail::oplogApplication() {
- ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator();
- ApplyBatchFinalizer finalizer(replCoord);
+class SyncTail::OpQueueBatcher {
+ MONGO_DISALLOW_COPYING(OpQueueBatcher);
- OperationContextImpl txn;
- OpTime originalEndOpTime(getMinValid(&txn).end);
+public:
+ explicit OpQueueBatcher(SyncTail* syncTail) : _syncTail(syncTail), _thread([&] { run(); }) {}
+ ~OpQueueBatcher() {
+ _inShutdown.store(true);
+ _cv.notify_all();
+ _thread.join();
+ }
- while (!inShutdown()) {
- OpQueue ops;
+ OpQueue getNextBatch(Seconds maxWaitTime) {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ if (_ops.empty()) {
+ _cv.wait_for(lk, maxWaitTime);
+ }
- Timer batchTimer;
- int lastTimeChecked = 0;
+ OpQueue ops = std::move(_ops);
+ _ops = {};
+ _cv.notify_all();
- do {
- int now = batchTimer.seconds();
+ return ops;
+ }
- // apply replication batch limits
- if (!ops.empty()) {
- if (now > replBatchLimitSeconds)
- break;
- if (ops.getDeque().size() > replBatchLimitOperations)
- break;
- }
- // occasionally check some things
- // (always checked in the first iteration of this do-while loop, because
- // ops is empty)
- if (ops.empty() || now > lastTimeChecked) {
- BackgroundSync* bgsync = BackgroundSync::get();
- if (bgsync->getInitialSyncRequestedFlag()) {
- // got a resync command
- return;
+private:
+ void run() {
+ Client::initThread("ReplBatcher");
+ OperationContextImpl txn;
+ auto replCoord = ReplicationCoordinator::get(&txn);
+
+ while (!_inShutdown.load()) {
+ Timer batchTimer;
+
+ OpQueue ops;
+ // tryPopAndWaitForMore returns true when we need to end a batch early
+ while (!_syncTail->tryPopAndWaitForMore(&txn, &ops) &&
+ (ops.getSize() < replBatchLimitBytes) && !_inShutdown.load()) {
+ int now = batchTimer.seconds();
+
+ // apply replication batch limits
+ if (!ops.empty()) {
+ if (now > replBatchLimitSeconds)
+ break;
+ if (ops.getDeque().size() > replBatchLimitOperations)
+ break;
}
- lastTimeChecked = now;
- // can we become secondary?
- // we have to check this before calling mgr, as we must be a secondary to
- // become primary
- tryToGoLiveAsASecondary(&txn, replCoord);
- }
- const int slaveDelaySecs = durationCount<Seconds>(replCoord->getSlaveDelaySecs());
- if (!ops.empty() && slaveDelaySecs > 0) {
- const BSONObj lastOp = ops.back();
- const unsigned int opTimestampSecs = lastOp["ts"].timestamp().getSecs();
+ const int slaveDelaySecs = durationCount<Seconds>(replCoord->getSlaveDelaySecs());
+ if (!ops.empty() && slaveDelaySecs > 0) {
+ const BSONObj lastOp = ops.back().raw;
+ const unsigned int opTimestampSecs = lastOp["ts"].timestamp().getSecs();
+
+ // Stop the batch as the lastOp is too new to be applied. If we continue
+ // on, we can get ops that are way ahead of the delay and this will
+ // make this thread sleep longer when handleSlaveDelay is called
+ // and apply ops much sooner than we like.
+ if (opTimestampSecs > static_cast<unsigned int>(time(0) - slaveDelaySecs)) {
+ break;
+ }
+ }
- // Stop the batch as the lastOp is too new to be applied. If we continue
- // on, we can get ops that are way ahead of the delay and this will
- // make this thread sleep longer when handleSlaveDelay is called
- // and apply ops much sooner than we like.
- if (opTimestampSecs > static_cast<unsigned int>(time(0) - slaveDelaySecs)) {
+ if (MONGO_FAIL_POINT(rsSyncApplyStop)) {
break;
}
+
+ // keep fetching more ops as long as we haven't filled up a full batch yet
}
- if (MONGO_FAIL_POINT(rsSyncApplyStop)) {
- break;
+ // For pausing replication in tests
+ while (MONGO_FAIL_POINT(rsSyncApplyStop) && !_inShutdown.load()) {
+ sleepmillis(0);
}
- // keep fetching more ops as long as we haven't filled up a full batch yet
- } while (!tryPopAndWaitForMore(&txn, &ops, replCoord) && // tryPopAndWaitForMore returns
- // true when we need to end a
- // batch early
- (ops.getSize() < replBatchLimitBytes) &&
- !inShutdown());
-
- // For pausing replication in tests
- while (MONGO_FAIL_POINT(rsSyncApplyStop)) {
- sleepmillis(0);
- if (inShutdown())
- return;
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ while (!_ops.empty()) {
+ // Block until the previous batch has been taken.
+ if (_inShutdown.load())
+ return;
+ _cv.wait(lk);
+ }
+ _ops = std::move(ops);
+ _cv.notify_all();
}
+ }
+
+ AtomicWord<bool> _inShutdown;
+ SyncTail* const _syncTail;
+
+ stdx::mutex _mutex; // Guards _ops.
+ stdx::condition_variable _cv;
+ OpQueue _ops;
+
+ stdx::thread _thread; // Must be last so all other members are initialized before starting.
+};
+
+/* tail an oplog. ok to return, will be re-called. */
+void SyncTail::oplogApplication() {
+ OpQueueBatcher batcher(this);
+
+ OperationContextImpl txn;
+ auto replCoord = ReplicationCoordinator::get(&txn);
+ ApplyBatchFinalizer finalizer(replCoord);
+
+ OpTime originalEndOpTime(getMinValid(&txn).end);
+ while (!inShutdown()) {
+ OpQueue ops;
- if (ops.empty()) {
- continue;
+ do {
+ if (BackgroundSync::get()->getInitialSyncRequestedFlag()) {
+ // got a resync command
+ return;
+ }
+
+ tryToGoLiveAsASecondary(&txn, replCoord);
+
+ // Blocks up to a second waiting for a batch to be ready to apply. If one doesn't become
+ // ready in time, we'll loop again so we can do the above checks periodically.
+ ops = batcher.getNextBatch(Seconds(1));
+ } while (!inShutdown() && ops.empty());
+
+ if (inShutdown())
+ return;
+
+ invariant(!ops.empty());
+
+ const BSONObj lastOp = ops.back().raw;
+
+ if (lastOp.isEmpty()) {
+ // This means we are currently stepping up as primary, and waiting for this thread to
+ // drain the ops we've queued up.
+ invariant(ops.getDeque().size() == 1);
+ invariant(replCoord->isWaitingForApplierToDrain());
+
+ replCoord->signalDrainComplete(&txn);
+ continue; // This wasn't a real op. Don't try to apply it.
}
- const BSONObj lastOp = ops.back();
handleSlaveDelay(lastOp);
// Set minValid to the last OpTime that needs to be applied, in this batch or from the
@@ -628,6 +696,23 @@ void SyncTail::oplogApplication() {
}
}
+SyncTail::OplogEntry::OplogEntry(const BSONObj& rawInput) : raw(rawInput.getOwned()) {
+ for (auto elem : raw) {
+ const auto name = elem.fieldNameStringData();
+ if (name == "ns") {
+ ns = elem.valuestrsafe();
+ } else if (name == "op") {
+ opType = elem.valuestrsafe();
+ } else if (name == "o2") {
+ o2 = elem;
+ } else if (name == "v") {
+ version = elem;
+ } else if (name == "o") {
+ o = elem;
+ }
+ }
+}
+
// Copies ops out of the bgsync queue into the deque passed in as a parameter.
// Returns true if the batch should be ended early.
// Batch should end early if we encounter a command, or if
@@ -635,9 +720,7 @@ void SyncTail::oplogApplication() {
// This function also blocks 1 second waiting for new ops to appear in the bgsync
// queue. We can't block forever because there are maintenance things we need
// to periodically check in the loop.
-bool SyncTail::tryPopAndWaitForMore(OperationContext* txn,
- SyncTail::OpQueue* ops,
- ReplicationCoordinator* replCoord) {
+bool SyncTail::tryPopAndWaitForMore(OperationContext* txn, SyncTail::OpQueue* ops) {
BSONObj op;
// Check to see if there are ops waiting in the bgsync queue
bool peek_success = peek(&op);
@@ -645,16 +728,6 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* txn,
if (!peek_success) {
// if we don't have anything in the queue, wait a bit for something to appear
if (ops->empty()) {
- if (replCoord->isWaitingForApplierToDrain()) {
- BackgroundSync::get()->waitUntilPaused();
- if (peek(&op)) {
- // The producer generated a last batch of ops before pausing so return
- // false so that we'll come back and apply them before signaling the drain
- // is complete.
- return false;
- }
- replCoord->signalDrainComplete(txn);
- }
// block up to 1 second
_networkQueue->waitForMore();
return false;
@@ -664,16 +737,17 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* txn,
return true;
}
- const char* ns = op["ns"].valuestrsafe();
+ auto entry = OplogEntry(op);
- // check for commands
- if ((op["op"].valuestrsafe()[0] == 'c') ||
+ // Check for ops that must be processed one at a time.
+ if (entry.raw.isEmpty() || // sentinel that network queue is drained.
+ (entry.opType[0] == 'c') || // commands.
// Index builds are acheived through the use of an insert op, not a command op.
// The following line is the same as what the insert code uses to detect an index build.
- (*ns != '\0' && nsToCollectionSubstring(ns) == "system.indexes")) {
+ (!entry.ns.empty() && nsToCollectionSubstring(entry.ns) == "system.indexes")) {
if (ops->empty()) {
// apply commands one-at-a-time
- ops->push_back(op);
+ ops->push_back(std::move(entry));
_networkQueue->consume();
}
@@ -682,13 +756,12 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* txn,
}
// check for oplog version change
- BSONElement elemVersion = op["v"];
int curVersion = 0;
- if (elemVersion.eoo())
+ if (entry.version.eoo())
// missing version means version 1
curVersion = 1;
else
- curVersion = elemVersion.Int();
+ curVersion = entry.version.Int();
if (curVersion != OPLOG_VERSION) {
severe() << "expected oplog version " << OPLOG_VERSION << " but found version "
@@ -697,7 +770,7 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* txn,
}
// Copy the op to the deque and remove it from the bgsync queue.
- ops->push_back(op);
+ ops->push_back(std::move(entry));
_networkQueue->consume();
// Go back for more ops