summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorScott Hernandez <scotthernandez@gmail.com>2014-07-18 15:52:17 -0400
committerScott Hernandez <scotthernandez@gmail.com>2014-07-21 15:39:45 -0400
commitd8f96453ea6e86c9fbe020766edda85b44823201 (patch)
tree7ce61830c4fb38685d036b9000d10ef2b14eb8c4
parent17a35699a9d760965e22e1d3f7ffc813e4edb658 (diff)
downloadmongo-d8f96453ea6e86c9fbe020766edda85b44823201.tar.gz
SERVER-14560: Add cancelHeartbeats
-rw-r--r--src/mongo/db/repl/repl_coordinator_impl.cpp30
-rw-r--r--src/mongo/db/repl/repl_coordinator_impl.h22
-rw-r--r--src/mongo/db/repl/replication_executor.cpp12
-rw-r--r--src/mongo/db/repl/replication_executor.h25
-rw-r--r--src/mongo/db/repl/replication_executor_test.cpp53
5 files changed, 122 insertions, 20 deletions
diff --git a/src/mongo/db/repl/repl_coordinator_impl.cpp b/src/mongo/db/repl/repl_coordinator_impl.cpp
index 5b5c54c0492..3f38d0d381e 100644
--- a/src/mongo/db/repl/repl_coordinator_impl.cpp
+++ b/src/mongo/db/repl/repl_coordinator_impl.cpp
@@ -493,8 +493,36 @@ namespace repl {
// TODO
}
+ void ReplicationCoordinatorImpl::_trackHeartbeatHandle(
+ const ReplicationExecutor::CallbackHandle& handle) {
+ // this mutex should not be needed because it is always used during a callback.
+ // boost::mutex::scoped_lock lock(_mutex);
+ _heartbeatHandles.push_back(handle);
+ }
+
+ void ReplicationCoordinatorImpl::_untrackHeartbeatHandle(
+ const ReplicationExecutor::CallbackHandle& handle) {
+ // this mutex should not be needed because it is always used during a callback.
+ // boost::mutex::scoped_lock lock(_mutex);
+ HeartbeatHandles::iterator it = std::find(_heartbeatHandles.begin(),
+ _heartbeatHandles.end(),
+ handle);
+ invariant(it != _heartbeatHandles.end());
+
+ _heartbeatHandles.erase(it);
+
+ }
+
void ReplicationCoordinatorImpl::cancelHeartbeats() {
- // TODO
+ // this mutex should not be needed because it is always used during a callback.
+ //boost::mutex::scoped_lock lock(_mutex);
+ HeartbeatHandles::const_iterator it = _heartbeatHandles.begin();
+ const HeartbeatHandles::const_iterator end = _heartbeatHandles.end();
+ for( ; it != end; ++it ) {
+ _replExecutor->cancel(*it);
+ }
+
+ _heartbeatHandles.clear();
}
} // namespace repl
diff --git a/src/mongo/db/repl/repl_coordinator_impl.h b/src/mongo/db/repl/repl_coordinator_impl.h
index 7da40107c8b..dcf635edebc 100644
--- a/src/mongo/db/repl/repl_coordinator_impl.h
+++ b/src/mongo/db/repl/repl_coordinator_impl.h
@@ -169,6 +169,14 @@ namespace repl {
void doMemberHeartbeat(ReplicationExecutor* executor,
const Status& inStatus,
const HostAndPort& hap);
+
+ /**
+ * Cancels all heartbeats.
+ *
+ * This is only called during the callback when there is a new config.
+ * At this time no new heartbeats can be scheduled due to the serialization
+ * of calls via the executor.
+ */
void cancelHeartbeats();
private:
@@ -193,17 +201,9 @@ namespace repl {
Date_t firstCallDate,
int retriesLeft);
- void _trackHeartbeatHandle(const ReplicationExecutor::CallbackHandle& handle) {
- // this mutex should not be needed because it is always used during a callback.
- // boost::mutex::scoped_lock lock(_mutex);
- _heartbeatHandles.push_back(handle);
- }
-
- void _untrackHeartbeatHandle(const ReplicationExecutor::CallbackHandle& handle) {
- // this mutex should not be needed because it is always used during a callback.
- // boost::mutex::scoped_lock lock(_mutex);
- // TODO
- }
+ void _trackHeartbeatHandle(const ReplicationExecutor::CallbackHandle& handle);
+
+ void _untrackHeartbeatHandle(const ReplicationExecutor::CallbackHandle& handle);
// Handles to actively queued heartbeats.
typedef std::vector<ReplicationExecutor::CallbackHandle> HeartbeatHandles;
diff --git a/src/mongo/db/repl/replication_executor.cpp b/src/mongo/db/repl/replication_executor.cpp
index 541d50013e4..135ae2ad49e 100644
--- a/src/mongo/db/repl/replication_executor.cpp
+++ b/src/mongo/db/repl/replication_executor.cpp
@@ -48,7 +48,8 @@ namespace {
ReplicationExecutor::ReplicationExecutor(NetworkInterface* netInterface) :
_networkInterface(netInterface),
_totalEventWaiters(0),
- _inShutdown(false) {
+ _inShutdown(false),
+ _nextId(0) {
}
ReplicationExecutor::~ReplicationExecutor() {}
@@ -107,7 +108,7 @@ namespace {
while (!_unsignaledEvents.empty()) {
EventList::iterator event = _unsignaledEvents.begin();
invariant(event->waiters.empty());
- signalEvent_inlock(EventHandle(event));
+ signalEvent_inlock(EventHandle(event, ++_nextId));
}
while (_totalEventWaiters > 0)
@@ -140,7 +141,7 @@ namespace {
iter->generation++;
iter->isSignaled = false;
_unsignaledEvents.splice(_unsignaledEvents.end(), _signaledEvents, iter);
- return StatusWith<EventHandle>(EventHandle(iter));
+ return StatusWith<EventHandle>(EventHandle(iter, ++_nextId));
}
void ReplicationExecutor::signalEvent(const EventHandle& event) {
@@ -421,9 +422,10 @@ namespace {
return StatusWith<CallbackHandle>(CallbackHandle(iter));
}
- ReplicationExecutor::EventHandle::EventHandle(const EventList::iterator& iter) :
+ ReplicationExecutor::EventHandle::EventHandle(const EventList::iterator& iter, uint64_t id) :
_iter(iter),
- _generation(iter->generation) {
+ _generation(iter->generation),
+ _id(id) {
}
ReplicationExecutor::CallbackHandle::CallbackHandle(const WorkQueue::iterator& iter) :
diff --git a/src/mongo/db/repl/replication_executor.h b/src/mongo/db/repl/replication_executor.h
index 41ea9a2c158..d743748e966 100644
--- a/src/mongo/db/repl/replication_executor.h
+++ b/src/mongo/db/repl/replication_executor.h
@@ -360,6 +360,7 @@ namespace repl {
int64_t _totalEventWaiters;
bool _inShutdown;
threadpool::ThreadPool _networkWorkers;
+ uint64_t _nextId;
};
/**
@@ -368,18 +369,27 @@ namespace repl {
class ReplicationExecutor::EventHandle {
friend class ReplicationExecutor;
public:
- EventHandle() : _generation(0) {}
+ EventHandle() : _generation(0), _id(0) {}
/**
* Returns true if the handle is valid, meaning that it identifies
*/
- bool isValid() const { return _generation != 0; }
+ bool isValid() const { return _id != 0; }
+
+ bool operator==(const EventHandle &other) const {
+ return (_id == other._id);
+ }
+
+ bool operator!=(const EventHandle &other) const {
+ return !(*this == other);
+ }
private:
- explicit EventHandle(const EventList::iterator& iter);
+ EventHandle(const EventList::iterator& iter, const uint64_t id);
EventList::iterator _iter;
uint64_t _generation;
+ uint64_t _id;
};
/**
@@ -389,6 +399,15 @@ namespace repl {
friend class ReplicationExecutor;
public:
CallbackHandle() : _generation(0) {}
+
+ bool operator==(const CallbackHandle &other) const {
+ return (_finishedEvent == other._finishedEvent);
+ }
+
+ bool operator!=(const CallbackHandle &other) const {
+ return !(*this == other);
+ }
+
private:
explicit CallbackHandle(const WorkQueue::iterator& iter);
diff --git a/src/mongo/db/repl/replication_executor_test.cpp b/src/mongo/db/repl/replication_executor_test.cpp
index 6eb46063390..5e16346f7de 100644
--- a/src/mongo/db/repl/replication_executor_test.cpp
+++ b/src/mongo/db/repl/replication_executor_test.cpp
@@ -393,6 +393,59 @@ namespace {
ASSERT_EQUALS(ErrorCodes::ExceededTimeLimit, status);
}
+ TEST(ReplicationExecutor, CallbackHandleComparison) {
+ NetworkInterfaceMock* net = new NetworkInterfaceMock;
+ net->simulatedNetworkLatency(5);
+ ReplicationExecutor executor(net);
+ Status status(ErrorCodes::InternalError, "");
+ const ReplicationExecutor::RemoteCommandRequest request(
+ HostAndPort("lazy", 27017),
+ "admin",
+ BSON("cmd" << 1));
+ ReplicationExecutor::CallbackHandle cbHandle1 = unittest::assertGet(
+ executor.scheduleRemoteCommand(
+ request,
+ stdx::bind(setStatusOnRemoteCommandCompletion,
+ stdx::placeholders::_1,
+ request,
+ &status)));
+ ReplicationExecutor::CallbackHandle cbHandle2 = unittest::assertGet(
+ executor.scheduleRemoteCommand(
+ request,
+ stdx::bind(setStatusOnRemoteCommandCompletion,
+ stdx::placeholders::_1,
+ request,
+ &status)));
+
+ // test equality
+ ASSERT_TRUE(cbHandle1 == cbHandle1);
+ ASSERT_TRUE(cbHandle2 == cbHandle2);
+ ASSERT_FALSE(cbHandle1 != cbHandle1);
+ ASSERT_FALSE(cbHandle2 != cbHandle2);
+
+ // test inequality
+ ASSERT_TRUE(cbHandle1 != cbHandle2);
+ ASSERT_TRUE(cbHandle2 != cbHandle1);
+ ASSERT_FALSE(cbHandle1 == cbHandle2);
+ ASSERT_FALSE(cbHandle2 == cbHandle1);
+
+ ReplicationExecutor::CallbackHandle cbHandle1Copy = cbHandle1;
+ ASSERT_TRUE(cbHandle1 == cbHandle1Copy);
+ ASSERT_TRUE(cbHandle1Copy == cbHandle1);
+ ASSERT_FALSE(cbHandle1Copy != cbHandle1);
+ ASSERT_FALSE(cbHandle1 != cbHandle1Copy);
+
+ std::vector<ReplicationExecutor::CallbackHandle> cbs;
+ cbs.push_back(cbHandle1);
+ cbs.push_back(cbHandle2);
+ ASSERT(cbHandle1 != cbHandle2);
+ std::vector<ReplicationExecutor::CallbackHandle>::iterator foundHandle =
+ std::find(cbs.begin(),
+ cbs.end(),
+ cbHandle1);
+ ASSERT_TRUE(cbs.end() != foundHandle);
+ ASSERT_TRUE(cbHandle1 == *foundHandle);
+ }
} // namespace
} // namespace repl
} // namespace mongo