diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 65 |
1 files changed, 45 insertions, 20 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 9c2b4f1638..0f71a91293 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -40,6 +40,7 @@ #include "qpid/framing/ConnectionCloseOkBody.h" #include "qpid/log/Statement.h" #include "qpid/sys/LatencyMetric.h" +#include "qpid/sys/AtomicValue.h" #include <boost/current_function.hpp> @@ -58,19 +59,22 @@ using namespace framing; NoOpConnectionOutputHandler Connection::discardHandler; -// Shadow connections -Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, - const std::string& wrappedId, ConnectionId myId) - : cluster(c), self(myId), catchUp(false), output(*this, out), - connection(&output, cluster.getBroker(), wrappedId), expectProtocolHeader(false), +namespace { +sys::AtomicValue<uint64_t> idCounter; +} + +// Shadow connection +Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& logId, const ConnectionId& id) + : cluster(c), self(id), catchUp(false), output(*this, out), + connection(&output, cluster.getBroker(), logId), expectProtocolHeader(false), mcastFrameHandler(cluster.getMulticast(), self) { init(); } -// Local connections +// Local connection Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, - const std::string& wrappedId, MemberId myId, bool isCatchUp, bool isLink) - : cluster(c), self(myId, this), catchUp(isCatchUp), output(*this, out), - connection(&output, cluster.getBroker(), wrappedId, isLink, catchUp ? ++catchUpId : 0), + const std::string& logId, MemberId member, bool isCatchUp, bool isLink) + : cluster(c), self(member, ++idCounter), catchUp(isCatchUp), output(*this, out), + connection(&output, cluster.getBroker(), logId, isLink, catchUp ? ++catchUpId : 0), expectProtocolHeader(isLink), mcastFrameHandler(cluster.getMulticast(), self) { init(); } @@ -149,12 +153,9 @@ void Connection::deliveredFrame(const EventFrame& f) { if (!framing::invoke(*this, *f.frame.getBody()).wasHandled() // Connection contol. && !checkUnsupported(*f.frame.getBody())) // Unsupported operation. { - // FIXME aconway 2009-02-24: Using the DATA/CONTROL - // distinction to distinguish incoming vs. outgoing frames is - // very unclear. if (f.type == DATA) // incoming data frames to broker::Connection connection.received(const_cast<AMQFrame&>(f.frame)); - else { // outgoing data frame, send via SessionState + else { // frame control, send frame via SessionState broker::SessionState* ss = connection.getChannel(f.frame.getChannel()).getSession(); if (ss) ss->out(const_cast<AMQFrame&>(f.frame)); } @@ -200,12 +201,12 @@ void Connection::left() { connection.closed(); } -// Decode data from local clients. +// ConnectoinCodec::decode receives read buffers from directly-connected clients. size_t Connection::decode(const char* buffer, size_t size) { if (catchUp) { // Handle catch-up locally. Buffer buf(const_cast<char*>(buffer), size); while (localDecoder.decode(buf)) - received(localDecoder.frame); + received(localDecoder.getFrame()); } else { // Multicast local connections. assert(isLocal()); @@ -233,6 +234,29 @@ size_t Connection::decode(const char* buffer, size_t size) { return size; } +// Decode a data event, a read buffer that has been delivered by the cluster. +void Connection::decode(const EventHeader& eh, const void* data) { + assert(eh.getType() == DATA); // Only handle connection data events. + const char* cp = static_cast<const char*>(data); + Buffer buf(const_cast<char*>(cp), eh.getSize()); + if (clusterDecoder.decode(buf)) { // Decoded a frame + AMQFrame frame(clusterDecoder.getFrame()); + while (clusterDecoder.decode(buf)) { + cluster.connectionFrame(EventFrame(eh, frame)); + frame = clusterDecoder.getFrame(); + } + // Set read-credit on the last frame ending in this event. + // Credit will be given when this frame is processed. + cluster.connectionFrame(EventFrame(eh, frame, 1)); + } + else { + // We must give 1 unit read credit per event. + // This event does not complete any frames so + // we give read credit directly. + giveReadCredit(1); + } +} + broker::SessionState& Connection::sessionState() { return *connection.getChannel(currentChannel).getSession(); } @@ -267,11 +291,12 @@ void Connection::sessionState( QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId()); } -void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const string& username) { - ConnectionId shadow = ConnectionId(memberId, connectionId); - QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadow); - self = shadow; +void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const string& username, const string& fragment) { + ConnectionId shadowId = ConnectionId(memberId, connectionId); + QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadowId); + self = shadowId; connection.setUserId(username); + clusterDecoder.setFragment(fragment.data(), fragment.size()); } void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t frameId) { @@ -281,7 +306,7 @@ void Connection::membership(const FieldTable& joiners, const FieldTable& members } bool Connection::isLocal() const { - return self.first == cluster.getId() && self.second == this; + return self.first == cluster.getId() && self.second; } bool Connection::isShadow() const { |