diff options
author | Scott Hernandez <scotthernandez@gmail.com> | 2016-09-20 13:16:47 -0400 |
---|---|---|
committer | Scott Hernandez <scotthernandez@gmail.com> | 2016-09-26 17:25:44 -0400 |
commit | 1e703b173478fd3dbef9611ec03d1d702ad5de02 (patch) | |
tree | 8b80193b608d46fc302ca9be98b5283874bb48ca | |
parent | ad356543d3be3b1414d59fe081a3e220f15fe1f5 (diff) | |
download | mongo-1e703b173478fd3dbef9611ec03d1d702ad5de02.tar.gz |
SERVER-25593: use shared_ptr for DataReplicator
4 files changed, 31 insertions, 17 deletions
diff --git a/jstests/noPassthroughWithMongod/initial_sync_replSetGetStatus.js b/jstests/noPassthroughWithMongod/initial_sync_replSetGetStatus.js index 0b755bac53b..94664964bde 100644 --- a/jstests/noPassthroughWithMongod/initial_sync_replSetGetStatus.js +++ b/jstests/noPassthroughWithMongod/initial_sync_replSetGetStatus.js @@ -100,10 +100,6 @@ assert(!res.initialSyncStatus, "Response should not have an 'initialSyncStatus' field: " + tojson(res)); - res = assert.commandWorked(secondary.adminCommand({replSetGetStatus: 1, initialSync: 1})); - assert(!res.initialSyncStatus, - "Response should not have an 'initialSyncStatus' field: " + tojson(res)); - assert.commandFailedWithCode(secondary.adminCommand({replSetGetStatus: 1, initialSync: "m"}), ErrorCodes.TypeMismatch); })(); diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp index b9349712267..bf1edad5f86 100644 --- a/src/mongo/db/repl/data_replicator.cpp +++ b/src/mongo/db/repl/data_replicator.cpp @@ -1345,7 +1345,6 @@ void DataReplicator::_changeStateIfNeeded() { } Status DataReplicator::scheduleShutdown(OperationContext* txn) { - log() << "Scheduling shutdown in the future, creating shutdownEvent."; auto eventStatus = _exec->makeEvent(); if (!eventStatus.isOK()) { return eventStatus.getStatus(); @@ -1360,8 +1359,10 @@ Status DataReplicator::scheduleShutdown(OperationContext* txn) { "Shutdown issued for the operation."}; } _cancelAllHandles_inlock(); - _oplogBuffer->shutdown(txn); - _oplogBuffer.reset(); + if (_oplogBuffer) { + _oplogBuffer->shutdown(txn); + _oplogBuffer.reset(); + } } // Schedule _doNextActions in case nothing is active to trigger the _onShutdown event. diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 9162f64e2f3..6e40b9e4002 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -566,12 +566,19 @@ void ReplicationCoordinatorImpl::_finishLoadLocalConfig( } void ReplicationCoordinatorImpl::_stopDataReplication(OperationContext* txn) { - if (_dr && _dr->getState() == DataReplicatorState::InitialSync) { + std::shared_ptr<DataReplicator> drCopy; + { + LockGuard lk(_mutex); + _dr.swap(drCopy); + } + if (drCopy && drCopy->getState() == DataReplicatorState::InitialSync) { LOG(1) << "ReplicationCoordinatorImpl::_stopDataReplication calling DataReplicator::shutdown."; - _dr->shutdown(txn); - LockGuard lk(_mutex); // Must take the lock to set/reset _dr, but not needed to call it. - _dr.reset(); + const auto status = drCopy->shutdown(txn); + if (!status.isOK()) { + warning() << "DataReplicator shutdown failed: " << status; + } + drCopy.reset(); // Do not return here, fall through. } LOG(1) << "ReplicationCoordinatorImpl::_stopDataReplication calling " @@ -596,21 +603,23 @@ void ReplicationCoordinatorImpl::_startDataReplication(OperationContext* txn, // Do initial sync. if (_externalState->shouldUseDataReplicatorInitialSync()) { _externalState->runOnInitialSyncThread([this, startCompleted](OperationContext* txn) { + std::shared_ptr<DataReplicator> drCopy; UniqueLock lk(_mutex); // Must take the lock to set _dr, but not call it. - _dr = stdx::make_unique<DataReplicator>( + drCopy = std::make_shared<DataReplicator>( createDataReplicatorOptions(this, _externalState.get()), stdx::make_unique<DataReplicatorExternalStateImpl>(this, _externalState.get()), _storage); + _dr = drCopy; lk.unlock(); - const auto status = _dr->doInitialSync(txn, numInitialSyncAttempts); + const auto status = drCopy->doInitialSync(txn, numInitialSyncAttempts); // If it is interrupted by resync, we do not need to cleanup the DataReplicator. if (status == ErrorCodes::ShutdownInProgress) { return; } + drCopy.reset(); lk.lock(); - _dr.reset(); if (!_inShutdown) { fassertStatusOK(40088, status); } else if (!status.isOK()) { @@ -698,6 +707,8 @@ void ReplicationCoordinatorImpl::shutdown(OperationContext* txn) { return; } + // Used to shut down outside of the lock. + std::shared_ptr<DataReplicator> drCopy; { stdx::lock_guard<stdx::mutex> lk(_mutex); fassert(28533, !_inShutdown); @@ -711,9 +722,13 @@ void ReplicationCoordinatorImpl::shutdown(OperationContext* txn) { fassert(18823, _rsConfigState != kConfigStartingUp); _replicationWaiterList.signalAndRemoveAll_inlock(); _opTimeWaiterList.signalAndRemoveAll_inlock(); + _dr.swap(drCopy); } // joining the replication executor is blocking so it must be run outside of the mutex + if (drCopy) { + drCopy->shutdown(txn); + } _externalState->shutdown(txn); _replExecutor.shutdown(); _replExecutor.join(); @@ -2255,15 +2270,17 @@ Status ReplicationCoordinatorImpl::processReplSetSyncFrom(OperationContext* txn, const HostAndPort& target, BSONObjBuilder* resultObj) { Status result(ErrorCodes::InternalError, "didn't set status in prepareSyncFromResponse"); + auto doResync = false; { LockGuard topoLock(_topoMutex); LockGuard lk(_mutex); auto opTime = _getMyLastAppliedOpTime_inlock(); _topCoord->prepareSyncFromResponse(target, opTime, resultObj, &result); + // If we are in the middle of an initial sync, do a resync. + doResync = result.isOK() && _dr && _dr->getState() == DataReplicatorState::InitialSync; } - // If we are in the middle of an initial sync, restart with the specified member. - if (result.isOK() && _dr && _dr->getState() == DataReplicatorState::InitialSync) { + if (doResync) { return resyncData(txn, false); } diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index 7011b208fc4..df76c251c0b 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -1328,7 +1328,7 @@ private: // Storage interface used by data replicator. StorageInterface* _storage; // (PS) // Data Replicator used to replicate data - std::unique_ptr<DataReplicator> _dr; // (M) + std::shared_ptr<DataReplicator> _dr; // (I) pointer set under mutex, copied by callers. // Hands out the next snapshot name. AtomicUInt64 _snapshotNameGenerator; // (S) |