diff options
-rw-r--r-- | jstests/replsets/rollback.js | 1 | ||||
-rw-r--r-- | jstests/replsets/rollback2.js | 1 | ||||
-rwxr-xr-x | jstests/replsets/rollback3.js | 1 | ||||
-rw-r--r-- | jstests/slow2/rollback4.js | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/initial_sync.cpp | 11 | ||||
-rw-r--r-- | src/mongo/db/repl/initial_sync.h | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 118 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.h | 6 |
8 files changed, 114 insertions, 33 deletions
diff --git a/jstests/replsets/rollback.js b/jstests/replsets/rollback.js index 2659b08c71d..685a951a2a4 100644 --- a/jstests/replsets/rollback.js +++ b/jstests/replsets/rollback.js @@ -129,6 +129,7 @@ load("jstests/replsets/rslib.js"); replTest.unPartition(1, 2); awaitOpTime(b.getMongo(), getLatestOp(a_conn).ts); + replTest.awaitSecondaryNodes(); replTest.awaitReplication(); checkFinalResults(a); checkFinalResults(b); diff --git a/jstests/replsets/rollback2.js b/jstests/replsets/rollback2.js index 8897455515c..00a8c9d8256 100644 --- a/jstests/replsets/rollback2.js +++ b/jstests/replsets/rollback2.js @@ -111,6 +111,7 @@ load("jstests/replsets/rslib.js"); awaitOpTime(b.getMongo(), getLatestOp(a_conn).ts); // await steady state and ensure the two nodes have the same contents + replTest.awaitSecondaryNodes(); replTest.awaitReplication(); checkFinalResults(a); checkFinalResults(b); diff --git a/jstests/replsets/rollback3.js b/jstests/replsets/rollback3.js index 762f7589b41..9c9d8589cdc 100755 --- a/jstests/replsets/rollback3.js +++ b/jstests/replsets/rollback3.js @@ -134,6 +134,7 @@ load("jstests/replsets/rslib.js"); awaitOpTime(b.getMongo(), getLatestOp(a_conn).ts); // await steady state and ensure the two nodes have the same contents + replTest.awaitSecondaryNodes(); replTest.awaitReplication(); checkFinalResults(a); checkFinalResults(b); diff --git a/jstests/slow2/rollback4.js b/jstests/slow2/rollback4.js index 5c1cc5422ce..12dc73cfee5 100644 --- a/jstests/slow2/rollback4.js +++ b/jstests/slow2/rollback4.js @@ -58,4 +58,5 @@ master.getDB( 'db' ).c.save( big ); // Restart old master replTest.restart( 0 ); // Wait five minutes to ensure there is enough time for rollback -replTest.awaitReplication(5*60*1000); +replTest.awaitSecondaryNodes(5*60*1000); +replTest.awaitReplication(); diff --git a/src/mongo/db/repl/initial_sync.cpp b/src/mongo/db/repl/initial_sync.cpp index c964be4d775..d769d7a89fa 100644 --- a/src/mongo/db/repl/initial_sync.cpp +++ b/src/mongo/db/repl/initial_sync.cpp @@ -33,8 +33,12 @@ #include "mongo/db/repl/initial_sync.h" #include "mongo/db/operation_context_impl.h" +#include "mongo/db/repl/bgsync.h" +#include "mongo/db/repl/oplog.h" #include "mongo/db/repl/optime.h" #include "mongo/db/repl/replication_coordinator_global.h" +#include "mongo/db/repl/repl_client_info.h" + #include "mongo/util/exit.h" #include "mongo/util/log.h" @@ -68,7 +72,8 @@ void InitialSync::_applyOplogUntil(OperationContext* txn, const OpTime& endOpTim while (true) { OpQueue ops; - while (!tryPopAndWaitForMore(txn, &ops, getGlobalReplicationCoordinator())) { + auto replCoord = repl::ReplicationCoordinator::get(txn); + while (!tryPopAndWaitForMore(txn, &ops, replCoord)) { // nothing came back last time, so go again if (ops.empty()) continue; @@ -106,6 +111,10 @@ void InitialSync::_applyOplogUntil(OperationContext* txn, const OpTime& endOpTim entriesApplied += ops.getDeque().size(); const OpTime lastOpTime = multiApply(txn, ops); + + replCoord->setMyLastOptime(lastOpTime); + setNewTimestamp(lastOpTime.getTimestamp()); + if (inShutdown()) { return; } diff --git a/src/mongo/db/repl/initial_sync.h b/src/mongo/db/repl/initial_sync.h index 6469ddbea75..aee7c79beab 100644 --- a/src/mongo/db/repl/initial_sync.h +++ b/src/mongo/db/repl/initial_sync.h @@ -49,12 +49,6 @@ public: */ void oplogApplication(OperationContext* txn, const OpTime& endOpTime); - // Initial sync will ignore all journal requirement flags and doesn't wait until - // operations are durable before updating the last OpTime. - virtual bool shouldEnsureDurability() { - return false; - } - private: /** * Applies oplog entries until reaching "endOpTime". diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index fe3d0cfda7d..244eba62c42 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -131,6 +131,99 @@ bool isCrudOpType(const char* field) { } } +namespace { + +class ApplyBatchFinalizer { +public: + ApplyBatchFinalizer(ReplicationCoordinator* replCoord); + ~ApplyBatchFinalizer(); + + /** + * In PV0, calls ReplicationCoordinator::setMyLastOptime with "newOp". + * In PV1, sets _latestOpTime to be "newOp" and signals the _waiterThread. + */ + void record(OpTime newOp); + +private: + /** + * Loops continuously, waiting for writes to be flushed to disk and then calls + * ReplicationCoordinator::setMyLastOptime with _latestOpTime. + * Terminates once _shutdownSignaled is set true. + */ + void _run(); + + // Used to update the replication system's progress. + ReplicationCoordinator* _replCoord; + // Protects _cond, _shutdownSignaled, and _latestOpTime. + stdx::mutex _mutex; + // Used to alert our thread of a new OpTime. + stdx::condition_variable _cond; + // The next OpTime to set as the ReplicationCoordinator's lastOpTime after flushing. + OpTime _latestOpTime; + // Once this is set to true the _run method will terminate. + bool _shutdownSignaled; + // Thread that will _run(). Must be initialized last as it depends on the other variables. + stdx::thread _waiterThread; +}; + +ApplyBatchFinalizer::ApplyBatchFinalizer(ReplicationCoordinator* replCoord) + : _replCoord(replCoord), + _shutdownSignaled(false), + _waiterThread(&ApplyBatchFinalizer::_run, this) {} + +ApplyBatchFinalizer::~ApplyBatchFinalizer() { + stdx::unique_lock<stdx::mutex> lock(_mutex); + _shutdownSignaled = true; + _cond.notify_all(); + lock.unlock(); + + _waiterThread.join(); +} + +void ApplyBatchFinalizer::record(OpTime newOp) { + const bool mustWaitUntilDurable = _replCoord->isV1ElectionProtocol(); + if (!mustWaitUntilDurable) { + // We have to use setMyLastOptimeForward since this thread races with + // logTransitionToPrimaryToOplog. + _replCoord->setMyLastOptimeForward(newOp); + return; + } + + stdx::unique_lock<stdx::mutex> lock(_mutex); + _latestOpTime = newOp; + _cond.notify_all(); +} + +void ApplyBatchFinalizer::_run() { + Client::initThread("ApplyBatchFinalizer"); + + while (true) { + OpTime latestOpTime; + + { + stdx::unique_lock<stdx::mutex> lock(_mutex); + while (_latestOpTime.isNull() && !_shutdownSignaled) { + _cond.wait(lock); + } + + if (_shutdownSignaled) { + return; + } + + latestOpTime = _latestOpTime; + _latestOpTime = OpTime(); + } + + auto txn = cc().makeOperationContext(); + txn->recoveryUnit()->goingToWaitUntilDurable(); + txn->recoveryUnit()->waitUntilDurable(); + // We have to use setMyLastOptimeForward since this thread races with + // logTransitionToPrimaryToOplog. + _replCoord->setMyLastOptimeForward(latestOpTime); + } +} +} // anonymous namespace containing ApplyBatchFinalizer definitions. + SyncTail::SyncTail(BackgroundSyncInterface* q, MultiSyncApplyFunc func) : _networkQueue(q), _applyFunc(func), @@ -352,7 +445,7 @@ void fillWriterVectors(const std::deque<BSONObj>& ops, } // namespace // Applies a batch of oplog entries, by using a set of threads to apply the operations and then -// writes the oplog entries to the local oplog. At the end we update the lastOpTime. +// writes the oplog entries to the local oplog. OpTime SyncTail::multiApply(OperationContext* txn, const OpQueue& ops) { invariant(_applyFunc); @@ -387,13 +480,6 @@ OpTime SyncTail::multiApply(OperationContext* txn, const OpQueue& ops) { } _writerPool.join(); // We have now written all database writes and updated the oplog to match. - - ReplClientInfo::forClient(txn->getClient()).setLastOp(lastOpTime); - replCoord->setMyLastOptime(lastOpTime); - setNewTimestamp(lastOpTime.getTimestamp()); - - BackgroundSync::get()->notify(txn); - return lastOpTime; } @@ -433,6 +519,7 @@ void tryToGoLiveAsASecondary(OperationContext* txn, ReplicationCoordinator* repl /* tail an oplog. ok to return, will be re-called. */ void SyncTail::oplogApplication() { ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); + ApplyBatchFinalizer finalizer(replCoord); OperationContextImpl txn; OpTime originalEndOpTime(getMinValid(&txn).end); @@ -533,18 +620,11 @@ void SyncTail::oplogApplication() { // This write will not journal/checkpoint. setMinValid(&txn, {start, end}); - const bool mustWaitUntilDurable = - shouldEnsureDurability() && replCoord->isV1ElectionProtocol(); - if (mustWaitUntilDurable) { - txn.recoveryUnit()->goingToWaitUntilDurable(); - } - - multiApply(&txn, ops); + OpTime finalOpTime = multiApply(&txn, ops); + setNewTimestamp(finalOpTime.getTimestamp()); - // This write will journal/checkpoint, and finish the batch. - setMinValid(&txn, - end, - mustWaitUntilDurable ? DurableRequirement::Strong : DurableRequirement::None); + setMinValid(&txn, end, DurableRequirement::None); + finalizer.record(finalOpTime); } } diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h index 5fc4f933ab5..254f0182bd5 100644 --- a/src/mongo/db/repl/sync_tail.h +++ b/src/mongo/db/repl/sync_tail.h @@ -153,12 +153,6 @@ protected: static const int replBatchLimitSeconds = 1; static const unsigned int replBatchLimitOperations = 5000; - // SyncTail base class always supports awaiting commit if any op has j:true flag - // that indicates awaiting commit before updating last OpTime. - virtual bool shouldEnsureDurability() { - return true; - } - // Apply a batch of operations, using multiple threads. // Returns the last OpTime applied during the apply batch, ops.end["ts"] basically. OpTime multiApply(OperationContext* txn, const OpQueue& ops); |