summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorScott Hernandez <scotthernandez@gmail.com>2016-09-20 13:16:47 -0400
committerScott Hernandez <scotthernandez@gmail.com>2016-09-26 17:25:44 -0400
commit1e703b173478fd3dbef9611ec03d1d702ad5de02 (patch)
tree8b80193b608d46fc302ca9be98b5283874bb48ca
parentad356543d3be3b1414d59fe081a3e220f15fe1f5 (diff)
downloadmongo-1e703b173478fd3dbef9611ec03d1d702ad5de02.tar.gz
SERVER-25593: use shared_ptr for DataReplicator
-rw-r--r--jstests/noPassthroughWithMongod/initial_sync_replSetGetStatus.js4
-rw-r--r--src/mongo/db/repl/data_replicator.cpp7
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp35
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h2
4 files changed, 31 insertions, 17 deletions
diff --git a/jstests/noPassthroughWithMongod/initial_sync_replSetGetStatus.js b/jstests/noPassthroughWithMongod/initial_sync_replSetGetStatus.js
index 0b755bac53b..94664964bde 100644
--- a/jstests/noPassthroughWithMongod/initial_sync_replSetGetStatus.js
+++ b/jstests/noPassthroughWithMongod/initial_sync_replSetGetStatus.js
@@ -100,10 +100,6 @@
assert(!res.initialSyncStatus,
"Response should not have an 'initialSyncStatus' field: " + tojson(res));
- res = assert.commandWorked(secondary.adminCommand({replSetGetStatus: 1, initialSync: 1}));
- assert(!res.initialSyncStatus,
- "Response should not have an 'initialSyncStatus' field: " + tojson(res));
-
assert.commandFailedWithCode(secondary.adminCommand({replSetGetStatus: 1, initialSync: "m"}),
ErrorCodes.TypeMismatch);
})();
diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp
index b9349712267..bf1edad5f86 100644
--- a/src/mongo/db/repl/data_replicator.cpp
+++ b/src/mongo/db/repl/data_replicator.cpp
@@ -1345,7 +1345,6 @@ void DataReplicator::_changeStateIfNeeded() {
}
Status DataReplicator::scheduleShutdown(OperationContext* txn) {
- log() << "Scheduling shutdown in the future, creating shutdownEvent.";
auto eventStatus = _exec->makeEvent();
if (!eventStatus.isOK()) {
return eventStatus.getStatus();
@@ -1360,8 +1359,10 @@ Status DataReplicator::scheduleShutdown(OperationContext* txn) {
"Shutdown issued for the operation."};
}
_cancelAllHandles_inlock();
- _oplogBuffer->shutdown(txn);
- _oplogBuffer.reset();
+ if (_oplogBuffer) {
+ _oplogBuffer->shutdown(txn);
+ _oplogBuffer.reset();
+ }
}
// Schedule _doNextActions in case nothing is active to trigger the _onShutdown event.
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 9162f64e2f3..6e40b9e4002 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -566,12 +566,19 @@ void ReplicationCoordinatorImpl::_finishLoadLocalConfig(
}
void ReplicationCoordinatorImpl::_stopDataReplication(OperationContext* txn) {
- if (_dr && _dr->getState() == DataReplicatorState::InitialSync) {
+ std::shared_ptr<DataReplicator> drCopy;
+ {
+ LockGuard lk(_mutex);
+ _dr.swap(drCopy);
+ }
+ if (drCopy && drCopy->getState() == DataReplicatorState::InitialSync) {
LOG(1)
<< "ReplicationCoordinatorImpl::_stopDataReplication calling DataReplicator::shutdown.";
- _dr->shutdown(txn);
- LockGuard lk(_mutex); // Must take the lock to set/reset _dr, but not needed to call it.
- _dr.reset();
+ const auto status = drCopy->shutdown(txn);
+ if (!status.isOK()) {
+ warning() << "DataReplicator shutdown failed: " << status;
+ }
+ drCopy.reset();
// Do not return here, fall through.
}
LOG(1) << "ReplicationCoordinatorImpl::_stopDataReplication calling "
@@ -596,21 +603,23 @@ void ReplicationCoordinatorImpl::_startDataReplication(OperationContext* txn,
// Do initial sync.
if (_externalState->shouldUseDataReplicatorInitialSync()) {
_externalState->runOnInitialSyncThread([this, startCompleted](OperationContext* txn) {
+ std::shared_ptr<DataReplicator> drCopy;
UniqueLock lk(_mutex); // Must take the lock to set _dr, but not call it.
- _dr = stdx::make_unique<DataReplicator>(
+ drCopy = std::make_shared<DataReplicator>(
createDataReplicatorOptions(this, _externalState.get()),
stdx::make_unique<DataReplicatorExternalStateImpl>(this, _externalState.get()),
_storage);
+ _dr = drCopy;
lk.unlock();
- const auto status = _dr->doInitialSync(txn, numInitialSyncAttempts);
+ const auto status = drCopy->doInitialSync(txn, numInitialSyncAttempts);
// If it is interrupted by resync, we do not need to cleanup the DataReplicator.
if (status == ErrorCodes::ShutdownInProgress) {
return;
}
+ drCopy.reset();
lk.lock();
- _dr.reset();
if (!_inShutdown) {
fassertStatusOK(40088, status);
} else if (!status.isOK()) {
@@ -698,6 +707,8 @@ void ReplicationCoordinatorImpl::shutdown(OperationContext* txn) {
return;
}
+ // Used to shut down outside of the lock.
+ std::shared_ptr<DataReplicator> drCopy;
{
stdx::lock_guard<stdx::mutex> lk(_mutex);
fassert(28533, !_inShutdown);
@@ -711,9 +722,13 @@ void ReplicationCoordinatorImpl::shutdown(OperationContext* txn) {
fassert(18823, _rsConfigState != kConfigStartingUp);
_replicationWaiterList.signalAndRemoveAll_inlock();
_opTimeWaiterList.signalAndRemoveAll_inlock();
+ _dr.swap(drCopy);
}
// joining the replication executor is blocking so it must be run outside of the mutex
+ if (drCopy) {
+ drCopy->shutdown(txn);
+ }
_externalState->shutdown(txn);
_replExecutor.shutdown();
_replExecutor.join();
@@ -2255,15 +2270,17 @@ Status ReplicationCoordinatorImpl::processReplSetSyncFrom(OperationContext* txn,
const HostAndPort& target,
BSONObjBuilder* resultObj) {
Status result(ErrorCodes::InternalError, "didn't set status in prepareSyncFromResponse");
+ auto doResync = false;
{
LockGuard topoLock(_topoMutex);
LockGuard lk(_mutex);
auto opTime = _getMyLastAppliedOpTime_inlock();
_topCoord->prepareSyncFromResponse(target, opTime, resultObj, &result);
+ // If we are in the middle of an initial sync, do a resync.
+ doResync = result.isOK() && _dr && _dr->getState() == DataReplicatorState::InitialSync;
}
- // If we are in the middle of an initial sync, restart with the specified member.
- if (result.isOK() && _dr && _dr->getState() == DataReplicatorState::InitialSync) {
+ if (doResync) {
return resyncData(txn, false);
}
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index 7011b208fc4..df76c251c0b 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -1328,7 +1328,7 @@ private:
// Storage interface used by data replicator.
StorageInterface* _storage; // (PS)
// Data Replicator used to replicate data
- std::unique_ptr<DataReplicator> _dr; // (M)
+ std::shared_ptr<DataReplicator> _dr; // (I) pointer set under mutex, copied by callers.
// Hands out the next snapshot name.
AtomicUInt64 _snapshotNameGenerator; // (S)