summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-12-08 14:57:05 +0000
committerAlan Conway <aconway@apache.org>2008-12-08 14:57:05 +0000
commit829e6ed873f8870cbb418cdd4209b4ae5deb264d (patch)
treef85d03e53072f9e4a0d294f219e7a7a473e176cf /cpp
parentb0149eb99d4157a011a1ea57d74164f2cafc9ce9 (diff)
downloadqpid-python-829e6ed873f8870cbb418cdd4209b4ae5deb264d.tar.gz
Cluster: separated connection map lock to reduce contention.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@724371 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/cluster.mk1
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp56
-rw-r--r--cpp/src/qpid/cluster/Cluster.h7
-rw-r--r--cpp/src/qpid/cluster/ConnectionMap.h90
4 files changed, 109 insertions, 45 deletions
diff --git a/cpp/src/cluster.mk b/cpp/src/cluster.mk
index 3deb390649..fe2342a416 100644
--- a/cpp/src/cluster.mk
+++ b/cpp/src/cluster.mk
@@ -31,6 +31,7 @@ cluster_la_SOURCES = \
qpid/cluster/ConnectionCodec.cpp \
qpid/cluster/Connection.h \
qpid/cluster/Connection.cpp \
+ qpid/cluster/ConnectionMap.h \
qpid/cluster/NoOpConnectionOutputHandler.h \
qpid/cluster/WriteEstimate.h \
qpid/cluster/WriteEstimate.cpp \
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 6cad003605..15332c2cac 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -128,15 +128,11 @@ Cluster::~Cluster() {
if (dumpThread.id()) dumpThread.join(); // Join the previous dumpthread.
}
-bool Cluster::insert(const boost::intrusive_ptr<Connection>& c) {
- Lock l(lock);
- bool result = connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c)).second;
- assert(result);
- return result;
+void Cluster::insert(const boost::intrusive_ptr<Connection>& c) {
+ connections.insert(c->getId(), c);
}
void Cluster::erase(ConnectionId id) {
- Lock l(lock);
connections.erase(id);
}
@@ -226,29 +222,15 @@ void Cluster::leave(Lock&) {
}
}
-boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& connectionId, Lock&) {
- ConnectionMap::iterator i = connections.find(connectionId);
- if (i == connections.end()) {
- if (connectionId.getMember() == myId) { // Closed local connection
- QPID_LOG(debug, *this << " activity on closed connection: " << connectionId);
- return boost::intrusive_ptr<Connection>();
- }
- else { // New shadow connection
- std::ostringstream mgmtId;
- mgmtId << name << ":" << connectionId;
- ConnectionMap::value_type value(connectionId,
- new Connection(*this, shadowOut, mgmtId.str(), connectionId));
- i = connections.insert(value).first;
- }
+boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& connectionId) {
+ boost::intrusive_ptr<Connection> cp = connections.find(connectionId);
+ if (!cp && connectionId.getMember() != myId) { // New shadow connection
+ std::ostringstream mgmtId;
+ mgmtId << name << ":" << connectionId;
+ cp = new Connection(*this, shadowOut, mgmtId.str(), connectionId);
+ connections.insert(connectionId, cp);
}
- return i->second;
-}
-
-Cluster::Connections Cluster::getConnections(Lock&) {
- Connections result(connections.size());
- std::transform(connections.begin(), connections.end(), result.begin(),
- boost::bind(&ConnectionMap::value_type::second, _1));
- return result;
+ return cp;
}
void Cluster::deliver(
@@ -299,7 +281,7 @@ void Cluster::delivered(const Event& e, Lock& l) {
QPID_LOG(trace, *this << " DROP: " << e);
}
else {
- boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId(), l);
+ boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId());
if (!connection) return;
if (e.getType() == CONTROL) {
while (frame.decode(buf)) {
@@ -479,7 +461,7 @@ void Cluster::dumpOffer(const MemberId& dumper, uint64_t dumpeeInt, const Uuid&
// FIXME aconway 2008-10-15: no longer need a separate control now
// that the dump control is in the deliver queue.
-void Cluster::dumpStart(const MemberId& , uint64_t dumpeeInt, const std::string& urlStr, Lock& l) {
+void Cluster::dumpStart(const MemberId& , uint64_t dumpeeInt, const std::string& urlStr, Lock&) {
if (state == LEFT) return;
MemberId dumpee(dumpeeInt);
Url url(urlStr);
@@ -489,7 +471,7 @@ void Cluster::dumpStart(const MemberId& , uint64_t dumpeeInt, const std::string&
deliverQueue.stop();
if (dumpThread.id()) dumpThread.join(); // Join the previous dumpthread.
dumpThread = Thread(
- new DumpClient(myId, dumpee, url, broker, map, getConnections(l),
+ new DumpClient(myId, dumpee, url, broker, map, connections.values(),
boost::bind(&Cluster::dumpOutDone, this),
boost::bind(&Cluster::dumpOutError, this, _1)));
}
@@ -587,16 +569,8 @@ void Cluster::memberUpdate(Lock& l) {
mgmtObject->set_members(urlstr);
}
- //close connections belonging to members that have now been excluded
- for (ConnectionMap::iterator i = connections.begin(); i != connections.end();) {
- MemberId member = i->first.getMember();
- if (member != myId && !map.isMember(member)) {
- i->second->left();
- connections.erase(i++);
- } else {
- i++;
- }
- }
+ // Close connections belonging to members that have now been excluded
+ connections.update(myId, map);
}
std::ostream& operator<<(std::ostream& o, const Cluster& cluster) {
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index 94f0c6a95f..14df4db905 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -23,6 +23,7 @@
#include "Event.h"
#include "NoOpConnectionOutputHandler.h"
#include "ClusterMap.h"
+#include "ConnectionMap.h"
#include "FailoverExchange.h"
#include "Quorum.h"
@@ -72,7 +73,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
virtual ~Cluster();
// Connection map
- bool insert(const ConnectionPtr&);
+ void insert(const ConnectionPtr&);
void erase(ConnectionId);
// Send to the cluster
@@ -101,7 +102,6 @@ class Cluster : private Cpg::Handler, public management::Manageable {
typedef sys::LockPtr<const Cluster,sys::Monitor> ConstLockPtr;
typedef sys::Monitor::ScopedLock Lock;
- typedef std::map<ConnectionId, boost::intrusive_ptr<cluster::Connection> > ConnectionMap;
typedef sys::PollableQueue<Event> PollableEventQueue;
typedef std::deque<Event> PlainEventQueue;
@@ -160,8 +160,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
struct cpg_address */*joined*/, int /*nJoined*/
);
- boost::intrusive_ptr<cluster::Connection> getConnection(const ConnectionId&, Lock&);
- Connections getConnections(Lock&);
+ boost::intrusive_ptr<cluster::Connection> getConnection(const ConnectionId&);
virtual qpid::management::ManagementObject* GetManagementObject() const;
virtual management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
diff --git a/cpp/src/qpid/cluster/ConnectionMap.h b/cpp/src/qpid/cluster/ConnectionMap.h
new file mode 100644
index 0000000000..f1862e2e75
--- /dev/null
+++ b/cpp/src/qpid/cluster/ConnectionMap.h
@@ -0,0 +1,90 @@
+#ifndef QPID_CLUSTER_CONNECTIONMAP_H
+#define QPID_CLUSTER_CONNECTIONMAP_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "types.h"
+#include "Connection.h"
+#include "ClusterMap.h"
+#include "qpid/sys/Mutex.h"
+#include <boost/intrusive_ptr.hpp>
+#include <map>
+
+namespace qpid {
+namespace cluster {
+
+/**
+ * Thread safe map of connections.
+ */
+class ConnectionMap
+{
+ public:
+ typedef boost::intrusive_ptr<cluster::Connection> ConnectionPtr;
+ typedef std::vector<ConnectionPtr> Vector;
+
+ void insert(ConnectionId id, ConnectionPtr p) {
+ ScopedLock l(lock);
+ map.insert(Map::value_type(id,p));
+ }
+
+ void erase(ConnectionId id) {
+ ScopedLock l(lock);
+ map.erase(id);
+ }
+
+ ConnectionPtr find(ConnectionId id) const {
+ ScopedLock l(lock);
+ Map::const_iterator i = map.find(id);
+ return i == map.end() ? ConnectionPtr() : i->second;
+ }
+
+ Vector values() const {
+ Vector result(map.size());
+ std::transform(map.begin(), map.end(), result.begin(),
+ boost::bind(&Map::value_type::second, _1));
+ return result;
+ }
+
+ void update(MemberId myId, const ClusterMap& cluster) {
+ for (Map::iterator i = map.begin(); i != map.end(); ) {
+ MemberId member = i->first.getMember();
+ if (member != myId && !cluster.isMember(member)) {
+ i->second->left();
+ map.erase(i++);
+ } else {
+ i++;
+ }
+ }
+ }
+
+ size_t size() const { return map.size(); }
+ private:
+ typedef std::map<ConnectionId, ConnectionPtr> Map;
+ typedef sys::Mutex::ScopedLock ScopedLock;
+
+ mutable sys::Mutex lock;
+ Map map;
+};
+
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_CONNECTIONMAP_H*/