diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 206 |
1 files changed, 89 insertions, 117 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 9a8cab24a6..08e31c184a 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -39,7 +39,6 @@ #include "qpid/framing/DeliveryProperties.h" #include "qpid/framing/ClusterConnectionDeliverCloseBody.h" #include "qpid/framing/ClusterConnectionAnnounceBody.h" -#include "qpid/framing/ClusterConnectionSecureUserIdBody.h" #include "qpid/framing/ConnectionCloseBody.h" #include "qpid/framing/ConnectionCloseOkBody.h" #include "qpid/log/Statement.h" @@ -48,15 +47,6 @@ #include <boost/current_function.hpp> -typedef boost::function<void ( std::string& )> UserIdCallback; - -// TODO aconway 2008-11-03: -// -// Refactor code for receiving an update into a separate UpdateConnection -// class. -// - - namespace qpid { namespace cluster { @@ -88,10 +78,8 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, expectProtocolHeader(false), mcastFrameHandler(cluster.getMulticast(), self), updateIn(c.getUpdateReceiver()), - secureConnection(0), - mcastSentButNotReceived(false), - inConnectionNegotiation(true) -{ } + secureConnection(0) +{} // Local connection Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, @@ -107,9 +95,7 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, expectProtocolHeader(isLink), mcastFrameHandler(cluster.getMulticast(), self), updateIn(c.getUpdateReceiver()), - secureConnection(0), - mcastSentButNotReceived(false), - inConnectionNegotiation(true) + secureConnection(0) { cluster.addLocalConnection(this); if (isLocalClient()) { @@ -117,11 +103,7 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, // and initialized when the announce is received. QPID_LOG(info, "new client connection " << *this); giveReadCredit(cluster.getSettings().readMax); // Flow control - cluster.getMulticast().mcastControl( - ClusterConnectionAnnounceBody(ProtocolVersion(), mgmtId, - connectionCtor.external.ssf, - connectionCtor.external.authid, - connectionCtor.external.nodict), getId()); + init(); } else { // Catch-up shadow connections initialized using nextShadow id. @@ -135,7 +117,8 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, } void Connection::setSecureConnection(broker::SecureConnection* sc) { - secureConnection = sc; + secureConnection = sc; + if (connection.get()) connection->setSecureConnection(sc); } void Connection::init() { @@ -155,30 +138,33 @@ void Connection::init() { } if (!isCatchUp()) connection->setErrorListener(this); - UserIdCallback fn = boost::bind ( &Connection::mcastUserId, this, _1 ); - connection->setUserIdCallback ( fn ); } // Called when we have consumed a read buffer to give credit to the // connection layer to continue reading. void Connection::giveReadCredit(int credit) { - { - sys::Mutex::ScopedLock l(connectionNegotiationMonitor); - if (inConnectionNegotiation) { - mcastSentButNotReceived = false; - connectionNegotiationMonitor.notify(); - } - } if (cluster.getSettings().readMax && credit) output.giveReadCredit(credit); } -void Connection::announce(const std::string& mgmtId, uint32_t ssf, const std::string& authid, bool nodict) { +void Connection::announce( + const std::string& mgmtId, uint32_t ssf, const std::string& authid, bool nodict, + const std::string& username, const std::string& initialFrames) +{ QPID_ASSERT(mgmtId == connectionCtor.mgmtId); QPID_ASSERT(ssf == connectionCtor.external.ssf); QPID_ASSERT(authid == connectionCtor.external.authid); QPID_ASSERT(nodict == connectionCtor.external.nodict); - init(); + // Local connections are already initialized. + if (isShadow()) { + init(); + // Play initial frames into the connection. + Buffer buf(const_cast<char*>(initialFrames.data()), initialFrames.size()); + AMQFrame frame; + while (frame.decode(buf)) + connection->received(frame); + connection->setUserId(username); + } } Connection::~Connection() { @@ -201,7 +187,6 @@ void Connection::received(framing::AMQFrame& f) { if (isLocal()) { // Local catch-up connection. currentChannel = f.getChannel(); if (!framing::invoke(*this, *f.getBody()).wasHandled()) - connection->received(f); } else { // Shadow or updated catch-up connection. @@ -235,7 +220,7 @@ struct GiveReadCreditOnExit { int credit; GiveReadCreditOnExit(Connection& connection_, int credit_) : connection(connection_), credit(credit_) {} - ~GiveReadCreditOnExit() { connection.giveReadCredit(credit); } + ~GiveReadCreditOnExit() { if (credit) connection.giveReadCredit(credit); } }; void Connection::deliverDoOutput(uint32_t limit) { @@ -307,57 +292,76 @@ void Connection::abort() { } // ConnectionCodec::decode receives read buffers from directly-connected clients. -size_t Connection::decode(const char* buffer, size_t size) { - - if (catchUp) { // Handle catch-up locally. - Buffer buf(const_cast<char*>(buffer), size); +size_t Connection::decode(const char* data, size_t size) { + GiveReadCreditOnExit grc(*this, 1); // Give a read credit by default. + const char* ptr = data; + const char* end = data + size; + if (catchUp) { // Handle catch-up locally. + Buffer buf(const_cast<char*>(ptr), size); + ptr += size; while (localDecoder.decode(buf)) received(localDecoder.getFrame()); - return buf.getPosition(); } else { // Multicast local connections. - assert(isLocal()); - const char* remainingData = buffer; - size_t remainingSize = size; - - if (expectProtocolHeader) { - //If this is an outgoing link, we will receive a protocol - //header which needs to be decoded first - framing::ProtocolInitiation pi; - Buffer buf(const_cast<char*>(buffer), size); - if (pi.decode(buf)) { - //TODO: check the version is correct - QPID_LOG(debug, "Outgoing clustered link connection received INIT(" << pi << ")"); - expectProtocolHeader = false; - remainingData = buffer + pi.encodedSize(); - remainingSize = size - pi.encodedSize(); - } else { - QPID_LOG(debug, "Not enough data for protocol header on outgoing clustered link"); - giveReadCredit(1); // We're not going to mcast so give read credit now. - return 0; - } + assert(isLocalClient()); + assert(connection.get()); + if (!checkProtocolHeader(ptr, size)) // Updates ptr + return 0; // Incomplete header + + if (!connection->isOpen()) + processInitialFrames(ptr, end-ptr); // Updates ptr + + if (connection->isOpen() && end - ptr > 0) { + // We're multi-casting, we will give read credit on delivery. + grc.credit = 0; + cluster.getMulticast().mcastBuffer(ptr, end - ptr, self); + ptr = end; } - - // During connection negotiation wait for each multicast to be - // processed before sending the next, to ensure that the - // security layer is activated before we attempt to decode - // encrypted frames. - { - sys::Mutex::ScopedLock l(connectionNegotiationMonitor); - if ( inConnectionNegotiation ) { - assert(!mcastSentButNotReceived); - mcastSentButNotReceived = true; - } - } - cluster.getMulticast().mcastBuffer(remainingData, remainingSize, self); - { - sys::Mutex::ScopedLock l(connectionNegotiationMonitor); - if (inConnectionNegotiation) - while (mcastSentButNotReceived) - connectionNegotiationMonitor.wait(); - assert(!mcastSentButNotReceived); + } + return ptr - data; +} + +// Decode the protocol header if needed. Updates data and size +// returns true if the header is complete or already read. +bool Connection::checkProtocolHeader(const char*& data, size_t size) { + if (expectProtocolHeader) { + //If this is an outgoing link, we will receive a protocol + //header which needs to be decoded first + framing::ProtocolInitiation pi; + Buffer buf(const_cast<char*&>(data), size); + if (pi.decode(buf)) { + //TODO: check the version is correct + QPID_LOG(debug, "Outgoing clustered link connection received INIT(" << pi << ")"); + expectProtocolHeader = false; + data += pi.encodedSize(); + } else { + return false; } - return size; + } + return true; +} + +void Connection::processInitialFrames(const char*& ptr, size_t size) { + // Process the initial negotiation locally and store it so + // it can be replayed on other brokers in announce() + Buffer buf(const_cast<char*>(ptr), size); + framing::AMQFrame frame; + while (!connection->isOpen() && frame.decode(buf)) + received(frame); + initialFrames.append(ptr, buf.getPosition()); + ptr += buf.getPosition(); + if (connection->isOpen()) { // initial negotiation complete + cluster.getMulticast().mcastControl( + ClusterConnectionAnnounceBody( + ProtocolVersion(), + connectionCtor.mgmtId, + connectionCtor.external.ssf, + connectionCtor.external.authid, + connectionCtor.external.nodict, + connection->getUserId(), + initialFrames), + getId()); + initialFrames.clear(); } } @@ -574,21 +578,14 @@ void Connection::queue(const std::string& encoded) { } void Connection::sessionError(uint16_t , const std::string& msg) { - // If we are negotiating the connection when it fails just close the connectoin. - // If it fails after that then we have to flag the error to the cluster. - if (inConnectionNegotiation) - cluster.getMulticast().mcastControl(ClusterConnectionDeliverCloseBody(), self); - else + // Ignore errors before isOpen(), we're not multicasting yet. + if (connection->isOpen()) cluster.flagError(*this, ERROR_TYPE_SESSION, msg); - } void Connection::connectionError(const std::string& msg) { - // If we are negotiating the connection when it fails just close the connectoin. - // If it fails after that then we have to flag the error to the cluster. - if (inConnectionNegotiation) - cluster.getMulticast().mcastControl(ClusterConnectionDeliverCloseBody(), self); - else + // Ignore errors before isOpen(), we're not multicasting yet. + if (connection->isOpen()) cluster.flagError(*this, ERROR_TYPE_CONNECTION, msg); } @@ -630,30 +627,5 @@ void Connection::managementAgents(const std::string& data) { QPID_LOG(debug, cluster << " updated management agents"); } - -void Connection::mcastUserId ( std::string & id ) { - // Only the directly connected broker will mcast the secure user id, and only - // for client connections (not update connections) - if (isLocalClient()) - cluster.getMulticast().mcastControl( - ClusterConnectionSecureUserIdBody(ProtocolVersion(), string(id)), getId() ); - { - // This call signals the end of the connection negotiation phase. - sys::Mutex::ScopedLock l(connectionNegotiationMonitor); - inConnectionNegotiation = false; - mcastSentButNotReceived = false; - connectionNegotiationMonitor.notify(); - } -} - -// All connections, shadow or not, get this call. -void Connection::secureUserId(const std::string& id) { - // Only set the user ID on shadow connections, and only if id is not the empty string. - if ( isShadow() && !id.empty() ) - connection->setUserId ( id ); -} - - - }} // Namespace qpid::cluster |