diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 77 |
1 files changed, 72 insertions, 5 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 30828d7bd9..118be27bb5 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -39,6 +39,7 @@ #include "qpid/framing/DeliveryProperties.h" #include "qpid/framing/ClusterConnectionDeliverCloseBody.h" #include "qpid/framing/ClusterConnectionAnnounceBody.h" +#include "qpid/framing/ClusterConnectionSecureUserIdBody.h" #include "qpid/framing/ConnectionCloseBody.h" #include "qpid/framing/ConnectionCloseOkBody.h" #include "qpid/log/Statement.h" @@ -46,6 +47,9 @@ #include <boost/current_function.hpp> + +typedef boost::function<void ( std::string& )> UserIdCallback; + // TODO aconway 2008-11-03: // // Refactor code for receiving an update into a separate UpdateConnection @@ -59,6 +63,7 @@ namespace cluster { using namespace framing; using namespace framing::cluster; + qpid::sys::AtomicValue<uint64_t> Connection::catchUpId(0x5000000000000000LL); Connection::NullFrameHandler Connection::nullFrameHandler; @@ -82,8 +87,11 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, connectionCtor(&output, cluster.getBroker(), mgmtId, external, false, 0, true), expectProtocolHeader(false), mcastFrameHandler(cluster.getMulticast(), self), - updateIn(c.getUpdateReceiver()) -{} + updateIn(c.getUpdateReceiver()), + secureConnection(0), + mcastSentButNotReceived(false), + inConnectionNegotiation(true) +{ } // Local connection Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, @@ -98,7 +106,9 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, isCatchUp), // isCatchUp => shadow expectProtocolHeader(isLink), mcastFrameHandler(cluster.getMulticast(), self), - updateIn(c.getUpdateReceiver()) + updateIn(c.getUpdateReceiver()), + secureConnection(0), + mcastSentButNotReceived(false) { cluster.addLocalConnection(this); if (isLocalClient()) { @@ -120,13 +130,19 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, updateIn.nextShadowMgmtId.clear(); init(); } + +} + +void Connection::setSecureConnection(broker::SecureConnection* sc) { + secureConnection = sc; } void Connection::init() { connection = connectionCtor.construct(); QPID_LOG(debug, cluster << " initialized connection: " << *this << " ssf=" << connection->getExternalSecuritySettings().ssf); - if (isLocalClient()) { + if (isLocalClient()) { + if (secureConnection) connection->setSecureConnection(secureConnection); // Actively send cluster-order frames from local node connection->setClusterOrderOutput(mcastFrameHandler); } @@ -138,9 +154,19 @@ void Connection::init() { } if (!isCatchUp()) connection->setErrorListener(this); + UserIdCallback fn = boost::bind ( &Connection::mcastUserId, this, _1 ); + connection->setUserIdCallback ( fn ); } void Connection::giveReadCredit(int credit) { + { + sys::Mutex::ScopedLock l(connectionNegotiationMonitor); + if (inConnectionNegotiation) { + mcastSentButNotReceived = false; + connectionNegotiationMonitor.notify(); + } + } + if (cluster.getSettings().readMax && credit) output.giveReadCredit(credit); } @@ -278,8 +304,9 @@ void Connection::abort() { cluster.erase(self); } -// ConnectoinCodec::decode receives read buffers from directly-connected clients. +// ConnectionCodec::decode receives read buffers from directly-connected clients. size_t Connection::decode(const char* buffer, size_t size) { + if (catchUp) { // Handle catch-up locally. Buffer buf(const_cast<char*>(buffer), size); while (localDecoder.decode(buf)) @@ -289,6 +316,15 @@ size_t Connection::decode(const char* buffer, size_t size) { assert(isLocal()); const char* remainingData = buffer; size_t remainingSize = size; + + { // scope for scoped lock. + sys::Mutex::ScopedLock l(connectionNegotiationMonitor); + if ( inConnectionNegotiation ) { + assert(!mcastSentButNotReceived); + mcastSentButNotReceived = true; + } + } + if (expectProtocolHeader) { //If this is an outgoing link, we will receive a protocol //header which needs to be decoded first @@ -307,6 +343,13 @@ size_t Connection::decode(const char* buffer, size_t size) { } } cluster.getMulticast().mcastBuffer(remainingData, remainingSize, self); + + { // scope for scoped lock. + sys::Mutex::ScopedLock l(connectionNegotiationMonitor); + if ( inConnectionNegotiation ) + while (inConnectionNegotiation && mcastSentButNotReceived) + connectionNegotiationMonitor.wait(); + } } return size; } @@ -570,5 +613,29 @@ void Connection::managementAgents(const std::string& data) { QPID_LOG(debug, cluster << " updated management agents"); } + +// Only the direct, non-shadow gets this call. +void Connection::mcastUserId ( std::string & id ) { + cluster.getMulticast().mcastControl( ClusterConnectionSecureUserIdBody(ProtocolVersion(), string(id)), getId() ); + + { + sys::Mutex::ScopedLock l(connectionNegotiationMonitor); + inConnectionNegotiation = false; + connectionNegotiationMonitor.notify(); + } +} + +// All connections, shadow or not, get this call. +void Connection::secureUserId(const std::string& id) { + if ( isShadow() ) { + // If the user ID is "none", it is not legitimate. Take no action. + if ( strcmp ( id.c_str(), "none" ) ) { + connection->setUserId ( id ); + } + } +} + + + }} // Namespace qpid::cluster |