diff options
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/cluster.cmake | 2 | ||||
-rw-r--r-- | cpp/src/cluster.mk | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Connection.cpp | 7 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Connection.h | 7 | ||||
-rw-r--r-- | cpp/src/qpid/broker/ConnectionHandler.cpp | 18 | ||||
-rw-r--r-- | cpp/src/qpid/broker/ConnectionHandler.h | 20 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SaslAuthenticator.cpp | 24 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SaslAuthenticator.h | 19 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ClusterPlugin.cpp | 11 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 77 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.h | 16 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ConnectionCodec.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/SecureConnectionFactory.cpp | 73 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/SecureConnectionFactory.h | 58 | ||||
-rw-r--r-- | cpp/src/tests/ForkedBroker.cpp | 1 | ||||
-rw-r--r-- | cpp/src/tests/Makefile.am | 8 | ||||
-rw-r--r-- | cpp/src/tests/cluster.cmake | 4 | ||||
-rw-r--r-- | cpp/src/tests/cluster_authentication_soak.cpp | 244 | ||||
-rw-r--r-- | cpp/src/tests/cluster_test.cpp | 1 | ||||
-rw-r--r-- | cpp/xml/cluster.xml | 6 |
20 files changed, 568 insertions, 32 deletions
diff --git a/cpp/src/cluster.cmake b/cpp/src/cluster.cmake index 8f886e7f3f..d18fa479bb 100644 --- a/cpp/src/cluster.cmake +++ b/cpp/src/cluster.cmake @@ -131,6 +131,8 @@ if (BUILD_CLUSTER) qpid/cluster/MemberSet.h qpid/cluster/MemberSet.cpp qpid/cluster/types.h + qpid/cluster/SecureConnectionFactory.h + qpid/cluster/SecureConnectionFactory.cpp qpid/cluster/StoreStatus.h qpid/cluster/StoreStatus.cpp ) diff --git a/cpp/src/cluster.mk b/cpp/src/cluster.mk index 8e95747c4d..2a648e968c 100644 --- a/cpp/src/cluster.mk +++ b/cpp/src/cluster.mk @@ -90,6 +90,8 @@ cluster_la_SOURCES = \ qpid/cluster/MemberSet.h \ qpid/cluster/MemberSet.cpp \ qpid/cluster/types.h \ + qpid/cluster/SecureConnectionFactory.h \ + qpid/cluster/SecureConnectionFactory.cpp \ qpid/cluster/StoreStatus.h \ qpid/cluster/StoreStatus.cpp diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index 2bb68b9f2d..51615e5b5f 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -39,6 +39,8 @@ #include <iostream> #include <assert.h> + + using namespace qpid::sys; using namespace qpid::framing; using qpid::ptr_map_ptr; @@ -77,7 +79,7 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std const qpid::sys::SecuritySettings& external, bool isLink_, uint64_t objectId, bool shadow_) : ConnectionState(out_, broker_), securitySettings(external), - adapter(*this, isLink_), + adapter(*this, isLink_, shadow_), isLink(isLink_), mgmtClosing(false), mgmtId(mgmtId_), @@ -384,4 +386,7 @@ void Connection::restartTimeout() timeoutTimer->touch(); } + + + }} diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h index 30a763411f..0639bcbb42 100644 --- a/cpp/src/qpid/broker/Connection.h +++ b/cpp/src/qpid/broker/Connection.h @@ -63,6 +63,9 @@ class LinkRegistry; class SecureConnection; struct ConnectionTimeoutTask; +typedef boost::function<void ( std::string& )> userIdCallback; + + class Connection : public sys::ConnectionInputHandler, public ConnectionState, public RefCounted @@ -143,6 +146,10 @@ class Connection : public sys::ConnectionInputHandler, return securitySettings; } + void setUserIdCallback ( UserIdCallback fn ) { + adapter.setUserIdCallback ( fn ); + } + private: typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap; typedef std::vector<Queue::shared_ptr>::iterator queue_iterator; diff --git a/cpp/src/qpid/broker/ConnectionHandler.cpp b/cpp/src/qpid/broker/ConnectionHandler.cpp index 50a5aff2c9..b2d4210473 100644 --- a/cpp/src/qpid/broker/ConnectionHandler.cpp +++ b/cpp/src/qpid/broker/ConnectionHandler.cpp @@ -83,11 +83,11 @@ void ConnectionHandler::setSecureConnection(SecureConnection* secured) handler->secured = secured; } -ConnectionHandler::ConnectionHandler(Connection& connection, bool isClient) : handler(new Handler(connection, isClient)) {} +ConnectionHandler::ConnectionHandler(Connection& connection, bool isClient, bool isShadow) : handler(new Handler(connection, isClient, isShadow)) {} -ConnectionHandler::Handler::Handler(Connection& c, bool isClient) : +ConnectionHandler::Handler::Handler(Connection& c, bool isClient, bool isShadow) : proxy(c.getOutput()), - connection(c), serverMode(!isClient), acl(0), secured(0) + connection(c), serverMode(!isClient), acl(0), secured(0), userIdCallback(0) { if (serverMode) { @@ -98,7 +98,7 @@ ConnectionHandler::Handler::Handler(Connection& c, bool isClient) : properties.setString(QPID_FED_TAG, connection.getBroker().getFederationTag()); - authenticator = SaslAuthenticator::createAuthenticator(c); + authenticator = SaslAuthenticator::createAuthenticator(c, isShadow); authenticator->getMechanisms(mechanisms); Array locales(0x95); @@ -181,6 +181,14 @@ void ConnectionHandler::Handler::tuneOk(uint16_t /*channelmax*/, connection.setHeartbeatInterval(heartbeat); } +void ConnectionHandler::Handler::callUserIdCallbacks ( ) { + string s; + if ( false == authenticator->getUsername(s) ) + s = "none"; + if ( userIdCallback ) + userIdCallback ( s ); +} + void ConnectionHandler::Handler::open(const string& /*virtualHost*/, const framing::Array& /*capabilities*/, bool /*insist*/) { @@ -195,6 +203,8 @@ void ConnectionHandler::Handler::open(const string& /*virtualHost*/, std::auto_ptr<SecurityLayer> sl = authenticator->getSecurityLayer(connection.getFrameMax()); if (sl.get()) secured->activateSecurityLayer(sl); } + + callUserIdCallbacks ( ); } diff --git a/cpp/src/qpid/broker/ConnectionHandler.h b/cpp/src/qpid/broker/ConnectionHandler.h index d74f65da36..0372942188 100644 --- a/cpp/src/qpid/broker/ConnectionHandler.h +++ b/cpp/src/qpid/broker/ConnectionHandler.h @@ -40,6 +40,9 @@ namespace broker { class Connection; class SecureConnection; +typedef boost::function<void ( std::string& )> UserIdCallback; + + class ConnectionHandler : public framing::FrameHandler { struct Handler : public framing::AMQP_AllOperations::ConnectionHandler @@ -51,7 +54,7 @@ class ConnectionHandler : public framing::FrameHandler AclModule* acl; SecureConnection* secured; - Handler(Connection& connection, bool isClient); + Handler(Connection& connection, bool isClient, bool isShadow=false); ~Handler(); void startOk(const qpid::framing::FieldTable& clientProperties, const std::string& mechanism, const std::string& response, @@ -64,6 +67,14 @@ class ConnectionHandler : public framing::FrameHandler void close(uint16_t replyCode, const std::string& replyText); void closeOk(); + UserIdCallback userIdCallback; + void setUserIdCallback ( UserIdCallback fn ) { + userIdCallback = fn; + }; + + + void callUserIdCallbacks ( ); + void start(const qpid::framing::FieldTable& serverProperties, const framing::Array& mechanisms, @@ -81,12 +92,17 @@ class ConnectionHandler : public framing::FrameHandler void redirect(const std::string& host, const framing::Array& knownHosts); }; std::auto_ptr<Handler> handler; + + public: - ConnectionHandler(Connection& connection, bool isClient); + ConnectionHandler(Connection& connection, bool isClient, bool isShadow=false ); void close(framing::connection::CloseCode code, const std::string& text); void heartbeat(); void handle(framing::AMQFrame& frame); void setSecureConnection(SecureConnection* secured); + void setUserIdCallback ( UserIdCallback fn ) { + handler->setUserIdCallback ( fn ); + } }; diff --git a/cpp/src/qpid/broker/SaslAuthenticator.cpp b/cpp/src/qpid/broker/SaslAuthenticator.cpp index 0f72f9643d..c55f3edb38 100644 --- a/cpp/src/qpid/broker/SaslAuthenticator.cpp +++ b/cpp/src/qpid/broker/SaslAuthenticator.cpp @@ -41,10 +41,12 @@ using qpid::sys::SecuritySettings; using boost::format; using boost::str; + namespace qpid { namespace broker { + class NullAuthenticator : public SaslAuthenticator { Connection& connection; @@ -62,6 +64,8 @@ public: #if HAVE_SASL + + class CyrusAuthenticator : public SaslAuthenticator { sasl_conn_t *sasl_conn; @@ -84,8 +88,7 @@ public: std::auto_ptr<SecurityLayer> getSecurityLayer(uint16_t maxFrameSize); }; -bool SaslAuthenticator::available(void) -{ +bool SaslAuthenticator::available(void) { return true; } @@ -109,8 +112,7 @@ void SaslAuthenticator::fini(void) typedef NullAuthenticator CyrusAuthenticator; -bool SaslAuthenticator::available(void) -{ +bool SaslAuthenticator::available(void) { return false; } @@ -126,18 +128,20 @@ void SaslAuthenticator::fini(void) #endif -std::auto_ptr<SaslAuthenticator> SaslAuthenticator::createAuthenticator(Connection& c) +std::auto_ptr<SaslAuthenticator> SaslAuthenticator::createAuthenticator(Connection& c, bool isShadow ) { - static bool needWarning = true; if (c.getBroker().getOptions().auth) { - return std::auto_ptr<SaslAuthenticator>(new CyrusAuthenticator(c, c.getBroker().getOptions().requireEncrypted)); + if ( isShadow ) + return std::auto_ptr<SaslAuthenticator>(new NullAuthenticator(c, c.getBroker().getOptions().requireEncrypted)); + else + return std::auto_ptr<SaslAuthenticator>(new CyrusAuthenticator(c, c.getBroker().getOptions().requireEncrypted)); } else { QPID_LOG(debug, "SASL: No Authentication Performed"); - needWarning = false; return std::auto_ptr<SaslAuthenticator>(new NullAuthenticator(c, c.getBroker().getOptions().requireEncrypted)); } } + NullAuthenticator::NullAuthenticator(Connection& c, bool e) : connection(c), client(c.getOutput()), realm(c.getBroker().getOptions().realm), encrypt(e) {} NullAuthenticator::~NullAuthenticator() {} @@ -200,7 +204,6 @@ std::auto_ptr<SecurityLayer> NullAuthenticator::getSecurityLayer(uint16_t) #if HAVE_SASL - CyrusAuthenticator::CyrusAuthenticator(Connection& c, bool _encrypt) : sasl_conn(0), connection(c), client(c.getOutput()), encrypt(_encrypt) { @@ -386,7 +389,7 @@ void CyrusAuthenticator::processAuthenticationStep(int code, const char *challen // authentication failure, when one is available throw ConnectionForcedException("Authenticated username unavailable"); } - QPID_LOG(info, "SASL: Authentication succeeded for: " << uid); + QPID_LOG(info, connection.getMgmtId() << " SASL: Authentication succeeded for: " << uid); connection.setUserId(uid); @@ -432,7 +435,6 @@ std::auto_ptr<SecurityLayer> CyrusAuthenticator::getSecurityLayer(uint16_t maxFr uint ssf = *(reinterpret_cast<const unsigned*>(value)); std::auto_ptr<SecurityLayer> securityLayer; if (ssf) { - QPID_LOG(info, "Installing security layer, SSF: "<< ssf); securityLayer = std::auto_ptr<SecurityLayer>(new CyrusSecurityLayer(sasl_conn, maxFrameSize)); } return securityLayer; diff --git a/cpp/src/qpid/broker/SaslAuthenticator.h b/cpp/src/qpid/broker/SaslAuthenticator.h index 8ddaeb19a4..f4ad24b3bd 100644 --- a/cpp/src/qpid/broker/SaslAuthenticator.h +++ b/cpp/src/qpid/broker/SaslAuthenticator.h @@ -21,17 +21,27 @@ #ifndef _SaslAuthenticator_ #define _SaslAuthenticator_ + #include "qpid/framing/amqp_types.h" #include "qpid/framing/AMQP_ClientProxy.h" #include "qpid/Exception.h" #include "qpid/sys/SecurityLayer.h" #include <memory> +#include <vector> +#include <boost/bind.hpp> +#include <boost/function.hpp> namespace qpid { namespace broker { class Connection; +// Calls your fn with the user ID string, just +// after the security negotiation is complete. +// Add your callback to the list with addUserIdCallback(). +typedef boost::function<void ( std::string& )> UserIdCallback; + + class SaslAuthenticator { public: @@ -40,16 +50,23 @@ public: virtual void start(const std::string& mechanism, const std::string& response) = 0; virtual void step(const std::string& response) = 0; virtual void getUid(std::string&) {} + virtual bool getUsername(std::string&) { return false; }; virtual void getError(std::string&) {} virtual std::auto_ptr<qpid::sys::SecurityLayer> getSecurityLayer(uint16_t maxFrameSize) = 0; + virtual void setUserIdCallback ( UserIdCallback ) { } static bool available(void); // Initialize the SASL mechanism; throw if it fails. static void init(const std::string& saslName); static void fini(void); - static std::auto_ptr<SaslAuthenticator> createAuthenticator(Connection& connection); + static std::auto_ptr<SaslAuthenticator> createAuthenticator(Connection& connection, bool isShadow); + + virtual void callUserIdCallbacks() { } + +private: + UserIdCallback userIdCallback; }; }} diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp index 955487ee03..75c8d328cf 100644 --- a/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -21,6 +21,8 @@ #include "qpid/cluster/ConnectionCodec.h" #include "qpid/cluster/ClusterSettings.h" +#include "qpid/cluster/SecureConnectionFactory.h" + #include "qpid/cluster/Cluster.h" #include "qpid/cluster/ConnectionCodec.h" #include "qpid/cluster/UpdateClient.h" @@ -75,6 +77,8 @@ struct ClusterOptions : public Options { } }; +typedef boost::shared_ptr<sys::ConnectionCodec::Factory> CodecFactoryPtr; + struct ClusterPlugin : public Plugin { ClusterSettings settings; @@ -94,9 +98,10 @@ struct ClusterPlugin : public Plugin { Broker* broker = dynamic_cast<Broker*>(&target); if (!broker) return; cluster = new Cluster(settings, *broker); - broker->setConnectionFactory( - boost::shared_ptr<sys::ConnectionCodec::Factory>( - new ConnectionCodec::Factory(broker->getConnectionFactory(), *cluster))); + CodecFactoryPtr simpleFactory(new broker::ConnectionFactory(*broker)); + CodecFactoryPtr clusterFactory(new ConnectionCodec::Factory(simpleFactory, *cluster)); + CodecFactoryPtr secureFactory(new SecureConnectionFactory(clusterFactory)); + broker->setConnectionFactory(secureFactory); } void disallowManagementMethods(ManagementAgent* agent) { diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 30828d7bd9..118be27bb5 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -39,6 +39,7 @@ #include "qpid/framing/DeliveryProperties.h" #include "qpid/framing/ClusterConnectionDeliverCloseBody.h" #include "qpid/framing/ClusterConnectionAnnounceBody.h" +#include "qpid/framing/ClusterConnectionSecureUserIdBody.h" #include "qpid/framing/ConnectionCloseBody.h" #include "qpid/framing/ConnectionCloseOkBody.h" #include "qpid/log/Statement.h" @@ -46,6 +47,9 @@ #include <boost/current_function.hpp> + +typedef boost::function<void ( std::string& )> UserIdCallback; + // TODO aconway 2008-11-03: // // Refactor code for receiving an update into a separate UpdateConnection @@ -59,6 +63,7 @@ namespace cluster { using namespace framing; using namespace framing::cluster; + qpid::sys::AtomicValue<uint64_t> Connection::catchUpId(0x5000000000000000LL); Connection::NullFrameHandler Connection::nullFrameHandler; @@ -82,8 +87,11 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, connectionCtor(&output, cluster.getBroker(), mgmtId, external, false, 0, true), expectProtocolHeader(false), mcastFrameHandler(cluster.getMulticast(), self), - updateIn(c.getUpdateReceiver()) -{} + updateIn(c.getUpdateReceiver()), + secureConnection(0), + mcastSentButNotReceived(false), + inConnectionNegotiation(true) +{ } // Local connection Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, @@ -98,7 +106,9 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, isCatchUp), // isCatchUp => shadow expectProtocolHeader(isLink), mcastFrameHandler(cluster.getMulticast(), self), - updateIn(c.getUpdateReceiver()) + updateIn(c.getUpdateReceiver()), + secureConnection(0), + mcastSentButNotReceived(false) { cluster.addLocalConnection(this); if (isLocalClient()) { @@ -120,13 +130,19 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, updateIn.nextShadowMgmtId.clear(); init(); } + +} + +void Connection::setSecureConnection(broker::SecureConnection* sc) { + secureConnection = sc; } void Connection::init() { connection = connectionCtor.construct(); QPID_LOG(debug, cluster << " initialized connection: " << *this << " ssf=" << connection->getExternalSecuritySettings().ssf); - if (isLocalClient()) { + if (isLocalClient()) { + if (secureConnection) connection->setSecureConnection(secureConnection); // Actively send cluster-order frames from local node connection->setClusterOrderOutput(mcastFrameHandler); } @@ -138,9 +154,19 @@ void Connection::init() { } if (!isCatchUp()) connection->setErrorListener(this); + UserIdCallback fn = boost::bind ( &Connection::mcastUserId, this, _1 ); + connection->setUserIdCallback ( fn ); } void Connection::giveReadCredit(int credit) { + { + sys::Mutex::ScopedLock l(connectionNegotiationMonitor); + if (inConnectionNegotiation) { + mcastSentButNotReceived = false; + connectionNegotiationMonitor.notify(); + } + } + if (cluster.getSettings().readMax && credit) output.giveReadCredit(credit); } @@ -278,8 +304,9 @@ void Connection::abort() { cluster.erase(self); } -// ConnectoinCodec::decode receives read buffers from directly-connected clients. +// ConnectionCodec::decode receives read buffers from directly-connected clients. size_t Connection::decode(const char* buffer, size_t size) { + if (catchUp) { // Handle catch-up locally. Buffer buf(const_cast<char*>(buffer), size); while (localDecoder.decode(buf)) @@ -289,6 +316,15 @@ size_t Connection::decode(const char* buffer, size_t size) { assert(isLocal()); const char* remainingData = buffer; size_t remainingSize = size; + + { // scope for scoped lock. + sys::Mutex::ScopedLock l(connectionNegotiationMonitor); + if ( inConnectionNegotiation ) { + assert(!mcastSentButNotReceived); + mcastSentButNotReceived = true; + } + } + if (expectProtocolHeader) { //If this is an outgoing link, we will receive a protocol //header which needs to be decoded first @@ -307,6 +343,13 @@ size_t Connection::decode(const char* buffer, size_t size) { } } cluster.getMulticast().mcastBuffer(remainingData, remainingSize, self); + + { // scope for scoped lock. + sys::Mutex::ScopedLock l(connectionNegotiationMonitor); + if ( inConnectionNegotiation ) + while (inConnectionNegotiation && mcastSentButNotReceived) + connectionNegotiationMonitor.wait(); + } } return size; } @@ -570,5 +613,29 @@ void Connection::managementAgents(const std::string& data) { QPID_LOG(debug, cluster << " updated management agents"); } + +// Only the direct, non-shadow gets this call. +void Connection::mcastUserId ( std::string & id ) { + cluster.getMulticast().mcastControl( ClusterConnectionSecureUserIdBody(ProtocolVersion(), string(id)), getId() ); + + { + sys::Mutex::ScopedLock l(connectionNegotiationMonitor); + inConnectionNegotiation = false; + connectionNegotiationMonitor.notify(); + } +} + +// All connections, shadow or not, get this call. +void Connection::secureUserId(const std::string& id) { + if ( isShadow() ) { + // If the user ID is "none", it is not legitimate. Take no action. + if ( strcmp ( id.c_str(), "none" ) ) { + connection->setUserId ( id ); + } + } +} + + + }} // Namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index 000d00f7d9..4f69bf7cf4 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -29,6 +29,7 @@ #include "UpdateReceiver.h" #include "qpid/broker/Connection.h" +#include "qpid/broker/SecureConnection.h" #include "qpid/broker/SemanticState.h" #include "qpid/amqp_0_10/Connection.h" #include "qpid/sys/AtomicValue.h" @@ -64,7 +65,7 @@ class Connection : { public: - + /** Local connection. */ Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& mgmtId, MemberId, bool catchUp, bool isLink, const qpid::sys::SecuritySettings& external); @@ -164,6 +165,7 @@ class Connection : void giveReadCredit(int credit); void announce(const std::string& mgmtId, uint32_t ssf, const std::string& authid, bool nodict); + void secureUserId(const std::string&); void abort(); void deliverClose(); @@ -176,6 +178,13 @@ class Connection : //uint32_t getSsf() const { return connectionCtor.external.ssf; } + void setSecureConnection ( broker::SecureConnection * sc ); + + // This is a callback, registered with the broker connection. + // It gives me the user ID, if one is negotiated through Sasl. + void mcastUserId ( std::string & ); + + private: struct NullFrameHandler : public framing::FrameHandler { void handle(framing::AMQFrame&) {} @@ -237,8 +246,13 @@ class Connection : bool expectProtocolHeader; McastFrameHandler mcastFrameHandler; UpdateReceiver& updateIn; + qpid::broker::SecureConnection* secureConnection; static qpid::sys::AtomicValue<uint64_t> catchUpId; + + mutable sys::Monitor connectionNegotiationMonitor; + bool mcastSentButNotReceived; + bool inConnectionNegotiation; friend std::ostream& operator<<(std::ostream&, const Connection&); }; diff --git a/cpp/src/qpid/cluster/ConnectionCodec.h b/cpp/src/qpid/cluster/ConnectionCodec.h index 4b919ed351..17a08904d9 100644 --- a/cpp/src/qpid/cluster/ConnectionCodec.h +++ b/cpp/src/qpid/cluster/ConnectionCodec.h @@ -70,7 +70,7 @@ class ConnectionCodec : public sys::ConnectionCodec { void closed(); bool isClosed() const; framing::ProtocolVersion getVersion() const; - + void setSecureConnection(broker::SecureConnection* sc) { interceptor->setSecureConnection(sc); } private: amqp_0_10::Connection codec; diff --git a/cpp/src/qpid/cluster/SecureConnectionFactory.cpp b/cpp/src/qpid/cluster/SecureConnectionFactory.cpp new file mode 100644 index 0000000000..6ddef66226 --- /dev/null +++ b/cpp/src/qpid/cluster/SecureConnectionFactory.cpp @@ -0,0 +1,73 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/cluster/SecureConnectionFactory.h" +#include "qpid/framing/ProtocolVersion.h" +#include "qpid/cluster/ConnectionCodec.h" +#include "qpid/broker/SecureConnection.h" +#include "qpid/sys/SecuritySettings.h" +#include "qpid/log/Statement.h" +#include <memory> + + +namespace qpid { +namespace cluster { + +using framing::ProtocolVersion; +using qpid::sys::SecuritySettings; +using qpid::broker::SecureConnection; + +typedef std::auto_ptr<qpid::broker::SecureConnection> SecureConnectionPtr; +typedef std::auto_ptr<qpid::sys::ConnectionCodec> CodecPtr; + +SecureConnectionFactory::SecureConnectionFactory(CodecFactoryPtr f) : codecFactory(f) { +} + +sys::ConnectionCodec* +SecureConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id, + const SecuritySettings& external) { + CodecPtr codec(codecFactory->create(v, out, id, external)); + ConnectionCodec* clusterCodec = dynamic_cast<qpid::cluster::ConnectionCodec*>(codec.get()); + if (clusterCodec) { + SecureConnectionPtr sc(new SecureConnection()); + clusterCodec->setSecureConnection(sc.get()); + sc->setCodec(codec); + return sc.release(); + } + return 0; +} + +sys::ConnectionCodec* +SecureConnectionFactory::create(sys::OutputControl& out, const std::string& id, + const SecuritySettings& external) { + // used to create connections from one broker to another + CodecPtr codec(codecFactory->create(out, id, external)); + ConnectionCodec* clusterCodec = dynamic_cast<qpid::cluster::ConnectionCodec*>(codec.get()); + if (clusterCodec) { + SecureConnectionPtr sc(new SecureConnection()); + clusterCodec->setSecureConnection(sc.get()); + sc->setCodec(codec); + return sc.release(); + } + return 0; +} + + +}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/SecureConnectionFactory.h b/cpp/src/qpid/cluster/SecureConnectionFactory.h new file mode 100644 index 0000000000..24d1fcfee5 --- /dev/null +++ b/cpp/src/qpid/cluster/SecureConnectionFactory.h @@ -0,0 +1,58 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef QPID_CLUSTER_SecureconnectionFactory +#define QPID_CLUSTER_SecureconnectionFactory + +#include "qpid/sys/ConnectionCodec.h" +#include <boost/shared_ptr.hpp> + +namespace qpid { + +namespace broker { + class Broker; +} + +namespace cluster { + +class SecureConnectionFactory : public qpid::sys::ConnectionCodec::Factory +{ + public: + typedef boost::shared_ptr<qpid::sys::ConnectionCodec::Factory> CodecFactoryPtr; + SecureConnectionFactory(CodecFactoryPtr f); + + qpid::sys::ConnectionCodec* create( + framing::ProtocolVersion, qpid::sys::OutputControl&, const std::string& id, + const qpid::sys::SecuritySettings& + ); + + /** Return "preferred" codec for outbound connections. */ + qpid::sys::ConnectionCodec* create( + qpid::sys::OutputControl&, const std::string& id, const qpid::sys::SecuritySettings& + ); + + private: + CodecFactoryPtr codecFactory; +}; + +}} // namespace qpid::cluster + + +#endif // QPID_CLUSTER_SecureconnectionFactory diff --git a/cpp/src/tests/ForkedBroker.cpp b/cpp/src/tests/ForkedBroker.cpp index 7c81d303fc..529774df98 100644 --- a/cpp/src/tests/ForkedBroker.cpp +++ b/cpp/src/tests/ForkedBroker.cpp @@ -139,6 +139,7 @@ void ForkedBroker::init(const Args& userArgs) { std::transform(args.begin(), args.end(), argv.begin(), boost::bind(&std::string::c_str, _1)); argv.push_back(0); QPID_LOG(debug, "ForkedBroker exec " << prog << ": " << args); + execv(prog, const_cast<char* const*>(&argv[0])); QPID_LOG(critical, "execv failed to start broker: prog=\"" << prog << "\"; args=\"" << args << "\"; errno=" << errno << " (" << std::strerror(errno) << ")"); ::exit(1); diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index 6133fc2e49..02b006665e 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -245,6 +245,11 @@ failover_soak_INCLUDES=$(PUBLIC_INCLUDES) failover_soak_SOURCES=failover_soak.cpp ForkedBroker.h ForkedBroker.cpp failover_soak_LDADD=$(lib_client) $(lib_broker) +check_PROGRAMS+=cluster_authentication_soak +cluster_authentication_soak_INCLUDES=$(PUBLIC_INCLUDES) +cluster_authentication_soak_SOURCES=cluster_authentication_soak.cpp ForkedBroker.h ForkedBroker.cpp +cluster_authentication_soak_LDADD=$(lib_client) $(lib_broker) + check_PROGRAMS+=declare_queues declare_queues_INCLUDES=$(PUBLIC_INCLUDES) declare_queues_SOURCES=declare_queues.cpp @@ -355,7 +360,7 @@ CLEANFILES+=valgrind.out *.log *.vglog* dummy_test qpidd.port $(unit_wrappers) # Not run under valgrind, too slow LONG_TESTS+=start_broker fanout_perftest shared_perftest multiq_perftest topic_perftest run_ring_queue_test stop_broker \ - run_failover_soak reliable_replication_test \ + run_failover_soak run_cluster_authentication_soak reliable_replication_test \ federated_cluster_test_with_node_failure EXTRA_DIST+= \ @@ -364,6 +369,7 @@ EXTRA_DIST+= \ multiq_perftest \ topic_perftest \ run_failover_soak \ + run_cluster_authentication_soak \ reliable_replication_test \ federated_cluster_test_with_node_failure diff --git a/cpp/src/tests/cluster.cmake b/cpp/src/tests/cluster.cmake index 9084bfb85b..5f7a811007 100644 --- a/cpp/src/tests/cluster.cmake +++ b/cpp/src/tests/cluster.cmake @@ -21,8 +21,8 @@ # Cluster tests cmake fragment, to be included in CMakeLists.txt # -add_executable (failover_soak failover_soak.cpp ForkedBroker.cpp ${platform_test_additions}) -target_link_libraries (failover_soak qpidclient) +add_executable (failover_soak failover_soak.cpp cluster_authentication_soak cluster_authentication_soak.cpp ForkedBroker.cpp ${platform_test_additions}) +target_link_libraries (failover_soak cluster_authentication_soak qpidclient) remember_location(failover_soak) set (cluster_test_SOURCES diff --git a/cpp/src/tests/cluster_authentication_soak.cpp b/cpp/src/tests/cluster_authentication_soak.cpp new file mode 100644 index 0000000000..985c3aa52a --- /dev/null +++ b/cpp/src/tests/cluster_authentication_soak.cpp @@ -0,0 +1,244 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> +#include <signal.h> +#include <fcntl.h> + +#include <sys/wait.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <sys/time.h> + +#include <string> +#include <iostream> +#include <sstream> +#include <vector> + +#include <boost/assign.hpp> + +#include "qpid/framing/Uuid.h" + +#include <ForkedBroker.h> +#include <qpid/client/Connection.h> + + + + + +using namespace std; +using boost::assign::list_of; +using namespace qpid::framing; +using namespace qpid::client; + + +namespace qpid { +namespace tests { + +vector<pid_t> brokerPids; + +typedef vector<ForkedBroker *> brokerVector; + + + + + +int newbiePort = 0; + + + + +void +startBroker ( brokerVector & brokers , + int brokerNumber ) { + stringstream portSS, prefix; + prefix << "soak-" << brokerNumber; + std::vector<std::string> argv; + + argv.push_back ("../qpidd"); + argv.push_back ("--no-module-dir"); + argv.push_back ("--load-module=../.libs/cluster.so"); + argv.push_back ("--cluster-name=micks_test_cluster"); + argv.push_back ("--cluster-username=guest"); + argv.push_back ("--cluster-password=guest"); + argv.push_back ("--cluster-mechanism=ANONYMOUS"); + argv.push_back ("TMP_DATA_DIR"); + argv.push_back ("--auth=yes"); + argv.push_back ("--mgmt-enable=yes"); + argv.push_back ("--log-prefix"); + argv.push_back (prefix.str()); + argv.push_back ("--log-to-file"); + argv.push_back (prefix.str()+".log"); + + ForkedBroker * newbie = new ForkedBroker (argv); + newbiePort = newbie->getPort(); + brokers.push_back ( newbie ); +} + + + + +bool +runPerftest ( ) { + stringstream portSs; + portSs << newbiePort; + + char const * path = "./perftest"; + + vector<char const *> argv; + argv.push_back ( "./perftest" ); + argv.push_back ( "-p" ); + argv.push_back ( portSs.str().c_str() ); + argv.push_back ( "--username" ); + argv.push_back ( "guest" ); + argv.push_back ( "--password" ); + argv.push_back ( "guest" ); + argv.push_back ( "--mechanism" ); + argv.push_back ( "DIGEST-MD5" ); + argv.push_back ( "--count" ); + argv.push_back ( "20000" ); + argv.push_back ( 0 ); + + pid_t pid = fork(); + + if ( ! pid ) { + int i=open("/dev/null",O_RDWR); + dup2 ( i, fileno(stdout) ); + dup2 ( i, fileno(stderr) ); + + execv ( path, const_cast<char * const *>(&argv[0]) ); + // The exec failed: we are still in parent process. + perror ( "error running perftest: " ); + return false; + } + else { + struct timeval startTime, + currentTime, + duration; + + gettimeofday ( & startTime, 0 ); + + while ( 1 ) { + sleep ( 5 ); + int status; + int returned_pid = waitpid ( pid, &status, WNOHANG ); + if ( returned_pid == pid ) { + int exit_status = WEXITSTATUS(status); + if ( exit_status ) { + cerr << "Perftest failed. exit_status was: " << exit_status; + return false; + } + else { + return true; // perftest succeeded. + } + } + else { // perftest has not yet completed. + gettimeofday ( & currentTime, 0 ); + timersub ( & currentTime, & startTime, & duration ); + if ( duration.tv_sec > 60 ) { + kill ( pid, 9 ); + cerr << "Perftest pid " << pid << " hanging: killed.\n"; + return false; + } + } + } + + } +} + + + +bool +allBrokersAreAlive ( brokerVector & brokers ) { + for ( unsigned int i = 0; i < brokers.size(); ++ i ) { + pid_t pid = brokers[i]->getPID(); + int status; + int value; + if ( (value = waitpid ( pid, &status, WNOHANG ) ) ) { + return false; + } + } + + return true; +} + + + + +void +killAllBrokers ( brokerVector & brokers ) { + for ( unsigned int i = 0; i < brokers.size(); ++ i ) + brokers[i]->kill ( 9 ); +} + + + + +}} // namespace qpid::tests + +using namespace qpid::tests; + + + +int +main ( int argc, char ** argv ) +{ + int n_iterations = argc > 0 ? atoi(argv[1]) : 1; + int n_brokers = 3; + brokerVector brokers; + + for ( int i = 0; i < n_brokers; ++ i ) { + startBroker ( brokers, i ); + } + + sleep ( 3 ); + + /* Run all perftest iterations, and only then check for brokers + * still being up. If you just want a quick check for the failure + * mode in which a single iteration would kill all brokers except + * the client-connected one, just run it with the iterations arg + * set to 1. + */ + for ( int iteration = 0; iteration < n_iterations; ++ iteration ) { + if ( ! runPerftest ( ) ) { + cerr << "Perftest " << iteration << " failed.\n"; + return 1; + } + if ( ! ( iteration % 10 ) ) { + cerr << "perftest " << iteration << " complete. -------------- \n"; + } + } + cerr << "\nperftest " << n_iterations << " iterations complete. -------------- \n\n"; + + if ( ! allBrokersAreAlive ( brokers ) ) { + cerr << "not all brokers are alive.\n"; + return 2; + } + + killAllBrokers ( brokers ); + return 0; +} + + + diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp index d07c0ecdb5..8c18e578df 100644 --- a/cpp/src/tests/cluster_test.cpp +++ b/cpp/src/tests/cluster_test.cpp @@ -54,6 +54,7 @@ #include <algorithm> #include <iterator> + using namespace std; using namespace qpid; using namespace qpid::cluster; diff --git a/cpp/xml/cluster.xml b/cpp/xml/cluster.xml index da0285f393..29157dc148 100644 --- a/cpp/xml/cluster.xml +++ b/cpp/xml/cluster.xml @@ -263,5 +263,11 @@ <control name="management-agents" code="0x37"> <field name="data" type="vbin32"/> </control> + + <!-- Announce the user ID on a secure connection --> + <control name="secureUserId" code="0x38"> + <field name="secure-user-id" type="str16"/> + </control> + </class> </amqp> |