summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2016-11-14 12:25:03 -0500
committerBenety Goh <benety@mongodb.com>2016-11-14 13:53:14 -0500
commit239aa95c1cac4ef769beb9644e6f93fe366ee56c (patch)
tree7f7074dfb1ee32f0e480c5272f42326e0dd4ae44 /src
parent1b1ac4cb6da9bc6f825bbc68861c6d93ddb01f9d (diff)
downloadmongo-239aa95c1cac4ef769beb9644e6f93fe366ee56c.tar.gz
SERVER-25662 DataReplicator callbacks should check shutdown flag before proceeding
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/repl/data_replicator.cpp62
1 files changed, 50 insertions, 12 deletions
diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp
index 29ffc794ff6..e662691389b 100644
--- a/src/mongo/db/repl/data_replicator.cpp
+++ b/src/mongo/db/repl/data_replicator.cpp
@@ -429,6 +429,13 @@ Status DataReplicator::_runInitialSyncAttempt_inlock(OperationContext* txn,
}
initialSyncFinishEvent = eventStatus.getValue();
+ if (_inShutdown) {
+ // Signal shutdown event.
+ _doNextActions_inlock();
+ return Status(ErrorCodes::ShutdownInProgress,
+ "initial sync terminated before creating cloner");
+ }
+
invariant(initialSyncFinishEvent.isValid());
_initialSyncState.reset(new InitialSyncState(
stdx::make_unique<DatabasesCloner>(
@@ -738,21 +745,28 @@ StatusWith<OpTimeWithHash> DataReplicator::doInitialSync(OperationContext* txn,
void DataReplicator::_onDataClonerFinish(const Status& status, HostAndPort syncSource) {
log() << "data clone finished, status: " << redact(status);
+
if (status.code() == ErrorCodes::CallbackCanceled) {
return;
}
+
+ LockGuard lk(_mutex);
+
+ if (_inShutdown) {
+ // Signal shutdown event.
+ _doNextActions_inlock();
+ return;
+ }
+
if (!status.isOK()) {
// Initial sync failed during cloning of databases
error() << "Failed to clone data due to '" << redact(status) << "'";
- {
- LockGuard lk(_mutex);
- _initialSyncState->status = status;
- }
+ invariant(_initialSyncState);
+ _initialSyncState->status = status;
_exec->signalEvent(_initialSyncState->finishEvent);
return;
}
- LockGuard lk(_mutex);
_scheduleLastOplogEntryFetcher_inlock(
stdx::bind(&DataReplicator::_onApplierReadyStart, this, stdx::placeholders::_1));
}
@@ -781,8 +795,18 @@ void DataReplicator::_scheduleLastOplogEntryFetcher_inlock(Fetcher::CallbackFn c
}
void DataReplicator::_onApplierReadyStart(const QueryResponseStatus& fetchResult) {
+ if (ErrorCodes::CallbackCanceled == fetchResult.getStatus()) {
+ return;
+ }
+
// Data clone done, move onto apply.
LockGuard lk(_mutex);
+ if (_inShutdown) {
+ // Signal shutdown event.
+ _doNextActions_inlock();
+ return;
+ }
+
auto&& optimeWithHashStatus = parseOpTimeWithHash(fetchResult);
if (optimeWithHashStatus.isOK()) {
auto&& optimeWithHash = optimeWithHashStatus.getValue();
@@ -1066,6 +1090,12 @@ void DataReplicator::_onApplyBatchFinish(const Status& status,
UniqueLock lk(_mutex);
+ if (_inShutdown) {
+ // Signal shutdown event.
+ _doNextActions_inlock();
+ return;
+ }
+
// This might block in _shuttingDownApplier's destructor if it is still active here.
_shuttingDownApplier = std::move(_applier);
@@ -1352,26 +1382,34 @@ void DataReplicator::_enqueueDocuments(Fetcher::Documents::const_iterator begin,
void DataReplicator::_onOplogFetchFinish(const Status& status, const OpTimeWithHash& lastFetched) {
_fetcherPaused = true;
+
log() << "Finished fetching oplog during initial sync: " << redact(status)
<< ". Last fetched optime and hash: " << lastFetched;
+
if (status.code() == ErrorCodes::CallbackCanceled) {
return;
- } else if (status.isOK()) {
- LockGuard lk(_mutex);
- _lastFetched = lastFetched;
- } else {
- invariant(!status.isOK());
+ }
+
+ LockGuard lk(_mutex);
+ if (_inShutdown) {
+ // Signal shutdown event.
+ _doNextActions_inlock();
+ return;
+ }
+
+ if (!status.isOK()) {
invariant(_state == DataReplicatorState::InitialSync);
// Do not change sync source, just log.
error() << "Error fetching oplog during initial sync: " << redact(status);
- LockGuard lk(_mutex);
invariant(_initialSyncState);
_initialSyncState->status = status;
_exec->signalEvent(_initialSyncState->finishEvent);
return;
}
- _doNextActions();
+ _lastFetched = lastFetched;
+
+ _doNextActions_inlock();
}
std::string DataReplicator::Stats::toString() const {