diff options
author | Scott Hernandez <scotthernandez@gmail.com> | 2014-07-18 15:52:17 -0400 |
---|---|---|
committer | Scott Hernandez <scotthernandez@gmail.com> | 2014-07-21 15:39:45 -0400 |
commit | d8f96453ea6e86c9fbe020766edda85b44823201 (patch) | |
tree | 7ce61830c4fb38685d036b9000d10ef2b14eb8c4 /src | |
parent | 17a35699a9d760965e22e1d3f7ffc813e4edb658 (diff) | |
download | mongo-d8f96453ea6e86c9fbe020766edda85b44823201.tar.gz |
SERVER-14560: Add cancelHeartbeats
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/repl/repl_coordinator_impl.cpp | 30 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_coordinator_impl.h | 22 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_executor.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_executor.h | 25 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_executor_test.cpp | 53 |
5 files changed, 122 insertions, 20 deletions
diff --git a/src/mongo/db/repl/repl_coordinator_impl.cpp b/src/mongo/db/repl/repl_coordinator_impl.cpp index 5b5c54c0492..3f38d0d381e 100644 --- a/src/mongo/db/repl/repl_coordinator_impl.cpp +++ b/src/mongo/db/repl/repl_coordinator_impl.cpp @@ -493,8 +493,36 @@ namespace repl { // TODO } + void ReplicationCoordinatorImpl::_trackHeartbeatHandle( + const ReplicationExecutor::CallbackHandle& handle) { + // this mutex should not be needed because it is always used during a callback. + // boost::mutex::scoped_lock lock(_mutex); + _heartbeatHandles.push_back(handle); + } + + void ReplicationCoordinatorImpl::_untrackHeartbeatHandle( + const ReplicationExecutor::CallbackHandle& handle) { + // this mutex should not be needed because it is always used during a callback. + // boost::mutex::scoped_lock lock(_mutex); + HeartbeatHandles::iterator it = std::find(_heartbeatHandles.begin(), + _heartbeatHandles.end(), + handle); + invariant(it != _heartbeatHandles.end()); + + _heartbeatHandles.erase(it); + + } + void ReplicationCoordinatorImpl::cancelHeartbeats() { - // TODO + // this mutex should not be needed because it is always used during a callback. + //boost::mutex::scoped_lock lock(_mutex); + HeartbeatHandles::const_iterator it = _heartbeatHandles.begin(); + const HeartbeatHandles::const_iterator end = _heartbeatHandles.end(); + for( ; it != end; ++it ) { + _replExecutor->cancel(*it); + } + + _heartbeatHandles.clear(); } } // namespace repl diff --git a/src/mongo/db/repl/repl_coordinator_impl.h b/src/mongo/db/repl/repl_coordinator_impl.h index 7da40107c8b..dcf635edebc 100644 --- a/src/mongo/db/repl/repl_coordinator_impl.h +++ b/src/mongo/db/repl/repl_coordinator_impl.h @@ -169,6 +169,14 @@ namespace repl { void doMemberHeartbeat(ReplicationExecutor* executor, const Status& inStatus, const HostAndPort& hap); + + /** + * Cancels all heartbeats. + * + * This is only called during the callback when there is a new config. + * At this time no new heartbeats can be scheduled due to the serialization + * of calls via the executor. + */ void cancelHeartbeats(); private: @@ -193,17 +201,9 @@ namespace repl { Date_t firstCallDate, int retriesLeft); - void _trackHeartbeatHandle(const ReplicationExecutor::CallbackHandle& handle) { - // this mutex should not be needed because it is always used during a callback. - // boost::mutex::scoped_lock lock(_mutex); - _heartbeatHandles.push_back(handle); - } - - void _untrackHeartbeatHandle(const ReplicationExecutor::CallbackHandle& handle) { - // this mutex should not be needed because it is always used during a callback. - // boost::mutex::scoped_lock lock(_mutex); - // TODO - } + void _trackHeartbeatHandle(const ReplicationExecutor::CallbackHandle& handle); + + void _untrackHeartbeatHandle(const ReplicationExecutor::CallbackHandle& handle); // Handles to actively queued heartbeats. typedef std::vector<ReplicationExecutor::CallbackHandle> HeartbeatHandles; diff --git a/src/mongo/db/repl/replication_executor.cpp b/src/mongo/db/repl/replication_executor.cpp index 541d50013e4..135ae2ad49e 100644 --- a/src/mongo/db/repl/replication_executor.cpp +++ b/src/mongo/db/repl/replication_executor.cpp @@ -48,7 +48,8 @@ namespace { ReplicationExecutor::ReplicationExecutor(NetworkInterface* netInterface) : _networkInterface(netInterface), _totalEventWaiters(0), - _inShutdown(false) { + _inShutdown(false), + _nextId(0) { } ReplicationExecutor::~ReplicationExecutor() {} @@ -107,7 +108,7 @@ namespace { while (!_unsignaledEvents.empty()) { EventList::iterator event = _unsignaledEvents.begin(); invariant(event->waiters.empty()); - signalEvent_inlock(EventHandle(event)); + signalEvent_inlock(EventHandle(event, ++_nextId)); } while (_totalEventWaiters > 0) @@ -140,7 +141,7 @@ namespace { iter->generation++; iter->isSignaled = false; _unsignaledEvents.splice(_unsignaledEvents.end(), _signaledEvents, iter); - return StatusWith<EventHandle>(EventHandle(iter)); + return StatusWith<EventHandle>(EventHandle(iter, ++_nextId)); } void ReplicationExecutor::signalEvent(const EventHandle& event) { @@ -421,9 +422,10 @@ namespace { return StatusWith<CallbackHandle>(CallbackHandle(iter)); } - ReplicationExecutor::EventHandle::EventHandle(const EventList::iterator& iter) : + ReplicationExecutor::EventHandle::EventHandle(const EventList::iterator& iter, uint64_t id) : _iter(iter), - _generation(iter->generation) { + _generation(iter->generation), + _id(id) { } ReplicationExecutor::CallbackHandle::CallbackHandle(const WorkQueue::iterator& iter) : diff --git a/src/mongo/db/repl/replication_executor.h b/src/mongo/db/repl/replication_executor.h index 41ea9a2c158..d743748e966 100644 --- a/src/mongo/db/repl/replication_executor.h +++ b/src/mongo/db/repl/replication_executor.h @@ -360,6 +360,7 @@ namespace repl { int64_t _totalEventWaiters; bool _inShutdown; threadpool::ThreadPool _networkWorkers; + uint64_t _nextId; }; /** @@ -368,18 +369,27 @@ namespace repl { class ReplicationExecutor::EventHandle { friend class ReplicationExecutor; public: - EventHandle() : _generation(0) {} + EventHandle() : _generation(0), _id(0) {} /** * Returns true if the handle is valid, meaning that it identifies */ - bool isValid() const { return _generation != 0; } + bool isValid() const { return _id != 0; } + + bool operator==(const EventHandle &other) const { + return (_id == other._id); + } + + bool operator!=(const EventHandle &other) const { + return !(*this == other); + } private: - explicit EventHandle(const EventList::iterator& iter); + EventHandle(const EventList::iterator& iter, const uint64_t id); EventList::iterator _iter; uint64_t _generation; + uint64_t _id; }; /** @@ -389,6 +399,15 @@ namespace repl { friend class ReplicationExecutor; public: CallbackHandle() : _generation(0) {} + + bool operator==(const CallbackHandle &other) const { + return (_finishedEvent == other._finishedEvent); + } + + bool operator!=(const CallbackHandle &other) const { + return !(*this == other); + } + private: explicit CallbackHandle(const WorkQueue::iterator& iter); diff --git a/src/mongo/db/repl/replication_executor_test.cpp b/src/mongo/db/repl/replication_executor_test.cpp index 6eb46063390..5e16346f7de 100644 --- a/src/mongo/db/repl/replication_executor_test.cpp +++ b/src/mongo/db/repl/replication_executor_test.cpp @@ -393,6 +393,59 @@ namespace { ASSERT_EQUALS(ErrorCodes::ExceededTimeLimit, status); } + TEST(ReplicationExecutor, CallbackHandleComparison) { + NetworkInterfaceMock* net = new NetworkInterfaceMock; + net->simulatedNetworkLatency(5); + ReplicationExecutor executor(net); + Status status(ErrorCodes::InternalError, ""); + const ReplicationExecutor::RemoteCommandRequest request( + HostAndPort("lazy", 27017), + "admin", + BSON("cmd" << 1)); + ReplicationExecutor::CallbackHandle cbHandle1 = unittest::assertGet( + executor.scheduleRemoteCommand( + request, + stdx::bind(setStatusOnRemoteCommandCompletion, + stdx::placeholders::_1, + request, + &status))); + ReplicationExecutor::CallbackHandle cbHandle2 = unittest::assertGet( + executor.scheduleRemoteCommand( + request, + stdx::bind(setStatusOnRemoteCommandCompletion, + stdx::placeholders::_1, + request, + &status))); + + // test equality + ASSERT_TRUE(cbHandle1 == cbHandle1); + ASSERT_TRUE(cbHandle2 == cbHandle2); + ASSERT_FALSE(cbHandle1 != cbHandle1); + ASSERT_FALSE(cbHandle2 != cbHandle2); + + // test inequality + ASSERT_TRUE(cbHandle1 != cbHandle2); + ASSERT_TRUE(cbHandle2 != cbHandle1); + ASSERT_FALSE(cbHandle1 == cbHandle2); + ASSERT_FALSE(cbHandle2 == cbHandle1); + + ReplicationExecutor::CallbackHandle cbHandle1Copy = cbHandle1; + ASSERT_TRUE(cbHandle1 == cbHandle1Copy); + ASSERT_TRUE(cbHandle1Copy == cbHandle1); + ASSERT_FALSE(cbHandle1Copy != cbHandle1); + ASSERT_FALSE(cbHandle1 != cbHandle1Copy); + + std::vector<ReplicationExecutor::CallbackHandle> cbs; + cbs.push_back(cbHandle1); + cbs.push_back(cbHandle2); + ASSERT(cbHandle1 != cbHandle2); + std::vector<ReplicationExecutor::CallbackHandle>::iterator foundHandle = + std::find(cbs.begin(), + cbs.end(), + cbHandle1); + ASSERT_TRUE(cbs.end() != foundHandle); + ASSERT_TRUE(cbHandle1 == *foundHandle); + } } // namespace } // namespace repl } // namespace mongo |