summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-05-27 16:39:15 +0000
committerAlan Conway <aconway@apache.org>2009-05-27 16:39:15 +0000
commitb6780b5b42b168c62a3380e5fc15cc39fc374615 (patch)
tree1ba66ece932dbbe371a3e840734d8cb218ec7f99 /cpp
parentba7280d1f6dc6a59021bfd86f20aa4382c3977a7 (diff)
downloadqpid-python-b6780b5b42b168c62a3380e5fc15cc39fc374615.tar.gz
Added missing locks in cluster code.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@779235 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp13
-rw-r--r--cpp/src/qpid/cluster/Cluster.h1
-rw-r--r--cpp/src/qpid/cluster/Decoder.cpp5
-rw-r--r--cpp/src/qpid/cluster/Decoder.h2
4 files changed, 17 insertions, 4 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 082e5488e8..37562ce46c 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -238,8 +238,13 @@ void Cluster::addShadowConnection(const boost::intrusive_ptr<Connection>& c) {
connections.insert(ConnectionMap::value_type(c->getId(), c));
}
-// Called by Connection::deliverClose() in deliverFrameQueue thread.
void Cluster::erase(const ConnectionId& id) {
+ Lock l(lock);
+ erase(id,l);
+}
+
+// Called by Connection::deliverClose() in deliverFrameQueue thread.
+void Cluster::erase(const ConnectionId& id, Lock&) {
connections.erase(id);
decoder.erase(id);
}
@@ -702,8 +707,10 @@ void Cluster::memberUpdate(Lock& l) {
while (i != connections.end()) {
ConnectionMap::iterator j = i++;
MemberId m = j->second->getId().getMember();
- if (m != self && !map.isMember(m))
- j->second->deliverClose();
+ if (m != self && !map.isMember(m)) {
+ j->second->getBrokerConnection().closed();
+ erase(j->second->getId(), l);
+ }
}
}
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index bd401f3715..b857c8a913 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -155,6 +155,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void setReady(Lock&);
void memberUpdate(Lock&);
void setClusterId(const framing::Uuid&, Lock&);
+ void erase(const ConnectionId&, Lock&);
// == Called in CPG dispatch thread
void deliver( // CPG deliver callback.
diff --git a/cpp/src/qpid/cluster/Decoder.cpp b/cpp/src/qpid/cluster/Decoder.cpp
index 6f65b3852a..a57edb3b7f 100644
--- a/cpp/src/qpid/cluster/Decoder.cpp
+++ b/cpp/src/qpid/cluster/Decoder.cpp
@@ -57,4 +57,9 @@ void Decoder::erase(const ConnectionId& c) {
map.erase(c);
}
+framing::FrameDecoder& Decoder::get(const ConnectionId& c) {
+ sys::Mutex::ScopedLock l(lock);
+ return map[c];
+}
+
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Decoder.h b/cpp/src/qpid/cluster/Decoder.h
index 3fb7b4f73d..6bcd877a75 100644
--- a/cpp/src/qpid/cluster/Decoder.h
+++ b/cpp/src/qpid/cluster/Decoder.h
@@ -45,7 +45,7 @@ class Decoder
Decoder(FrameHandler fh) : callback(fh) {}
void decode(const EventHeader& eh, const char* data);
void erase(const ConnectionId&);
- framing::FrameDecoder& get(const ConnectionId& c) { return map[c]; }
+ framing::FrameDecoder& get(const ConnectionId& c);
private:
typedef std::map<ConnectionId, framing::FrameDecoder> Map;