summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/ConnectionMap.h
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/ConnectionMap.h')
-rw-r--r--cpp/src/qpid/cluster/ConnectionMap.h70
1 files changed, 29 insertions, 41 deletions
diff --git a/cpp/src/qpid/cluster/ConnectionMap.h b/cpp/src/qpid/cluster/ConnectionMap.h
index c355074e75..23084796cf 100644
--- a/cpp/src/qpid/cluster/ConnectionMap.h
+++ b/cpp/src/qpid/cluster/ConnectionMap.h
@@ -24,6 +24,7 @@
#include "types.h"
#include "Connection.h"
#include "ClusterMap.h"
+#include "NoOpConnectionOutputHandler.h"
#include "qpid/sys/Mutex.h"
#include <boost/intrusive_ptr.hpp>
#include <map>
@@ -31,61 +32,48 @@
namespace qpid {
namespace cluster {
+class Cluster;
+
/**
- * Thread safe map of connections.
+ * Thread safe map of connections. The map is used in:
+ * - deliver thread to look connections and create new shadow connections.
+ * - local catch-up connection threads to add a caught-up shadow connections.
+ * - local client connection threads when local connections are created.
*/
-class ConnectionMap
-{
+class ConnectionMap {
public:
typedef boost::intrusive_ptr<cluster::Connection> ConnectionPtr;
typedef std::vector<ConnectionPtr> Vector;
- void insert(ConnectionId id, ConnectionPtr p) {
- ScopedLock l(lock);
- map.insert(Map::value_type(id,p));
- }
+ ConnectionMap(Cluster& c) : cluster(c) {}
+
+ /** Insert a local connection or a caught up shadow connection.
+ * Called in local connection thread.
+ */
+ void insert(ConnectionPtr p);
+
+ /** Erase a closed connection. Called in deliver thread. */
+ void erase(const ConnectionId& id);
- void erase(ConnectionId id) {
- ScopedLock l(lock);
- map.erase(id);
- }
+ /** Get an existing connection. */
+ ConnectionPtr get(const ConnectionId& id);
- ConnectionPtr find(ConnectionId id) const {
- ScopedLock l(lock);
- Map::const_iterator i = map.find(id);
- return i == map.end() ? ConnectionPtr() : i->second;
- }
+ /** Get connections for sending an update. */
+ Vector values() const;
- Vector values() const {
- Vector result(map.size());
- std::transform(map.begin(), map.end(), result.begin(),
- boost::bind(&Map::value_type::second, _1));
- return result;
- }
+ /** Remove connections who's members are no longer in the cluster. Deliver thread. */
+ void update(MemberId myId, const ClusterMap& cluster);
- void update(MemberId myId, const ClusterMap& cluster) {
- for (Map::iterator i = map.begin(); i != map.end(); ) {
- MemberId member = i->first.getMember();
- if (member != myId && !cluster.isMember(member)) {
- i->second->left();
- map.erase(i++);
- } else {
- i++;
- }
- }
- }
+
+ void clear();
- void clear() {
- ScopedLock l(lock);
- map.clear();
- }
+ size_t size() const;
- size_t size() const { return map.size(); }
private:
typedef std::map<ConnectionId, ConnectionPtr> Map;
- typedef sys::Mutex::ScopedLock ScopedLock;
-
- mutable sys::Mutex lock;
+
+ Cluster& cluster;
+ NoOpConnectionOutputHandler shadowOut;
Map map;
};