diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 27 |
1 files changed, 23 insertions, 4 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 3988abd491..ac4b9dcdf2 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -62,14 +62,15 @@ NoOpConnectionOutputHandler Connection::discardHandler; Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& wrappedId, ConnectionId myId) : cluster(c), self(myId), catchUp(false), output(*this, out), - connection(&output, cluster.getBroker(), wrappedId), readCredit(0) + connection(&output, cluster.getBroker(), wrappedId), readCredit(0), expectProtocolHeader(false) { init(); } // Local connections Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, - const std::string& wrappedId, MemberId myId, bool isCatchUp) + const std::string& wrappedId, MemberId myId, bool isCatchUp, bool isLink) : cluster(c), self(myId, this), catchUp(isCatchUp), output(*this, out), - connection(&output, cluster.getBroker(), wrappedId), readCredit(0) + connection(&output, cluster.getBroker(), wrappedId, isLink), readCredit(0), + expectProtocolHeader(isLink) { init(); } void Connection::init() { @@ -213,7 +214,25 @@ size_t Connection::decode(const char* buffer, size_t size) { } else { // Multicast local connections. assert(isLocal()); - cluster.getMulticast().mcastBuffer(buffer, size, self); + const char* remainingData = buffer; + size_t remainingSize = size; + if (expectProtocolHeader) { + //If this is an outgoing link, we will receive a protocol + //header which needs to be decoded first + framing::ProtocolInitiation pi; + Buffer buf(const_cast<char*>(buffer), size); + if (pi.decode(buf)) { + //TODO: check the version is correct + QPID_LOG(debug, "Outgoing clustered link connection received INIT(" << pi << ")"); + expectProtocolHeader = false; + remainingData = buffer + pi.encodedSize(); + remainingSize = size - pi.encodedSize(); + } else { + QPID_LOG(debug, "Not enough data for protocol header on outgoing clustered link"); + return 0; + } + } + cluster.getMulticast().mcastBuffer(remainingData, remainingSize, self); } return size; } |