diff options
Diffstat (limited to 'cpp/src/qpid/cluster')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 49 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/DumpClient.cpp | 31 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/OutputInterceptor.cpp | 4 |
3 files changed, 41 insertions, 43 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index e64692bc91..f4d75b7b6b 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -123,6 +123,8 @@ Cluster::~Cluster() { void Cluster::insert(const boost::intrusive_ptr<Connection>& c) { Lock l(lock); + // FIXME aconway 2008-10-08: what keeps catchUp connections in memory if not in map? + // esp shadow connections? See race comment in getConnection. assert(!c->isCatchUp()); connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c)); } @@ -204,15 +206,18 @@ void Cluster::leave(Lock&) { } boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& connectionId, Lock&) { - if (connectionId.getMember() == memberId) - return boost::intrusive_ptr<Connection>(connectionId.getPointer()); ConnectionMap::iterator i = connections.find(connectionId); - if (i == connections.end()) { // New shadow connection. - assert(connectionId.getMember() != memberId); - std::ostringstream mgmtId; - mgmtId << name.str() << ":" << connectionId; - ConnectionMap::value_type value(connectionId, new Connection(*this, shadowOut, mgmtId.str(), connectionId)); - i = connections.insert(value).first; + if (i == connections.end()) { + if (connectionId.getMember() == memberId) { // Closed local connection + QPID_LOG(warning, *this << " attempt to use closed connection " << connectionId); + return boost::intrusive_ptr<Connection>(); + } + else { // New shadow connection + std::ostringstream mgmtId; + mgmtId << name.str() << ":" << connectionId; + ConnectionMap::value_type value(connectionId, new Connection(*this, shadowOut, mgmtId.str(), connectionId)); + i = connections.insert(value).first; + } } return i->second; } @@ -261,15 +266,17 @@ void Cluster::process(const Event& e, Lock& l) { } } else { // e.isConnection() - boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId(), l); - if (e.getType() == DATA) { - QPID_LOG(trace, *this << " PROC: " << e); - connection->deliverBuffer(buf); - } - else { // control - while (frame.decode(buf)) { - QPID_LOG(trace, *this << " PROC: " << e << " " << frame); - connection->delivered(frame); + boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId(), l); + if (connection) { // Ignore if no connection. + if (e.getType() == DATA) { + QPID_LOG(trace, *this << " PROC: " << e); + connection->deliverBuffer(buf); + } + else { // control + while (frame.decode(buf)) { + QPID_LOG(trace, *this << " PROC: " << e << " " << frame); + connection->delivered(frame); + } } } } @@ -333,7 +340,7 @@ void Cluster::configChange ( Mutex::ScopedLock l(lock); QPID_LOG(debug, *this << " configuration change: " << AddrList(current, nCurrent) << AddrList(left, nLeft, "( ", ")")); - bool changed = map.configChange(current, nCurrent, left, nLeft, joined, nJoined); + map.configChange(current, nCurrent, left, nLeft, joined, nJoined); if (state == LEFT) return; if (!map.isAlive(memberId)) { leave(l); return; } @@ -350,7 +357,7 @@ void Cluster::configChange ( QPID_LOG(debug, *this << " send dump-request " << myUrl); } } - else if (state >= READY && changed) + else if (state >= READY) memberUpdate(l); } @@ -408,8 +415,8 @@ void Cluster::dumpRequest(const MemberId& id, const std::string& url, Lock& l) { } void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) { - if (map.ready(id, Url(url))) - memberUpdate(l); + map.ready(id, Url(url)); + memberUpdate(l); } void Cluster::dumpOffer(const MemberId& dumper, uint64_t dumpeeInt, Lock& l) { diff --git a/cpp/src/qpid/cluster/DumpClient.cpp b/cpp/src/qpid/cluster/DumpClient.cpp index ed339b2f85..2b079a22bc 100644 --- a/cpp/src/qpid/cluster/DumpClient.cpp +++ b/cpp/src/qpid/cluster/DumpClient.cpp @@ -23,6 +23,7 @@ #include "ClusterMap.h" #include "Connection.h" #include "qpid/client/SessionBase_0_10Access.h" +#include "qpid/client/ConnectionAccess.h" #include "qpid/broker/Broker.h" #include "qpid/broker/Queue.h" #include "qpid/broker/QueueRegistry.h" @@ -41,14 +42,6 @@ #include <boost/bind.hpp> namespace qpid { - -namespace client { -struct ConnectionAccess { - static void setVersion(Connection& c, const framing::ProtocolVersion& v) { c.version = v; } - static boost::shared_ptr<ConnectionImpl> getImpl(Connection& c) { return c.impl; } -}; -} // namespace client - namespace cluster { using broker::Broker; @@ -169,10 +162,15 @@ void DumpClient::dumpBinding(const std::string& queue, const QueueBinding& bindi void DumpClient::dumpConnection(const boost::intrusive_ptr<Connection>& dumpConnection) { QPID_LOG(debug, dumperId << " dumping connection " << *dumpConnection); shadowConnection = catchUpConnection(); + broker::Connection& bc = dumpConnection->getBrokerConnection(); // FIXME aconway 2008-09-19: Open with identical settings to dumpConnection: password, vhost, frame size, // authentication etc. See ConnectionSettings. shadowConnection.open(dumpeeUrl, bc.getUserId()); + + // Stop the failover listener as its session will conflict with re-creating-sessions + client::ConnectionAccess::getImpl(shadowConnection)->stopFailoverListener(); + dumpConnection->getBrokerConnection().eachSessionHandler(boost::bind(&DumpClient::dumpSession, this, _1)); ClusterConnectionProxy(shadowConnection).shadowReady( dumpConnection->getId().getMember(), @@ -184,26 +182,21 @@ void DumpClient::dumpConnection(const boost::intrusive_ptr<Connection>& dumpConn void DumpClient::dumpSession(broker::SessionHandler& sh) { QPID_LOG(debug, dumperId << " dumping session " << &sh.getConnection() << "[" << sh.getChannel() << "] = " << sh.getSession()->getId()); - broker::SessionState* s = sh.getSession(); - if (!s) return; // no session. + broker::SessionState* ss = sh.getSession(); + if (!ss) return; // no session. - // Re-create the session. + // Create a client session to dump session state. boost::shared_ptr<client::ConnectionImpl> cimpl = client::ConnectionAccess::getImpl(shadowConnection); - size_t max_frame_size = cimpl->getNegotiatedSettings().maxFrameSize; - boost::shared_ptr<client::SessionImpl> simpl( - new client::SessionImpl(s->getId().getName(), cimpl, sh.getChannel(), max_frame_size)); - cimpl->addSession(simpl); - simpl->open(sh.getSession()->getTimeout()); + boost::shared_ptr<client::SessionImpl> simpl = cimpl->newSession(ss->getId().getName(), ss->getTimeout(), sh.getChannel()); client::SessionBase_0_10Access(shadowSession).set(simpl); AMQP_AllProxy::ClusterConnection proxy(simpl->out); // Re-create session state on remote connection. - broker::SessionState* ss = sh.getSession(); // For reasons unknown, boost::bind does not work here with boost 1.33. ss->eachConsumer(std::bind1st(std::mem_fun(&DumpClient::dumpConsumer),this)); - // FIXME aconway 2008-09-19: remaining session state. + // FIXME aconway 2008-09-19: update remaining session state. // Reset command-sequence state. proxy.sessionState( @@ -216,7 +209,7 @@ void DumpClient::dumpSession(broker::SessionHandler& sh) { ss->receiverGetIncomplete() ); - // FIXME aconway 2008-09-23: session replay list. + // FIXME aconway 2008-09-23: update session replay list. QPID_LOG(debug, dumperId << " dumped session " << sh.getSession()->getId()); } diff --git a/cpp/src/qpid/cluster/OutputInterceptor.cpp b/cpp/src/qpid/cluster/OutputInterceptor.cpp index cc1af255e4..f53b48ec1e 100644 --- a/cpp/src/qpid/cluster/OutputInterceptor.cpp +++ b/cpp/src/qpid/cluster/OutputInterceptor.cpp @@ -58,9 +58,7 @@ void OutputInterceptor::activateOutput() { // Called in write thread when the IO layer has no more data to write. // We do nothing in the write thread, we run doOutput only on delivery // of doOutput requests. -bool OutputInterceptor::doOutput() { - return false; -} +bool OutputInterceptor::doOutput() { return false; } // Delivery of doOutput allows us to run the real connection doOutput() // which stocks up the write buffers with data. |