diff options
-rw-r--r-- | qpid/cpp/src/qpid/broker/Connection.cpp | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Connection.h | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/ConnectionHandler.cpp | 13 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/ConnectionHandler.h | 12 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SaslAuthenticator.h | 10 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Connection.cpp | 206 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Connection.h | 20 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Multicaster.cpp | 1 | ||||
-rw-r--r-- | qpid/cpp/src/tests/cluster_test.cpp | 8 | ||||
-rw-r--r-- | qpid/cpp/xml/cluster.xml | 10 |
10 files changed, 112 insertions, 179 deletions
diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp index 51615e5b5f..ac574fc1a3 100644 --- a/qpid/cpp/src/qpid/broker/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/Connection.cpp @@ -386,7 +386,6 @@ void Connection::restartTimeout() timeoutTimer->touch(); } - - +bool Connection::isOpen() { return adapter.isOpen(); } }} diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h index 0639bcbb42..ad9f786179 100644 --- a/qpid/cpp/src/qpid/broker/Connection.h +++ b/qpid/cpp/src/qpid/broker/Connection.h @@ -63,9 +63,6 @@ class LinkRegistry; class SecureConnection; struct ConnectionTimeoutTask; -typedef boost::function<void ( std::string& )> userIdCallback; - - class Connection : public sys::ConnectionInputHandler, public ConnectionState, public RefCounted @@ -146,9 +143,8 @@ class Connection : public sys::ConnectionInputHandler, return securitySettings; } - void setUserIdCallback ( UserIdCallback fn ) { - adapter.setUserIdCallback ( fn ); - } + /** @return true if the initial connection negotiation is complete. */ + bool isOpen(); private: typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap; diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp index 225735deb6..c349bc7ac7 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp @@ -87,7 +87,8 @@ ConnectionHandler::ConnectionHandler(Connection& connection, bool isClient, bool ConnectionHandler::Handler::Handler(Connection& c, bool isClient, bool isShadow) : proxy(c.getOutput()), - connection(c), serverMode(!isClient), acl(0), secured(0), userIdCallback(0) + connection(c), serverMode(!isClient), acl(0), secured(0), + isOpen(false) { if (serverMode) { @@ -195,14 +196,7 @@ void ConnectionHandler::Handler::open(const string& /*virtualHost*/, if (sl.get()) secured->activateSecurityLayer(sl); } - if ( userIdCallback ) { - string s; - // Not checking the return value of getUsername, if there is - // no username then we want to call the userIdCallback anyway - // with an empty string. - authenticator->getUsername(s); - userIdCallback(s); - } + isOpen = true; proxy.openOk(array); } @@ -272,6 +266,7 @@ void ConnectionHandler::Handler::openOk(const framing::Array& knownHosts) Url url((*i)->get<std::string>()); connection.getKnownHosts().push_back(url); } + isOpen = true; } void ConnectionHandler::Handler::redirect(const string& /*host*/, const framing::Array& /*knownHosts*/) diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.h b/qpid/cpp/src/qpid/broker/ConnectionHandler.h index ecc8868e87..6d55cab647 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionHandler.h +++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.h @@ -40,9 +40,6 @@ namespace broker { class Connection; class SecureConnection; -typedef boost::function<void ( std::string& )> UserIdCallback; - - class ConnectionHandler : public framing::FrameHandler { struct Handler : public framing::AMQP_AllOperations::ConnectionHandler @@ -53,6 +50,7 @@ class ConnectionHandler : public framing::FrameHandler std::auto_ptr<SaslAuthenticator> authenticator; AclModule* acl; SecureConnection* secured; + bool isOpen; Handler(Connection& connection, bool isClient, bool isShadow=false); ~Handler(); @@ -67,10 +65,6 @@ class ConnectionHandler : public framing::FrameHandler void close(uint16_t replyCode, const std::string& replyText); void closeOk(); - UserIdCallback userIdCallback; - void setUserIdCallback ( UserIdCallback fn ) { userIdCallback = fn; }; - - void start(const qpid::framing::FieldTable& serverProperties, const framing::Array& mechanisms, const framing::Array& locales); @@ -95,9 +89,7 @@ class ConnectionHandler : public framing::FrameHandler void heartbeat(); void handle(framing::AMQFrame& frame); void setSecureConnection(SecureConnection* secured); - void setUserIdCallback ( UserIdCallback fn ) { - handler->setUserIdCallback ( fn ); - } + bool isOpen() { return handler->isOpen; } }; diff --git a/qpid/cpp/src/qpid/broker/SaslAuthenticator.h b/qpid/cpp/src/qpid/broker/SaslAuthenticator.h index b4b946f7ce..cfbe1a0cd1 100644 --- a/qpid/cpp/src/qpid/broker/SaslAuthenticator.h +++ b/qpid/cpp/src/qpid/broker/SaslAuthenticator.h @@ -36,12 +36,6 @@ namespace broker { class Connection; -// Calls your fn with the user ID string, just -// after the security negotiation is complete. -// Add your callback to the list with addUserIdCallback(). -typedef boost::function<void ( std::string& )> UserIdCallback; - - class SaslAuthenticator { public: @@ -54,7 +48,6 @@ public: virtual void getError(std::string&) {} virtual std::auto_ptr<qpid::sys::SecurityLayer> getSecurityLayer(uint16_t maxFrameSize) = 0; - virtual void setUserIdCallback ( UserIdCallback ) { } static bool available(void); // Initialize the SASL mechanism; throw if it fails. @@ -64,9 +57,6 @@ public: static std::auto_ptr<SaslAuthenticator> createAuthenticator(Connection& connection, bool isShadow); virtual void callUserIdCallbacks() { } - -private: - UserIdCallback userIdCallback; }; }} diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp index 9a8cab24a6..08e31c184a 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.cpp +++ b/qpid/cpp/src/qpid/cluster/Connection.cpp @@ -39,7 +39,6 @@ #include "qpid/framing/DeliveryProperties.h" #include "qpid/framing/ClusterConnectionDeliverCloseBody.h" #include "qpid/framing/ClusterConnectionAnnounceBody.h" -#include "qpid/framing/ClusterConnectionSecureUserIdBody.h" #include "qpid/framing/ConnectionCloseBody.h" #include "qpid/framing/ConnectionCloseOkBody.h" #include "qpid/log/Statement.h" @@ -48,15 +47,6 @@ #include <boost/current_function.hpp> -typedef boost::function<void ( std::string& )> UserIdCallback; - -// TODO aconway 2008-11-03: -// -// Refactor code for receiving an update into a separate UpdateConnection -// class. -// - - namespace qpid { namespace cluster { @@ -88,10 +78,8 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, expectProtocolHeader(false), mcastFrameHandler(cluster.getMulticast(), self), updateIn(c.getUpdateReceiver()), - secureConnection(0), - mcastSentButNotReceived(false), - inConnectionNegotiation(true) -{ } + secureConnection(0) +{} // Local connection Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, @@ -107,9 +95,7 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, expectProtocolHeader(isLink), mcastFrameHandler(cluster.getMulticast(), self), updateIn(c.getUpdateReceiver()), - secureConnection(0), - mcastSentButNotReceived(false), - inConnectionNegotiation(true) + secureConnection(0) { cluster.addLocalConnection(this); if (isLocalClient()) { @@ -117,11 +103,7 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, // and initialized when the announce is received. QPID_LOG(info, "new client connection " << *this); giveReadCredit(cluster.getSettings().readMax); // Flow control - cluster.getMulticast().mcastControl( - ClusterConnectionAnnounceBody(ProtocolVersion(), mgmtId, - connectionCtor.external.ssf, - connectionCtor.external.authid, - connectionCtor.external.nodict), getId()); + init(); } else { // Catch-up shadow connections initialized using nextShadow id. @@ -135,7 +117,8 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, } void Connection::setSecureConnection(broker::SecureConnection* sc) { - secureConnection = sc; + secureConnection = sc; + if (connection.get()) connection->setSecureConnection(sc); } void Connection::init() { @@ -155,30 +138,33 @@ void Connection::init() { } if (!isCatchUp()) connection->setErrorListener(this); - UserIdCallback fn = boost::bind ( &Connection::mcastUserId, this, _1 ); - connection->setUserIdCallback ( fn ); } // Called when we have consumed a read buffer to give credit to the // connection layer to continue reading. void Connection::giveReadCredit(int credit) { - { - sys::Mutex::ScopedLock l(connectionNegotiationMonitor); - if (inConnectionNegotiation) { - mcastSentButNotReceived = false; - connectionNegotiationMonitor.notify(); - } - } if (cluster.getSettings().readMax && credit) output.giveReadCredit(credit); } -void Connection::announce(const std::string& mgmtId, uint32_t ssf, const std::string& authid, bool nodict) { +void Connection::announce( + const std::string& mgmtId, uint32_t ssf, const std::string& authid, bool nodict, + const std::string& username, const std::string& initialFrames) +{ QPID_ASSERT(mgmtId == connectionCtor.mgmtId); QPID_ASSERT(ssf == connectionCtor.external.ssf); QPID_ASSERT(authid == connectionCtor.external.authid); QPID_ASSERT(nodict == connectionCtor.external.nodict); - init(); + // Local connections are already initialized. + if (isShadow()) { + init(); + // Play initial frames into the connection. + Buffer buf(const_cast<char*>(initialFrames.data()), initialFrames.size()); + AMQFrame frame; + while (frame.decode(buf)) + connection->received(frame); + connection->setUserId(username); + } } Connection::~Connection() { @@ -201,7 +187,6 @@ void Connection::received(framing::AMQFrame& f) { if (isLocal()) { // Local catch-up connection. currentChannel = f.getChannel(); if (!framing::invoke(*this, *f.getBody()).wasHandled()) - connection->received(f); } else { // Shadow or updated catch-up connection. @@ -235,7 +220,7 @@ struct GiveReadCreditOnExit { int credit; GiveReadCreditOnExit(Connection& connection_, int credit_) : connection(connection_), credit(credit_) {} - ~GiveReadCreditOnExit() { connection.giveReadCredit(credit); } + ~GiveReadCreditOnExit() { if (credit) connection.giveReadCredit(credit); } }; void Connection::deliverDoOutput(uint32_t limit) { @@ -307,57 +292,76 @@ void Connection::abort() { } // ConnectionCodec::decode receives read buffers from directly-connected clients. -size_t Connection::decode(const char* buffer, size_t size) { - - if (catchUp) { // Handle catch-up locally. - Buffer buf(const_cast<char*>(buffer), size); +size_t Connection::decode(const char* data, size_t size) { + GiveReadCreditOnExit grc(*this, 1); // Give a read credit by default. + const char* ptr = data; + const char* end = data + size; + if (catchUp) { // Handle catch-up locally. + Buffer buf(const_cast<char*>(ptr), size); + ptr += size; while (localDecoder.decode(buf)) received(localDecoder.getFrame()); - return buf.getPosition(); } else { // Multicast local connections. - assert(isLocal()); - const char* remainingData = buffer; - size_t remainingSize = size; - - if (expectProtocolHeader) { - //If this is an outgoing link, we will receive a protocol - //header which needs to be decoded first - framing::ProtocolInitiation pi; - Buffer buf(const_cast<char*>(buffer), size); - if (pi.decode(buf)) { - //TODO: check the version is correct - QPID_LOG(debug, "Outgoing clustered link connection received INIT(" << pi << ")"); - expectProtocolHeader = false; - remainingData = buffer + pi.encodedSize(); - remainingSize = size - pi.encodedSize(); - } else { - QPID_LOG(debug, "Not enough data for protocol header on outgoing clustered link"); - giveReadCredit(1); // We're not going to mcast so give read credit now. - return 0; - } + assert(isLocalClient()); + assert(connection.get()); + if (!checkProtocolHeader(ptr, size)) // Updates ptr + return 0; // Incomplete header + + if (!connection->isOpen()) + processInitialFrames(ptr, end-ptr); // Updates ptr + + if (connection->isOpen() && end - ptr > 0) { + // We're multi-casting, we will give read credit on delivery. + grc.credit = 0; + cluster.getMulticast().mcastBuffer(ptr, end - ptr, self); + ptr = end; } - - // During connection negotiation wait for each multicast to be - // processed before sending the next, to ensure that the - // security layer is activated before we attempt to decode - // encrypted frames. - { - sys::Mutex::ScopedLock l(connectionNegotiationMonitor); - if ( inConnectionNegotiation ) { - assert(!mcastSentButNotReceived); - mcastSentButNotReceived = true; - } - } - cluster.getMulticast().mcastBuffer(remainingData, remainingSize, self); - { - sys::Mutex::ScopedLock l(connectionNegotiationMonitor); - if (inConnectionNegotiation) - while (mcastSentButNotReceived) - connectionNegotiationMonitor.wait(); - assert(!mcastSentButNotReceived); + } + return ptr - data; +} + +// Decode the protocol header if needed. Updates data and size +// returns true if the header is complete or already read. +bool Connection::checkProtocolHeader(const char*& data, size_t size) { + if (expectProtocolHeader) { + //If this is an outgoing link, we will receive a protocol + //header which needs to be decoded first + framing::ProtocolInitiation pi; + Buffer buf(const_cast<char*&>(data), size); + if (pi.decode(buf)) { + //TODO: check the version is correct + QPID_LOG(debug, "Outgoing clustered link connection received INIT(" << pi << ")"); + expectProtocolHeader = false; + data += pi.encodedSize(); + } else { + return false; } - return size; + } + return true; +} + +void Connection::processInitialFrames(const char*& ptr, size_t size) { + // Process the initial negotiation locally and store it so + // it can be replayed on other brokers in announce() + Buffer buf(const_cast<char*>(ptr), size); + framing::AMQFrame frame; + while (!connection->isOpen() && frame.decode(buf)) + received(frame); + initialFrames.append(ptr, buf.getPosition()); + ptr += buf.getPosition(); + if (connection->isOpen()) { // initial negotiation complete + cluster.getMulticast().mcastControl( + ClusterConnectionAnnounceBody( + ProtocolVersion(), + connectionCtor.mgmtId, + connectionCtor.external.ssf, + connectionCtor.external.authid, + connectionCtor.external.nodict, + connection->getUserId(), + initialFrames), + getId()); + initialFrames.clear(); } } @@ -574,21 +578,14 @@ void Connection::queue(const std::string& encoded) { } void Connection::sessionError(uint16_t , const std::string& msg) { - // If we are negotiating the connection when it fails just close the connectoin. - // If it fails after that then we have to flag the error to the cluster. - if (inConnectionNegotiation) - cluster.getMulticast().mcastControl(ClusterConnectionDeliverCloseBody(), self); - else + // Ignore errors before isOpen(), we're not multicasting yet. + if (connection->isOpen()) cluster.flagError(*this, ERROR_TYPE_SESSION, msg); - } void Connection::connectionError(const std::string& msg) { - // If we are negotiating the connection when it fails just close the connectoin. - // If it fails after that then we have to flag the error to the cluster. - if (inConnectionNegotiation) - cluster.getMulticast().mcastControl(ClusterConnectionDeliverCloseBody(), self); - else + // Ignore errors before isOpen(), we're not multicasting yet. + if (connection->isOpen()) cluster.flagError(*this, ERROR_TYPE_CONNECTION, msg); } @@ -630,30 +627,5 @@ void Connection::managementAgents(const std::string& data) { QPID_LOG(debug, cluster << " updated management agents"); } - -void Connection::mcastUserId ( std::string & id ) { - // Only the directly connected broker will mcast the secure user id, and only - // for client connections (not update connections) - if (isLocalClient()) - cluster.getMulticast().mcastControl( - ClusterConnectionSecureUserIdBody(ProtocolVersion(), string(id)), getId() ); - { - // This call signals the end of the connection negotiation phase. - sys::Mutex::ScopedLock l(connectionNegotiationMonitor); - inConnectionNegotiation = false; - mcastSentButNotReceived = false; - connectionNegotiationMonitor.notify(); - } -} - -// All connections, shadow or not, get this call. -void Connection::secureUserId(const std::string& id) { - // Only set the user ID on shadow connections, and only if id is not the empty string. - if ( isShadow() && !id.empty() ) - connection->setUserId ( id ); -} - - - }} // Namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h index 4f69bf7cf4..70c4d0e2a3 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.h +++ b/qpid/cpp/src/qpid/cluster/Connection.h @@ -164,8 +164,9 @@ class Connection : void exchange(const std::string& encoded); void giveReadCredit(int credit); - void announce(const std::string& mgmtId, uint32_t ssf, const std::string& authid, bool nodict); - void secureUserId(const std::string&); + void announce(const std::string& mgmtId, uint32_t ssf, const std::string& authid, + bool nodict, const std::string& username, + const std::string& initFrames); void abort(); void deliverClose(); @@ -175,16 +176,8 @@ class Connection : void managementSchema(const std::string& data); void managementAgents(const std::string& data); void managementSetupState(uint64_t objectNum, uint16_t bootSequence); - - //uint32_t getSsf() const { return connectionCtor.external.ssf; } - void setSecureConnection ( broker::SecureConnection * sc ); - // This is a callback, registered with the broker connection. - // It gives me the user ID, if one is negotiated through Sasl. - void mcastUserId ( std::string & ); - - private: struct NullFrameHandler : public framing::FrameHandler { void handle(framing::AMQFrame&) {} @@ -228,6 +221,8 @@ class Connection : bool checkUnsupported(const framing::AMQBody& body); void deliverDoOutput(uint32_t limit); + bool checkProtocolHeader(const char*& data, size_t size); + void processInitialFrames(const char*& data, size_t size); boost::shared_ptr<broker::Queue> findQueue(const std::string& qname); broker::SessionState& sessionState(); broker::SemanticState& semanticState(); @@ -247,13 +242,10 @@ class Connection : McastFrameHandler mcastFrameHandler; UpdateReceiver& updateIn; qpid::broker::SecureConnection* secureConnection; + std::string initialFrames; static qpid::sys::AtomicValue<uint64_t> catchUpId; - mutable sys::Monitor connectionNegotiationMonitor; - bool mcastSentButNotReceived; - bool inConnectionNegotiation; - friend std::ostream& operator<<(std::ostream&, const Connection&); }; diff --git a/qpid/cpp/src/qpid/cluster/Multicaster.cpp b/qpid/cpp/src/qpid/cluster/Multicaster.cpp index d57ff76941..8916de9628 100644 --- a/qpid/cpp/src/qpid/cluster/Multicaster.cpp +++ b/qpid/cpp/src/qpid/cluster/Multicaster.cpp @@ -61,7 +61,6 @@ void Multicaster::mcast(const Event& e) { QPID_LOG(trace, "MCAST " << e); if (bypass) { // direct, don't queue iovec iov = e.toIovec(); - // FIXME aconway 2010-03-10: should do limited retry. while (!cpg.mcast(&iov, 1)) ; } diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp index d5f2c457e5..0565ecc34c 100644 --- a/qpid/cpp/src/tests/cluster_test.cpp +++ b/qpid/cpp/src/tests/cluster_test.cpp @@ -510,7 +510,7 @@ QPID_AUTO_TEST_CASE(testUpdateMessageBuilder) { Client c1(cluster[1], "c1"); BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "abcd"); - BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection).size()); + BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size()); } QPID_AUTO_TEST_CASE(testConnectionKnownHosts) { @@ -518,13 +518,13 @@ QPID_AUTO_TEST_CASE(testConnectionKnownHosts) { prepareArgs(args, durableFlag); ClusterFixture cluster(1, args, -1); Client c0(cluster[0], "c0"); - set<int> kb0 = knownBrokerPorts(c0.connection); + set<int> kb0 = knownBrokerPorts(c0.connection, 1); BOOST_CHECK_EQUAL(kb0.size(), 1u); BOOST_CHECK_EQUAL(kb0, makeSet(cluster)); cluster.add(); Client c1(cluster[1], "c1"); - set<int> kb1 = knownBrokerPorts(c1.connection); + set<int> kb1 = knownBrokerPorts(c1.connection, 2); kb0 = knownBrokerPorts(c0.connection, 2); BOOST_CHECK_EQUAL(kb1.size(), 2u); BOOST_CHECK_EQUAL(kb1, makeSet(cluster)); @@ -532,7 +532,7 @@ QPID_AUTO_TEST_CASE(testConnectionKnownHosts) { cluster.add(); Client c2(cluster[2], "c2"); - set<int> kb2 = knownBrokerPorts(c2.connection); + set<int> kb2 = knownBrokerPorts(c2.connection, 3); kb1 = knownBrokerPorts(c1.connection, 3); kb0 = knownBrokerPorts(c0.connection, 3); BOOST_CHECK_EQUAL(kb2.size(), 3u); diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml index 29157dc148..30cd159dd3 100644 --- a/qpid/cpp/xml/cluster.xml +++ b/qpid/cpp/xml/cluster.xml @@ -127,6 +127,10 @@ <field name="authid" type="str16"/> <!-- exclude certain sasl mechs, used with ssl and sasl-external --> <field name="nodict" type="bit"/> + <!-- User name as negotiated by SASL --> + <field name="username" type="str32"/> + <!-- Frames forming the initial connection negotiation. --> + <field name="initial-frames" type="str32"/> </control> <!-- Marks the cluster-wide point when a connection is considered closed. --> @@ -263,11 +267,5 @@ <control name="management-agents" code="0x37"> <field name="data" type="vbin32"/> </control> - - <!-- Announce the user ID on a secure connection --> - <control name="secureUserId" code="0x38"> - <field name="secure-user-id" type="str16"/> - </control> - </class> </amqp> |