diff options
author | Andy Schwerin <Andy Schwerin schwerin@mongodb.com> | 2017-04-10 17:00:10 -0400 |
---|---|---|
committer | Andy Schwerin <Andy Schwerin schwerin@mongodb.com> | 2017-04-18 13:33:30 -0400 |
commit | b47300a80384e48dd6e17bae7a81479f7b9f1a2c (patch) | |
tree | a572caf8997526ab72bc28756ab9a7821959d3a0 | |
parent | 89931a9ab6942c8f55afa1a523ac8fb0801ba731 (diff) | |
download | mongo-b47300a80384e48dd6e17bae7a81479f7b9f1a2c.tar.gz |
SERVER-28624 Replace references to ReplicationExecutor with TaskExecutor.
This change replaces, wherever possible, references to the concrete
type ReplicationExecutor with references to the interface type
TaskExecutor, and eliminates the repl::ResponseStatus typedef, with an
eye toward eventually replacing ReplicationExecutor with another implementation.
18 files changed, 120 insertions, 128 deletions
diff --git a/src/mongo/db/repl/base_cloner_test_fixture.cpp b/src/mongo/db/repl/base_cloner_test_fixture.cpp index ebaa135090f..89676a8a7f6 100644 --- a/src/mongo/db/repl/base_cloner_test_fixture.cpp +++ b/src/mongo/db/repl/base_cloner_test_fixture.cpp @@ -140,16 +140,15 @@ void BaseClonerTest::scheduleNetworkResponse(NetworkOperationIterator noi, const auto net = getNet(); Milliseconds millis(0); RemoteCommandResponse response(obj, BSONObj(), millis); - executor::TaskExecutor::ResponseStatus responseStatus(response); log() << "Scheduling response to request:" << noi->getDiagnosticString() << " -- resp:" << obj; - net->scheduleResponse(noi, net->now(), responseStatus); + net->scheduleResponse(noi, net->now(), response); } void BaseClonerTest::scheduleNetworkResponse(NetworkOperationIterator noi, ErrorCodes::Error code, const std::string& reason) { auto net = getNet(); - executor::TaskExecutor::ResponseStatus responseStatus(code, reason); + RemoteCommandResponse responseStatus(code, reason); log() << "Scheduling error response to request:" << noi->getDiagnosticString() << " -- status:" << responseStatus.status.toString(); net->scheduleResponse(noi, net->now(), responseStatus); diff --git a/src/mongo/db/repl/databases_cloner_test.cpp b/src/mongo/db/repl/databases_cloner_test.cpp index 777a4eedacd..8752f30adaa 100644 --- a/src/mongo/db/repl/databases_cloner_test.cpp +++ b/src/mongo/db/repl/databases_cloner_test.cpp @@ -104,8 +104,7 @@ public: NetworkInterfaceMock* net = getNet(); Milliseconds millis(0); RemoteCommandResponse response(obj, BSONObj(), millis); - executor::TaskExecutor::ResponseStatus responseStatus(response); - net->scheduleResponse(noi, net->now(), responseStatus); + net->scheduleResponse(noi, net->now(), response); } void scheduleNetworkResponse(std::string cmdName, Status errorStatus) { @@ -242,11 +241,11 @@ protected: log() << "Sending response for network request:"; log() << " req: " << noi->getRequest().dbname << "." << noi->getRequest().cmdObj; log() << " resp:" << responses[processedRequests].second; - net->scheduleResponse( - noi, - net->now(), - executor::TaskExecutor::ResponseStatus(RemoteCommandResponse( - responses[processedRequests].second, BSONObj(), Milliseconds(10)))); + net->scheduleResponse(noi, + net->now(), + RemoteCommandResponse(responses[processedRequests].second, + BSONObj(), + Milliseconds(10))); if ((Date_t::now() - lastLog) > Seconds(1)) { lastLog = Date_t(); diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp index 61fa115e43f..5cf7160aad1 100644 --- a/src/mongo/db/repl/initial_syncer_test.cpp +++ b/src/mongo/db/repl/initial_syncer_test.cpp @@ -99,7 +99,6 @@ using unittest::log; using LockGuard = stdx::lock_guard<stdx::mutex>; using NetworkGuard = executor::NetworkInterfaceMock::InNetworkGuard; -using ResponseStatus = executor::TaskExecutor::ResponseStatus; using UniqueLock = stdx::unique_lock<stdx::mutex>; struct CollectionCloneInfo { @@ -153,12 +152,11 @@ public: NetworkInterfaceMock* net = getNet(); Milliseconds millis(0); RemoteCommandResponse response(obj, BSONObj(), millis); - executor::TaskExecutor::ResponseStatus responseStatus(response); log() << "Sending response for network request:"; log() << " req: " << noi->getRequest().dbname << "." << noi->getRequest().cmdObj; log() << " resp:" << response; - net->scheduleResponse(noi, net->now(), responseStatus); + net->scheduleResponse(noi, net->now(), response); } void scheduleNetworkResponse(std::string cmdName, Status errorStatus) { diff --git a/src/mongo/db/repl/repl_set_commands.cpp b/src/mongo/db/repl/repl_set_commands.cpp index 48969d39a3b..6348f72bfbd 100644 --- a/src/mongo/db/repl/repl_set_commands.cpp +++ b/src/mongo/db/repl/repl_set_commands.cpp @@ -54,7 +54,6 @@ #include "mongo/db/repl/replication_coordinator_external_state_impl.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/repl/replication_coordinator_global.h" -#include "mongo/db/repl/replication_executor.h" #include "mongo/db/repl/storage_interface.h" #include "mongo/db/repl/update_position_args.h" #include "mongo/db/service_context.h" diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index abc0844d4fb..c6213308b17 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -62,7 +62,6 @@ #include "mongo/db/repl/repl_set_html_summary.h" #include "mongo/db/repl/repl_set_request_votes_args.h" #include "mongo/db/repl/repl_settings.h" -#include "mongo/db/repl/replication_executor.h" #include "mongo/db/repl/rslog.h" #include "mongo/db/repl/storage_interface.h" #include "mongo/db/repl/topology_coordinator.h" @@ -97,8 +96,6 @@ MONGO_FP_DECLARE(transitionToPrimaryHangBeforeTakingGlobalExclusiveLock); using CallbackArgs = executor::TaskExecutor::CallbackArgs; using CallbackFn = executor::TaskExecutor::CallbackFn; using CallbackHandle = executor::TaskExecutor::CallbackHandle; -using CBHandle = ReplicationExecutor::CallbackHandle; -using CBHStatus = StatusWith<CBHandle>; using EventHandle = executor::TaskExecutor::EventHandle; using executor::NetworkInterface; using NextAction = Fetcher::NextAction; @@ -472,7 +469,7 @@ bool ReplicationCoordinatorImpl::_startLoadLocalConfig(OperationContext* opCtx) } void ReplicationCoordinatorImpl::_finishLoadLocalConfig( - const ReplicationExecutor::CallbackArgs& cbData, + const executor::TaskExecutor::CallbackArgs& cbData, const ReplSetConfig& localConfig, const StatusWith<OpTime>& lastOpTimeStatus, const StatusWith<LastVote>& lastVoteStatus) { @@ -811,12 +808,12 @@ void ReplicationCoordinatorImpl::clearSyncSourceBlacklist() { _topCoord->clearSyncSourceBlacklist(); } -ReplicationExecutor::EventHandle ReplicationCoordinatorImpl::setFollowerMode_nonBlocking( +executor::TaskExecutor::EventHandle ReplicationCoordinatorImpl::setFollowerMode_nonBlocking( const MemberState& newState, bool* success) { - StatusWith<ReplicationExecutor::EventHandle> finishedSettingFollowerMode = - _replExecutor.makeEvent(); + + auto finishedSettingFollowerMode = _replExecutor.makeEvent(); if (finishedSettingFollowerMode.getStatus() == ErrorCodes::ShutdownInProgress) { - return ReplicationExecutor::EventHandle(); + return {}; } fassert(18812, finishedSettingFollowerMode.getStatus()); _setFollowerModeFinish(newState, finishedSettingFollowerMode.getValue(), success); @@ -833,7 +830,7 @@ bool ReplicationCoordinatorImpl::setFollowerMode(const MemberState& newState) { void ReplicationCoordinatorImpl::_setFollowerModeFinish( const MemberState& newState, - const ReplicationExecutor::EventHandle& finishedSettingFollowerMode, + const executor::TaskExecutor::EventHandle& finishedSettingFollowerMode, bool* success) { stdx::unique_lock<stdx::mutex> lk(_mutex); @@ -1876,7 +1873,7 @@ bool ReplicationCoordinatorImpl::_tryToStepDown(const Date_t waitUntil, } void ReplicationCoordinatorImpl::_handleTimePassing( - const ReplicationExecutor::CallbackArgs& cbData) { + const executor::TaskExecutor::CallbackArgs& cbData) { if (!cbData.status.isOK()) { return; } @@ -2465,7 +2462,7 @@ Status ReplicationCoordinatorImpl::processReplSetReconfig(OperationContext* opCt auto reconfigFinished = uassertStatusOK(_replExecutor.makeEvent()); const auto reconfigFinishFn = [ this, newConfig, myIndex = myIndex.getValue(), reconfigFinished ]( - const ReplicationExecutor::CallbackArgs& cbData) { + const executor::TaskExecutor::CallbackArgs& cbData) { if (!cbData.status.isOK()) { return; @@ -2476,7 +2473,8 @@ Status ReplicationCoordinatorImpl::processReplSetReconfig(OperationContext* opCt // If it's a force reconfig, the primary node may not be electable after the configuration // change. In case we are that primary node, finish the reconfig under the global lock, // so that the step down occurs safely. - CBHStatus cbhStatus(ErrorCodes::InternalError, "reconfigFinishFn hasn't been scheduled"); + StatusWith<CallbackHandle> cbhStatus(ErrorCodes::InternalError, + "reconfigFinishFn hasn't been scheduled"); if (args.force) { cbhStatus = _replExecutor.scheduleWorkWithGlobalExclusiveLock(reconfigFinishFn); } else { @@ -2496,7 +2494,7 @@ Status ReplicationCoordinatorImpl::processReplSetReconfig(OperationContext* opCt void ReplicationCoordinatorImpl::_finishReplSetReconfig( const ReplSetConfig& newConfig, int myIndex, - const ReplicationExecutor::EventHandle& finishedEvent) { + const executor::TaskExecutor::EventHandle& finishedEvent) { stdx::unique_lock<stdx::mutex> lk(_mutex); invariant(_rsConfigState == kConfigReconfiguring); @@ -3165,7 +3163,7 @@ HostAndPort ReplicationCoordinatorImpl::chooseNewSyncSource(const OpTime& lastOp } void ReplicationCoordinatorImpl::_unblacklistSyncSource( - const ReplicationExecutor::CallbackArgs& cbData, const HostAndPort& host) { + const executor::TaskExecutor::CallbackArgs& cbData, const HostAndPort& host) { if (cbData.status == ErrorCodes::CallbackCanceled) return; @@ -3785,7 +3783,8 @@ void ReplicationCoordinatorImpl::setIndexPrefetchConfig( _indexPrefetchConfig = cfg; } -ReplicationExecutor::EventHandle ReplicationCoordinatorImpl::_cancelElectionIfNeeded_inTopoLock() { +executor::TaskExecutor::EventHandle +ReplicationCoordinatorImpl::_cancelElectionIfNeeded_inTopoLock() { if (_topCoord->getRole() != TopologyCoordinator::Role::candidate) { return {}; } diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index 84b1fce4a6b..aa9d0fd5533 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -47,6 +47,7 @@ #include "mongo/db/repl/topology_coordinator.h" #include "mongo/db/repl/update_position_args.h" #include "mongo/db/storage/snapshot_name.h" +#include "mongo/executor/task_executor.h" #include "mongo/platform/atomic_word.h" #include "mongo/platform/unordered_map.h" #include "mongo/platform/unordered_set.h" @@ -375,8 +376,8 @@ public: * When the operation is complete (wait() returns), 'success' will be set to true * if the member state has been set successfully. */ - ReplicationExecutor::EventHandle setFollowerMode_nonBlocking(const MemberState& newState, - bool* success); + executor::TaskExecutor::EventHandle setFollowerMode_nonBlocking(const MemberState& newState, + bool* success); /** * Non-blocking version of updateTerm. @@ -385,7 +386,7 @@ public: * to a status telling if the term increased or a stepdown was triggered. */ - ReplicationExecutor::EventHandle updateTerm_forTest( + executor::TaskExecutor::EventHandle updateTerm_forTest( long long term, TopologyCoordinator::UpdateTermResult* updateResult); /** @@ -525,7 +526,7 @@ private: typedef std::vector<SlaveInfo> SlaveInfoVector; - typedef std::vector<ReplicationExecutor::CallbackHandle> HeartbeatHandles; + typedef std::vector<executor::TaskExecutor::CallbackHandle> HeartbeatHandles; /** * Appends a "replicationProgress" section with data for each member in set. @@ -603,8 +604,7 @@ private: /** * Helper to update our saved config, cancel any pending heartbeats, and kick off sending - * new heartbeats based on the new config. Must *only* be called from within the - * ReplicationExecutor context. + * new heartbeats based on the new config. * * Returns an action to be performed after unlocking _mutex, via * _performPostMemberStateUpdateAction. @@ -628,7 +628,7 @@ private: * need to change as a result of time passing - for instance becoming PRIMARY when a single * node replica set member's stepDown period ends. */ - void _handleTimePassing(const ReplicationExecutor::CallbackArgs& cbData); + void _handleTimePassing(const executor::TaskExecutor::CallbackArgs& cbData); /** * Helper method for _awaitReplication that takes an already locked unique_lock, but leaves @@ -697,9 +697,10 @@ private: * supply an event, "finishedSettingFollowerMode", and wait for that event to * be signaled. Do not observe "*success" until after the event is signaled. */ - void _setFollowerModeFinish(const MemberState& newState, - const ReplicationExecutor::EventHandle& finishedSettingFollowerMode, - bool* success); + void _setFollowerModeFinish( + const MemberState& newState, + const executor::TaskExecutor::EventHandle& finishedSettingFollowerMode, + bool* success); /** * Helper method for updating our tracking of the last optime applied by a given node. @@ -742,13 +743,13 @@ private: * * Schedules additional heartbeats, triggers elections and step downs, etc. */ - void _handleHeartbeatResponse(const ReplicationExecutor::RemoteCommandCallbackArgs& cbData, + void _handleHeartbeatResponse(const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData, int targetIndex); void _trackHeartbeatHandle_inlock( - const StatusWith<ReplicationExecutor::CallbackHandle>& handle); + const StatusWith<executor::TaskExecutor::CallbackHandle>& handle); - void _untrackHeartbeatHandle_inlock(const ReplicationExecutor::CallbackHandle& handle); + void _untrackHeartbeatHandle_inlock(const executor::TaskExecutor::CallbackHandle& handle); /* * Return a randomized offset amount that is scaled in proportion to the size of the @@ -789,7 +790,7 @@ private: * * Scheduled by _scheduleHeartbeatToTarget_inlock. */ - void _doMemberHeartbeat(ReplicationExecutor::CallbackArgs cbData, + void _doMemberHeartbeat(executor::TaskExecutor::CallbackArgs cbData, const HostAndPort& target, int targetIndex); @@ -811,7 +812,7 @@ private: * Callback that finishes the work started in _startLoadLocalConfig and sets _rsConfigState * to kConfigSteady, so that we can begin processing heartbeats and reconfigs. */ - void _finishLoadLocalConfig(const ReplicationExecutor::CallbackArgs& cbData, + void _finishLoadLocalConfig(const executor::TaskExecutor::CallbackArgs& cbData, const ReplSetConfig& localConfig, const StatusWith<OpTime>& lastOpTimeStatus, const StatusWith<LastVote>& lastVoteStatus); @@ -838,7 +839,7 @@ private: */ void _finishReplSetReconfig(const ReplSetConfig& newConfig, int myIndex, - const ReplicationExecutor::EventHandle& finishedEvent); + const executor::TaskExecutor::EventHandle& finishedEvent); /** * Changes _rsConfigState to newState, and notify any waiters. @@ -906,7 +907,7 @@ private: * This job will be scheduled to run in DB worker threads. */ void _writeLastVoteForMyElection(LastVote lastVote, - const ReplicationExecutor::CallbackArgs& cbData); + const executor::TaskExecutor::CallbackArgs& cbData); /** * Starts VoteRequester to run the real election when last vote write has completed. @@ -924,7 +925,7 @@ private: /** * Callback called after a random delay, to prevent repeated election ties. */ - void _recoverFromElectionTie(const ReplicationExecutor::CallbackArgs& cbData); + void _recoverFromElectionTie(const executor::TaskExecutor::CallbackArgs& cbData); /** * Removes 'host' from the sync source blacklist. If 'host' isn't found, it's simply @@ -932,7 +933,7 @@ private: * * Must be scheduled as a callback. */ - void _unblacklistSyncSource(const ReplicationExecutor::CallbackArgs& cbData, + void _unblacklistSyncSource(const executor::TaskExecutor::CallbackArgs& cbData, const HostAndPort& host); /** @@ -943,15 +944,15 @@ private: /** * Schedules stepdown to run with the global exclusive lock. */ - ReplicationExecutor::EventHandle _stepDownStart(); + executor::TaskExecutor::EventHandle _stepDownStart(); /** * Completes a step-down of the current node. Must be run with a global * shared or global exclusive lock. * Signals 'finishedEvent' on successful completion. */ - void _stepDownFinish(const ReplicationExecutor::CallbackArgs& cbData, - const ReplicationExecutor::EventHandle& finishedEvent); + void _stepDownFinish(const executor::TaskExecutor::CallbackArgs& cbData, + const executor::TaskExecutor::EventHandle& finishedEvent); /** * Schedules a replica set config change. @@ -962,19 +963,19 @@ private: * Callback that continues a heartbeat-initiated reconfig after a running election * completes. */ - void _heartbeatReconfigAfterElectionCanceled(const ReplicationExecutor::CallbackArgs& cbData, + void _heartbeatReconfigAfterElectionCanceled(const executor::TaskExecutor::CallbackArgs& cbData, const ReplSetConfig& newConfig); /** * Method to write a configuration transmitted via heartbeat message to stable storage. */ - void _heartbeatReconfigStore(const ReplicationExecutor::CallbackArgs& cbd, + void _heartbeatReconfigStore(const executor::TaskExecutor::CallbackArgs& cbd, const ReplSetConfig& newConfig); /** * Conclusion actions of a heartbeat-triggered reconfiguration. */ - void _heartbeatReconfigFinish(const ReplicationExecutor::CallbackArgs& cbData, + void _heartbeatReconfigFinish(const executor::TaskExecutor::CallbackArgs& cbData, const ReplSetConfig& newConfig, StatusWith<int> myIndex); @@ -1055,7 +1056,7 @@ private: * Callback which marks downed nodes as down, triggers a stepdown if a majority of nodes are no * longer visible, and reschedules itself. */ - void _handleLivenessTimeout(const ReplicationExecutor::CallbackArgs& cbData); + void _handleLivenessTimeout(const executor::TaskExecutor::CallbackArgs& cbData); /** * If "updatedMemberId" is the current _earliestMemberId, cancels the current @@ -1182,7 +1183,7 @@ private: * * Caller must already have locked the _topoMutex. */ - ReplicationExecutor::EventHandle _cancelElectionIfNeeded_inTopoLock(); + executor::TaskExecutor::EventHandle _cancelElectionIfNeeded_inTopoLock(); /** * Waits until the optime of the current node is at least the opTime specified in 'readConcern'. @@ -1315,11 +1316,11 @@ private: // Event that the election code will signal when the in-progress election completes. // Unspecified value when _freshnessChecker is NULL. - ReplicationExecutor::EventHandle _electionFinishedEvent; // (M) + executor::TaskExecutor::EventHandle _electionFinishedEvent; // (M) // Event that the election code will signal when the in-progress election dry run completes, // which includes writing the last vote and scheduling the real election. - ReplicationExecutor::EventHandle _electionDryRunFinishedEvent; // (M) + executor::TaskExecutor::EventHandle _electionDryRunFinishedEvent; // (M) // Whether we slept last time we attempted an election but possibly tied with other nodes. bool _sleptLastElection; // (M) @@ -1369,10 +1370,10 @@ private: stdx::condition_variable _currentCommittedSnapshotCond; // (M) // Callback Handle used to cancel a scheduled LivenessTimeout callback. - ReplicationExecutor::CallbackHandle _handleLivenessTimeoutCbh; // (M) + executor::TaskExecutor::CallbackHandle _handleLivenessTimeoutCbh; // (M) // Callback Handle used to cancel a scheduled ElectionTimeout callback. - ReplicationExecutor::CallbackHandle _handleElectionTimeoutCbh; // (M) + executor::TaskExecutor::CallbackHandle _handleElectionTimeoutCbh; // (M) // Election timeout callback will not run before this time. // If this date is Date_t(), the callback is either unscheduled or canceled. @@ -1380,7 +1381,7 @@ private: Date_t _handleElectionTimeoutWhen; // (M) // Callback Handle used to cancel a scheduled PriorityTakover callback. - ReplicationExecutor::CallbackHandle _priorityTakeoverCbh; // (M) + executor::TaskExecutor::CallbackHandle _priorityTakeoverCbh; // (M) // Priority takeover callback will not run before this time. // If this date is Date_t(), the callback is either unscheduled or canceled. diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect.cpp index fc712893ed8..f05821d1b58 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_elect.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_elect.cpp @@ -48,10 +48,10 @@ class LoseElectionGuard { public: LoseElectionGuard(TopologyCoordinator* topCoord, - ReplicationExecutor* executor, + executor::TaskExecutor* executor, std::unique_ptr<FreshnessChecker>* freshnessChecker, std::unique_ptr<ElectCmdRunner>* electCmdRunner, - ReplicationExecutor::EventHandle* electionFinishedEvent) + executor::TaskExecutor::EventHandle* electionFinishedEvent) : _topCoord(topCoord), _executor(executor), _freshnessChecker(freshnessChecker), @@ -77,10 +77,10 @@ public: private: TopologyCoordinator* const _topCoord; - ReplicationExecutor* const _executor; + executor::TaskExecutor* const _executor; std::unique_ptr<FreshnessChecker>* const _freshnessChecker; std::unique_ptr<ElectCmdRunner>* const _electCmdRunner; - const ReplicationExecutor::EventHandle* _electionFinishedEvent; + const executor::TaskExecutor::EventHandle* _electionFinishedEvent; bool _dismissed; }; @@ -107,7 +107,7 @@ void ReplicationCoordinatorImpl::_startElectSelf_inlock() { } log() << "Standing for election"; - const StatusWith<ReplicationExecutor::EventHandle> finishEvh = _replExecutor.makeEvent(); + const StatusWith<executor::TaskExecutor::EventHandle> finishEvh = _replExecutor.makeEvent(); if (finishEvh.getStatus() == ErrorCodes::ShutdownInProgress) { return; } @@ -132,7 +132,7 @@ void ReplicationCoordinatorImpl::_startElectSelf_inlock() { _freshnessChecker.reset(new FreshnessChecker); - StatusWith<ReplicationExecutor::EventHandle> nextPhaseEvh = + StatusWith<executor::TaskExecutor::EventHandle> nextPhaseEvh = _freshnessChecker->start(&_replExecutor, lastOpTimeApplied.getTimestamp(), _rsConfig, @@ -209,7 +209,7 @@ void ReplicationCoordinatorImpl::_onFreshnessCheckComplete() { } _electCmdRunner.reset(new ElectCmdRunner); - StatusWith<ReplicationExecutor::EventHandle> nextPhaseEvh = _electCmdRunner->start( + StatusWith<executor::TaskExecutor::EventHandle> nextPhaseEvh = _electCmdRunner->start( &_replExecutor, _rsConfig, _selfIndex, _topCoord->getMaybeUpHostAndPorts()); if (nextPhaseEvh.getStatus() == ErrorCodes::ShutdownInProgress) { return; @@ -272,7 +272,7 @@ void ReplicationCoordinatorImpl::_onElectCmdRunnerComplete() { } void ReplicationCoordinatorImpl::_recoverFromElectionTie( - const ReplicationExecutor::CallbackArgs& cbData) { + const executor::TaskExecutor::CallbackArgs& cbData) { stdx::unique_lock<stdx::mutex> lk(_mutex); auto now = _replExecutor.now(); diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp index e0ba997c564..2f8bea4a620 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp @@ -146,11 +146,13 @@ TEST_F(ReplCoordElectTest, ElectionSucceedsWhenNodeIsTheOnlyElectableNode) { net->enterNetwork(); const NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); // blackhole heartbeat - net->scheduleResponse(noi, net->now(), ResponseStatus(ErrorCodes::OperationFailed, "timeout")); + net->scheduleResponse( + noi, net->now(), executor::RemoteCommandResponse(ErrorCodes::OperationFailed, "timeout")); net->runReadyNetworkOperations(); // blackhole freshness const NetworkInterfaceMock::NetworkOperationIterator noi2 = net->getNextReadyRequest(); - net->scheduleResponse(noi2, net->now(), ResponseStatus(ErrorCodes::OperationFailed, "timeout")); + net->scheduleResponse( + noi2, net->now(), executor::RemoteCommandResponse(ErrorCodes::OperationFailed, "timeout")); net->runReadyNetworkOperations(); net->exitNetwork(); diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp index 0716713358e..779ca98776c 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp @@ -138,7 +138,7 @@ void ReplicationCoordinatorImpl::_startElectSelfV1_inlock() { _voteRequester.reset(new VoteRequester); long long term = _topCoord->getTerm(); - StatusWith<ReplicationExecutor::EventHandle> nextPhaseEvh = + StatusWith<executor::TaskExecutor::EventHandle> nextPhaseEvh = _voteRequester->start(&_replExecutor, _rsConfig, _selfIndex, @@ -190,7 +190,7 @@ void ReplicationCoordinatorImpl::_onDryRunComplete(long long originalTerm) { LastVote lastVote{originalTerm + 1, _selfIndex}; auto cbStatus = _replExecutor.scheduleDBWork( - [this, lastVote](const ReplicationExecutor::CallbackArgs& cbData) { + [this, lastVote](const executor::TaskExecutor::CallbackArgs& cbData) { _writeLastVoteForMyElection(lastVote, cbData); }); if (cbStatus.getStatus() == ErrorCodes::ShutdownInProgress) { @@ -201,7 +201,7 @@ void ReplicationCoordinatorImpl::_onDryRunComplete(long long originalTerm) { } void ReplicationCoordinatorImpl::_writeLastVoteForMyElection( - LastVote lastVote, const ReplicationExecutor::CallbackArgs& cbData) { + LastVote lastVote, const executor::TaskExecutor::CallbackArgs& cbData) { // storeLocalLastVoteDocument can call back in to the replication coordinator, // so _mutex must be unlocked here. However, we cannot return until we // lock it because we want to lose the election on cancel or error and @@ -236,7 +236,7 @@ void ReplicationCoordinatorImpl::_startVoteRequester_inlock(long long newTerm) { const auto lastOpTime = _getMyLastAppliedOpTime_inlock(); _voteRequester.reset(new VoteRequester); - StatusWith<ReplicationExecutor::EventHandle> nextPhaseEvh = _voteRequester->start( + StatusWith<executor::TaskExecutor::EventHandle> nextPhaseEvh = _voteRequester->start( &_replExecutor, _rsConfig, _selfIndex, _topCoord->getTerm(), false, lastOpTime); if (nextPhaseEvh.getStatus() == ErrorCodes::ShutdownInProgress) { return; diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp index c26e5af56ea..1075a7e9232 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp @@ -1373,7 +1373,7 @@ protected: return config; } - ResponseStatus makeFreshnessScanResponse(OpTime opTime) { + executor::RemoteCommandResponse makeFreshnessScanResponse(OpTime opTime) { // OpTime part of replSetGetStatus. return makeResponseStatus(BSON("optimes" << BSON("appliedOpTime" << opTime.toBSON()))); } diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp index ac532ba036c..ed1bb1465a5 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp @@ -44,7 +44,6 @@ #include "mongo/db/repl/repl_set_heartbeat_args_v1.h" #include "mongo/db/repl/repl_set_heartbeat_response.h" #include "mongo/db/repl/replication_coordinator_impl.h" -#include "mongo/db/repl/replication_executor.h" #include "mongo/db/repl/topology_coordinator.h" #include "mongo/db/repl/vote_requester.h" #include "mongo/db/service_context.h" @@ -63,10 +62,6 @@ namespace repl { namespace { -using CallbackArgs = executor::TaskExecutor::CallbackArgs; -using CBHandle = ReplicationExecutor::CallbackHandle; -using CBHStatus = StatusWith<CBHandle>; - MONGO_FP_DECLARE(blockHeartbeatStepdown); MONGO_FP_DECLARE(blockHeartbeatReconfigFinish); @@ -88,7 +83,7 @@ Milliseconds ReplicationCoordinatorImpl::_getRandomizedElectionOffset() { return Milliseconds(randomOffset); } -void ReplicationCoordinatorImpl::_doMemberHeartbeat(ReplicationExecutor::CallbackArgs cbData, +void ReplicationCoordinatorImpl::_doMemberHeartbeat(executor::TaskExecutor::CallbackArgs cbData, const HostAndPort& target, int targetIndex) { stdx::lock_guard<stdx::mutex> lk(_mutex); @@ -115,7 +110,7 @@ void ReplicationCoordinatorImpl::_doMemberHeartbeat(ReplicationExecutor::Callbac const RemoteCommandRequest request( target, "admin", heartbeatObj, BSON(rpc::kReplSetMetadataFieldName << 1), nullptr, timeout); - const ReplicationExecutor::RemoteCommandCallbackFn callback = + const executor::TaskExecutor::RemoteCommandCallbackFn callback = stdx::bind(&ReplicationCoordinatorImpl::_handleHeartbeatResponse, this, stdx::placeholders::_1, @@ -141,7 +136,7 @@ void ReplicationCoordinatorImpl::_scheduleHeartbeatToTarget_inlock(const HostAnd } void ReplicationCoordinatorImpl::_handleHeartbeatResponse( - const ReplicationExecutor::RemoteCommandCallbackArgs& cbData, int targetIndex) { + const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData, int targetIndex) { stdx::unique_lock<stdx::mutex> lk(_mutex); // remove handle from queued heartbeats @@ -321,7 +316,7 @@ namespace { /** * This callback is purely for logging and has no effect on any other operations */ -void remoteStepdownCallback(const ReplicationExecutor::RemoteCommandCallbackArgs& cbData) { +void remoteStepdownCallback(const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData) { const Status status = cbData.response.status; if (status == ErrorCodes::CallbackCanceled) { return; @@ -348,13 +343,13 @@ void ReplicationCoordinatorImpl::_requestRemotePrimaryStepdown(const HostAndPort nullptr); log() << "Requesting " << target << " step down from primary"; - CBHStatus cbh = _replExecutor.scheduleRemoteCommand(request, remoteStepdownCallback); + auto cbh = _replExecutor.scheduleRemoteCommand(request, remoteStepdownCallback); if (cbh.getStatus() != ErrorCodes::ShutdownInProgress) { fassert(18808, cbh.getStatus()); } } -ReplicationExecutor::EventHandle ReplicationCoordinatorImpl::_stepDownStart() { +executor::TaskExecutor::EventHandle ReplicationCoordinatorImpl::_stepDownStart() { auto finishEvent = _makeEvent(); if (!finishEvent) { return finishEvent; @@ -366,8 +361,8 @@ ReplicationExecutor::EventHandle ReplicationCoordinatorImpl::_stepDownStart() { } void ReplicationCoordinatorImpl::_stepDownFinish( - const ReplicationExecutor::CallbackArgs& cbData, - const ReplicationExecutor::EventHandle& finishedEvent) { + const executor::TaskExecutor::CallbackArgs& cbData, + const executor::TaskExecutor::EventHandle& finishedEvent) { if (cbData.status == ErrorCodes::CallbackCanceled) { return; } @@ -440,7 +435,7 @@ void ReplicationCoordinatorImpl::_scheduleHeartbeatReconfig_inlock(const ReplSet } void ReplicationCoordinatorImpl::_heartbeatReconfigAfterElectionCanceled( - const ReplicationExecutor::CallbackArgs& cbData, const ReplSetConfig& newConfig) { + const executor::TaskExecutor::CallbackArgs& cbData, const ReplSetConfig& newConfig) { if (cbData.status == ErrorCodes::CallbackCanceled) { return; } @@ -458,7 +453,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigAfterElectionCanceled( } void ReplicationCoordinatorImpl::_heartbeatReconfigStore( - const ReplicationExecutor::CallbackArgs& cbd, const ReplSetConfig& newConfig) { + const executor::TaskExecutor::CallbackArgs& cbd, const ReplSetConfig& newConfig) { if (cbd.status.code() == ErrorCodes::CallbackCanceled) { log() << "The callback to persist the replica set configuration was canceled - " << "the configuration was not persisted but was used: " << newConfig.toBSON(); @@ -536,7 +531,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore( } void ReplicationCoordinatorImpl::_heartbeatReconfigFinish( - const ReplicationExecutor::CallbackArgs& cbData, + const executor::TaskExecutor::CallbackArgs& cbData, const ReplSetConfig& newConfig, StatusWith<int> myIndex) { @@ -615,15 +610,17 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish( auto evh = _resetElectionInfoOnProtocolVersionUpgrade(oldConfig, newConfig); lk.unlock(); if (evh) { - _replExecutor.onEvent(evh, [this, action](const CallbackArgs& cbArgs) { - _performPostMemberStateUpdateAction(action); - }); + _replExecutor.onEvent(evh, + [this, action](const executor::TaskExecutor::CallbackArgs& cbArgs) { + _performPostMemberStateUpdateAction(action); + }); } else { _performPostMemberStateUpdateAction(action); } } -void ReplicationCoordinatorImpl::_trackHeartbeatHandle_inlock(const StatusWith<CBHandle>& handle) { +void ReplicationCoordinatorImpl::_trackHeartbeatHandle_inlock( + const StatusWith<executor::TaskExecutor::CallbackHandle>& handle) { if (handle.getStatus() == ErrorCodes::ShutdownInProgress) { return; } @@ -631,7 +628,8 @@ void ReplicationCoordinatorImpl::_trackHeartbeatHandle_inlock(const StatusWith<C _heartbeatHandles.push_back(handle.getValue()); } -void ReplicationCoordinatorImpl::_untrackHeartbeatHandle_inlock(const CBHandle& handle) { +void ReplicationCoordinatorImpl::_untrackHeartbeatHandle_inlock( + const executor::TaskExecutor::CallbackHandle& handle) { const HeartbeatHandles::iterator newEnd = std::remove(_heartbeatHandles.begin(), _heartbeatHandles.end(), handle); invariant(newEnd != _heartbeatHandles.end()); @@ -641,9 +639,10 @@ void ReplicationCoordinatorImpl::_untrackHeartbeatHandle_inlock(const CBHandle& void ReplicationCoordinatorImpl::_cancelHeartbeats_inlock() { LOG_FOR_HEARTBEATS(2) << "Cancelling all heartbeats."; - std::for_each(_heartbeatHandles.begin(), - _heartbeatHandles.end(), - stdx::bind(&ReplicationExecutor::cancel, &_replExecutor, stdx::placeholders::_1)); + std::for_each( + _heartbeatHandles.begin(), + _heartbeatHandles.end(), + stdx::bind(&executor::TaskExecutor::cancel, &_replExecutor, stdx::placeholders::_1)); // Heartbeat callbacks will remove themselves from _heartbeatHandles when they execute with // CallbackCanceled status, so it's better to leave the handles in the list, for now. @@ -676,7 +675,7 @@ void ReplicationCoordinatorImpl::_startHeartbeats_inlock() { } void ReplicationCoordinatorImpl::_handleLivenessTimeout( - const ReplicationExecutor::CallbackArgs& cbData) { + const executor::TaskExecutor::CallbackArgs& cbData) { stdx::unique_lock<stdx::mutex> lk(_mutex); // Only reset the callback handle if it matches, otherwise more will be coming through if (cbData.myHandle == _handleLivenessTimeoutCbh) { diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp index 5f4101e9eb2..59eee099c93 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp @@ -365,8 +365,7 @@ TEST_F(ReplCoordHBV1Test, IgnoreTheContentsOfMetadataWhenItsReplicaSetIdDoesNotM // Prepare heartbeat response. OID unexpectedId = OID::gen(); OpTime opTime{Timestamp{10, 10}, 10}; - ReplicationExecutor::ResponseStatus heartbeatResponse(ErrorCodes::InternalError, - "not initialized"); + RemoteCommandResponse heartbeatResponse(ErrorCodes::InternalError, "not initialized"); { ReplSetHeartbeatResponse hbResp; hbResp.setSetName(rsConfig.getReplSetName()); diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp index e69c85d57d1..459cc393927 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp @@ -362,8 +362,9 @@ TEST_F(ReplCoordTest, NodeReturnsNodeNotFoundWhenQuorumCheckFailsWhileInitiating ASSERT_EQUALS(HostAndPort("node2", 54321), noi->getRequest().target); ASSERT_EQUALS("admin", noi->getRequest().dbname); ASSERT_BSONOBJ_EQ(hbArgs.toBSON(), noi->getRequest().cmdObj); - getNet()->scheduleResponse( - noi, startDate + Milliseconds(10), ResponseStatus(ErrorCodes::NoSuchKey, "No response")); + getNet()->scheduleResponse(noi, + startDate + Milliseconds(10), + RemoteCommandResponse(ErrorCodes::NoSuchKey, "No response")); getNet()->runUntil(startDate + Milliseconds(10)); getNet()->exitNetwork(); ASSERT_EQUALS(startDate + Milliseconds(10), getNet()->now()); @@ -398,7 +399,7 @@ TEST_F(ReplCoordTest, InitiateSucceedsWhenQuorumCheckPasses) { getNet()->scheduleResponse( noi, startDate + Milliseconds(10), - ResponseStatus(RemoteCommandResponse(hbResp.toBSON(false), BSONObj(), Milliseconds(8)))); + RemoteCommandResponse(hbResp.toBSON(false), BSONObj(), Milliseconds(8))); getNet()->runUntil(startDate + Milliseconds(10)); getNet()->exitNetwork(); ASSERT_EQUALS(startDate + Milliseconds(10), getNet()->now()); diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp index 0df971091d4..e5a17dd5c1a 100644 --- a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp +++ b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp @@ -195,16 +195,17 @@ void ReplCoordTest::assertStartSuccess(const BSONObj& configDoc, const HostAndPo ASSERT_NE(MemberState::RS_STARTUP, getReplCoord()->getMemberState().s); } -ResponseStatus ReplCoordTest::makeResponseStatus(const BSONObj& doc, Milliseconds millis) { +executor::RemoteCommandResponse ReplCoordTest::makeResponseStatus(const BSONObj& doc, + Milliseconds millis) { return makeResponseStatus(doc, BSONObj(), millis); } -ResponseStatus ReplCoordTest::makeResponseStatus(const BSONObj& doc, - const BSONObj& metadata, - Milliseconds millis) { +executor::RemoteCommandResponse ReplCoordTest::makeResponseStatus(const BSONObj& doc, + const BSONObj& metadata, + Milliseconds millis) { log() << "Responding with " << doc << " (metadata: " << metadata << "; elapsed: " << millis << ")"; - return ResponseStatus(RemoteCommandResponse(doc, metadata, millis)); + return RemoteCommandResponse(doc, metadata, millis); } void ReplCoordTest::simulateEnoughHeartbeatsForAllNodesUp() { diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.h b/src/mongo/db/repl/replication_coordinator_test_fixture.h index 5ffab24a8ef..972e2f503ae 100644 --- a/src/mongo/db/repl/replication_coordinator_test_fixture.h +++ b/src/mongo/db/repl/replication_coordinator_test_fixture.h @@ -59,18 +59,17 @@ class TopologyCoordinatorImpl; class ReplCoordTest : public mongo::unittest::Test { public: /** - * Makes a ResponseStatus with the given "doc" response and optional elapsed time "millis". + * Makes a command response with the given "doc" response and optional elapsed time "millis". */ - static ResponseStatus makeResponseStatus(const BSONObj& doc, - Milliseconds millis = Milliseconds(0)); + static executor::RemoteCommandResponse makeResponseStatus( + const BSONObj& doc, Milliseconds millis = Milliseconds(0)); /** - * Makes a ResponseStatus with the given "doc" response, metadata and optional elapsed time + * Makes a command response with the given "doc" response, metadata and optional elapsed time * "millis". */ - static ResponseStatus makeResponseStatus(const BSONObj& doc, - const BSONObj& metadata, - Milliseconds millis = Milliseconds(0)); + static executor::RemoteCommandResponse makeResponseStatus( + const BSONObj& doc, const BSONObj& metadata, Milliseconds millis = Milliseconds(0)); /** * Constructs a ReplSetConfig from the given BSON, or raises a test failure exception. diff --git a/src/mongo/db/repl/replication_executor.cpp b/src/mongo/db/repl/replication_executor.cpp index 8dc65f6d7e3..9fcf7b01359 100644 --- a/src/mongo/db/repl/replication_executor.cpp +++ b/src/mongo/db/repl/replication_executor.cpp @@ -268,13 +268,13 @@ StatusWith<ReplicationExecutor::CallbackHandle> ReplicationExecutor::onEvent( static void remoteCommandFinished(const ReplicationExecutor::CallbackArgs& cbData, const ReplicationExecutor::RemoteCommandCallbackFn& cb, const RemoteCommandRequest& request, - const ResponseStatus& response) { + const RemoteCommandResponse& response) { if (cbData.status.isOK()) { cb(ReplicationExecutor::RemoteCommandCallbackArgs( cbData.executor, cbData.myHandle, request, response)); } else { cb(ReplicationExecutor::RemoteCommandCallbackArgs( - cbData.executor, cbData.myHandle, request, ResponseStatus(cbData.status))); + cbData.executor, cbData.myHandle, request, cbData.status)); } } @@ -283,11 +283,11 @@ static void remoteCommandFailedEarly(const ReplicationExecutor::CallbackArgs& cb const RemoteCommandRequest& request) { invariant(!cbData.status.isOK()); cb(ReplicationExecutor::RemoteCommandCallbackArgs( - cbData.executor, cbData.myHandle, request, ResponseStatus(cbData.status))); + cbData.executor, cbData.myHandle, request, cbData.status)); } void ReplicationExecutor::_finishRemoteCommand(const RemoteCommandRequest& request, - const ResponseStatus& response, + const RemoteCommandResponse& response, const CallbackHandle& cbHandle, const uint64_t expectedHandleGeneration, const RemoteCommandCallbackFn& cb) { diff --git a/src/mongo/db/repl/replication_executor.h b/src/mongo/db/repl/replication_executor.h index 3205b816776..37b05e2fa66 100644 --- a/src/mongo/db/repl/replication_executor.h +++ b/src/mongo/db/repl/replication_executor.h @@ -365,8 +365,6 @@ private: EventHandle _finishedEvent; }; -typedef ReplicationExecutor::ResponseStatus ResponseStatus; - /** * Description of a scheduled but not-yet-run work item. * diff --git a/src/mongo/db/repl/reporter_test.cpp b/src/mongo/db/repl/reporter_test.cpp index 483901d0fb5..383b2cf8aca 100644 --- a/src/mongo/db/repl/reporter_test.cpp +++ b/src/mongo/db/repl/reporter_test.cpp @@ -46,8 +46,6 @@ using executor::NetworkInterfaceMock; using executor::RemoteCommandRequest; using executor::RemoteCommandResponse; -using ResponseStatus = mongo::executor::TaskExecutor::ResponseStatus; - class MockProgressManager { public: void updateMap(int memberId, const OpTime& lastDurableOpTime, const OpTime& lastAppliedOpTime) { @@ -122,7 +120,7 @@ public: */ BSONObj processNetworkResponse(const BSONObj& obj, bool expectReadyRequestsAfterProcessing = false); - BSONObj processNetworkResponse(const ResponseStatus rs, + BSONObj processNetworkResponse(const RemoteCommandResponse rs, bool expectReadyRequestsAfterProcessing = false); void runUntil(Date_t when, bool expectReadyRequestsAfterAdvancingClock = false); @@ -211,7 +209,7 @@ BSONObj ReporterTest::processNetworkResponse(const BSONObj& obj, return cmdObj; } -BSONObj ReporterTest::processNetworkResponse(const ResponseStatus rs, +BSONObj ReporterTest::processNetworkResponse(const RemoteCommandResponse rs, bool expectReadyRequestsAfterProcessing) { auto net = getNet(); net->enterNetwork(); |