diff options
Diffstat (limited to 'cpp/src/qpid/cluster/ConnectionMap.h')
-rw-r--r-- | cpp/src/qpid/cluster/ConnectionMap.h | 70 |
1 files changed, 29 insertions, 41 deletions
diff --git a/cpp/src/qpid/cluster/ConnectionMap.h b/cpp/src/qpid/cluster/ConnectionMap.h index c355074e75..23084796cf 100644 --- a/cpp/src/qpid/cluster/ConnectionMap.h +++ b/cpp/src/qpid/cluster/ConnectionMap.h @@ -24,6 +24,7 @@ #include "types.h" #include "Connection.h" #include "ClusterMap.h" +#include "NoOpConnectionOutputHandler.h" #include "qpid/sys/Mutex.h" #include <boost/intrusive_ptr.hpp> #include <map> @@ -31,61 +32,48 @@ namespace qpid { namespace cluster { +class Cluster; + /** - * Thread safe map of connections. + * Thread safe map of connections. The map is used in: + * - deliver thread to look connections and create new shadow connections. + * - local catch-up connection threads to add a caught-up shadow connections. + * - local client connection threads when local connections are created. */ -class ConnectionMap -{ +class ConnectionMap { public: typedef boost::intrusive_ptr<cluster::Connection> ConnectionPtr; typedef std::vector<ConnectionPtr> Vector; - void insert(ConnectionId id, ConnectionPtr p) { - ScopedLock l(lock); - map.insert(Map::value_type(id,p)); - } + ConnectionMap(Cluster& c) : cluster(c) {} + + /** Insert a local connection or a caught up shadow connection. + * Called in local connection thread. + */ + void insert(ConnectionPtr p); + + /** Erase a closed connection. Called in deliver thread. */ + void erase(const ConnectionId& id); - void erase(ConnectionId id) { - ScopedLock l(lock); - map.erase(id); - } + /** Get an existing connection. */ + ConnectionPtr get(const ConnectionId& id); - ConnectionPtr find(ConnectionId id) const { - ScopedLock l(lock); - Map::const_iterator i = map.find(id); - return i == map.end() ? ConnectionPtr() : i->second; - } + /** Get connections for sending an update. */ + Vector values() const; - Vector values() const { - Vector result(map.size()); - std::transform(map.begin(), map.end(), result.begin(), - boost::bind(&Map::value_type::second, _1)); - return result; - } + /** Remove connections who's members are no longer in the cluster. Deliver thread. */ + void update(MemberId myId, const ClusterMap& cluster); - void update(MemberId myId, const ClusterMap& cluster) { - for (Map::iterator i = map.begin(); i != map.end(); ) { - MemberId member = i->first.getMember(); - if (member != myId && !cluster.isMember(member)) { - i->second->left(); - map.erase(i++); - } else { - i++; - } - } - } + + void clear(); - void clear() { - ScopedLock l(lock); - map.clear(); - } + size_t size() const; - size_t size() const { return map.size(); } private: typedef std::map<ConnectionId, ConnectionPtr> Map; - typedef sys::Mutex::ScopedLock ScopedLock; - - mutable sys::Mutex lock; + + Cluster& cluster; + NoOpConnectionOutputHandler shadowOut; Map map; }; |