summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp37
1 files changed, 22 insertions, 15 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 118be27bb5..d7e5ee5cd9 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -115,7 +115,7 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
// Local clients are announced to the cluster
// and initialized when the announce is received.
QPID_LOG(info, "new client connection " << *this);
- giveReadCredit(cluster.getSettings().readMax);
+ giveReadCredit(cluster.getSettings().readMax); // Flow control
cluster.getMulticast().mcastControl(
ClusterConnectionAnnounceBody(ProtocolVersion(), mgmtId,
connectionCtor.external.ssf,
@@ -158,6 +158,8 @@ void Connection::init() {
connection->setUserIdCallback ( fn );
}
+// Called when we have consumed a read buffer to give credit to the
+// connection layer to continue reading.
void Connection::giveReadCredit(int credit) {
{
sys::Mutex::ScopedLock l(connectionNegotiationMonitor);
@@ -166,7 +168,6 @@ void Connection::giveReadCredit(int credit) {
connectionNegotiationMonitor.notify();
}
}
-
if (cluster.getSettings().readMax && credit)
output.giveReadCredit(credit);
}
@@ -311,20 +312,13 @@ size_t Connection::decode(const char* buffer, size_t size) {
Buffer buf(const_cast<char*>(buffer), size);
while (localDecoder.decode(buf))
received(localDecoder.getFrame());
+ return buf.getPosition();
}
else { // Multicast local connections.
assert(isLocal());
const char* remainingData = buffer;
size_t remainingSize = size;
- { // scope for scoped lock.
- sys::Mutex::ScopedLock l(connectionNegotiationMonitor);
- if ( inConnectionNegotiation ) {
- assert(!mcastSentButNotReceived);
- mcastSentButNotReceived = true;
- }
- }
-
if (expectProtocolHeader) {
//If this is an outgoing link, we will receive a protocol
//header which needs to be decoded first
@@ -342,16 +336,28 @@ size_t Connection::decode(const char* buffer, size_t size) {
return 0;
}
}
- cluster.getMulticast().mcastBuffer(remainingData, remainingSize, self);
- { // scope for scoped lock.
+ // During connection negotiation wait for each multicast to be
+ // processed before sending the next, to ensure that the
+ // security layer is activated before we attempt to decode
+ // encrypted frames.
+ {
+ sys::Mutex::ScopedLock l(connectionNegotiationMonitor);
+ if ( inConnectionNegotiation ) {
+ assert(!mcastSentButNotReceived);
+ mcastSentButNotReceived = true;
+ }
+ }
+ cluster.getMulticast().mcastBuffer(remainingData, remainingSize, self);
+ {
sys::Mutex::ScopedLock l(connectionNegotiationMonitor);
- if ( inConnectionNegotiation )
- while (inConnectionNegotiation && mcastSentButNotReceived)
+ if (inConnectionNegotiation)
+ while (mcastSentButNotReceived)
connectionNegotiationMonitor.wait();
+ assert(!mcastSentButNotReceived);
}
+ return size;
}
- return size;
}
broker::SessionState& Connection::sessionState() {
@@ -621,6 +627,7 @@ void Connection::mcastUserId ( std::string & id ) {
{
sys::Mutex::ScopedLock l(connectionNegotiationMonitor);
inConnectionNegotiation = false;
+ mcastSentButNotReceived = false;
connectionNegotiationMonitor.notify();
}
}