diff options
author | Alan Conway <aconway@apache.org> | 2010-05-17 21:27:56 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2010-05-17 21:27:56 +0000 |
commit | 01229d655617794cf001eead8ea70758c97f7267 (patch) | |
tree | 28e413a0ace9152a1fd641a1f2b645b4442d8bf3 /cpp/src | |
parent | 315ffe6aa9081edcc28a49f8f9163efea83dacc4 (diff) | |
download | qpid-python-01229d655617794cf001eead8ea70758c97f7267.tar.gz |
Fix for "Assertion `!mcastSentButNotReceived' failed." in cluster tests.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@945383 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 37 |
1 files changed, 22 insertions, 15 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 118be27bb5..d7e5ee5cd9 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -115,7 +115,7 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, // Local clients are announced to the cluster // and initialized when the announce is received. QPID_LOG(info, "new client connection " << *this); - giveReadCredit(cluster.getSettings().readMax); + giveReadCredit(cluster.getSettings().readMax); // Flow control cluster.getMulticast().mcastControl( ClusterConnectionAnnounceBody(ProtocolVersion(), mgmtId, connectionCtor.external.ssf, @@ -158,6 +158,8 @@ void Connection::init() { connection->setUserIdCallback ( fn ); } +// Called when we have consumed a read buffer to give credit to the +// connection layer to continue reading. void Connection::giveReadCredit(int credit) { { sys::Mutex::ScopedLock l(connectionNegotiationMonitor); @@ -166,7 +168,6 @@ void Connection::giveReadCredit(int credit) { connectionNegotiationMonitor.notify(); } } - if (cluster.getSettings().readMax && credit) output.giveReadCredit(credit); } @@ -311,20 +312,13 @@ size_t Connection::decode(const char* buffer, size_t size) { Buffer buf(const_cast<char*>(buffer), size); while (localDecoder.decode(buf)) received(localDecoder.getFrame()); + return buf.getPosition(); } else { // Multicast local connections. assert(isLocal()); const char* remainingData = buffer; size_t remainingSize = size; - { // scope for scoped lock. - sys::Mutex::ScopedLock l(connectionNegotiationMonitor); - if ( inConnectionNegotiation ) { - assert(!mcastSentButNotReceived); - mcastSentButNotReceived = true; - } - } - if (expectProtocolHeader) { //If this is an outgoing link, we will receive a protocol //header which needs to be decoded first @@ -342,16 +336,28 @@ size_t Connection::decode(const char* buffer, size_t size) { return 0; } } - cluster.getMulticast().mcastBuffer(remainingData, remainingSize, self); - { // scope for scoped lock. + // During connection negotiation wait for each multicast to be + // processed before sending the next, to ensure that the + // security layer is activated before we attempt to decode + // encrypted frames. + { + sys::Mutex::ScopedLock l(connectionNegotiationMonitor); + if ( inConnectionNegotiation ) { + assert(!mcastSentButNotReceived); + mcastSentButNotReceived = true; + } + } + cluster.getMulticast().mcastBuffer(remainingData, remainingSize, self); + { sys::Mutex::ScopedLock l(connectionNegotiationMonitor); - if ( inConnectionNegotiation ) - while (inConnectionNegotiation && mcastSentButNotReceived) + if (inConnectionNegotiation) + while (mcastSentButNotReceived) connectionNegotiationMonitor.wait(); + assert(!mcastSentButNotReceived); } + return size; } - return size; } broker::SessionState& Connection::sessionState() { @@ -621,6 +627,7 @@ void Connection::mcastUserId ( std::string & id ) { { sys::Mutex::ScopedLock l(connectionNegotiationMonitor); inConnectionNegotiation = false; + mcastSentButNotReceived = false; connectionNegotiationMonitor.notify(); } } |