summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/replsets/rollback.js1
-rw-r--r--jstests/replsets/rollback2.js1
-rwxr-xr-xjstests/replsets/rollback3.js1
-rw-r--r--jstests/slow2/rollback4.js3
-rw-r--r--src/mongo/db/repl/initial_sync.cpp11
-rw-r--r--src/mongo/db/repl/initial_sync.h6
-rw-r--r--src/mongo/db/repl/sync_tail.cpp118
-rw-r--r--src/mongo/db/repl/sync_tail.h6
8 files changed, 114 insertions, 33 deletions
diff --git a/jstests/replsets/rollback.js b/jstests/replsets/rollback.js
index 2659b08c71d..685a951a2a4 100644
--- a/jstests/replsets/rollback.js
+++ b/jstests/replsets/rollback.js
@@ -129,6 +129,7 @@ load("jstests/replsets/rslib.js");
replTest.unPartition(1, 2);
awaitOpTime(b.getMongo(), getLatestOp(a_conn).ts);
+ replTest.awaitSecondaryNodes();
replTest.awaitReplication();
checkFinalResults(a);
checkFinalResults(b);
diff --git a/jstests/replsets/rollback2.js b/jstests/replsets/rollback2.js
index 8897455515c..00a8c9d8256 100644
--- a/jstests/replsets/rollback2.js
+++ b/jstests/replsets/rollback2.js
@@ -111,6 +111,7 @@ load("jstests/replsets/rslib.js");
awaitOpTime(b.getMongo(), getLatestOp(a_conn).ts);
// await steady state and ensure the two nodes have the same contents
+ replTest.awaitSecondaryNodes();
replTest.awaitReplication();
checkFinalResults(a);
checkFinalResults(b);
diff --git a/jstests/replsets/rollback3.js b/jstests/replsets/rollback3.js
index 762f7589b41..9c9d8589cdc 100755
--- a/jstests/replsets/rollback3.js
+++ b/jstests/replsets/rollback3.js
@@ -134,6 +134,7 @@ load("jstests/replsets/rslib.js");
awaitOpTime(b.getMongo(), getLatestOp(a_conn).ts);
// await steady state and ensure the two nodes have the same contents
+ replTest.awaitSecondaryNodes();
replTest.awaitReplication();
checkFinalResults(a);
checkFinalResults(b);
diff --git a/jstests/slow2/rollback4.js b/jstests/slow2/rollback4.js
index 5c1cc5422ce..12dc73cfee5 100644
--- a/jstests/slow2/rollback4.js
+++ b/jstests/slow2/rollback4.js
@@ -58,4 +58,5 @@ master.getDB( 'db' ).c.save( big );
// Restart old master
replTest.restart( 0 );
// Wait five minutes to ensure there is enough time for rollback
-replTest.awaitReplication(5*60*1000);
+replTest.awaitSecondaryNodes(5*60*1000);
+replTest.awaitReplication();
diff --git a/src/mongo/db/repl/initial_sync.cpp b/src/mongo/db/repl/initial_sync.cpp
index c964be4d775..d769d7a89fa 100644
--- a/src/mongo/db/repl/initial_sync.cpp
+++ b/src/mongo/db/repl/initial_sync.cpp
@@ -33,8 +33,12 @@
#include "mongo/db/repl/initial_sync.h"
#include "mongo/db/operation_context_impl.h"
+#include "mongo/db/repl/bgsync.h"
+#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/replication_coordinator_global.h"
+#include "mongo/db/repl/repl_client_info.h"
+
#include "mongo/util/exit.h"
#include "mongo/util/log.h"
@@ -68,7 +72,8 @@ void InitialSync::_applyOplogUntil(OperationContext* txn, const OpTime& endOpTim
while (true) {
OpQueue ops;
- while (!tryPopAndWaitForMore(txn, &ops, getGlobalReplicationCoordinator())) {
+ auto replCoord = repl::ReplicationCoordinator::get(txn);
+ while (!tryPopAndWaitForMore(txn, &ops, replCoord)) {
// nothing came back last time, so go again
if (ops.empty())
continue;
@@ -106,6 +111,10 @@ void InitialSync::_applyOplogUntil(OperationContext* txn, const OpTime& endOpTim
entriesApplied += ops.getDeque().size();
const OpTime lastOpTime = multiApply(txn, ops);
+
+ replCoord->setMyLastOptime(lastOpTime);
+ setNewTimestamp(lastOpTime.getTimestamp());
+
if (inShutdown()) {
return;
}
diff --git a/src/mongo/db/repl/initial_sync.h b/src/mongo/db/repl/initial_sync.h
index 6469ddbea75..aee7c79beab 100644
--- a/src/mongo/db/repl/initial_sync.h
+++ b/src/mongo/db/repl/initial_sync.h
@@ -49,12 +49,6 @@ public:
*/
void oplogApplication(OperationContext* txn, const OpTime& endOpTime);
- // Initial sync will ignore all journal requirement flags and doesn't wait until
- // operations are durable before updating the last OpTime.
- virtual bool shouldEnsureDurability() {
- return false;
- }
-
private:
/**
* Applies oplog entries until reaching "endOpTime".
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index fe3d0cfda7d..244eba62c42 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -131,6 +131,99 @@ bool isCrudOpType(const char* field) {
}
}
+namespace {
+
+class ApplyBatchFinalizer {
+public:
+ ApplyBatchFinalizer(ReplicationCoordinator* replCoord);
+ ~ApplyBatchFinalizer();
+
+ /**
+ * In PV0, calls ReplicationCoordinator::setMyLastOptime with "newOp".
+ * In PV1, sets _latestOpTime to be "newOp" and signals the _waiterThread.
+ */
+ void record(OpTime newOp);
+
+private:
+ /**
+ * Loops continuously, waiting for writes to be flushed to disk and then calls
+ * ReplicationCoordinator::setMyLastOptime with _latestOpTime.
+ * Terminates once _shutdownSignaled is set true.
+ */
+ void _run();
+
+ // Used to update the replication system's progress.
+ ReplicationCoordinator* _replCoord;
+ // Protects _cond, _shutdownSignaled, and _latestOpTime.
+ stdx::mutex _mutex;
+ // Used to alert our thread of a new OpTime.
+ stdx::condition_variable _cond;
+ // The next OpTime to set as the ReplicationCoordinator's lastOpTime after flushing.
+ OpTime _latestOpTime;
+ // Once this is set to true the _run method will terminate.
+ bool _shutdownSignaled;
+ // Thread that will _run(). Must be initialized last as it depends on the other variables.
+ stdx::thread _waiterThread;
+};
+
+ApplyBatchFinalizer::ApplyBatchFinalizer(ReplicationCoordinator* replCoord)
+ : _replCoord(replCoord),
+ _shutdownSignaled(false),
+ _waiterThread(&ApplyBatchFinalizer::_run, this) {}
+
+ApplyBatchFinalizer::~ApplyBatchFinalizer() {
+ stdx::unique_lock<stdx::mutex> lock(_mutex);
+ _shutdownSignaled = true;
+ _cond.notify_all();
+ lock.unlock();
+
+ _waiterThread.join();
+}
+
+void ApplyBatchFinalizer::record(OpTime newOp) {
+ const bool mustWaitUntilDurable = _replCoord->isV1ElectionProtocol();
+ if (!mustWaitUntilDurable) {
+ // We have to use setMyLastOptimeForward since this thread races with
+ // logTransitionToPrimaryToOplog.
+ _replCoord->setMyLastOptimeForward(newOp);
+ return;
+ }
+
+ stdx::unique_lock<stdx::mutex> lock(_mutex);
+ _latestOpTime = newOp;
+ _cond.notify_all();
+}
+
+void ApplyBatchFinalizer::_run() {
+ Client::initThread("ApplyBatchFinalizer");
+
+ while (true) {
+ OpTime latestOpTime;
+
+ {
+ stdx::unique_lock<stdx::mutex> lock(_mutex);
+ while (_latestOpTime.isNull() && !_shutdownSignaled) {
+ _cond.wait(lock);
+ }
+
+ if (_shutdownSignaled) {
+ return;
+ }
+
+ latestOpTime = _latestOpTime;
+ _latestOpTime = OpTime();
+ }
+
+ auto txn = cc().makeOperationContext();
+ txn->recoveryUnit()->goingToWaitUntilDurable();
+ txn->recoveryUnit()->waitUntilDurable();
+ // We have to use setMyLastOptimeForward since this thread races with
+ // logTransitionToPrimaryToOplog.
+ _replCoord->setMyLastOptimeForward(latestOpTime);
+ }
+}
+} // anonymous namespace containing ApplyBatchFinalizer definitions.
+
SyncTail::SyncTail(BackgroundSyncInterface* q, MultiSyncApplyFunc func)
: _networkQueue(q),
_applyFunc(func),
@@ -352,7 +445,7 @@ void fillWriterVectors(const std::deque<BSONObj>& ops,
} // namespace
// Applies a batch of oplog entries, by using a set of threads to apply the operations and then
-// writes the oplog entries to the local oplog. At the end we update the lastOpTime.
+// writes the oplog entries to the local oplog.
OpTime SyncTail::multiApply(OperationContext* txn, const OpQueue& ops) {
invariant(_applyFunc);
@@ -387,13 +480,6 @@ OpTime SyncTail::multiApply(OperationContext* txn, const OpQueue& ops) {
}
_writerPool.join();
// We have now written all database writes and updated the oplog to match.
-
- ReplClientInfo::forClient(txn->getClient()).setLastOp(lastOpTime);
- replCoord->setMyLastOptime(lastOpTime);
- setNewTimestamp(lastOpTime.getTimestamp());
-
- BackgroundSync::get()->notify(txn);
-
return lastOpTime;
}
@@ -433,6 +519,7 @@ void tryToGoLiveAsASecondary(OperationContext* txn, ReplicationCoordinator* repl
/* tail an oplog. ok to return, will be re-called. */
void SyncTail::oplogApplication() {
ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator();
+ ApplyBatchFinalizer finalizer(replCoord);
OperationContextImpl txn;
OpTime originalEndOpTime(getMinValid(&txn).end);
@@ -533,18 +620,11 @@ void SyncTail::oplogApplication() {
// This write will not journal/checkpoint.
setMinValid(&txn, {start, end});
- const bool mustWaitUntilDurable =
- shouldEnsureDurability() && replCoord->isV1ElectionProtocol();
- if (mustWaitUntilDurable) {
- txn.recoveryUnit()->goingToWaitUntilDurable();
- }
-
- multiApply(&txn, ops);
+ OpTime finalOpTime = multiApply(&txn, ops);
+ setNewTimestamp(finalOpTime.getTimestamp());
- // This write will journal/checkpoint, and finish the batch.
- setMinValid(&txn,
- end,
- mustWaitUntilDurable ? DurableRequirement::Strong : DurableRequirement::None);
+ setMinValid(&txn, end, DurableRequirement::None);
+ finalizer.record(finalOpTime);
}
}
diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h
index 5fc4f933ab5..254f0182bd5 100644
--- a/src/mongo/db/repl/sync_tail.h
+++ b/src/mongo/db/repl/sync_tail.h
@@ -153,12 +153,6 @@ protected:
static const int replBatchLimitSeconds = 1;
static const unsigned int replBatchLimitOperations = 5000;
- // SyncTail base class always supports awaiting commit if any op has j:true flag
- // that indicates awaiting commit before updating last OpTime.
- virtual bool shouldEnsureDurability() {
- return true;
- }
-
// Apply a batch of operations, using multiple threads.
// Returns the last OpTime applied during the apply batch, ops.end["ts"] basically.
OpTime multiApply(OperationContext* txn, const OpQueue& ops);