summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp49
1 files changed, 28 insertions, 21 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index e64692bc91..f4d75b7b6b 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -123,6 +123,8 @@ Cluster::~Cluster() {
void Cluster::insert(const boost::intrusive_ptr<Connection>& c) {
Lock l(lock);
+ // FIXME aconway 2008-10-08: what keeps catchUp connections in memory if not in map?
+ // esp shadow connections? See race comment in getConnection.
assert(!c->isCatchUp());
connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c));
}
@@ -204,15 +206,18 @@ void Cluster::leave(Lock&) {
}
boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& connectionId, Lock&) {
- if (connectionId.getMember() == memberId)
- return boost::intrusive_ptr<Connection>(connectionId.getPointer());
ConnectionMap::iterator i = connections.find(connectionId);
- if (i == connections.end()) { // New shadow connection.
- assert(connectionId.getMember() != memberId);
- std::ostringstream mgmtId;
- mgmtId << name.str() << ":" << connectionId;
- ConnectionMap::value_type value(connectionId, new Connection(*this, shadowOut, mgmtId.str(), connectionId));
- i = connections.insert(value).first;
+ if (i == connections.end()) {
+ if (connectionId.getMember() == memberId) { // Closed local connection
+ QPID_LOG(warning, *this << " attempt to use closed connection " << connectionId);
+ return boost::intrusive_ptr<Connection>();
+ }
+ else { // New shadow connection
+ std::ostringstream mgmtId;
+ mgmtId << name.str() << ":" << connectionId;
+ ConnectionMap::value_type value(connectionId, new Connection(*this, shadowOut, mgmtId.str(), connectionId));
+ i = connections.insert(value).first;
+ }
}
return i->second;
}
@@ -261,15 +266,17 @@ void Cluster::process(const Event& e, Lock& l) {
}
}
else { // e.isConnection()
- boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId(), l);
- if (e.getType() == DATA) {
- QPID_LOG(trace, *this << " PROC: " << e);
- connection->deliverBuffer(buf);
- }
- else { // control
- while (frame.decode(buf)) {
- QPID_LOG(trace, *this << " PROC: " << e << " " << frame);
- connection->delivered(frame);
+ boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId(), l);
+ if (connection) { // Ignore if no connection.
+ if (e.getType() == DATA) {
+ QPID_LOG(trace, *this << " PROC: " << e);
+ connection->deliverBuffer(buf);
+ }
+ else { // control
+ while (frame.decode(buf)) {
+ QPID_LOG(trace, *this << " PROC: " << e << " " << frame);
+ connection->delivered(frame);
+ }
}
}
}
@@ -333,7 +340,7 @@ void Cluster::configChange (
Mutex::ScopedLock l(lock);
QPID_LOG(debug, *this << " configuration change: " << AddrList(current, nCurrent)
<< AddrList(left, nLeft, "( ", ")"));
- bool changed = map.configChange(current, nCurrent, left, nLeft, joined, nJoined);
+ map.configChange(current, nCurrent, left, nLeft, joined, nJoined);
if (state == LEFT) return;
if (!map.isAlive(memberId)) { leave(l); return; }
@@ -350,7 +357,7 @@ void Cluster::configChange (
QPID_LOG(debug, *this << " send dump-request " << myUrl);
}
}
- else if (state >= READY && changed)
+ else if (state >= READY)
memberUpdate(l);
}
@@ -408,8 +415,8 @@ void Cluster::dumpRequest(const MemberId& id, const std::string& url, Lock& l) {
}
void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) {
- if (map.ready(id, Url(url)))
- memberUpdate(l);
+ map.ready(id, Url(url));
+ memberUpdate(l);
}
void Cluster::dumpOffer(const MemberId& dumper, uint64_t dumpeeInt, Lock& l) {