summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Connection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp27
1 files changed, 23 insertions, 4 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 3988abd491..ac4b9dcdf2 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -62,14 +62,15 @@ NoOpConnectionOutputHandler Connection::discardHandler;
Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
const std::string& wrappedId, ConnectionId myId)
: cluster(c), self(myId), catchUp(false), output(*this, out),
- connection(&output, cluster.getBroker(), wrappedId), readCredit(0)
+ connection(&output, cluster.getBroker(), wrappedId), readCredit(0), expectProtocolHeader(false)
{ init(); }
// Local connections
Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
- const std::string& wrappedId, MemberId myId, bool isCatchUp)
+ const std::string& wrappedId, MemberId myId, bool isCatchUp, bool isLink)
: cluster(c), self(myId, this), catchUp(isCatchUp), output(*this, out),
- connection(&output, cluster.getBroker(), wrappedId), readCredit(0)
+ connection(&output, cluster.getBroker(), wrappedId, isLink), readCredit(0),
+ expectProtocolHeader(isLink)
{ init(); }
void Connection::init() {
@@ -213,7 +214,25 @@ size_t Connection::decode(const char* buffer, size_t size) {
}
else { // Multicast local connections.
assert(isLocal());
- cluster.getMulticast().mcastBuffer(buffer, size, self);
+ const char* remainingData = buffer;
+ size_t remainingSize = size;
+ if (expectProtocolHeader) {
+ //If this is an outgoing link, we will receive a protocol
+ //header which needs to be decoded first
+ framing::ProtocolInitiation pi;
+ Buffer buf(const_cast<char*>(buffer), size);
+ if (pi.decode(buf)) {
+ //TODO: check the version is correct
+ QPID_LOG(debug, "Outgoing clustered link connection received INIT(" << pi << ")");
+ expectProtocolHeader = false;
+ remainingData = buffer + pi.encodedSize();
+ remainingSize = size - pi.encodedSize();
+ } else {
+ QPID_LOG(debug, "Not enough data for protocol header on outgoing clustered link");
+ return 0;
+ }
+ }
+ cluster.getMulticast().mcastBuffer(remainingData, remainingSize, self);
}
return size;
}