diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 49 |
1 files changed, 28 insertions, 21 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index e64692bc91..f4d75b7b6b 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -123,6 +123,8 @@ Cluster::~Cluster() { void Cluster::insert(const boost::intrusive_ptr<Connection>& c) { Lock l(lock); + // FIXME aconway 2008-10-08: what keeps catchUp connections in memory if not in map? + // esp shadow connections? See race comment in getConnection. assert(!c->isCatchUp()); connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c)); } @@ -204,15 +206,18 @@ void Cluster::leave(Lock&) { } boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& connectionId, Lock&) { - if (connectionId.getMember() == memberId) - return boost::intrusive_ptr<Connection>(connectionId.getPointer()); ConnectionMap::iterator i = connections.find(connectionId); - if (i == connections.end()) { // New shadow connection. - assert(connectionId.getMember() != memberId); - std::ostringstream mgmtId; - mgmtId << name.str() << ":" << connectionId; - ConnectionMap::value_type value(connectionId, new Connection(*this, shadowOut, mgmtId.str(), connectionId)); - i = connections.insert(value).first; + if (i == connections.end()) { + if (connectionId.getMember() == memberId) { // Closed local connection + QPID_LOG(warning, *this << " attempt to use closed connection " << connectionId); + return boost::intrusive_ptr<Connection>(); + } + else { // New shadow connection + std::ostringstream mgmtId; + mgmtId << name.str() << ":" << connectionId; + ConnectionMap::value_type value(connectionId, new Connection(*this, shadowOut, mgmtId.str(), connectionId)); + i = connections.insert(value).first; + } } return i->second; } @@ -261,15 +266,17 @@ void Cluster::process(const Event& e, Lock& l) { } } else { // e.isConnection() - boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId(), l); - if (e.getType() == DATA) { - QPID_LOG(trace, *this << " PROC: " << e); - connection->deliverBuffer(buf); - } - else { // control - while (frame.decode(buf)) { - QPID_LOG(trace, *this << " PROC: " << e << " " << frame); - connection->delivered(frame); + boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId(), l); + if (connection) { // Ignore if no connection. + if (e.getType() == DATA) { + QPID_LOG(trace, *this << " PROC: " << e); + connection->deliverBuffer(buf); + } + else { // control + while (frame.decode(buf)) { + QPID_LOG(trace, *this << " PROC: " << e << " " << frame); + connection->delivered(frame); + } } } } @@ -333,7 +340,7 @@ void Cluster::configChange ( Mutex::ScopedLock l(lock); QPID_LOG(debug, *this << " configuration change: " << AddrList(current, nCurrent) << AddrList(left, nLeft, "( ", ")")); - bool changed = map.configChange(current, nCurrent, left, nLeft, joined, nJoined); + map.configChange(current, nCurrent, left, nLeft, joined, nJoined); if (state == LEFT) return; if (!map.isAlive(memberId)) { leave(l); return; } @@ -350,7 +357,7 @@ void Cluster::configChange ( QPID_LOG(debug, *this << " send dump-request " << myUrl); } } - else if (state >= READY && changed) + else if (state >= READY) memberUpdate(l); } @@ -408,8 +415,8 @@ void Cluster::dumpRequest(const MemberId& id, const std::string& url, Lock& l) { } void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) { - if (map.ready(id, Url(url))) - memberUpdate(l); + map.ready(id, Url(url)); + memberUpdate(l); } void Cluster::dumpOffer(const MemberId& dumper, uint64_t dumpeeInt, Lock& l) { |