summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/cluster.cmake2
-rw-r--r--cpp/src/cluster.mk2
-rw-r--r--cpp/src/qpid/broker/Connection.cpp7
-rw-r--r--cpp/src/qpid/broker/Connection.h7
-rw-r--r--cpp/src/qpid/broker/ConnectionHandler.cpp18
-rw-r--r--cpp/src/qpid/broker/ConnectionHandler.h20
-rw-r--r--cpp/src/qpid/broker/SaslAuthenticator.cpp24
-rw-r--r--cpp/src/qpid/broker/SaslAuthenticator.h19
-rw-r--r--cpp/src/qpid/cluster/ClusterPlugin.cpp11
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp77
-rw-r--r--cpp/src/qpid/cluster/Connection.h16
-rw-r--r--cpp/src/qpid/cluster/ConnectionCodec.h2
-rw-r--r--cpp/src/qpid/cluster/SecureConnectionFactory.cpp73
-rw-r--r--cpp/src/qpid/cluster/SecureConnectionFactory.h58
-rw-r--r--cpp/src/tests/ForkedBroker.cpp1
-rw-r--r--cpp/src/tests/Makefile.am8
-rw-r--r--cpp/src/tests/cluster.cmake4
-rw-r--r--cpp/src/tests/cluster_authentication_soak.cpp244
-rw-r--r--cpp/src/tests/cluster_test.cpp1
-rw-r--r--cpp/xml/cluster.xml6
20 files changed, 568 insertions, 32 deletions
diff --git a/cpp/src/cluster.cmake b/cpp/src/cluster.cmake
index 8f886e7f3f..d18fa479bb 100644
--- a/cpp/src/cluster.cmake
+++ b/cpp/src/cluster.cmake
@@ -131,6 +131,8 @@ if (BUILD_CLUSTER)
qpid/cluster/MemberSet.h
qpid/cluster/MemberSet.cpp
qpid/cluster/types.h
+ qpid/cluster/SecureConnectionFactory.h
+ qpid/cluster/SecureConnectionFactory.cpp
qpid/cluster/StoreStatus.h
qpid/cluster/StoreStatus.cpp
)
diff --git a/cpp/src/cluster.mk b/cpp/src/cluster.mk
index 8e95747c4d..2a648e968c 100644
--- a/cpp/src/cluster.mk
+++ b/cpp/src/cluster.mk
@@ -90,6 +90,8 @@ cluster_la_SOURCES = \
qpid/cluster/MemberSet.h \
qpid/cluster/MemberSet.cpp \
qpid/cluster/types.h \
+ qpid/cluster/SecureConnectionFactory.h \
+ qpid/cluster/SecureConnectionFactory.cpp \
qpid/cluster/StoreStatus.h \
qpid/cluster/StoreStatus.cpp
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index 2bb68b9f2d..51615e5b5f 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -39,6 +39,8 @@
#include <iostream>
#include <assert.h>
+
+
using namespace qpid::sys;
using namespace qpid::framing;
using qpid::ptr_map_ptr;
@@ -77,7 +79,7 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std
const qpid::sys::SecuritySettings& external, bool isLink_, uint64_t objectId, bool shadow_) :
ConnectionState(out_, broker_),
securitySettings(external),
- adapter(*this, isLink_),
+ adapter(*this, isLink_, shadow_),
isLink(isLink_),
mgmtClosing(false),
mgmtId(mgmtId_),
@@ -384,4 +386,7 @@ void Connection::restartTimeout()
timeoutTimer->touch();
}
+
+
+
}}
diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h
index 30a763411f..0639bcbb42 100644
--- a/cpp/src/qpid/broker/Connection.h
+++ b/cpp/src/qpid/broker/Connection.h
@@ -63,6 +63,9 @@ class LinkRegistry;
class SecureConnection;
struct ConnectionTimeoutTask;
+typedef boost::function<void ( std::string& )> userIdCallback;
+
+
class Connection : public sys::ConnectionInputHandler,
public ConnectionState,
public RefCounted
@@ -143,6 +146,10 @@ class Connection : public sys::ConnectionInputHandler,
return securitySettings;
}
+ void setUserIdCallback ( UserIdCallback fn ) {
+ adapter.setUserIdCallback ( fn );
+ }
+
private:
typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap;
typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
diff --git a/cpp/src/qpid/broker/ConnectionHandler.cpp b/cpp/src/qpid/broker/ConnectionHandler.cpp
index 50a5aff2c9..b2d4210473 100644
--- a/cpp/src/qpid/broker/ConnectionHandler.cpp
+++ b/cpp/src/qpid/broker/ConnectionHandler.cpp
@@ -83,11 +83,11 @@ void ConnectionHandler::setSecureConnection(SecureConnection* secured)
handler->secured = secured;
}
-ConnectionHandler::ConnectionHandler(Connection& connection, bool isClient) : handler(new Handler(connection, isClient)) {}
+ConnectionHandler::ConnectionHandler(Connection& connection, bool isClient, bool isShadow) : handler(new Handler(connection, isClient, isShadow)) {}
-ConnectionHandler::Handler::Handler(Connection& c, bool isClient) :
+ConnectionHandler::Handler::Handler(Connection& c, bool isClient, bool isShadow) :
proxy(c.getOutput()),
- connection(c), serverMode(!isClient), acl(0), secured(0)
+ connection(c), serverMode(!isClient), acl(0), secured(0), userIdCallback(0)
{
if (serverMode) {
@@ -98,7 +98,7 @@ ConnectionHandler::Handler::Handler(Connection& c, bool isClient) :
properties.setString(QPID_FED_TAG, connection.getBroker().getFederationTag());
- authenticator = SaslAuthenticator::createAuthenticator(c);
+ authenticator = SaslAuthenticator::createAuthenticator(c, isShadow);
authenticator->getMechanisms(mechanisms);
Array locales(0x95);
@@ -181,6 +181,14 @@ void ConnectionHandler::Handler::tuneOk(uint16_t /*channelmax*/,
connection.setHeartbeatInterval(heartbeat);
}
+void ConnectionHandler::Handler::callUserIdCallbacks ( ) {
+ string s;
+ if ( false == authenticator->getUsername(s) )
+ s = "none";
+ if ( userIdCallback )
+ userIdCallback ( s );
+}
+
void ConnectionHandler::Handler::open(const string& /*virtualHost*/,
const framing::Array& /*capabilities*/, bool /*insist*/)
{
@@ -195,6 +203,8 @@ void ConnectionHandler::Handler::open(const string& /*virtualHost*/,
std::auto_ptr<SecurityLayer> sl = authenticator->getSecurityLayer(connection.getFrameMax());
if (sl.get()) secured->activateSecurityLayer(sl);
}
+
+ callUserIdCallbacks ( );
}
diff --git a/cpp/src/qpid/broker/ConnectionHandler.h b/cpp/src/qpid/broker/ConnectionHandler.h
index d74f65da36..0372942188 100644
--- a/cpp/src/qpid/broker/ConnectionHandler.h
+++ b/cpp/src/qpid/broker/ConnectionHandler.h
@@ -40,6 +40,9 @@ namespace broker {
class Connection;
class SecureConnection;
+typedef boost::function<void ( std::string& )> UserIdCallback;
+
+
class ConnectionHandler : public framing::FrameHandler
{
struct Handler : public framing::AMQP_AllOperations::ConnectionHandler
@@ -51,7 +54,7 @@ class ConnectionHandler : public framing::FrameHandler
AclModule* acl;
SecureConnection* secured;
- Handler(Connection& connection, bool isClient);
+ Handler(Connection& connection, bool isClient, bool isShadow=false);
~Handler();
void startOk(const qpid::framing::FieldTable& clientProperties,
const std::string& mechanism, const std::string& response,
@@ -64,6 +67,14 @@ class ConnectionHandler : public framing::FrameHandler
void close(uint16_t replyCode, const std::string& replyText);
void closeOk();
+ UserIdCallback userIdCallback;
+ void setUserIdCallback ( UserIdCallback fn ) {
+ userIdCallback = fn;
+ };
+
+
+ void callUserIdCallbacks ( );
+
void start(const qpid::framing::FieldTable& serverProperties,
const framing::Array& mechanisms,
@@ -81,12 +92,17 @@ class ConnectionHandler : public framing::FrameHandler
void redirect(const std::string& host, const framing::Array& knownHosts);
};
std::auto_ptr<Handler> handler;
+
+
public:
- ConnectionHandler(Connection& connection, bool isClient);
+ ConnectionHandler(Connection& connection, bool isClient, bool isShadow=false );
void close(framing::connection::CloseCode code, const std::string& text);
void heartbeat();
void handle(framing::AMQFrame& frame);
void setSecureConnection(SecureConnection* secured);
+ void setUserIdCallback ( UserIdCallback fn ) {
+ handler->setUserIdCallback ( fn );
+ }
};
diff --git a/cpp/src/qpid/broker/SaslAuthenticator.cpp b/cpp/src/qpid/broker/SaslAuthenticator.cpp
index 0f72f9643d..c55f3edb38 100644
--- a/cpp/src/qpid/broker/SaslAuthenticator.cpp
+++ b/cpp/src/qpid/broker/SaslAuthenticator.cpp
@@ -41,10 +41,12 @@ using qpid::sys::SecuritySettings;
using boost::format;
using boost::str;
+
namespace qpid {
namespace broker {
+
class NullAuthenticator : public SaslAuthenticator
{
Connection& connection;
@@ -62,6 +64,8 @@ public:
#if HAVE_SASL
+
+
class CyrusAuthenticator : public SaslAuthenticator
{
sasl_conn_t *sasl_conn;
@@ -84,8 +88,7 @@ public:
std::auto_ptr<SecurityLayer> getSecurityLayer(uint16_t maxFrameSize);
};
-bool SaslAuthenticator::available(void)
-{
+bool SaslAuthenticator::available(void) {
return true;
}
@@ -109,8 +112,7 @@ void SaslAuthenticator::fini(void)
typedef NullAuthenticator CyrusAuthenticator;
-bool SaslAuthenticator::available(void)
-{
+bool SaslAuthenticator::available(void) {
return false;
}
@@ -126,18 +128,20 @@ void SaslAuthenticator::fini(void)
#endif
-std::auto_ptr<SaslAuthenticator> SaslAuthenticator::createAuthenticator(Connection& c)
+std::auto_ptr<SaslAuthenticator> SaslAuthenticator::createAuthenticator(Connection& c, bool isShadow )
{
- static bool needWarning = true;
if (c.getBroker().getOptions().auth) {
- return std::auto_ptr<SaslAuthenticator>(new CyrusAuthenticator(c, c.getBroker().getOptions().requireEncrypted));
+ if ( isShadow )
+ return std::auto_ptr<SaslAuthenticator>(new NullAuthenticator(c, c.getBroker().getOptions().requireEncrypted));
+ else
+ return std::auto_ptr<SaslAuthenticator>(new CyrusAuthenticator(c, c.getBroker().getOptions().requireEncrypted));
} else {
QPID_LOG(debug, "SASL: No Authentication Performed");
- needWarning = false;
return std::auto_ptr<SaslAuthenticator>(new NullAuthenticator(c, c.getBroker().getOptions().requireEncrypted));
}
}
+
NullAuthenticator::NullAuthenticator(Connection& c, bool e) : connection(c), client(c.getOutput()),
realm(c.getBroker().getOptions().realm), encrypt(e) {}
NullAuthenticator::~NullAuthenticator() {}
@@ -200,7 +204,6 @@ std::auto_ptr<SecurityLayer> NullAuthenticator::getSecurityLayer(uint16_t)
#if HAVE_SASL
-
CyrusAuthenticator::CyrusAuthenticator(Connection& c, bool _encrypt) :
sasl_conn(0), connection(c), client(c.getOutput()), encrypt(_encrypt)
{
@@ -386,7 +389,7 @@ void CyrusAuthenticator::processAuthenticationStep(int code, const char *challen
// authentication failure, when one is available
throw ConnectionForcedException("Authenticated username unavailable");
}
- QPID_LOG(info, "SASL: Authentication succeeded for: " << uid);
+ QPID_LOG(info, connection.getMgmtId() << " SASL: Authentication succeeded for: " << uid);
connection.setUserId(uid);
@@ -432,7 +435,6 @@ std::auto_ptr<SecurityLayer> CyrusAuthenticator::getSecurityLayer(uint16_t maxFr
uint ssf = *(reinterpret_cast<const unsigned*>(value));
std::auto_ptr<SecurityLayer> securityLayer;
if (ssf) {
- QPID_LOG(info, "Installing security layer, SSF: "<< ssf);
securityLayer = std::auto_ptr<SecurityLayer>(new CyrusSecurityLayer(sasl_conn, maxFrameSize));
}
return securityLayer;
diff --git a/cpp/src/qpid/broker/SaslAuthenticator.h b/cpp/src/qpid/broker/SaslAuthenticator.h
index 8ddaeb19a4..f4ad24b3bd 100644
--- a/cpp/src/qpid/broker/SaslAuthenticator.h
+++ b/cpp/src/qpid/broker/SaslAuthenticator.h
@@ -21,17 +21,27 @@
#ifndef _SaslAuthenticator_
#define _SaslAuthenticator_
+
#include "qpid/framing/amqp_types.h"
#include "qpid/framing/AMQP_ClientProxy.h"
#include "qpid/Exception.h"
#include "qpid/sys/SecurityLayer.h"
#include <memory>
+#include <vector>
+#include <boost/bind.hpp>
+#include <boost/function.hpp>
namespace qpid {
namespace broker {
class Connection;
+// Calls your fn with the user ID string, just
+// after the security negotiation is complete.
+// Add your callback to the list with addUserIdCallback().
+typedef boost::function<void ( std::string& )> UserIdCallback;
+
+
class SaslAuthenticator
{
public:
@@ -40,16 +50,23 @@ public:
virtual void start(const std::string& mechanism, const std::string& response) = 0;
virtual void step(const std::string& response) = 0;
virtual void getUid(std::string&) {}
+ virtual bool getUsername(std::string&) { return false; };
virtual void getError(std::string&) {}
virtual std::auto_ptr<qpid::sys::SecurityLayer> getSecurityLayer(uint16_t maxFrameSize) = 0;
+ virtual void setUserIdCallback ( UserIdCallback ) { }
static bool available(void);
// Initialize the SASL mechanism; throw if it fails.
static void init(const std::string& saslName);
static void fini(void);
- static std::auto_ptr<SaslAuthenticator> createAuthenticator(Connection& connection);
+ static std::auto_ptr<SaslAuthenticator> createAuthenticator(Connection& connection, bool isShadow);
+
+ virtual void callUserIdCallbacks() { }
+
+private:
+ UserIdCallback userIdCallback;
};
}}
diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp
index 955487ee03..75c8d328cf 100644
--- a/cpp/src/qpid/cluster/ClusterPlugin.cpp
+++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp
@@ -21,6 +21,8 @@
#include "qpid/cluster/ConnectionCodec.h"
#include "qpid/cluster/ClusterSettings.h"
+#include "qpid/cluster/SecureConnectionFactory.h"
+
#include "qpid/cluster/Cluster.h"
#include "qpid/cluster/ConnectionCodec.h"
#include "qpid/cluster/UpdateClient.h"
@@ -75,6 +77,8 @@ struct ClusterOptions : public Options {
}
};
+typedef boost::shared_ptr<sys::ConnectionCodec::Factory> CodecFactoryPtr;
+
struct ClusterPlugin : public Plugin {
ClusterSettings settings;
@@ -94,9 +98,10 @@ struct ClusterPlugin : public Plugin {
Broker* broker = dynamic_cast<Broker*>(&target);
if (!broker) return;
cluster = new Cluster(settings, *broker);
- broker->setConnectionFactory(
- boost::shared_ptr<sys::ConnectionCodec::Factory>(
- new ConnectionCodec::Factory(broker->getConnectionFactory(), *cluster)));
+ CodecFactoryPtr simpleFactory(new broker::ConnectionFactory(*broker));
+ CodecFactoryPtr clusterFactory(new ConnectionCodec::Factory(simpleFactory, *cluster));
+ CodecFactoryPtr secureFactory(new SecureConnectionFactory(clusterFactory));
+ broker->setConnectionFactory(secureFactory);
}
void disallowManagementMethods(ManagementAgent* agent) {
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 30828d7bd9..118be27bb5 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -39,6 +39,7 @@
#include "qpid/framing/DeliveryProperties.h"
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
#include "qpid/framing/ClusterConnectionAnnounceBody.h"
+#include "qpid/framing/ClusterConnectionSecureUserIdBody.h"
#include "qpid/framing/ConnectionCloseBody.h"
#include "qpid/framing/ConnectionCloseOkBody.h"
#include "qpid/log/Statement.h"
@@ -46,6 +47,9 @@
#include <boost/current_function.hpp>
+
+typedef boost::function<void ( std::string& )> UserIdCallback;
+
// TODO aconway 2008-11-03:
//
// Refactor code for receiving an update into a separate UpdateConnection
@@ -59,6 +63,7 @@ namespace cluster {
using namespace framing;
using namespace framing::cluster;
+
qpid::sys::AtomicValue<uint64_t> Connection::catchUpId(0x5000000000000000LL);
Connection::NullFrameHandler Connection::nullFrameHandler;
@@ -82,8 +87,11 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
connectionCtor(&output, cluster.getBroker(), mgmtId, external, false, 0, true),
expectProtocolHeader(false),
mcastFrameHandler(cluster.getMulticast(), self),
- updateIn(c.getUpdateReceiver())
-{}
+ updateIn(c.getUpdateReceiver()),
+ secureConnection(0),
+ mcastSentButNotReceived(false),
+ inConnectionNegotiation(true)
+{ }
// Local connection
Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
@@ -98,7 +106,9 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
isCatchUp), // isCatchUp => shadow
expectProtocolHeader(isLink),
mcastFrameHandler(cluster.getMulticast(), self),
- updateIn(c.getUpdateReceiver())
+ updateIn(c.getUpdateReceiver()),
+ secureConnection(0),
+ mcastSentButNotReceived(false)
{
cluster.addLocalConnection(this);
if (isLocalClient()) {
@@ -120,13 +130,19 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
updateIn.nextShadowMgmtId.clear();
init();
}
+
+}
+
+void Connection::setSecureConnection(broker::SecureConnection* sc) {
+ secureConnection = sc;
}
void Connection::init() {
connection = connectionCtor.construct();
QPID_LOG(debug, cluster << " initialized connection: " << *this
<< " ssf=" << connection->getExternalSecuritySettings().ssf);
- if (isLocalClient()) {
+ if (isLocalClient()) {
+ if (secureConnection) connection->setSecureConnection(secureConnection);
// Actively send cluster-order frames from local node
connection->setClusterOrderOutput(mcastFrameHandler);
}
@@ -138,9 +154,19 @@ void Connection::init() {
}
if (!isCatchUp())
connection->setErrorListener(this);
+ UserIdCallback fn = boost::bind ( &Connection::mcastUserId, this, _1 );
+ connection->setUserIdCallback ( fn );
}
void Connection::giveReadCredit(int credit) {
+ {
+ sys::Mutex::ScopedLock l(connectionNegotiationMonitor);
+ if (inConnectionNegotiation) {
+ mcastSentButNotReceived = false;
+ connectionNegotiationMonitor.notify();
+ }
+ }
+
if (cluster.getSettings().readMax && credit)
output.giveReadCredit(credit);
}
@@ -278,8 +304,9 @@ void Connection::abort() {
cluster.erase(self);
}
-// ConnectoinCodec::decode receives read buffers from directly-connected clients.
+// ConnectionCodec::decode receives read buffers from directly-connected clients.
size_t Connection::decode(const char* buffer, size_t size) {
+
if (catchUp) { // Handle catch-up locally.
Buffer buf(const_cast<char*>(buffer), size);
while (localDecoder.decode(buf))
@@ -289,6 +316,15 @@ size_t Connection::decode(const char* buffer, size_t size) {
assert(isLocal());
const char* remainingData = buffer;
size_t remainingSize = size;
+
+ { // scope for scoped lock.
+ sys::Mutex::ScopedLock l(connectionNegotiationMonitor);
+ if ( inConnectionNegotiation ) {
+ assert(!mcastSentButNotReceived);
+ mcastSentButNotReceived = true;
+ }
+ }
+
if (expectProtocolHeader) {
//If this is an outgoing link, we will receive a protocol
//header which needs to be decoded first
@@ -307,6 +343,13 @@ size_t Connection::decode(const char* buffer, size_t size) {
}
}
cluster.getMulticast().mcastBuffer(remainingData, remainingSize, self);
+
+ { // scope for scoped lock.
+ sys::Mutex::ScopedLock l(connectionNegotiationMonitor);
+ if ( inConnectionNegotiation )
+ while (inConnectionNegotiation && mcastSentButNotReceived)
+ connectionNegotiationMonitor.wait();
+ }
}
return size;
}
@@ -570,5 +613,29 @@ void Connection::managementAgents(const std::string& data) {
QPID_LOG(debug, cluster << " updated management agents");
}
+
+// Only the direct, non-shadow gets this call.
+void Connection::mcastUserId ( std::string & id ) {
+ cluster.getMulticast().mcastControl( ClusterConnectionSecureUserIdBody(ProtocolVersion(), string(id)), getId() );
+
+ {
+ sys::Mutex::ScopedLock l(connectionNegotiationMonitor);
+ inConnectionNegotiation = false;
+ connectionNegotiationMonitor.notify();
+ }
+}
+
+// All connections, shadow or not, get this call.
+void Connection::secureUserId(const std::string& id) {
+ if ( isShadow() ) {
+ // If the user ID is "none", it is not legitimate. Take no action.
+ if ( strcmp ( id.c_str(), "none" ) ) {
+ connection->setUserId ( id );
+ }
+ }
+}
+
+
+
}} // Namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h
index 000d00f7d9..4f69bf7cf4 100644
--- a/cpp/src/qpid/cluster/Connection.h
+++ b/cpp/src/qpid/cluster/Connection.h
@@ -29,6 +29,7 @@
#include "UpdateReceiver.h"
#include "qpid/broker/Connection.h"
+#include "qpid/broker/SecureConnection.h"
#include "qpid/broker/SemanticState.h"
#include "qpid/amqp_0_10/Connection.h"
#include "qpid/sys/AtomicValue.h"
@@ -64,7 +65,7 @@ class Connection :
{
public:
-
+
/** Local connection. */
Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& mgmtId, MemberId, bool catchUp, bool isLink,
const qpid::sys::SecuritySettings& external);
@@ -164,6 +165,7 @@ class Connection :
void giveReadCredit(int credit);
void announce(const std::string& mgmtId, uint32_t ssf, const std::string& authid, bool nodict);
+ void secureUserId(const std::string&);
void abort();
void deliverClose();
@@ -176,6 +178,13 @@ class Connection :
//uint32_t getSsf() const { return connectionCtor.external.ssf; }
+ void setSecureConnection ( broker::SecureConnection * sc );
+
+ // This is a callback, registered with the broker connection.
+ // It gives me the user ID, if one is negotiated through Sasl.
+ void mcastUserId ( std::string & );
+
+
private:
struct NullFrameHandler : public framing::FrameHandler {
void handle(framing::AMQFrame&) {}
@@ -237,8 +246,13 @@ class Connection :
bool expectProtocolHeader;
McastFrameHandler mcastFrameHandler;
UpdateReceiver& updateIn;
+ qpid::broker::SecureConnection* secureConnection;
static qpid::sys::AtomicValue<uint64_t> catchUpId;
+
+ mutable sys::Monitor connectionNegotiationMonitor;
+ bool mcastSentButNotReceived;
+ bool inConnectionNegotiation;
friend std::ostream& operator<<(std::ostream&, const Connection&);
};
diff --git a/cpp/src/qpid/cluster/ConnectionCodec.h b/cpp/src/qpid/cluster/ConnectionCodec.h
index 4b919ed351..17a08904d9 100644
--- a/cpp/src/qpid/cluster/ConnectionCodec.h
+++ b/cpp/src/qpid/cluster/ConnectionCodec.h
@@ -70,7 +70,7 @@ class ConnectionCodec : public sys::ConnectionCodec {
void closed();
bool isClosed() const;
framing::ProtocolVersion getVersion() const;
-
+ void setSecureConnection(broker::SecureConnection* sc) { interceptor->setSecureConnection(sc); }
private:
amqp_0_10::Connection codec;
diff --git a/cpp/src/qpid/cluster/SecureConnectionFactory.cpp b/cpp/src/qpid/cluster/SecureConnectionFactory.cpp
new file mode 100644
index 0000000000..6ddef66226
--- /dev/null
+++ b/cpp/src/qpid/cluster/SecureConnectionFactory.cpp
@@ -0,0 +1,73 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/cluster/SecureConnectionFactory.h"
+#include "qpid/framing/ProtocolVersion.h"
+#include "qpid/cluster/ConnectionCodec.h"
+#include "qpid/broker/SecureConnection.h"
+#include "qpid/sys/SecuritySettings.h"
+#include "qpid/log/Statement.h"
+#include <memory>
+
+
+namespace qpid {
+namespace cluster {
+
+using framing::ProtocolVersion;
+using qpid::sys::SecuritySettings;
+using qpid::broker::SecureConnection;
+
+typedef std::auto_ptr<qpid::broker::SecureConnection> SecureConnectionPtr;
+typedef std::auto_ptr<qpid::sys::ConnectionCodec> CodecPtr;
+
+SecureConnectionFactory::SecureConnectionFactory(CodecFactoryPtr f) : codecFactory(f) {
+}
+
+sys::ConnectionCodec*
+SecureConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id,
+ const SecuritySettings& external) {
+ CodecPtr codec(codecFactory->create(v, out, id, external));
+ ConnectionCodec* clusterCodec = dynamic_cast<qpid::cluster::ConnectionCodec*>(codec.get());
+ if (clusterCodec) {
+ SecureConnectionPtr sc(new SecureConnection());
+ clusterCodec->setSecureConnection(sc.get());
+ sc->setCodec(codec);
+ return sc.release();
+ }
+ return 0;
+}
+
+sys::ConnectionCodec*
+SecureConnectionFactory::create(sys::OutputControl& out, const std::string& id,
+ const SecuritySettings& external) {
+ // used to create connections from one broker to another
+ CodecPtr codec(codecFactory->create(out, id, external));
+ ConnectionCodec* clusterCodec = dynamic_cast<qpid::cluster::ConnectionCodec*>(codec.get());
+ if (clusterCodec) {
+ SecureConnectionPtr sc(new SecureConnection());
+ clusterCodec->setSecureConnection(sc.get());
+ sc->setCodec(codec);
+ return sc.release();
+ }
+ return 0;
+}
+
+
+}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/SecureConnectionFactory.h b/cpp/src/qpid/cluster/SecureConnectionFactory.h
new file mode 100644
index 0000000000..24d1fcfee5
--- /dev/null
+++ b/cpp/src/qpid/cluster/SecureConnectionFactory.h
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef QPID_CLUSTER_SecureconnectionFactory
+#define QPID_CLUSTER_SecureconnectionFactory
+
+#include "qpid/sys/ConnectionCodec.h"
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+
+namespace broker {
+ class Broker;
+}
+
+namespace cluster {
+
+class SecureConnectionFactory : public qpid::sys::ConnectionCodec::Factory
+{
+ public:
+ typedef boost::shared_ptr<qpid::sys::ConnectionCodec::Factory> CodecFactoryPtr;
+ SecureConnectionFactory(CodecFactoryPtr f);
+
+ qpid::sys::ConnectionCodec* create(
+ framing::ProtocolVersion, qpid::sys::OutputControl&, const std::string& id,
+ const qpid::sys::SecuritySettings&
+ );
+
+ /** Return "preferred" codec for outbound connections. */
+ qpid::sys::ConnectionCodec* create(
+ qpid::sys::OutputControl&, const std::string& id, const qpid::sys::SecuritySettings&
+ );
+
+ private:
+ CodecFactoryPtr codecFactory;
+};
+
+}} // namespace qpid::cluster
+
+
+#endif // QPID_CLUSTER_SecureconnectionFactory
diff --git a/cpp/src/tests/ForkedBroker.cpp b/cpp/src/tests/ForkedBroker.cpp
index 7c81d303fc..529774df98 100644
--- a/cpp/src/tests/ForkedBroker.cpp
+++ b/cpp/src/tests/ForkedBroker.cpp
@@ -139,6 +139,7 @@ void ForkedBroker::init(const Args& userArgs) {
std::transform(args.begin(), args.end(), argv.begin(), boost::bind(&std::string::c_str, _1));
argv.push_back(0);
QPID_LOG(debug, "ForkedBroker exec " << prog << ": " << args);
+
execv(prog, const_cast<char* const*>(&argv[0]));
QPID_LOG(critical, "execv failed to start broker: prog=\"" << prog << "\"; args=\"" << args << "\"; errno=" << errno << " (" << std::strerror(errno) << ")");
::exit(1);
diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am
index 6133fc2e49..02b006665e 100644
--- a/cpp/src/tests/Makefile.am
+++ b/cpp/src/tests/Makefile.am
@@ -245,6 +245,11 @@ failover_soak_INCLUDES=$(PUBLIC_INCLUDES)
failover_soak_SOURCES=failover_soak.cpp ForkedBroker.h ForkedBroker.cpp
failover_soak_LDADD=$(lib_client) $(lib_broker)
+check_PROGRAMS+=cluster_authentication_soak
+cluster_authentication_soak_INCLUDES=$(PUBLIC_INCLUDES)
+cluster_authentication_soak_SOURCES=cluster_authentication_soak.cpp ForkedBroker.h ForkedBroker.cpp
+cluster_authentication_soak_LDADD=$(lib_client) $(lib_broker)
+
check_PROGRAMS+=declare_queues
declare_queues_INCLUDES=$(PUBLIC_INCLUDES)
declare_queues_SOURCES=declare_queues.cpp
@@ -355,7 +360,7 @@ CLEANFILES+=valgrind.out *.log *.vglog* dummy_test qpidd.port $(unit_wrappers)
# Not run under valgrind, too slow
LONG_TESTS+=start_broker fanout_perftest shared_perftest multiq_perftest topic_perftest run_ring_queue_test stop_broker \
- run_failover_soak reliable_replication_test \
+ run_failover_soak run_cluster_authentication_soak reliable_replication_test \
federated_cluster_test_with_node_failure
EXTRA_DIST+= \
@@ -364,6 +369,7 @@ EXTRA_DIST+= \
multiq_perftest \
topic_perftest \
run_failover_soak \
+ run_cluster_authentication_soak \
reliable_replication_test \
federated_cluster_test_with_node_failure
diff --git a/cpp/src/tests/cluster.cmake b/cpp/src/tests/cluster.cmake
index 9084bfb85b..5f7a811007 100644
--- a/cpp/src/tests/cluster.cmake
+++ b/cpp/src/tests/cluster.cmake
@@ -21,8 +21,8 @@
# Cluster tests cmake fragment, to be included in CMakeLists.txt
#
-add_executable (failover_soak failover_soak.cpp ForkedBroker.cpp ${platform_test_additions})
-target_link_libraries (failover_soak qpidclient)
+add_executable (failover_soak failover_soak.cpp cluster_authentication_soak cluster_authentication_soak.cpp ForkedBroker.cpp ${platform_test_additions})
+target_link_libraries (failover_soak cluster_authentication_soak qpidclient)
remember_location(failover_soak)
set (cluster_test_SOURCES
diff --git a/cpp/src/tests/cluster_authentication_soak.cpp b/cpp/src/tests/cluster_authentication_soak.cpp
new file mode 100644
index 0000000000..985c3aa52a
--- /dev/null
+++ b/cpp/src/tests/cluster_authentication_soak.cpp
@@ -0,0 +1,244 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <signal.h>
+#include <fcntl.h>
+
+#include <sys/wait.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <sys/time.h>
+
+#include <string>
+#include <iostream>
+#include <sstream>
+#include <vector>
+
+#include <boost/assign.hpp>
+
+#include "qpid/framing/Uuid.h"
+
+#include <ForkedBroker.h>
+#include <qpid/client/Connection.h>
+
+
+
+
+
+using namespace std;
+using boost::assign::list_of;
+using namespace qpid::framing;
+using namespace qpid::client;
+
+
+namespace qpid {
+namespace tests {
+
+vector<pid_t> brokerPids;
+
+typedef vector<ForkedBroker *> brokerVector;
+
+
+
+
+
+int newbiePort = 0;
+
+
+
+
+void
+startBroker ( brokerVector & brokers ,
+ int brokerNumber ) {
+ stringstream portSS, prefix;
+ prefix << "soak-" << brokerNumber;
+ std::vector<std::string> argv;
+
+ argv.push_back ("../qpidd");
+ argv.push_back ("--no-module-dir");
+ argv.push_back ("--load-module=../.libs/cluster.so");
+ argv.push_back ("--cluster-name=micks_test_cluster");
+ argv.push_back ("--cluster-username=guest");
+ argv.push_back ("--cluster-password=guest");
+ argv.push_back ("--cluster-mechanism=ANONYMOUS");
+ argv.push_back ("TMP_DATA_DIR");
+ argv.push_back ("--auth=yes");
+ argv.push_back ("--mgmt-enable=yes");
+ argv.push_back ("--log-prefix");
+ argv.push_back (prefix.str());
+ argv.push_back ("--log-to-file");
+ argv.push_back (prefix.str()+".log");
+
+ ForkedBroker * newbie = new ForkedBroker (argv);
+ newbiePort = newbie->getPort();
+ brokers.push_back ( newbie );
+}
+
+
+
+
+bool
+runPerftest ( ) {
+ stringstream portSs;
+ portSs << newbiePort;
+
+ char const * path = "./perftest";
+
+ vector<char const *> argv;
+ argv.push_back ( "./perftest" );
+ argv.push_back ( "-p" );
+ argv.push_back ( portSs.str().c_str() );
+ argv.push_back ( "--username" );
+ argv.push_back ( "guest" );
+ argv.push_back ( "--password" );
+ argv.push_back ( "guest" );
+ argv.push_back ( "--mechanism" );
+ argv.push_back ( "DIGEST-MD5" );
+ argv.push_back ( "--count" );
+ argv.push_back ( "20000" );
+ argv.push_back ( 0 );
+
+ pid_t pid = fork();
+
+ if ( ! pid ) {
+ int i=open("/dev/null",O_RDWR);
+ dup2 ( i, fileno(stdout) );
+ dup2 ( i, fileno(stderr) );
+
+ execv ( path, const_cast<char * const *>(&argv[0]) );
+ // The exec failed: we are still in parent process.
+ perror ( "error running perftest: " );
+ return false;
+ }
+ else {
+ struct timeval startTime,
+ currentTime,
+ duration;
+
+ gettimeofday ( & startTime, 0 );
+
+ while ( 1 ) {
+ sleep ( 5 );
+ int status;
+ int returned_pid = waitpid ( pid, &status, WNOHANG );
+ if ( returned_pid == pid ) {
+ int exit_status = WEXITSTATUS(status);
+ if ( exit_status ) {
+ cerr << "Perftest failed. exit_status was: " << exit_status;
+ return false;
+ }
+ else {
+ return true; // perftest succeeded.
+ }
+ }
+ else { // perftest has not yet completed.
+ gettimeofday ( & currentTime, 0 );
+ timersub ( & currentTime, & startTime, & duration );
+ if ( duration.tv_sec > 60 ) {
+ kill ( pid, 9 );
+ cerr << "Perftest pid " << pid << " hanging: killed.\n";
+ return false;
+ }
+ }
+ }
+
+ }
+}
+
+
+
+bool
+allBrokersAreAlive ( brokerVector & brokers ) {
+ for ( unsigned int i = 0; i < brokers.size(); ++ i ) {
+ pid_t pid = brokers[i]->getPID();
+ int status;
+ int value;
+ if ( (value = waitpid ( pid, &status, WNOHANG ) ) ) {
+ return false;
+ }
+ }
+
+ return true;
+}
+
+
+
+
+void
+killAllBrokers ( brokerVector & brokers ) {
+ for ( unsigned int i = 0; i < brokers.size(); ++ i )
+ brokers[i]->kill ( 9 );
+}
+
+
+
+
+}} // namespace qpid::tests
+
+using namespace qpid::tests;
+
+
+
+int
+main ( int argc, char ** argv )
+{
+ int n_iterations = argc > 0 ? atoi(argv[1]) : 1;
+ int n_brokers = 3;
+ brokerVector brokers;
+
+ for ( int i = 0; i < n_brokers; ++ i ) {
+ startBroker ( brokers, i );
+ }
+
+ sleep ( 3 );
+
+ /* Run all perftest iterations, and only then check for brokers
+ * still being up. If you just want a quick check for the failure
+ * mode in which a single iteration would kill all brokers except
+ * the client-connected one, just run it with the iterations arg
+ * set to 1.
+ */
+ for ( int iteration = 0; iteration < n_iterations; ++ iteration ) {
+ if ( ! runPerftest ( ) ) {
+ cerr << "Perftest " << iteration << " failed.\n";
+ return 1;
+ }
+ if ( ! ( iteration % 10 ) ) {
+ cerr << "perftest " << iteration << " complete. -------------- \n";
+ }
+ }
+ cerr << "\nperftest " << n_iterations << " iterations complete. -------------- \n\n";
+
+ if ( ! allBrokersAreAlive ( brokers ) ) {
+ cerr << "not all brokers are alive.\n";
+ return 2;
+ }
+
+ killAllBrokers ( brokers );
+ return 0;
+}
+
+
+
diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp
index d07c0ecdb5..8c18e578df 100644
--- a/cpp/src/tests/cluster_test.cpp
+++ b/cpp/src/tests/cluster_test.cpp
@@ -54,6 +54,7 @@
#include <algorithm>
#include <iterator>
+
using namespace std;
using namespace qpid;
using namespace qpid::cluster;
diff --git a/cpp/xml/cluster.xml b/cpp/xml/cluster.xml
index da0285f393..29157dc148 100644
--- a/cpp/xml/cluster.xml
+++ b/cpp/xml/cluster.xml
@@ -263,5 +263,11 @@
<control name="management-agents" code="0x37">
<field name="data" type="vbin32"/>
</control>
+
+ <!-- Announce the user ID on a secure connection -->
+ <control name="secureUserId" code="0x38">
+ <field name="secure-user-id" type="str16"/>
+ </control>
+
</class>
</amqp>