diff options
author | Alan Conway <aconway@apache.org> | 2008-12-08 14:57:05 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-12-08 14:57:05 +0000 |
commit | 829e6ed873f8870cbb418cdd4209b4ae5deb264d (patch) | |
tree | f85d03e53072f9e4a0d294f219e7a7a473e176cf /cpp | |
parent | b0149eb99d4157a011a1ea57d74164f2cafc9ce9 (diff) | |
download | qpid-python-829e6ed873f8870cbb418cdd4209b4ae5deb264d.tar.gz |
Cluster: separated connection map lock to reduce contention.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@724371 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/cluster.mk | 1 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 56 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 7 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ConnectionMap.h | 90 |
4 files changed, 109 insertions, 45 deletions
diff --git a/cpp/src/cluster.mk b/cpp/src/cluster.mk index 3deb390649..fe2342a416 100644 --- a/cpp/src/cluster.mk +++ b/cpp/src/cluster.mk @@ -31,6 +31,7 @@ cluster_la_SOURCES = \ qpid/cluster/ConnectionCodec.cpp \ qpid/cluster/Connection.h \ qpid/cluster/Connection.cpp \ + qpid/cluster/ConnectionMap.h \ qpid/cluster/NoOpConnectionOutputHandler.h \ qpid/cluster/WriteEstimate.h \ qpid/cluster/WriteEstimate.cpp \ diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 6cad003605..15332c2cac 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -128,15 +128,11 @@ Cluster::~Cluster() { if (dumpThread.id()) dumpThread.join(); // Join the previous dumpthread. } -bool Cluster::insert(const boost::intrusive_ptr<Connection>& c) { - Lock l(lock); - bool result = connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c)).second; - assert(result); - return result; +void Cluster::insert(const boost::intrusive_ptr<Connection>& c) { + connections.insert(c->getId(), c); } void Cluster::erase(ConnectionId id) { - Lock l(lock); connections.erase(id); } @@ -226,29 +222,15 @@ void Cluster::leave(Lock&) { } } -boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& connectionId, Lock&) { - ConnectionMap::iterator i = connections.find(connectionId); - if (i == connections.end()) { - if (connectionId.getMember() == myId) { // Closed local connection - QPID_LOG(debug, *this << " activity on closed connection: " << connectionId); - return boost::intrusive_ptr<Connection>(); - } - else { // New shadow connection - std::ostringstream mgmtId; - mgmtId << name << ":" << connectionId; - ConnectionMap::value_type value(connectionId, - new Connection(*this, shadowOut, mgmtId.str(), connectionId)); - i = connections.insert(value).first; - } +boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& connectionId) { + boost::intrusive_ptr<Connection> cp = connections.find(connectionId); + if (!cp && connectionId.getMember() != myId) { // New shadow connection + std::ostringstream mgmtId; + mgmtId << name << ":" << connectionId; + cp = new Connection(*this, shadowOut, mgmtId.str(), connectionId); + connections.insert(connectionId, cp); } - return i->second; -} - -Cluster::Connections Cluster::getConnections(Lock&) { - Connections result(connections.size()); - std::transform(connections.begin(), connections.end(), result.begin(), - boost::bind(&ConnectionMap::value_type::second, _1)); - return result; + return cp; } void Cluster::deliver( @@ -299,7 +281,7 @@ void Cluster::delivered(const Event& e, Lock& l) { QPID_LOG(trace, *this << " DROP: " << e); } else { - boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId(), l); + boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId()); if (!connection) return; if (e.getType() == CONTROL) { while (frame.decode(buf)) { @@ -479,7 +461,7 @@ void Cluster::dumpOffer(const MemberId& dumper, uint64_t dumpeeInt, const Uuid& // FIXME aconway 2008-10-15: no longer need a separate control now // that the dump control is in the deliver queue. -void Cluster::dumpStart(const MemberId& , uint64_t dumpeeInt, const std::string& urlStr, Lock& l) { +void Cluster::dumpStart(const MemberId& , uint64_t dumpeeInt, const std::string& urlStr, Lock&) { if (state == LEFT) return; MemberId dumpee(dumpeeInt); Url url(urlStr); @@ -489,7 +471,7 @@ void Cluster::dumpStart(const MemberId& , uint64_t dumpeeInt, const std::string& deliverQueue.stop(); if (dumpThread.id()) dumpThread.join(); // Join the previous dumpthread. dumpThread = Thread( - new DumpClient(myId, dumpee, url, broker, map, getConnections(l), + new DumpClient(myId, dumpee, url, broker, map, connections.values(), boost::bind(&Cluster::dumpOutDone, this), boost::bind(&Cluster::dumpOutError, this, _1))); } @@ -587,16 +569,8 @@ void Cluster::memberUpdate(Lock& l) { mgmtObject->set_members(urlstr); } - //close connections belonging to members that have now been excluded - for (ConnectionMap::iterator i = connections.begin(); i != connections.end();) { - MemberId member = i->first.getMember(); - if (member != myId && !map.isMember(member)) { - i->second->left(); - connections.erase(i++); - } else { - i++; - } - } + // Close connections belonging to members that have now been excluded + connections.update(myId, map); } std::ostream& operator<<(std::ostream& o, const Cluster& cluster) { diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 94f0c6a95f..14df4db905 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -23,6 +23,7 @@ #include "Event.h" #include "NoOpConnectionOutputHandler.h" #include "ClusterMap.h" +#include "ConnectionMap.h" #include "FailoverExchange.h" #include "Quorum.h" @@ -72,7 +73,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { virtual ~Cluster(); // Connection map - bool insert(const ConnectionPtr&); + void insert(const ConnectionPtr&); void erase(ConnectionId); // Send to the cluster @@ -101,7 +102,6 @@ class Cluster : private Cpg::Handler, public management::Manageable { typedef sys::LockPtr<const Cluster,sys::Monitor> ConstLockPtr; typedef sys::Monitor::ScopedLock Lock; - typedef std::map<ConnectionId, boost::intrusive_ptr<cluster::Connection> > ConnectionMap; typedef sys::PollableQueue<Event> PollableEventQueue; typedef std::deque<Event> PlainEventQueue; @@ -160,8 +160,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { struct cpg_address */*joined*/, int /*nJoined*/ ); - boost::intrusive_ptr<cluster::Connection> getConnection(const ConnectionId&, Lock&); - Connections getConnections(Lock&); + boost::intrusive_ptr<cluster::Connection> getConnection(const ConnectionId&); virtual qpid::management::ManagementObject* GetManagementObject() const; virtual management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text); diff --git a/cpp/src/qpid/cluster/ConnectionMap.h b/cpp/src/qpid/cluster/ConnectionMap.h new file mode 100644 index 0000000000..f1862e2e75 --- /dev/null +++ b/cpp/src/qpid/cluster/ConnectionMap.h @@ -0,0 +1,90 @@ +#ifndef QPID_CLUSTER_CONNECTIONMAP_H +#define QPID_CLUSTER_CONNECTIONMAP_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "types.h" +#include "Connection.h" +#include "ClusterMap.h" +#include "qpid/sys/Mutex.h" +#include <boost/intrusive_ptr.hpp> +#include <map> + +namespace qpid { +namespace cluster { + +/** + * Thread safe map of connections. + */ +class ConnectionMap +{ + public: + typedef boost::intrusive_ptr<cluster::Connection> ConnectionPtr; + typedef std::vector<ConnectionPtr> Vector; + + void insert(ConnectionId id, ConnectionPtr p) { + ScopedLock l(lock); + map.insert(Map::value_type(id,p)); + } + + void erase(ConnectionId id) { + ScopedLock l(lock); + map.erase(id); + } + + ConnectionPtr find(ConnectionId id) const { + ScopedLock l(lock); + Map::const_iterator i = map.find(id); + return i == map.end() ? ConnectionPtr() : i->second; + } + + Vector values() const { + Vector result(map.size()); + std::transform(map.begin(), map.end(), result.begin(), + boost::bind(&Map::value_type::second, _1)); + return result; + } + + void update(MemberId myId, const ClusterMap& cluster) { + for (Map::iterator i = map.begin(); i != map.end(); ) { + MemberId member = i->first.getMember(); + if (member != myId && !cluster.isMember(member)) { + i->second->left(); + map.erase(i++); + } else { + i++; + } + } + } + + size_t size() const { return map.size(); } + private: + typedef std::map<ConnectionId, ConnectionPtr> Map; + typedef sys::Mutex::ScopedLock ScopedLock; + + mutable sys::Mutex lock; + Map map; +}; + + +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_CONNECTIONMAP_H*/ |