summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorScott Hernandez <scotthernandez@gmail.com>2016-07-08 17:14:44 -0400
committerScott Hernandez <scotthernandez@gmail.com>2016-07-11 10:44:20 -0400
commit00d3b20bf51315a1bda183fcad7b4737da789a48 (patch)
tree65f45721f571bff62c1a3a68cc622b3b35fd0a9f /src
parentbcafb88541f91de118c5c5981c7040422c0694ea (diff)
downloadmongo-00d3b20bf51315a1bda183fcad7b4737da789a48.tar.gz
SERVER-23750: wait outside of the DataReplicator mutex
Diffstat (limited to 'src')
-rw-r--r--src/mongo/client/fetcher.cpp12
-rw-r--r--src/mongo/client/fetcher.h3
-rw-r--r--src/mongo/db/repl/data_replicator.cpp64
-rw-r--r--src/mongo/db/repl/data_replicator.h2
-rw-r--r--src/mongo/db/repl/data_replicator_test.cpp1
5 files changed, 66 insertions, 16 deletions
diff --git a/src/mongo/client/fetcher.cpp b/src/mongo/client/fetcher.cpp
index b042a6dc0e9..4f6e831a501 100644
--- a/src/mongo/client/fetcher.cpp
+++ b/src/mongo/client/fetcher.cpp
@@ -255,6 +255,7 @@ void Fetcher::cancel() {
}
handle = _getMoreCallbackHandle;
+ _inShutdown = true;
}
_executor->cancel(handle);
@@ -288,6 +289,17 @@ void Fetcher::_callback(const RemoteCommandCallbackArgs& rcbd, const char* batch
return;
}
+ bool inShutdown = false;
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ inShutdown = _inShutdown;
+ }
+ if (inShutdown) {
+ _work(Status(ErrorCodes::ShutdownInProgress, "fetcher shutting down"), nullptr, nullptr);
+ _finishCallback();
+ return;
+ }
+
const BSONObj& queryResponseObj = rcbd.response.getValue().data;
Status status = getStatusFromCommandResult(queryResponseObj);
if (!status.isOK()) {
diff --git a/src/mongo/client/fetcher.h b/src/mongo/client/fetcher.h
index 51130535fca..b3330eb1467 100644
--- a/src/mongo/client/fetcher.h
+++ b/src/mongo/client/fetcher.h
@@ -228,6 +228,9 @@ private:
// Using boolean instead of a counter to avoid issues with wrap around.
bool _first = true;
+ // _inShutdown is true after cancel() is called.
+ bool _inShutdown = false;
+
// Callback handle to the scheduled getMore command.
executor::TaskExecutor::CallbackHandle _getMoreCallbackHandle;
diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp
index fe873860615..5fe04c81f66 100644
--- a/src/mongo/db/repl/data_replicator.cpp
+++ b/src/mongo/db/repl/data_replicator.cpp
@@ -260,8 +260,9 @@ DataReplicator::DataReplicator(
DataReplicator::~DataReplicator() {
DESTRUCTOR_GUARD({
+ UniqueLock lk(_mutex);
_cancelAllHandles_inlock();
- _waitOnAll_inlock();
+ _waitOnAndResetAll(lk);
});
}
@@ -696,9 +697,14 @@ StatusWith<OpTimeWithHash> DataReplicator::doInitialSync(OperationContext* txn)
// Reset state.
if (_oplogFetcher) {
_oplogFetcher->shutdown();
- // TODO: cleanup fetcher, and make work with networkMock/tests.
- // _fetcher->join();
- // _fetcher.reset();
+ std::unique_ptr<OplogFetcher> oplogFetcher;
+ _oplogFetcher.swap(oplogFetcher);
+ lk.unlock();
+
+ oplogFetcher->join();
+ invariant(!oplogFetcher->isActive());
+
+ lk.lock();
// TODO: clear buffer
// _clearFetcherBuffer();
}
@@ -722,7 +728,7 @@ StatusWith<OpTimeWithHash> DataReplicator::doInitialSync(OperationContext* txn)
// Success, cleanup
_cancelAllHandles_inlock();
- _waitOnAll_inlock();
+ _waitOnAndResetAll(lk);
invariant(!_anyActiveHandles_inlock());
_reporterPaused = false;
@@ -804,7 +810,8 @@ void DataReplicator::_onApplierReadyStart(const QueryResponseStatus& fetchResult
bool DataReplicator::_anyActiveHandles_inlock() const {
return _applierActive || (_oplogFetcher && _oplogFetcher->isActive()) ||
- (_initialSyncState && _initialSyncState->dbsCloner->isActive()) ||
+ (_initialSyncState && _initialSyncState->dbsCloner &&
+ _initialSyncState->dbsCloner->isActive()) ||
(_reporter && _reporter->isActive());
}
@@ -819,15 +826,42 @@ void DataReplicator::_cancelAllHandles_inlock() {
_initialSyncState->dbsCloner->shutdown();
}
-void DataReplicator::_waitOnAll_inlock() {
- if (_oplogFetcher)
- _oplogFetcher->join();
- if (_applier)
- _applier->wait();
- if (_reporter)
- _reporter->join();
- if (_initialSyncState)
- _initialSyncState->dbsCloner->join();
+void DataReplicator::_waitOnAndResetAll(UniqueLock& lk) {
+ if (_lastOplogEntryFetcher) {
+ std::unique_ptr<Fetcher> oldFetcher;
+ oldFetcher.swap(_lastOplogEntryFetcher);
+ lk.unlock();
+ oldFetcher->wait();
+ lk.lock();
+ }
+ if (_oplogFetcher) {
+ std::unique_ptr<OplogFetcher> oldFetcher;
+ oldFetcher.swap(_oplogFetcher);
+ lk.unlock();
+ oldFetcher->join();
+ lk.lock();
+ }
+ if (_applier) {
+ std::unique_ptr<MultiApplier> oldApplier;
+ oldApplier.swap(_applier);
+ lk.unlock();
+ oldApplier->wait();
+ lk.lock();
+ }
+ if (_reporter) {
+ std::unique_ptr<Reporter> oldReporter;
+ oldReporter.swap(_reporter);
+ lk.unlock();
+ oldReporter->join();
+ lk.lock();
+ }
+ if (_initialSyncState) {
+ std::unique_ptr<DatabasesCloner> cloner;
+ cloner.swap(_initialSyncState->dbsCloner);
+ lk.unlock();
+ cloner->join();
+ lk.lock();
+ }
}
void DataReplicator::_doNextActions() {
diff --git a/src/mongo/db/repl/data_replicator.h b/src/mongo/db/repl/data_replicator.h
index dc472193161..17ea41b2a27 100644
--- a/src/mongo/db/repl/data_replicator.h
+++ b/src/mongo/db/repl/data_replicator.h
@@ -306,7 +306,7 @@ private:
Status _scheduleReport();
void _cancelAllHandles_inlock();
- void _waitOnAll_inlock();
+ void _waitOnAndResetAll(UniqueLock& lk);
bool _anyActiveHandles_inlock() const;
Status _shutdown(OperationContext* txn);
diff --git a/src/mongo/db/repl/data_replicator_test.cpp b/src/mongo/db/repl/data_replicator_test.cpp
index 29abe7504b5..b8379d02279 100644
--- a/src/mongo/db/repl/data_replicator_test.cpp
+++ b/src/mongo/db/repl/data_replicator_test.cpp
@@ -534,6 +534,7 @@ protected:
NetworkGuard guard(net);
if (!net->hasReadyRequests() && processedRequests < expectedResponses) {
+ net->runReadyNetworkOperations();
guard.dismiss();
continue;
}