summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2016-08-29 15:38:13 -0400
committerBenety Goh <benety@mongodb.com>2016-08-30 14:14:58 -0400
commit548ae3d39b33dc807fb6ce34f6807fb9443533fe (patch)
tree01ede2f7faa5171c535cab6c6975fa5dd85d9342 /src/mongo/db/repl
parent128e24d6ecbb38d17e4fcb68df063c4713d7f95e (diff)
downloadmongo-548ae3d39b33dc807fb6ce34f6807fb9443533fe.tar.gz
SERVER-25858 MultiApplier waits for completion callback to finish before setting state to inactive
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r--src/mongo/db/repl/data_replicator.cpp8
-rw-r--r--src/mongo/db/repl/data_replicator.h1
-rw-r--r--src/mongo/db/repl/data_replicator_test.cpp67
-rw-r--r--src/mongo/db/repl/multiapplier.cpp9
4 files changed, 79 insertions, 6 deletions
diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp
index 4cf015ce6f4..c0e7e6d7148 100644
--- a/src/mongo/db/repl/data_replicator.cpp
+++ b/src/mongo/db/repl/data_replicator.cpp
@@ -881,6 +881,8 @@ bool DataReplicator::_anyActiveHandles_inlock() const {
return _applierActive || (_oplogFetcher && _oplogFetcher->isActive()) ||
(_initialSyncState && _initialSyncState->dbsCloner &&
_initialSyncState->dbsCloner->isActive()) ||
+ (_applier && _applier->isActive()) ||
+ (_shuttingDownApplier && _shuttingDownApplier->isActive()) ||
(_reporter && _reporter->isActive());
}
@@ -889,6 +891,9 @@ void DataReplicator::_cancelAllHandles_inlock() {
_oplogFetcher->shutdown();
if (_applier)
_applier->shutdown();
+ // No need to call shutdown() on _shuttingdownApplier. This applier is assigned when the most
+ // recent applier's finish callback has been invoked. Note that isActive() will still return
+ // true if the callback is still in progress.
if (_reporter)
_reporter->shutdown();
if (_initialSyncState && _initialSyncState->dbsCloner &&
@@ -901,6 +906,7 @@ void DataReplicator::_waitOnAndResetAll_inlock(UniqueLock* lk) {
swapAndJoin_inlock(lk, _lastOplogEntryFetcher, "Waiting on fetcher (last oplog entry): ");
swapAndJoin_inlock(lk, _oplogFetcher, "Waiting on oplog fetcher: ");
swapAndJoin_inlock(lk, _applier, "Waiting on applier: ");
+ swapAndJoin_inlock(lk, _shuttingDownApplier, "Waiting on most recently completed applier: ");
swapAndJoin_inlock(lk, _reporter, "Waiting on reporter: ");
if (_initialSyncState) {
swapAndJoin_inlock(lk, _initialSyncState->dbsCloner, "Waiting on databases cloner: ");
@@ -1127,6 +1133,8 @@ void DataReplicator::_onApplyBatchFinish(const Status& status,
UniqueLock lk(_mutex);
_applierActive = false;
+ // This might block in _shuttingDownApplier's destructor if it is still active here.
+ _shuttingDownApplier = std::move(_applier);
if (!status.isOK()) {
switch (_state) {
diff --git a/src/mongo/db/repl/data_replicator.h b/src/mongo/db/repl/data_replicator.h
index 954afc2ff39..172a2334143 100644
--- a/src/mongo/db/repl/data_replicator.h
+++ b/src/mongo/db/repl/data_replicator.h
@@ -368,6 +368,7 @@ private:
bool _applierActive = false; // (M)
bool _applierPaused = false; // (X)
std::unique_ptr<MultiApplier> _applier; // (M)
+ std::unique_ptr<MultiApplier> _shuttingDownApplier; // (M)
HostAndPort _syncSource; // (M)
OpTimeWithHash _lastFetched; // (MX)
OpTimeWithHash _lastApplied; // (MX)
diff --git a/src/mongo/db/repl/data_replicator_test.cpp b/src/mongo/db/repl/data_replicator_test.cpp
index 936f1c32d09..7e611129e73 100644
--- a/src/mongo/db/repl/data_replicator_test.cpp
+++ b/src/mongo/db/repl/data_replicator_test.cpp
@@ -1441,6 +1441,73 @@ TEST_F(InitialSyncTest, GetInitialSyncProgressReturnsCorrectProgress) {
ASSERT_EQUALS(attempt1.getStringField("syncSource"), std::string("localhost:27017"));
}
+TEST_F(InitialSyncTest, DataReplicatorCreatesNewApplierForNextBatchBeforeDestroyingCurrentApplier) {
+ auto getRollbackIdResponse = BSON("ok" << 1 << "rbid" << 1);
+ auto noopOp1 = BSON("ts" << Timestamp(Seconds(1), 1U) << "h" << 1LL << "v"
+ << OplogEntry::kOplogVersion
+ << "ns"
+ << ""
+ << "op"
+ << "n"
+ << "o"
+ << BSON("msg"
+ << "noop"));
+ auto createCollectionOp1 =
+ BSON("ts" << Timestamp(Seconds(2), 1U) << "h" << 1LL << "v" << OplogEntry::kOplogVersion
+ << "ns"
+ << "test.$cmd"
+ << "op"
+ << "c"
+ << "o"
+ << BSON("create"
+ << "coll1"));
+ auto createCollectionOp2 =
+ BSON("ts" << Timestamp(Seconds(3), 1U) << "h" << 1LL << "v" << OplogEntry::kOplogVersion
+ << "ns"
+ << "test.$cmd"
+ << "op"
+ << "c"
+ << "o"
+ << BSON("create"
+ << "coll2"));
+ const Responses responses = {
+ // pre-initial sync rollback checker request
+ {"replSetGetRBID", getRollbackIdResponse},
+ // get latest oplog ts - this should match the first op returned by the oplog fetcher
+ {"find",
+ BSON("ok" << 1 << "cursor" << BSON("id" << 0LL << "ns"
+ << "local.oplog.rs"
+ << "firstBatch"
+ << BSON_ARRAY(noopOp1)))},
+ // oplog fetcher find - single set of results containing two commands that have to be
+ // applied in separate batches per batching logic
+ {"find",
+ BSON("ok" << 1 << "cursor" << BSON("id" << 0LL << "ns"
+ << "local.oplog.rs"
+ << "firstBatch"
+ << BSON_ARRAY(noopOp1 << createCollectionOp1
+ << createCollectionOp2)))},
+ // Clone Start
+ // listDatabases - return empty list of databases since we're not testing the cloner.
+ {"listDatabases", BSON("ok" << 1 << "databases" << BSONArray())},
+ // get latest oplog ts - this should match the last op returned by the oplog fetcher
+ {"find",
+ BSON("ok" << 1 << "cursor" << BSON("id" << 0LL << "ns"
+ << "local.oplog.rs"
+ << "firstBatch"
+ << BSON_ARRAY(createCollectionOp2)))},
+ // post-initial sync rollback checker request
+ {"replSetGetRBID", getRollbackIdResponse},
+ };
+
+ startSync(0);
+
+ setResponses(responses);
+ playResponses();
+ log() << "Done playing responses";
+ verifySync(getNet());
+ ASSERT_EQUALS(OplogEntry(createCollectionOp2).getOpTime(), _myLastOpTime);
+}
class TestSyncSourceSelector2 : public SyncSourceSelector {
diff --git a/src/mongo/db/repl/multiapplier.cpp b/src/mongo/db/repl/multiapplier.cpp
index 77519dee0d6..02ab0950d62 100644
--- a/src/mongo/db/repl/multiapplier.cpp
+++ b/src/mongo/db/repl/multiapplier.cpp
@@ -149,14 +149,11 @@ void MultiApplier::_callback(const executor::TaskExecutor::CallbackArgs& cbd) {
}
void MultiApplier::_finishCallback(const Status& result) {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ _onCompletion(result);
+
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
_active = false;
_condition.notify_all();
- auto finish = _onCompletion;
- lk.unlock();
-
- // This instance may be destroyed during the "finish" call.
- finish(result);
}
} // namespace repl