diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 103 |
1 files changed, 79 insertions, 24 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index a1ed5f34f5..4aa66cce1f 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -20,10 +20,15 @@ */ #include "Connection.h" #include "Cluster.h" + +#include "qpid/broker/SessionState.h" #include "qpid/framing/AMQFrame.h" +#include "qpid/framing/AllInvoker.h" #include "qpid/framing/ClusterConnectionDeliverCloseBody.h" +#include "qpid/framing/ConnectionCloseBody.h" +#include "qpid/framing/ConnectionCloseOkBody.h" #include "qpid/log/Statement.h" -#include "qpid/framing/AllInvoker.h" + #include <boost/current_function.hpp> namespace qpid { @@ -31,6 +36,8 @@ namespace cluster { using namespace framing; +NoOpConnectionOutputHandler Connection::discardHandler; + // Shadow connections Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& wrappedId, ConnectionId myId) @@ -49,7 +56,9 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, QPID_LOG(debug, "New connection: " << *this); } -Connection::~Connection() {} +Connection::~Connection() { + QPID_LOG(debug, "Deleted connection: " << *this); +} bool Connection::doOutput() { return output.doOutput(); @@ -63,28 +72,55 @@ void Connection::deliverDoOutput(uint32_t requested) { output.deliverDoOutput(requested); } +// Received from a directly connected client. void Connection::received(framing::AMQFrame& f) { - QPID_LOG(trace, "EXEC [" << *this << "]: " << f); + QPID_LOG(trace, "RECV " << *this << ": " << f); + if (isShadow()) { + // Final close that completes catch-up for shadow connection. + if (catchUp && f.getMethod() && f.getMethod()->isA<ConnectionCloseBody>()) { + AMQFrame ok(in_place<ConnectionCloseOkBody>()); + connection.getOutput().send(ok); + } + else + QPID_LOG(warning, *this << " ignoring unexpected frame: " << f); + } + else { + currentChannel = f.getChannel(); + if (!framing::invoke(*this, *f.getBody()).wasHandled()) + connection.received(f); + } +} + +// Delivered from cluster. +void Connection::delivered(framing::AMQFrame& f) { + QPID_LOG(trace, "DLVR " << *this << ": " << f); + assert(!isCatchUp()); // Handle connection controls, deliver other frames to connection. + currentChannel = f.getChannel(); if (!framing::invoke(*this, *f.getBody()).wasHandled()) connection.received(f); } void Connection::closed() { try { + QPID_LOG(debug, "Connection closed " << *this); + + if (catchUp) { + catchUp = false; + cluster.catchUpClosed(boost::intrusive_ptr<Connection>(this)); + if (!isShadow()) connection.closed(); + } + // Local network connection has closed. We need to keep the // connection around but replace the output handler with a // no-op handler as the network output handler will be // deleted. output.setOutputHandler(discardHandler); - if (catchUp) { - catchUp = false; - cluster.catchUpClosed(boost::intrusive_ptr<Connection>(this)); - } - else { - // This was a local replicated connection. Multicast a deliver closed - // and process any outstanding frames from the cluster until - // self-delivery of deliver-closed. + + if (isLocal()) { + // This was a local replicated connection. Multicast a deliver + // closed and process any outstanding frames from the cluster + // until self-delivery of deliver-close. cluster.mcastControl(ClusterConnectionDeliverCloseBody(), this); ++mcastSeq; } @@ -100,29 +136,48 @@ void Connection::deliverClose () { cluster.erase(self); } -size_t Connection::decode(const char* buffer, size_t size) { - assert(!catchUp); - ++mcastSeq; - cluster.mcastBuffer(buffer, size, self); +// Decode data from local clients. +size_t Connection::decode(const char* buffer, size_t size) { + if (catchUp) { // Handle catch-up locally. + Buffer buf(const_cast<char*>(buffer), size); + while (localDecoder.decode(buf)) + received(localDecoder.frame); + } + else { // Multicast local connections. + assert(isLocal()); + cluster.mcastBuffer(buffer, size, self, ++mcastSeq); + } return size; } void Connection::deliverBuffer(Buffer& buf) { assert(!catchUp); ++deliverSeq; - while (decoder.decode(buf)) - received(decoder.frame); + while (mcastDecoder.decode(buf)) + delivered(mcastDecoder.frame); } -void Connection::sessionState(const SequenceNumber& /*replayStart*/, - const SequenceSet& /*sentIncomplete*/, - const SequenceNumber& /*expected*/, - const SequenceNumber& /*received*/, - const SequenceSet& /*unknownCompleted*/, - const SequenceSet& /*receivedIncomplete*/) +void Connection::sessionState( + const SequenceNumber& replayStart, + const SequenceNumber& sendCommandPoint, + const SequenceSet& sentIncomplete, + const SequenceNumber& expected, + const SequenceNumber& received, + const SequenceSet& unknownCompleted, + const SequenceSet& receivedIncomplete) { - // FIXME aconway 2008-09-10: TODO + broker::SessionHandler& h = connection.getChannel(currentChannel); + broker::SessionState* s = h.getSession(); + s->setState( + replayStart, + sendCommandPoint, + sentIncomplete, + expected, + received, + unknownCompleted, + receivedIncomplete); + QPID_LOG(debug, "Received session state dump for " << s->getId()); } void Connection::shadowReady(uint64_t memberId, uint64_t connectionId) { |