summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Connection.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2009-01-22 22:53:50 +0000
committerGordon Sim <gsim@apache.org>2009-01-22 22:53:50 +0000
commitd7ce27f7cc96894f149e5c20c03b306b80636727 (patch)
tree22caa566993da19f9e211f69fdca64c13f1f04e6 /cpp/src/qpid/cluster/Connection.cpp
parent74481dd2b6b97374bd4f260ca89d9103ce6383ed (diff)
downloadqpid-python-d7ce27f7cc96894f149e5c20c03b306b80636727.tar.gz
QPID-1567: More changes to make clustering and federation work together
* replicate outgoing link traffic to all nodes * coordinate amongst nodes so that only one node actually maintains active links with the others able to take over if that node fails git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@736841 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp27
1 files changed, 23 insertions, 4 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 3988abd491..ac4b9dcdf2 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -62,14 +62,15 @@ NoOpConnectionOutputHandler Connection::discardHandler;
Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
const std::string& wrappedId, ConnectionId myId)
: cluster(c), self(myId), catchUp(false), output(*this, out),
- connection(&output, cluster.getBroker(), wrappedId), readCredit(0)
+ connection(&output, cluster.getBroker(), wrappedId), readCredit(0), expectProtocolHeader(false)
{ init(); }
// Local connections
Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
- const std::string& wrappedId, MemberId myId, bool isCatchUp)
+ const std::string& wrappedId, MemberId myId, bool isCatchUp, bool isLink)
: cluster(c), self(myId, this), catchUp(isCatchUp), output(*this, out),
- connection(&output, cluster.getBroker(), wrappedId), readCredit(0)
+ connection(&output, cluster.getBroker(), wrappedId, isLink), readCredit(0),
+ expectProtocolHeader(isLink)
{ init(); }
void Connection::init() {
@@ -213,7 +214,25 @@ size_t Connection::decode(const char* buffer, size_t size) {
}
else { // Multicast local connections.
assert(isLocal());
- cluster.getMulticast().mcastBuffer(buffer, size, self);
+ const char* remainingData = buffer;
+ size_t remainingSize = size;
+ if (expectProtocolHeader) {
+ //If this is an outgoing link, we will receive a protocol
+ //header which needs to be decoded first
+ framing::ProtocolInitiation pi;
+ Buffer buf(const_cast<char*>(buffer), size);
+ if (pi.decode(buf)) {
+ //TODO: check the version is correct
+ QPID_LOG(debug, "Outgoing clustered link connection received INIT(" << pi << ")");
+ expectProtocolHeader = false;
+ remainingData = buffer + pi.encodedSize();
+ remainingSize = size - pi.encodedSize();
+ } else {
+ QPID_LOG(debug, "Not enough data for protocol header on outgoing clustered link");
+ return 0;
+ }
+ }
+ cluster.getMulticast().mcastBuffer(remainingData, remainingSize, self);
}
return size;
}