summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2009-01-06 19:50:59 +0000
committerGordon Sim <gsim@apache.org>2009-01-06 19:50:59 +0000
commit1fc265f9181e1b27491e517f95b327096e55f1fc (patch)
treef7a2774f4fbe887f537970438f5bae0bd78f56bd
parent51409ed32a294a4691e775be15b6a3cfe9690a7b (diff)
downloadqpid-python-1fc265f9181e1b27491e517f95b327096e55f1fc.tar.gz
* Cyrus SASL intgeration for c++ client
* SASL security layer support for c++ client and broker git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@732082 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/Makefile.am17
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp3
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.cpp5
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.h3
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionHandler.cpp18
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionHandler.h3
-rw-r--r--qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp62
-rw-r--r--qpid/cpp/src/qpid/broker/SaslAuthenticator.h2
-rw-r--r--qpid/cpp/src/qpid/broker/SecureConnection.cpp87
-rw-r--r--qpid/cpp/src/qpid/broker/SecureConnection.h60
-rw-r--r--qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp65
-rw-r--r--qpid/cpp/src/qpid/broker/SecureConnectionFactory.h48
-rw-r--r--qpid/cpp/src/qpid/client/ConnectionHandler.cpp56
-rw-r--r--qpid/cpp/src/qpid/client/ConnectionHandler.h7
-rw-r--r--qpid/cpp/src/qpid/client/ConnectionImpl.cpp8
-rw-r--r--qpid/cpp/src/qpid/client/ConnectionSettings.cpp9
-rw-r--r--qpid/cpp/src/qpid/client/ConnectionSettings.h17
-rw-r--r--qpid/cpp/src/qpid/client/Connector.cpp197
-rw-r--r--qpid/cpp/src/qpid/client/Connector.h8
-rw-r--r--qpid/cpp/src/qpid/client/RdmaConnector.cpp182
-rw-r--r--qpid/cpp/src/qpid/client/Sasl.h52
-rw-r--r--qpid/cpp/src/qpid/client/SaslFactory.cpp345
-rw-r--r--qpid/cpp/src/qpid/client/SaslFactory.h48
-rw-r--r--qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp23
-rw-r--r--qpid/cpp/src/qpid/sys/Codec.h52
-rw-r--r--qpid/cpp/src/qpid/sys/ConnectionCodec.h16
-rw-r--r--qpid/cpp/src/qpid/sys/SecurityLayer.h42
-rw-r--r--qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp12
-rw-r--r--qpid/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp119
-rw-r--r--qpid/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.h66
-rw-r--r--qpid/cpp/src/tests/.valgrind.supp7
-rw-r--r--qpid/cpp/src/tests/ConnectionOptions.h5
32 files changed, 1381 insertions, 263 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index 114de01d44..7aefa7f481 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -353,11 +353,14 @@ libqpidcommon_la_SOURCES = \
qpid/sys/Runnable.cpp \
qpid/sys/Shlib.cpp
-libqpidbroker_la_LIBADD = libqpidcommon.la -luuid
if HAVE_SASL
-libqpidbroker_la_LIBADD += -lsasl2
+libqpidcommon_la_SOURCES += qpid/sys/cyrus/CyrusSecurityLayer.h
+libqpidcommon_la_SOURCES += qpid/sys/cyrus/CyrusSecurityLayer.cpp
+libqpidcommon_la_LIBADD += -lsasl2
endif
+libqpidbroker_la_LIBADD = libqpidcommon.la -luuid
+
libqpidbroker_la_SOURCES = \
$(mgen_broker_cpp) \
$(posix_broker_src) \
@@ -403,6 +406,8 @@ libqpidbroker_la_SOURCES = \
qpid/broker/RecoveredEnqueue.cpp \
qpid/broker/RecoveredDequeue.cpp \
qpid/broker/SaslAuthenticator.cpp \
+ qpid/broker/SecureConnection.cpp \
+ qpid/broker/SecureConnectionFactory.cpp \
qpid/broker/SemanticState.h \
qpid/broker/SemanticState.cpp \
qpid/broker/SessionAdapter.cpp \
@@ -426,6 +431,7 @@ libqpidbroker_la_SOURCES = \
qpid/management/ManagementExchange.cpp \
qpid/sys/TCPIOPlugin.cpp
+
libqpidclient_la_LIBADD = libqpidcommon.la -luuid
libqpidclient_la_SOURCES = \
@@ -452,6 +458,7 @@ libqpidclient_la_SOURCES = \
qpid/client/MessageReplayTracker.cpp \
qpid/client/QueueOptions.cpp \
qpid/client/Results.cpp \
+ qpid/client/SaslFactory.cpp \
qpid/client/SessionBase_0_10.cpp \
qpid/client/SessionBase_0_10.h \
qpid/client/SessionBase_0_10Access.h \
@@ -552,6 +559,8 @@ nobase_include_HEADERS = \
qpid/broker/RecoveryManager.h \
qpid/broker/RecoveryManagerImpl.h \
qpid/broker/SaslAuthenticator.h \
+ qpid/broker/SecureConnection.h \
+ qpid/broker/SecureConnectionFactory.h \
qpid/broker/SessionAdapter.h \
qpid/broker/SessionManager.h \
qpid/broker/System.h \
@@ -591,6 +600,8 @@ nobase_include_HEADERS = \
qpid/client/MessageListener.h \
qpid/client/MessageReplayTracker.h \
qpid/client/Results.h \
+ qpid/client/Sasl.h \
+ qpid/client/SaslFactory.h \
qpid/client/SessionBase_0_10.h \
qpid/client/Session.h \
qpid/client/SessionImpl.h \
@@ -668,6 +679,7 @@ nobase_include_HEADERS = \
qpid/sys/AtomicValue_gcc.h \
qpid/sys/AtomicValue_mutex.h \
qpid/sys/BlockingQueue.h \
+ qpid/sys/Codec.h \
qpid/sys/CopyOnWriteArray.h \
qpid/sys/Condition.h \
qpid/sys/ConnectionCodec.h \
@@ -692,6 +704,7 @@ nobase_include_HEADERS = \
qpid/sys/Runnable.h \
qpid/sys/Fork.h \
qpid/sys/ScopedIncrement.h \
+ qpid/sys/SecurityLayer.h \
qpid/sys/Semaphore.h \
qpid/sys/SystemInfo.h \
qpid/sys/Shlib.h \
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index 64be104b98..37750f8352 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -27,6 +27,7 @@
#include "NullMessageStore.h"
#include "RecoveryManagerImpl.h"
#include "SaslAuthenticator.h"
+#include "SecureConnectionFactory.h"
#include "TopicExchange.h"
#include "Link.h"
@@ -135,7 +136,7 @@ Broker::Broker(const Broker::Options& conf) :
acl(0),
dataDir(conf.noDataDir ? std::string() : conf.dataDir),
links(this),
- factory(new ConnectionFactory(*this)),
+ factory(new SecureConnectionFactory(*this)),
dtxManager(timer),
sessionManager(
qpid::SessionState::Configuration(
diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp
index f0b9980861..eb54ddfd56 100644
--- a/qpid/cpp/src/qpid/broker/Connection.cpp
+++ b/qpid/cpp/src/qpid/broker/Connection.cpp
@@ -267,5 +267,10 @@ Manageable::status_t Connection::ManagementMethod(uint32_t methodId, Args&, stri
return status;
}
+void Connection::setSecureConnection(SecureConnection* s)
+{
+ adapter.setSecureConnection(s);
+}
+
}}
diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h
index 350ed2c07f..acd9f94d9b 100644
--- a/qpid/cpp/src/qpid/broker/Connection.h
+++ b/qpid/cpp/src/qpid/broker/Connection.h
@@ -57,6 +57,7 @@ namespace qpid {
namespace broker {
class LinkRegistry;
+class SecureConnection;
class Connection : public sys::ConnectionInputHandler,
public ConnectionState,
@@ -105,7 +106,7 @@ class Connection : public sys::ConnectionInputHandler,
}
void sendClose();
-
+ void setSecureConnection(SecureConnection* secured);
private:
typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap;
typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
index 7386ce7229..6f99b60cd8 100644
--- a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
+++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
@@ -22,17 +22,20 @@
#include "ConnectionHandler.h"
#include "Connection.h"
+#include "SecureConnection.h"
+#include "qpid/Url.h"
#include "qpid/framing/ClientInvoker.h"
#include "qpid/framing/ServerInvoker.h"
#include "qpid/framing/enum.h"
#include "qpid/log/Statement.h"
-#include "qpid/Url.h"
+#include "qpid/sys/SecurityLayer.h"
#include "AclModule.h"
#include "qmf/org/apache/qpid/broker/EventClientConnectFail.h"
using namespace qpid;
using namespace qpid::broker;
using namespace qpid::framing;
+using qpid::sys::SecurityLayer;
namespace _qmf = qmf::org::apache::qpid::broker;
namespace
@@ -70,11 +73,16 @@ void ConnectionHandler::handle(framing::AMQFrame& frame)
}
}
+void ConnectionHandler::setSecureConnection(SecureConnection* secured)
+{
+ handler->secured = secured;
+}
+
ConnectionHandler::ConnectionHandler(Connection& connection, bool isClient) : handler(new Handler(connection, isClient)) {}
ConnectionHandler::Handler::Handler(Connection& c, bool isClient) :
client(c.getOutput()), server(c.getOutput()),
- connection(c), serverMode(!isClient), acl(0)
+ connection(c), serverMode(!isClient), acl(0), secured(0)
{
if (serverMode) {
@@ -160,6 +168,12 @@ void ConnectionHandler::Handler::open(const string& /*virtualHost*/,
for (std::vector<Url>::iterator i = urls.begin(); i < urls.end(); ++i)
array.add(boost::shared_ptr<Str16Value>(new Str16Value(i->str())));
client.openOk(array);
+
+ //install security layer if one has been negotiated:
+ if (secured) {
+ std::auto_ptr<SecurityLayer> sl = authenticator->getSecurityLayer(connection.getFrameMax());
+ if (sl.get()) secured->activateSecurityLayer(sl);
+ }
}
diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.h b/qpid/cpp/src/qpid/broker/ConnectionHandler.h
index d3d5965dfc..6fd252b120 100644
--- a/qpid/cpp/src/qpid/broker/ConnectionHandler.h
+++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.h
@@ -40,6 +40,7 @@ namespace qpid {
namespace broker {
class Connection;
+class SecureConnection;
class ConnectionHandler : public framing::FrameHandler
{
@@ -52,6 +53,7 @@ class ConnectionHandler : public framing::FrameHandler
bool serverMode;
std::auto_ptr<SaslAuthenticator> authenticator;
AclModule* acl;
+ SecureConnection* secured;
Handler(Connection& connection, bool isClient);
~Handler();
@@ -87,6 +89,7 @@ class ConnectionHandler : public framing::FrameHandler
ConnectionHandler(Connection& connection, bool isClient);
void close(framing::connection::CloseCode code, const std::string& text);
void handle(framing::AMQFrame& frame);
+ void setSecureConnection(SecureConnection* secured);
};
diff --git a/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp b/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp
index 370de8a1d1..9fce1fbbd5 100644
--- a/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp
+++ b/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp
@@ -30,9 +30,12 @@
#if HAVE_SASL
#include <sasl/sasl.h>
+#include "qpid/sys/cyrus/CyrusSecurityLayer.h"
+using qpid::sys::cyrus::CyrusSecurityLayer;
#endif
using namespace qpid::framing;
+using qpid::sys::SecurityLayer;
using boost::format;
using boost::str;
@@ -46,11 +49,12 @@ class NullAuthenticator : public SaslAuthenticator
framing::AMQP_ClientProxy::Connection client;
std::string realm;
public:
- NullAuthenticator(Connection& connection);
+ NullAuthenticator(Connection& connection, bool dummy=false/*dummy arg to match CyrusAuthenticator*/);
~NullAuthenticator();
void getMechanisms(framing::Array& mechanisms);
void start(const std::string& mechanism, const std::string& response);
void step(const std::string&) {}
+ std::auto_ptr<SecurityLayer> getSecurityLayer(uint16_t maxFrameSize);
};
#if HAVE_SASL
@@ -60,11 +64,12 @@ class CyrusAuthenticator : public SaslAuthenticator
sasl_conn_t *sasl_conn;
Connection& connection;
framing::AMQP_ClientProxy::Connection client;
+ const bool encrypt;
void processAuthenticationStep(int code, const char *challenge, unsigned int challenge_len);
public:
- CyrusAuthenticator(Connection& connection);
+ CyrusAuthenticator(Connection& connection, bool encrypt);
~CyrusAuthenticator();
void init();
void getMechanisms(framing::Array& mechanisms);
@@ -72,6 +77,7 @@ public:
void step(const std::string& response);
void getUid(std::string& uid);
void getError(std::string& error);
+ std::auto_ptr<SecurityLayer> getSecurityLayer(uint16_t maxFrameSize);
};
bool SaslAuthenticator::available(void)
@@ -120,7 +126,7 @@ std::auto_ptr<SaslAuthenticator> SaslAuthenticator::createAuthenticator(Connecti
{
static bool needWarning = true;
if (c.getBroker().getOptions().auth) {
- return std::auto_ptr<SaslAuthenticator>(new CyrusAuthenticator(c));
+ return std::auto_ptr<SaslAuthenticator>(new CyrusAuthenticator(c, c.getBroker().getOptions().requireEncrypted));
} else {
QPID_LOG(warning, "SASL: No Authentication Performed");
needWarning = false;
@@ -128,7 +134,7 @@ std::auto_ptr<SaslAuthenticator> SaslAuthenticator::createAuthenticator(Connecti
}
}
-NullAuthenticator::NullAuthenticator(Connection& c) : connection(c), client(c.getOutput()),
+ NullAuthenticator::NullAuthenticator(Connection& c, bool /*dummy*/) : connection(c), client(c.getOutput()),
realm(c.getBroker().getOptions().realm) {}
NullAuthenticator::~NullAuthenticator() {}
@@ -158,9 +164,18 @@ void NullAuthenticator::start(const string& mechanism, const string& response)
}
+std::auto_ptr<SecurityLayer> NullAuthenticator::getSecurityLayer(uint16_t)
+{
+ std::auto_ptr<SecurityLayer> securityLayer;
+ return securityLayer;
+}
+
+
#if HAVE_SASL
-CyrusAuthenticator::CyrusAuthenticator(Connection& c) : sasl_conn(0), connection(c), client(c.getOutput())
+
+CyrusAuthenticator::CyrusAuthenticator(Connection& c, bool _encrypt) :
+ sasl_conn(0), connection(c), client(c.getOutput()), encrypt(_encrypt)
{
init();
}
@@ -196,6 +211,25 @@ void CyrusAuthenticator::init()
// server error, when one is available
throw ConnectionForcedException("Unable to perform authentication");
}
+
+ sasl_security_properties_t secprops;
+
+ //TODO: should the actual SSF values be configurable here?
+ secprops.min_ssf = encrypt ? 10: 0;
+ secprops.max_ssf = 256;
+ secprops.maxbufsize = 65535;
+
+ QPID_LOG(debug, "min_ssf: " << secprops.min_ssf << ", max_ssf: " << secprops.max_ssf);
+
+ secprops.property_names = 0;
+ secprops.property_values = 0;
+ secprops.security_flags = 0; /* or SASL_SEC_NOANONYMOUS etc as appropriate */
+
+ int result = sasl_setprop(sasl_conn, SASL_SEC_PROPS, &secprops);
+ if (result != SASL_OK) {
+ throw framing::InternalErrorException(QPID_MSG("SASL error: " << result));
+ }
+
}
CyrusAuthenticator::~CyrusAuthenticator()
@@ -332,6 +366,24 @@ void CyrusAuthenticator::processAuthenticationStep(int code, const char *challen
}
}
}
+
+std::auto_ptr<SecurityLayer> CyrusAuthenticator::getSecurityLayer(uint16_t maxFrameSize)
+{
+
+ const void* value(0);
+ int result = sasl_getprop(sasl_conn, SASL_SSF, &value);
+ if (result != SASL_OK) {
+ throw framing::InternalErrorException(QPID_MSG("SASL error: " << sasl_errdetail(sasl_conn)));
+ }
+ uint ssf = *(reinterpret_cast<const unsigned*>(value));
+ std::auto_ptr<SecurityLayer> securityLayer;
+ if (ssf) {
+ QPID_LOG(info, "Installing security layer, SSF: "<< ssf);
+ securityLayer = std::auto_ptr<SecurityLayer>(new CyrusSecurityLayer(sasl_conn, maxFrameSize));
+ }
+ return securityLayer;
+}
+
#endif
}}
diff --git a/qpid/cpp/src/qpid/broker/SaslAuthenticator.h b/qpid/cpp/src/qpid/broker/SaslAuthenticator.h
index 2598b6d177..8ddaeb19a4 100644
--- a/qpid/cpp/src/qpid/broker/SaslAuthenticator.h
+++ b/qpid/cpp/src/qpid/broker/SaslAuthenticator.h
@@ -24,6 +24,7 @@
#include "qpid/framing/amqp_types.h"
#include "qpid/framing/AMQP_ClientProxy.h"
#include "qpid/Exception.h"
+#include "qpid/sys/SecurityLayer.h"
#include <memory>
namespace qpid {
@@ -40,6 +41,7 @@ public:
virtual void step(const std::string& response) = 0;
virtual void getUid(std::string&) {}
virtual void getError(std::string&) {}
+ virtual std::auto_ptr<qpid::sys::SecurityLayer> getSecurityLayer(uint16_t maxFrameSize) = 0;
static bool available(void);
diff --git a/qpid/cpp/src/qpid/broker/SecureConnection.cpp b/qpid/cpp/src/qpid/broker/SecureConnection.cpp
new file mode 100644
index 0000000000..4a9946e176
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/SecureConnection.cpp
@@ -0,0 +1,87 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "SecureConnection.h"
+#include "qpid/sys/SecurityLayer.h"
+#include "qpid/framing/reply_exceptions.h"
+
+namespace qpid {
+namespace broker {
+
+using qpid::sys::SecurityLayer;
+
+SecureConnection::SecureConnection() : secured(false) {}
+
+size_t SecureConnection::decode(const char* buffer, size_t size)
+{
+ if (!secured && securityLayer.get()) {
+ //security layer comes into effect on first read after its
+ //activated
+ secured = true;
+ }
+ if (secured) {
+ return securityLayer->decode(buffer, size);
+ } else {
+ return codec->decode(buffer, size);
+ }
+}
+
+size_t SecureConnection::encode(const char* buffer, size_t size)
+{
+ if (secured) {
+ return securityLayer->encode(buffer, size);
+ } else {
+ return codec->encode(buffer, size);
+ }
+}
+
+bool SecureConnection::canEncode()
+{
+ if (secured) return securityLayer->canEncode();
+ else return codec->canEncode();
+}
+
+void SecureConnection::closed()
+{
+ codec->closed();
+}
+
+bool SecureConnection::isClosed() const
+{
+ return codec->isClosed();
+}
+
+framing::ProtocolVersion SecureConnection::getVersion() const
+{
+ return codec->getVersion();
+}
+
+void SecureConnection:: setCodec(std::auto_ptr<ConnectionCodec> c)
+{
+ codec = c;
+}
+
+void SecureConnection::activateSecurityLayer(std::auto_ptr<SecurityLayer> sl)
+{
+ securityLayer = sl;
+ securityLayer->init(codec.get());
+}
+
+}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/SecureConnection.h b/qpid/cpp/src/qpid/broker/SecureConnection.h
new file mode 100644
index 0000000000..4a0cc50e34
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/SecureConnection.h
@@ -0,0 +1,60 @@
+#ifndef QPID_BROKER_SECURECONNECTION_H
+#define QPID_BROKER_SECURECONNECTION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/ConnectionCodec.h"
+#include <memory>
+
+namespace qpid {
+
+namespace sys {
+class SecurityLayer;
+}
+
+namespace broker {
+
+/**
+ * A ConnectionCodec 'wrapper' that allows a connection to be
+ * 'secured' e.g. encrypted based on settings negotiatiated at the
+ * time of establishment.
+ */
+class SecureConnection : public qpid::sys::ConnectionCodec
+{
+ public:
+ SecureConnection();
+ size_t decode(const char* buffer, size_t size);
+ size_t encode(const char* buffer, size_t size);
+ bool canEncode();
+ void closed();
+ bool isClosed() const;
+ framing::ProtocolVersion getVersion() const;
+ void setCodec(std::auto_ptr<ConnectionCodec>);
+ void activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer>);
+ private:
+ std::auto_ptr<ConnectionCodec> codec;
+ std::auto_ptr<qpid::sys::SecurityLayer> securityLayer;
+ bool secured;
+};
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_SECURECONNECTION_H*/
diff --git a/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp b/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp
new file mode 100644
index 0000000000..38fd96bcba
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp
@@ -0,0 +1,65 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "SecureConnectionFactory.h"
+#include "qpid/framing/ProtocolVersion.h"
+#include "qpid/amqp_0_10/Connection.h"
+#include "qpid/broker/Connection.h"
+#include "qpid/broker/SecureConnection.h"
+
+namespace qpid {
+namespace broker {
+
+using framing::ProtocolVersion;
+typedef std::auto_ptr<amqp_0_10::Connection> CodecPtr;
+typedef std::auto_ptr<SecureConnection> SecureConnectionPtr;
+typedef std::auto_ptr<Connection> ConnectionPtr;
+typedef std::auto_ptr<sys::ConnectionInputHandler> InputPtr;
+
+SecureConnectionFactory::SecureConnectionFactory(Broker& b) : broker(b) {}
+
+sys::ConnectionCodec*
+SecureConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id) {
+ if (v == ProtocolVersion(0, 10)) {
+ SecureConnectionPtr sc(new SecureConnection());
+ CodecPtr c(new amqp_0_10::Connection(out, id, false));
+ ConnectionPtr i(new broker::Connection(c.get(), broker, id, false));
+ i->setSecureConnection(sc.get());
+ c->setInputHandler(InputPtr(i.release()));
+ sc->setCodec(std::auto_ptr<sys::ConnectionCodec>(c));
+ return sc.release();
+ }
+ return 0;
+}
+
+sys::ConnectionCodec*
+SecureConnectionFactory::create(sys::OutputControl& out, const std::string& id) {
+ // used to create connections from one broker to another
+ SecureConnectionPtr sc(new SecureConnection());
+ CodecPtr c(new amqp_0_10::Connection(out, id, true));
+ ConnectionPtr i(new broker::Connection(c.get(), broker, id, true));
+ i->setSecureConnection(sc.get());
+ c->setInputHandler(InputPtr(i.release()));
+ sc->setCodec(std::auto_ptr<sys::ConnectionCodec>(c));
+ return sc.release();
+}
+
+
+}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/SecureConnectionFactory.h b/qpid/cpp/src/qpid/broker/SecureConnectionFactory.h
new file mode 100644
index 0000000000..048fb250d6
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/SecureConnectionFactory.h
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _SecureConnectionFactory_
+#define _SecureConnectionFactory_
+
+#include "qpid/sys/ConnectionCodec.h"
+
+namespace qpid {
+namespace broker {
+class Broker;
+
+class SecureConnectionFactory : public sys::ConnectionCodec::Factory
+{
+ public:
+ SecureConnectionFactory(Broker& b);
+
+ sys::ConnectionCodec*
+ create(framing::ProtocolVersion, sys::OutputControl&, const std::string& id);
+
+ sys::ConnectionCodec*
+ create(sys::OutputControl&, const std::string& id);
+
+ private:
+ Broker& broker;
+};
+
+}}
+
+
+#endif
diff --git a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp b/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
index db5d006a17..2a070ebcff 100644
--- a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
+++ b/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
@@ -21,16 +21,18 @@
#include "ConnectionHandler.h"
-#include "qpid/log/Statement.h"
+#include "SaslFactory.h"
#include "qpid/framing/amqp_framing.h"
#include "qpid/framing/all_method_bodies.h"
#include "qpid/framing/ClientInvoker.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/log/Helpers.h"
+#include "qpid/log/Statement.h"
using namespace qpid::client;
using namespace qpid::framing;
using namespace qpid::framing::connection;
+using qpid::sys::SecurityLayer;
namespace {
const std::string OK("OK");
@@ -146,18 +148,50 @@ void ConnectionHandler::fail(const std::string& message)
setState(FAILED);
}
-void ConnectionHandler::start(const FieldTable& /*serverProps*/, const Array& /*mechanisms*/, const Array& /*locales*/)
+namespace {
+std::string SPACE(" ");
+}
+
+void ConnectionHandler::start(const FieldTable& /*serverProps*/, const Array& mechanisms, const Array& /*locales*/)
{
checkState(NOT_STARTED, INVALID_STATE_START);
setState(NEGOTIATING);
- //TODO: verify that desired mechanism and locale are supported
- string response = ((char)0) + username + ((char)0) + password;
- proxy.startOk(properties, mechanism, response, locale);
+ sasl = SaslFactory::getInstance().create(*this);
+
+ std::string mechlist;
+ bool chosenMechanismSupported = mechanism.empty();
+ for (Array::const_iterator i = mechanisms.begin(); i != mechanisms.end(); ++i) {
+ if (!mechanism.empty() && mechanism == (*i)->get<std::string>()) {
+ chosenMechanismSupported = true;
+ mechlist = (*i)->get<std::string>() + SPACE + mechlist;
+ } else {
+ if (i != mechanisms.begin()) mechlist += SPACE;
+ mechlist += (*i)->get<std::string>();
+ }
+ }
+
+ if (!chosenMechanismSupported) {
+ fail("Selected mechanism not supported: " + mechanism);
+ }
+
+ if (sasl.get()) {
+ string response = sasl->start(mechanism.empty() ? mechlist : mechanism);
+ proxy.startOk(properties, sasl->getMechanism(), response, locale);
+ } else {
+ //TODO: verify that desired mechanism and locale are supported
+ string response = ((char)0) + username + ((char)0) + password;
+ proxy.startOk(properties, mechanism, response, locale);
+ }
}
-void ConnectionHandler::secure(const std::string& /*challenge*/)
+void ConnectionHandler::secure(const std::string& challenge)
{
- throw NotImplementedException("Challenge-response cycle not yet implemented in client");
+ if (sasl.get()) {
+ string response = sasl->step(challenge);
+ proxy.secureOk(response);
+ } else {
+ throw NotImplementedException("Challenge-response cycle not yet implemented in client");
+ }
}
void ConnectionHandler::tune(uint16_t maxChannelsProposed, uint16_t maxFrameSizeProposed,
@@ -179,6 +213,9 @@ void ConnectionHandler::openOk ( const Array& knownBrokers )
framing::Array::ValueVector::const_iterator i;
for ( i = knownBrokers.begin(); i != knownBrokers.end(); ++i )
knownBrokersUrls.push_back(Url((*i)->get<std::string>()));
+ if (sasl.get()) {
+ securityLayer = sasl->getSecurityLayer(maxFrameSize);
+ }
setState(OPEN);
QPID_LOG(debug, "Known-brokers for connection: " << log::formatList(knownBrokersUrls));
}
@@ -224,3 +261,8 @@ bool ConnectionHandler::isClosed() const
}
bool ConnectionHandler::isClosing() const { return getState() == CLOSING; }
+
+std::auto_ptr<qpid::sys::SecurityLayer> ConnectionHandler::getSecurityLayer()
+{
+ return securityLayer;
+}
diff --git a/qpid/cpp/src/qpid/client/ConnectionHandler.h b/qpid/cpp/src/qpid/client/ConnectionHandler.h
index 12323684a5..ec9278626f 100644
--- a/qpid/cpp/src/qpid/client/ConnectionHandler.h
+++ b/qpid/cpp/src/qpid/client/ConnectionHandler.h
@@ -23,6 +23,7 @@
#include "ChainableFrameHandler.h"
#include "ConnectionSettings.h"
+#include "Sasl.h"
#include "StateManager.h"
#include "qpid/framing/AMQMethodBody.h"
#include "qpid/framing/AMQP_HighestVersion.h"
@@ -33,7 +34,9 @@
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/InputHandler.h"
+#include "qpid/sys/SecurityLayer.h"
#include "qpid/Url.h"
+#include <memory>
namespace qpid {
namespace client {
@@ -64,6 +67,8 @@ class ConnectionHandler : private StateManager,
framing::ProtocolVersion version;
framing::Array capabilities;
framing::FieldTable properties;
+ std::auto_ptr<Sasl> sasl;
+ std::auto_ptr<qpid::sys::SecurityLayer> securityLayer;
void checkState(STATES s, const std::string& msg);
@@ -103,6 +108,8 @@ public:
bool isClosed() const;
bool isClosing() const;
+ std::auto_ptr<qpid::sys::SecurityLayer> getSecurityLayer();
+
CloseListener onClose;
ErrorListener onError;
diff --git a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
index 0d7ffa0288..aa9eeb7489 100644
--- a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
+++ b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
@@ -110,6 +110,14 @@ void ConnectionImpl::open()
connector->connect(host, port);
connector->init();
handler.waitForOpen();
+ //enable security layer if one has been negotiated:
+ std::auto_ptr<SecurityLayer> securityLayer = handler.getSecurityLayer();
+ if (securityLayer.get()) {
+ QPID_LOG(debug, "Activating security layer");
+ connector->activateSecurityLayer(securityLayer);
+ } else {
+ QPID_LOG(debug, "No security layer in place");
+ }
failover.reset(new FailoverListener(shared_from_this(), handler.knownBrokersUrls));
}
diff --git a/qpid/cpp/src/qpid/client/ConnectionSettings.cpp b/qpid/cpp/src/qpid/client/ConnectionSettings.cpp
index f5fc62dad2..5851917da6 100644
--- a/qpid/cpp/src/qpid/client/ConnectionSettings.cpp
+++ b/qpid/cpp/src/qpid/client/ConnectionSettings.cpp
@@ -22,6 +22,7 @@
#include "qpid/log/Logger.h"
#include "qpid/sys/Socket.h"
+#include "qpid/Version.h"
namespace qpid {
namespace client {
@@ -30,15 +31,15 @@ ConnectionSettings::ConnectionSettings() :
protocol("tcp"),
host("localhost"),
port(TcpAddress::DEFAULT_PORT),
- username("guest"),
- password("guest"),
- mechanism("PLAIN"),
locale("en_US"),
heartbeat(0),
maxChannels(32767),
maxFrameSize(65535),
bounds(2),
- tcpNoDelay(false)
+ tcpNoDelay(false),
+ service(qpid::saslName),
+ minSsf(0),
+ maxSsf(256)
{}
ConnectionSettings::~ConnectionSettings() {}
diff --git a/qpid/cpp/src/qpid/client/ConnectionSettings.h b/qpid/cpp/src/qpid/client/ConnectionSettings.h
index 1b994a6da3..c7725e19f0 100644
--- a/qpid/cpp/src/qpid/client/ConnectionSettings.h
+++ b/qpid/cpp/src/qpid/client/ConnectionSettings.h
@@ -71,7 +71,8 @@ struct ConnectionSettings {
std::string virtualhost;
/**
- * The username to use when authenticating the connection.
+ * The username to use when authenticating the connection. If not
+ * specified the current users login is used if available.
*/
std::string username;
/**
@@ -111,6 +112,20 @@ struct ConnectionSettings {
* If true, TCP_NODELAY will be set for the connection.
*/
bool tcpNoDelay;
+ /**
+ * SASL service name
+ */
+ std::string service;
+ /**
+ * Minimum acceptable strength of any SASL negotiated security
+ * layer. 0 means no security layer required.
+ */
+ uint minSsf;
+ /**
+ * Maximum acceptable strength of any SASL negotiated security
+ * layer. 0 means no security layer allowed.
+ */
+ uint maxSsf;
};
}} // namespace qpid::client
diff --git a/qpid/cpp/src/qpid/client/Connector.cpp b/qpid/cpp/src/qpid/client/Connector.cpp
index bef98863a1..0e11b920e1 100644
--- a/qpid/cpp/src/qpid/client/Connector.cpp
+++ b/qpid/cpp/src/qpid/client/Connector.cpp
@@ -24,15 +24,18 @@
#include "ConnectionImpl.h"
#include "ConnectionSettings.h"
#include "qpid/log/Statement.h"
+#include "qpid/sys/Codec.h"
#include "qpid/sys/Time.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/sys/AsynchIO.h"
#include "qpid/sys/Dispatcher.h"
#include "qpid/sys/Poller.h"
+#include "qpid/sys/SecurityLayer.h"
#include "qpid/Msg.h"
#include <iostream>
#include <map>
+#include <deque>
#include <boost/bind.hpp>
#include <boost/format.hpp>
#include <boost/weak_ptr.hpp>
@@ -74,39 +77,19 @@ void Connector::registerFactory(const std::string& proto, Factory* connectorFact
theProtocolRegistry()[proto] = connectorFactory;
}
-class TCPConnector : public Connector, private sys::Runnable
+class TCPConnector : public Connector, public sys::Codec, private sys::Runnable
{
+ typedef std::deque<framing::AMQFrame> Frames;
struct Buff;
- /** Batch up frames for writing to aio. */
- class Writer : public framing::FrameHandler {
- typedef sys::AsynchIOBufferBase BufferBase;
- typedef std::vector<framing::AMQFrame> Frames;
-
- const uint16_t maxFrameSize;
- sys::Mutex lock;
- sys::AsynchIO* aio;
- BufferBase* buffer;
- Frames frames;
- size_t lastEof; // Position after last EOF in frames
- framing::Buffer encode;
- size_t framesEncoded;
- std::string identifier;
- Bounds* bounds;
-
- void writeOne();
- void newBuffer();
+ const uint16_t maxFrameSize;
- public:
-
- Writer(uint16_t maxFrameSize, Bounds*);
- ~Writer();
- void init(std::string id, sys::AsynchIO*);
- void handle(framing::AMQFrame&);
- void write(sys::AsynchIO&);
- };
+ sys::Mutex lock;
+ Frames frames; // Outgoing frame queue
+ size_t lastEof; // Position after last EOF in frames
+ uint64_t currentSize;
+ Bounds* bounds;
- const uint16_t maxFrameSize;
framing::ProtocolVersion version;
bool initiated;
@@ -119,14 +102,14 @@ class TCPConnector : public Connector, private sys::Runnable
framing::InitiationHandler* initialiser;
framing::OutputHandler* output;
- Writer writer;
-
sys::Thread receiver;
sys::Socket socket;
sys::AsynchIO* aio;
+ std::string identifier;
boost::shared_ptr<sys::Poller> poller;
+ std::auto_ptr<qpid::sys::SecurityLayer> securityLayer;
~TCPConnector();
@@ -139,8 +122,6 @@ class TCPConnector : public Connector, private sys::Runnable
void writeDataBlock(const framing::AMQDataBlock& data);
void eof(qpid::sys::AsynchIO&);
- std::string identifier;
-
boost::weak_ptr<ConnectionImpl> impl;
void connect(const std::string& host, int port);
@@ -153,6 +134,12 @@ class TCPConnector : public Connector, private sys::Runnable
sys::ShutdownHandler* getShutdownHandler() const;
framing::OutputHandler* getOutputHandler();
const std::string& getIdentifier() const;
+ void activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer>);
+
+ size_t decode(const char* buffer, size_t size);
+ size_t encode(const char* buffer, size_t size);
+ bool canEncode();
+
public:
TCPConnector(framing::ProtocolVersion pVersion,
@@ -177,12 +164,14 @@ TCPConnector::TCPConnector(ProtocolVersion ver,
const ConnectionSettings& settings,
ConnectionImpl* cimpl)
: maxFrameSize(settings.maxFrameSize),
+ lastEof(0),
+ currentSize(0),
+ bounds(cimpl),
version(ver),
initiated(false),
closed(true),
joined(true),
shutdownHandler(0),
- writer(maxFrameSize, cimpl),
aio(0),
impl(cimpl->shared_from_this())
{
@@ -214,7 +203,6 @@ void TCPConnector::connect(const std::string& host, int port){
0, // closed
0, // nobuffs
boost::bind(&TCPConnector::writebuff, this, _1));
- writer.init(identifier, aio);
}
void TCPConnector::init(){
@@ -266,7 +254,21 @@ const std::string& TCPConnector::getIdentifier() const {
}
void TCPConnector::send(AMQFrame& frame) {
- writer.handle(frame);
+ bool notifyWrite = false;
+ {
+ Mutex::ScopedLock l(lock);
+ frames.push_back(frame);
+ //only ask to write if this is the end of a frameset or if we
+ //already have a buffers worth of data
+ currentSize += frame.encodedSize();
+ if (frame.getEof()) {
+ lastEof = frames.size();
+ notifyWrite = true;
+ } else {
+ notifyWrite = (currentSize >= maxFrameSize);
+ }
+ }
+ if (notifyWrite) aio->notifyPendingWrite();
}
void TCPConnector::handleClosed() {
@@ -279,70 +281,70 @@ struct TCPConnector::Buff : public AsynchIO::BufferBase {
~Buff() { delete [] bytes;}
};
-TCPConnector::Writer::Writer(uint16_t s, Bounds* b) : maxFrameSize(s), aio(0), buffer(0), lastEof(0), bounds(b)
+void TCPConnector::writebuff(AsynchIO& /*aio*/)
{
-}
-
-TCPConnector::Writer::~Writer() { delete buffer; }
+ Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this;
+ if (codec->canEncode()) {
+ std::auto_ptr<AsynchIO::BufferBase> buffer = std::auto_ptr<AsynchIO::BufferBase>(aio->getQueuedBuffer());
+ if (!buffer.get()) buffer = std::auto_ptr<AsynchIO::BufferBase>(new Buff(maxFrameSize));
+
+ size_t encoded = codec->encode(buffer->bytes, buffer->byteCount);
-void TCPConnector::Writer::init(std::string id, sys::AsynchIO* a) {
- Mutex::ScopedLock l(lock);
- identifier = id;
- aio = a;
- newBuffer();
-}
-void TCPConnector::Writer::handle(framing::AMQFrame& frame) {
- Mutex::ScopedLock l(lock);
- frames.push_back(frame);
- //only try to write if this is the end of a frameset or if we
- //already have a buffers worth of data
- if (frame.getEof() || (bounds && bounds->getCurrentSize() >= maxFrameSize)) {
- lastEof = frames.size();
- aio->notifyPendingWrite();
+ buffer->dataStart = 0;
+ buffer->dataCount = encoded;
+ aio->queueWrite(buffer.release());
}
- QPID_LOG(trace, "SENT " << identifier << ": " << frame);
-}
-
-void TCPConnector::Writer::writeOne() {
- assert(buffer);
- framesEncoded = 0;
-
- buffer->dataStart = 0;
- buffer->dataCount = encode.getPosition();
- aio->queueWrite(buffer);
- newBuffer();
}
-void TCPConnector::Writer::newBuffer() {
- buffer = aio->getQueuedBuffer();
- if (!buffer) buffer = new Buff(maxFrameSize);
- encode = framing::Buffer(buffer->bytes, buffer->byteCount);
- framesEncoded = 0;
+// Called in IO thread.
+bool TCPConnector::canEncode()
+{
+ Mutex::ScopedLock l(lock);
+ //have at least one full frameset or a whole buffers worth of data
+ return lastEof || currentSize >= maxFrameSize;
}
// Called in IO thread.
-void TCPConnector::Writer::write(sys::AsynchIO&) {
- Mutex::ScopedLock l(lock);
- assert(buffer);
+size_t TCPConnector::encode(const char* buffer, size_t size)
+{
+ framing::Buffer out(const_cast<char*>(buffer), size);
size_t bytesWritten(0);
- for (size_t i = 0; i < lastEof; ++i) {
- AMQFrame& frame = frames[i];
- uint32_t size = frame.encodedSize();
- if (size > encode.available()) writeOne();
- assert(size <= encode.available());
- frame.encode(encode);
- ++framesEncoded;
- bytesWritten += size;
+ {
+ Mutex::ScopedLock l(lock);
+ while (!frames.empty() && out.available() >= frames.front().encodedSize() ) {
+ frames.front().encode(out);
+ QPID_LOG(trace, "SENT " << identifier << ": " << frames.front());
+ frames.pop_front();
+ if (lastEof) --lastEof;
+ }
+ bytesWritten = size - out.available();
+ currentSize -= bytesWritten;
}
- frames.erase(frames.begin(), frames.begin()+lastEof);
- lastEof = 0;
if (bounds) bounds->reduce(bytesWritten);
- if (encode.getPosition() > 0) writeOne();
+ return bytesWritten;
}
-bool TCPConnector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) {
- framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
+bool TCPConnector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff)
+{
+ Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this;
+ int32_t decoded = codec->decode(buff->bytes+buff->dataStart, buff->dataCount);
+ // TODO: unreading needs to go away, and when we can cope
+ // with multiple sub-buffers in the general buffer scheme, it will
+ if (decoded < buff->dataCount) {
+ // Adjust buffer for used bytes and then "unread them"
+ buff->dataStart += decoded;
+ buff->dataCount -= decoded;
+ aio.unread(buff);
+ } else {
+ // Give whole buffer back to aio subsystem
+ aio.queueReadBuffer(buff);
+ }
+ return true;
+}
+size_t TCPConnector::decode(const char* buffer, size_t size)
+{
+ framing::Buffer in(const_cast<char*>(buffer), size);
if (!initiated) {
framing::ProtocolInitiation protocolInit;
if (protocolInit.decode(in)) {
@@ -356,22 +358,7 @@ bool TCPConnector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) {
QPID_LOG(trace, "RECV " << identifier << ": " << frame);
input->received(frame);
}
- // TODO: unreading needs to go away, and when we can cope
- // with multiple sub-buffers in the general buffer scheme, it will
- if (in.available() != 0) {
- // Adjust buffer for used bytes and then "unread them"
- buff->dataStart += buff->dataCount-in.available();
- buff->dataCount = in.available();
- aio.unread(buff);
- } else {
- // Give whole buffer back to aio subsystem
- aio.queueReadBuffer(buff);
- }
- return true;
-}
-
-void TCPConnector::writebuff(AsynchIO& aio_) {
- writer.write(aio_);
+ return size - in.available();
}
void TCPConnector::writeDataBlock(const AMQDataBlock& data) {
@@ -388,7 +375,7 @@ void TCPConnector::eof(AsynchIO&) {
// TODO: astitcher 20070908 This version of the code can never time out, so the idle processing
// will never be called
-void TCPConnector::run(){
+void TCPConnector::run() {
// Keep the connection impl in memory until run() completes.
boost::shared_ptr<ConnectionImpl> protect = impl.lock();
assert(protect);
@@ -409,5 +396,11 @@ void TCPConnector::run(){
}
}
+void TCPConnector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer> sl)
+{
+ securityLayer = sl;
+ securityLayer->init(this);
+}
+
}} // namespace qpid::client
diff --git a/qpid/cpp/src/qpid/client/Connector.h b/qpid/cpp/src/qpid/client/Connector.h
index 5c37d95300..e23fb8875b 100644
--- a/qpid/cpp/src/qpid/client/Connector.h
+++ b/qpid/cpp/src/qpid/client/Connector.h
@@ -40,6 +40,11 @@
#include <boost/shared_ptr.hpp>
namespace qpid {
+
+namespace sys {
+class SecurityLayer;
+}
+
namespace client {
struct ConnectionSettings;
@@ -65,6 +70,9 @@ class Connector : public framing::OutputHandler
virtual sys::ShutdownHandler* getShutdownHandler() const = 0;
virtual framing::OutputHandler* getOutputHandler() = 0;
virtual const std::string& getIdentifier() const = 0;
+
+ virtual void activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer>) {}
+
};
}}
diff --git a/qpid/cpp/src/qpid/client/RdmaConnector.cpp b/qpid/cpp/src/qpid/client/RdmaConnector.cpp
index 98fe762f31..3cc8961eea 100644
--- a/qpid/cpp/src/qpid/client/RdmaConnector.cpp
+++ b/qpid/cpp/src/qpid/client/RdmaConnector.cpp
@@ -29,6 +29,7 @@
#include "qpid/sys/rdma/RdmaIO.h"
#include "qpid/sys/Dispatcher.h"
#include "qpid/sys/Poller.h"
+#include "qpid/sys/SecurityLayer.h"
#include "qpid/Msg.h"
#include <iostream>
@@ -47,39 +48,21 @@ using namespace qpid::framing;
using boost::format;
using boost::str;
-class RdmaConnector : public Connector, private sys::Runnable
+ class RdmaConnector : public Connector, public sys::Codec, private sys::Runnable
{
struct Buff;
- /** Batch up frames for writing to aio. */
- class Writer : public framing::FrameHandler {
- typedef Rdma::Buffer BufferBase;
- typedef std::deque<framing::AMQFrame> Frames;
-
- const uint16_t maxFrameSize;
- sys::Mutex lock;
- Rdma::AsynchIO* aio;
- BufferBase* buffer;
- Frames frames;
- size_t lastEof; // Position after last EOF in frames
- framing::Buffer encode;
- size_t framesEncoded;
- std::string identifier;
- Bounds* bounds;
-
- void writeOne();
- void newBuffer();
+ typedef Rdma::Buffer BufferBase;
+ typedef std::deque<framing::AMQFrame> Frames;
- public:
-
- Writer(uint16_t maxFrameSize, Bounds*);
- ~Writer();
- void init(std::string id, Rdma::AsynchIO*);
- void handle(framing::AMQFrame&);
- void write(Rdma::AsynchIO&);
- };
-
const uint16_t maxFrameSize;
+ sys::Mutex lock;
+ Frames frames;
+ size_t lastEof; // Position after last EOF in frames
+ uint64_t currentSize;
+ Bounds* bounds;
+
+
framing::ProtocolVersion version;
bool initiated;
@@ -92,12 +75,11 @@ class RdmaConnector : public Connector, private sys::Runnable
framing::InitiationHandler* initialiser;
framing::OutputHandler* output;
- Writer writer;
-
sys::Thread receiver;
Rdma::AsynchIO* aio;
sys::Poller::shared_ptr poller;
+ std::auto_ptr<qpid::sys::SecurityLayer> securityLayer;
~RdmaConnector();
@@ -129,6 +111,11 @@ class RdmaConnector : public Connector, private sys::Runnable
sys::ShutdownHandler* getShutdownHandler() const;
framing::OutputHandler* getOutputHandler();
const std::string& getIdentifier() const;
+ void activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer>);
+
+ size_t decode(const char* buffer, size_t size);
+ size_t encode(const char* buffer, size_t size);
+ bool canEncode();
public:
RdmaConnector(framing::ProtocolVersion pVersion,
@@ -155,12 +142,14 @@ RdmaConnector::RdmaConnector(ProtocolVersion ver,
const ConnectionSettings& settings,
ConnectionImpl* cimpl)
: maxFrameSize(settings.maxFrameSize),
+ lastEof(0),
+ currentSize(0),
+ bounds(cimpl),
version(ver),
initiated(false),
polling(false),
joined(true),
shutdownHandler(0),
- writer(maxFrameSize, cimpl),
aio(0),
impl(cimpl)
{
@@ -216,7 +205,6 @@ void RdmaConnector::connected(Poller::shared_ptr poller, Rdma::Connection::intru
aio->start(poller);
identifier = str(format("[%1% %2%]") % ci->getLocalName() % ci->getPeerName());
- writer.init(identifier, aio);
ProtocolInitiation init(version);
writeDataBlock(init);
}
@@ -279,7 +267,21 @@ const std::string& RdmaConnector::getIdentifier() const {
}
void RdmaConnector::send(AMQFrame& frame) {
- writer.handle(frame);
+ bool notifyWrite = false;
+ {
+ Mutex::ScopedLock l(lock);
+ frames.push_back(frame);
+ //only ask to write if this is the end of a frameset or if we
+ //already have a buffers worth of data
+ currentSize += frame.encodedSize();
+ if (frame.getEof()) {
+ lastEof = frames.size();
+ notifyWrite = true;
+ } else {
+ notifyWrite = (currentSize >= maxFrameSize);
+ }
+ }
+ if (notifyWrite) aio->notifyPendingWrite();
}
void RdmaConnector::handleClosed() {
@@ -287,88 +289,54 @@ void RdmaConnector::handleClosed() {
shutdownHandler->shutdown();
}
-RdmaConnector::Writer::Writer(uint16_t s, Bounds* b) :
- maxFrameSize(s),
- aio(0),
- buffer(0),
- lastEof(0),
- bounds(b)
-{
-}
-
-RdmaConnector::Writer::~Writer() {
- if (aio)
- aio->returnBuffer(buffer);
-}
-
-void RdmaConnector::Writer::init(std::string id, Rdma::AsynchIO* a) {
- Mutex::ScopedLock l(lock);
- identifier = id;
- aio = a;
- assert(aio->bufferAvailable());
- newBuffer();
-}
-void RdmaConnector::Writer::handle(framing::AMQFrame& frame) {
- Mutex::ScopedLock l(lock);
- frames.push_back(frame);
- // Don't bother to send anything unless we're at the end of a frameset (assembly in 0-10 terminology)
- if (frame.getEof()) {
- lastEof = frames.size();
- QPID_LOG(debug, "Requesting write: lastEof=" << lastEof);
- aio->notifyPendingWrite();
+// Called in IO thread. (write idle routine)
+// This is NOT only called in response to previously calling notifyPendingWrite
+void RdmaConnector::writebuff(Rdma::AsynchIO&) {
+ Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this;
+ if (codec->canEncode()) {
+ std::auto_ptr<BufferBase> buffer = std::auto_ptr<BufferBase>(aio->getBuffer());
+ size_t encoded = codec->encode(buffer->bytes, buffer->byteCount);
+
+ buffer->dataStart = 0;
+ buffer->dataCount = encoded;
+ aio->queueWrite(buffer.release());
}
- QPID_LOG(trace, "SENT " << identifier << ": " << frame);
}
-void RdmaConnector::Writer::writeOne() {
- assert(buffer);
- QPID_LOG(trace, "Write buffer " << encode.getPosition()
- << " bytes " << framesEncoded << " frames ");
- framesEncoded = 0;
-
- buffer->dataStart = 0;
- buffer->dataCount = encode.getPosition();
- aio->queueWrite(buffer);
- newBuffer();
-}
-
-void RdmaConnector::Writer::newBuffer() {
- buffer = aio->getBuffer();
- encode = framing::Buffer(buffer->bytes, buffer->byteCount);
- framesEncoded = 0;
+bool RdmaConnector::canEncode()
+{
+ Mutex::ScopedLock l(lock);
+ //have at least one full frameset or a whole buffers worth of data
+ return aio->writable() && aio->bufferAvailable() && (lastEof || currentSize >= maxFrameSize);
}
-// Called in IO thread. (write idle routine)
-// This is NOT only called in response to previously calling notifyPendingWrite
-void RdmaConnector::Writer::write(Rdma::AsynchIO&) {
- Mutex::ScopedLock l(lock);
- assert(buffer);
- // If nothing to do return immediately
- if (lastEof==0)
- return;
- size_t bytesWritten = 0;
- while (aio->writable() && aio->bufferAvailable() && !frames.empty()) {
- const AMQFrame* frame = &frames.front();
- uint32_t size = frame->encodedSize();
- while (size <= encode.available()) {
- frame->encode(encode);
+size_t RdmaConnector::encode(const char* buffer, size_t size)
+{
+ framing::Buffer out(const_cast<char*>(buffer), size);
+ size_t bytesWritten(0);
+ {
+ Mutex::ScopedLock l(lock);
+ while (!frames.empty() && out.available() >= frames.front().encodedSize() ) {
+ frames.front().encode(out);
+ QPID_LOG(trace, "SENT " << identifier << ": " << frames.front());
frames.pop_front();
- ++framesEncoded;
- bytesWritten += size;
- if (frames.empty())
- break;
- frame = &frames.front();
- size = frame->encodedSize();
+ if (lastEof) --lastEof;
}
- lastEof -= framesEncoded;
- writeOne();
+ bytesWritten = size - out.available();
+ currentSize -= bytesWritten;
}
if (bounds) bounds->reduce(bytesWritten);
+ return bytesWritten;
}
void RdmaConnector::readbuff(Rdma::AsynchIO&, Rdma::Buffer* buff) {
- framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
+ Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this;
+ codec->decode(buff->bytes+buff->dataStart, buff->dataCount);
+}
+size_t RdmaConnector::decode(const char* buffer, size_t size)
+{
+ framing::Buffer in(const_cast<char*>(buffer), size);
if (!initiated) {
framing::ProtocolInitiation protocolInit;
if (protocolInit.decode(in)) {
@@ -382,10 +350,7 @@ void RdmaConnector::readbuff(Rdma::AsynchIO&, Rdma::Buffer* buff) {
QPID_LOG(trace, "RECV " << identifier << ": " << frame);
input->received(frame);
}
-}
-
-void RdmaConnector::writebuff(Rdma::AsynchIO& aio_) {
- writer.write(aio_);
+ return size - in.available();
}
void RdmaConnector::writeDataBlock(const AMQDataBlock& data) {
@@ -424,5 +389,10 @@ void RdmaConnector::run(){
}
}
+void RdmaConnector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer> sl)
+{
+ securityLayer = sl;
+ securityLayer->init(this);
+}
}} // namespace qpid::client
diff --git a/qpid/cpp/src/qpid/client/Sasl.h b/qpid/cpp/src/qpid/client/Sasl.h
new file mode 100644
index 0000000000..e7a911ebce
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/Sasl.h
@@ -0,0 +1,52 @@
+#ifndef QPID_CLIENT_SASL_H
+#define QPID_CLIENT_SASL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <memory>
+#include <string>
+
+namespace qpid {
+
+namespace sys {
+class SecurityLayer;
+}
+
+namespace client {
+
+class ConnectionSettings;
+
+/**
+ * Interface to SASL support
+ */
+class Sasl
+{
+ public:
+ virtual std::string start(const std::string& mechanisms) = 0;
+ virtual std::string step(const std::string& challenge) = 0;
+ virtual std::string getMechanism() = 0;
+ virtual std::auto_ptr<qpid::sys::SecurityLayer> getSecurityLayer(uint16_t maxFrameSize) = 0;
+ virtual ~Sasl() {}
+};
+}} // namespace qpid::client
+
+#endif /*!QPID_CLIENT_SASL_H*/
diff --git a/qpid/cpp/src/qpid/client/SaslFactory.cpp b/qpid/cpp/src/qpid/client/SaslFactory.cpp
new file mode 100644
index 0000000000..d6edc6501d
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/SaslFactory.cpp
@@ -0,0 +1,345 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "SaslFactory.h"
+#include "ConnectionSettings.h"
+
+#ifdef HAVE_CONFIG_H
+# include "config.h"
+#endif
+
+#ifndef HAVE_SASL
+
+namespace qpid {
+namespace client {
+
+//Null implementation
+
+SaslFactory::SaslFactory() {}
+
+SaslFactory::~SaslFactory() {}
+
+SaslFactory& SaslFactory::getInstance()
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ if (!instance.get()) {
+ instance = std::auto_ptr<SaslFactory>(new SaslFactory());
+ }
+ return *instance;
+}
+
+std::auto_ptr<Sasl> SaslFactory::create(const ConnectionSettings&)
+{
+ return std::auto_ptr<Sasl>();
+}
+
+qpid::sys::Mutex SaslFactory::lock;
+std::auto_ptr<SaslFactory> SaslFactory::instance;
+
+}} // namespace qpid::client
+
+#else
+
+#include "qpid/Exception.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/sys/SecurityLayer.h"
+#include "qpid/sys/cyrus/CyrusSecurityLayer.h"
+#include "qpid/log/Statement.h"
+#include <sasl/sasl.h>
+#include <strings.h>
+
+namespace qpid {
+namespace client {
+
+using qpid::sys::SecurityLayer;
+using qpid::sys::cyrus::CyrusSecurityLayer;
+using qpid::framing::InternalErrorException;
+
+const size_t MAX_LOGIN_LENGTH = 50;
+
+class CyrusSasl : public Sasl
+{
+ public:
+ CyrusSasl(const ConnectionSettings&);
+ ~CyrusSasl();
+ std::string start(const std::string& mechanisms);
+ std::string step(const std::string& challenge);
+ std::string getMechanism();
+ std::auto_ptr<SecurityLayer> getSecurityLayer(uint16_t maxFrameSize);
+ private:
+ sasl_conn_t* conn;
+ sasl_callback_t callbacks[5];//realm, user, authname, password, end-of-list
+ ConnectionSettings settings;
+ std::string input;
+ std::string mechanism;
+ char login[MAX_LOGIN_LENGTH];
+
+ void interact(sasl_interact_t* client_interact);
+};
+
+//sasl callback functions
+int getLogin(void *context, int id, const char **result, unsigned *len);
+int getUserFromSettings(void *context, int id, const char **result, unsigned *len);
+int getPasswordFromSettings(sasl_conn_t *conn, void *context, int id, sasl_secret_t **psecret);
+typedef int CallbackProc();
+
+qpid::sys::Mutex SaslFactory::lock;
+std::auto_ptr<SaslFactory> SaslFactory::instance;
+
+SaslFactory::SaslFactory()
+{
+ sasl_callback_t* callbacks = 0;
+ int result = sasl_client_init(callbacks);
+ if (result != SASL_OK) {
+ throw InternalErrorException(QPID_MSG("Sasl error: " << sasl_errstring(result, 0, 0)));
+ }
+}
+
+SaslFactory::~SaslFactory()
+{
+ sasl_done();
+}
+
+SaslFactory& SaslFactory::getInstance()
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ if (!instance.get()) {
+ instance = std::auto_ptr<SaslFactory>(new SaslFactory());
+ }
+ return *instance;
+}
+
+std::auto_ptr<Sasl> SaslFactory::create(const ConnectionSettings& settings)
+{
+ std::auto_ptr<Sasl> sasl(new CyrusSasl(settings));
+ return sasl;
+}
+
+CyrusSasl::CyrusSasl(const ConnectionSettings& s) : conn(0), settings(s)
+{
+ size_t i = 0;
+
+ callbacks[i].id = SASL_CB_GETREALM;
+ callbacks[i].proc = 0;
+ callbacks[i++].context = 0;
+
+ if (settings.username.empty()) {
+ callbacks[i].id = SASL_CB_USER;
+ callbacks[i].proc = (CallbackProc*) &getLogin;
+ callbacks[i++].context = &login;
+
+ callbacks[i].id = SASL_CB_AUTHNAME;
+ callbacks[i].proc = (CallbackProc*) &getLogin;
+ callbacks[i++].context = &login;
+ } else {
+ callbacks[i].id = SASL_CB_USER;
+ callbacks[i].proc = (CallbackProc*) &getUserFromSettings;
+ callbacks[i++].context = &settings;
+
+ callbacks[i].id = SASL_CB_AUTHNAME;
+ callbacks[i].proc = (CallbackProc*) &getUserFromSettings;
+ callbacks[i++].context = &settings;
+ }
+
+ callbacks[i].id = SASL_CB_PASS;
+ callbacks[i].proc = (CallbackProc*) &getPasswordFromSettings;
+ callbacks[i++].context = &settings;
+
+ callbacks[i].id = SASL_CB_LIST_END;
+ callbacks[i].proc = 0;
+ callbacks[i++].context = 0;
+}
+
+CyrusSasl::~CyrusSasl()
+{
+ if (conn) {
+ sasl_dispose(&conn);
+ }
+}
+
+namespace {
+ const std::string SSL("ssl");
+}
+
+std::string CyrusSasl::start(const std::string& mechanisms)
+{
+ QPID_LOG(debug, "CyrusSasl::start(" << mechanisms << ")");
+ int result = sasl_client_new(settings.service.c_str(),
+ settings.host.c_str(),
+ 0, 0, /* Local and remote IP address strings */
+ callbacks,
+ 0, /* security flags */
+ &conn);
+
+ if (result != SASL_OK) throw InternalErrorException(QPID_MSG("Sasl error: " << sasl_errdetail(conn)));
+
+ sasl_security_properties_t secprops;
+
+ secprops.min_ssf = settings.minSsf;
+ secprops.max_ssf = settings.maxSsf;
+ secprops.maxbufsize = 65535;
+
+ QPID_LOG(debug, "min_ssf: " << secprops.min_ssf << ", max_ssf: " << secprops.max_ssf);
+
+ secprops.property_names = 0;
+ secprops.property_values = 0;
+ secprops.security_flags = 0;//TODO: provide means for application to configure these
+
+ result = sasl_setprop(conn, SASL_SEC_PROPS, &secprops);
+ if (result != SASL_OK) {
+ throw framing::InternalErrorException(QPID_MSG("SASL error: " << sasl_errdetail(conn)));
+ }
+
+
+ sasl_interact_t* client_interact = 0;
+ const char *out = 0;
+ unsigned outlen = 0;
+ const char *chosenMechanism = 0;
+
+ do {
+ result = sasl_client_start(conn,
+ mechanisms.c_str(),
+ &client_interact,
+ &out,
+ &outlen,
+ &chosenMechanism);
+
+ if (result == SASL_INTERACT) {
+ interact(client_interact);
+ }
+ } while (result == SASL_INTERACT);
+
+ if (result != SASL_CONTINUE && result != SASL_OK) {
+ throw InternalErrorException(QPID_MSG("Sasl error: " << sasl_errdetail(conn)));
+ }
+
+ mechanism = std::string(chosenMechanism);
+ QPID_LOG(debug, "CyrusSasl::start(" << mechanisms << "): selected "
+ << mechanism << " response: '" << std::string(out, outlen) << "'");
+ return std::string(out, outlen);
+}
+
+std::string CyrusSasl::step(const std::string& challenge)
+{
+ sasl_interact_t* client_interact = 0;
+ const char *out = 0;
+ unsigned outlen = 0;
+ int result = 0;
+ do {
+ result = sasl_client_step(conn, /* our context */
+ challenge.data(), /* the data from the server */
+ challenge.size(), /* it's length */
+ &client_interact, /* this should be
+ unallocated and NULL */
+ &out, /* filled in on success */
+ &outlen); /* filled in on success */
+
+ if (result == SASL_INTERACT) {
+ interact(client_interact);
+ }
+ } while (result == SASL_INTERACT);
+
+ std::string response;
+ if (result == SASL_CONTINUE || result == SASL_OK) response = std::string(out, outlen);
+ else if (result != SASL_OK) {
+ throw InternalErrorException(QPID_MSG("Sasl error: " << sasl_errdetail(conn)));
+ }
+ QPID_LOG(debug, "CyrusSasl::step(" << challenge << "): " << response);
+ return response;
+}
+
+std::string CyrusSasl::getMechanism()
+{
+ return mechanism;
+}
+
+void CyrusSasl::interact(sasl_interact_t* client_interact)
+{
+ std::cout << "[" << client_interact->id << "] " << client_interact->challenge << " " << client_interact->prompt;
+ if (client_interact->defresult) std::cout << " (" << client_interact->defresult << ")";
+ std::cout << std::endl;
+ if (std::cin >> input) {
+ client_interact->result = input.data();
+ client_interact->len = input.size();
+ }
+}
+
+std::auto_ptr<SecurityLayer> CyrusSasl::getSecurityLayer(uint16_t maxFrameSize)
+{
+ const void* value(0);
+ int result = sasl_getprop(conn, SASL_SSF, &value);
+ if (result != SASL_OK) {
+ throw framing::InternalErrorException(QPID_MSG("SASL error: " << sasl_errdetail(conn)));
+ }
+ uint ssf = *(reinterpret_cast<const unsigned*>(value));
+ std::auto_ptr<SecurityLayer> securityLayer;
+ if (ssf) {
+ QPID_LOG(info, "Installing security layer, SSF: "<< ssf);
+ securityLayer = std::auto_ptr<SecurityLayer>(new CyrusSecurityLayer(conn, maxFrameSize));
+ }
+ return securityLayer;
+}
+
+int getLogin(void* context, int /*id*/, const char** result, unsigned* /*len*/)
+{
+ if (context) {
+ char* login = (char*) context;
+ int status = getlogin_r(login, MAX_LOGIN_LENGTH);
+ if (status == 0) {
+ *result = login;
+ QPID_LOG(debug, "getLogin(): " << (*result));
+ } else {
+ strcpy(login, "guest");
+ QPID_LOG(error, "getlogin_r() failed with " << status << "; defaulting to " << login);
+ }
+ return SASL_OK;
+ } else {
+ return SASL_FAIL;
+ }
+}
+
+int getUserFromSettings(void* context, int /*id*/, const char** result, unsigned* /*len*/)
+{
+ if (context) {
+ *result = ((ConnectionSettings*) context)->username.c_str();
+ QPID_LOG(debug, "getUserFromSettings(): " << (*result));
+ return SASL_OK;
+ } else {
+ return SASL_FAIL;
+ }
+}
+
+int getPasswordFromSettings(sasl_conn_t* /*conn*/, void* context, int /*id*/, sasl_secret_t** psecret)
+{
+ if (context) {
+ size_t length = ((ConnectionSettings*) context)->password.size();
+ sasl_secret_t* secret = (sasl_secret_t*) malloc(sizeof(sasl_secret_t) + length);
+ secret->len = length;
+ memcpy(secret->data, ((ConnectionSettings*) context)->password.data(), length);
+ *psecret = secret;
+ return SASL_OK;
+ } else {
+ return SASL_FAIL;
+ }
+}
+
+}} // namespace qpid::client
+
+#endif
diff --git a/qpid/cpp/src/qpid/client/SaslFactory.h b/qpid/cpp/src/qpid/client/SaslFactory.h
new file mode 100644
index 0000000000..60a1d60ff3
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/SaslFactory.h
@@ -0,0 +1,48 @@
+#ifndef QPID_CLIENT_SASLFACTORY_H
+#define QPID_CLIENT_SASLFACTORY_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Sasl.h"
+#include "qpid/sys/Mutex.h"
+#include <memory>
+
+namespace qpid {
+namespace client {
+
+/**
+ * Factory for instances of the Sasl interface through which Sasl
+ * support is provided to a ConnectionHandler.
+ */
+class SaslFactory
+{
+ public:
+ std::auto_ptr<Sasl> create(const ConnectionSettings&);
+ static SaslFactory& getInstance();
+ ~SaslFactory();
+ private:
+ SaslFactory();
+ static qpid::sys::Mutex lock;
+ static std::auto_ptr<SaslFactory> instance;
+};
+}} // namespace qpid::client
+
+#endif /*!QPID_CLIENT_SASLFACTORY_H*/
diff --git a/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp b/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp
index 83b6329889..35b75c1fe8 100644
--- a/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp
+++ b/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp
@@ -184,16 +184,21 @@ void AsynchIOHandler::idle(AsynchIO&){
return;
}
if (codec == 0) return;
- if (codec->canEncode()) {
- // Try and get a queued buffer if not then construct new one
- AsynchIO::BufferBase* buff = aio->getQueuedBuffer();
- if (!buff) buff = new Buff;
- size_t encoded=codec->encode(buff->bytes, buff->byteCount);
- buff->dataCount = encoded;
- aio->queueWrite(buff);
- }
- if (codec->isClosed())
+ try {
+ if (codec->canEncode()) {
+ // Try and get a queued buffer if not then construct new one
+ AsynchIO::BufferBase* buff = aio->getQueuedBuffer();
+ if (!buff) buff = new Buff;
+ size_t encoded=codec->encode(buff->bytes, buff->byteCount);
+ buff->dataCount = encoded;
+ aio->queueWrite(buff);
+ }
+ if (codec->isClosed())
+ aio->queueWriteClose();
+ } catch (const std::exception& e) {
+ QPID_LOG(error, e.what());
aio->queueWriteClose();
+ }
}
}} // namespace qpid::sys
diff --git a/qpid/cpp/src/qpid/sys/Codec.h b/qpid/cpp/src/qpid/sys/Codec.h
new file mode 100644
index 0000000000..f9645f554e
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/Codec.h
@@ -0,0 +1,52 @@
+#ifndef QPID_SYS_CODEC_H
+#define QPID_SYS_CODEC_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <cstddef>
+
+namespace qpid {
+namespace sys {
+
+/**
+ * Generic codec interface
+ */
+class Codec
+{
+ public:
+ virtual ~Codec() {}
+
+ /** Decode from buffer, return number of bytes decoded.
+ * @return may be less than size if there was incomplete
+ * data at the end of the buffer.
+ */
+ virtual size_t decode(const char* buffer, size_t size) = 0;
+
+
+ /** Encode into buffer, return number of bytes encoded */
+ virtual size_t encode(const char* buffer, size_t size) = 0;
+
+ /** Return true if we have data to encode */
+ virtual bool canEncode() = 0;
+};
+}} // namespace qpid::sys
+
+#endif /*!QPID_SYS_CODEC_H*/
diff --git a/qpid/cpp/src/qpid/sys/ConnectionCodec.h b/qpid/cpp/src/qpid/sys/ConnectionCodec.h
index b1b047d2cc..7f5e2f831c 100644
--- a/qpid/cpp/src/qpid/sys/ConnectionCodec.h
+++ b/qpid/cpp/src/qpid/sys/ConnectionCodec.h
@@ -21,6 +21,7 @@
* under the License.
*
*/
+#include "Codec.h"
#include "qpid/framing/ProtocolVersion.h"
namespace qpid {
@@ -34,23 +35,10 @@ class OutputControl;
* Interface of coder/decoder for a connection of a specific protocol
* version.
*/
-class ConnectionCodec {
+class ConnectionCodec : public Codec {
public:
virtual ~ConnectionCodec() {}
- /** Decode from buffer, return number of bytes decoded.
- * @return may be less than size if there was incomplete
- * data at the end of the buffer.
- */
- virtual size_t decode(const char* buffer, size_t size) = 0;
-
-
- /** Encode into buffer, return number of bytes encoded */
- virtual size_t encode(const char* buffer, size_t size) = 0;
-
- /** Return true if we have data to encode */
- virtual bool canEncode() = 0;
-
/** Network connection was closed from other end. */
virtual void closed() = 0;
diff --git a/qpid/cpp/src/qpid/sys/SecurityLayer.h b/qpid/cpp/src/qpid/sys/SecurityLayer.h
new file mode 100644
index 0000000000..6ad29eea80
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/SecurityLayer.h
@@ -0,0 +1,42 @@
+#ifndef QPID_SYS_SECURITYLAYER_H
+#define QPID_SYS_SECURITYLAYER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Codec.h"
+
+namespace qpid {
+namespace sys {
+
+/**
+ * Defines interface to a SASL negotiated Security Layer (for
+ * encryption/integrity)
+ */
+class SecurityLayer : public Codec
+{
+ public:
+ virtual void init(Codec*) = 0;
+ virtual ~SecurityLayer() {}
+};
+
+}} // namespace qpid::sys
+
+#endif /*!QPID_SYS_SECURITYLAYER_H*/
diff --git a/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp b/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp
index be091f86d8..c6e45b8fa4 100644
--- a/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp
+++ b/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp
@@ -65,14 +65,10 @@ static class TCPIOPlugin : public Plugin {
// Only provide to a Broker
if (broker) {
const broker::Broker::Options& opts = broker->getOptions();
- if (opts.requireEncrypted) {
- QPID_LOG(info, "Not accepting unencrypted connections on TCP");
- } else {
- ProtocolFactory::shared_ptr protocol(new AsynchIOProtocolFactory(opts.port, opts.connectionBacklog,
- opts.tcpNoDelay));
- QPID_LOG(notice, "Listening on TCP port " << protocol->getPort());
- broker->registerProtocolFactory("tcp", protocol);
- }
+ ProtocolFactory::shared_ptr protocol(new AsynchIOProtocolFactory(opts.port, opts.connectionBacklog,
+ opts.tcpNoDelay));
+ QPID_LOG(notice, "Listening on TCP port " << protocol->getPort());
+ broker->registerProtocolFactory("tcp", protocol);
}
}
} tcpPlugin;
diff --git a/qpid/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp b/qpid/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp
new file mode 100644
index 0000000000..e906d22cae
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp
@@ -0,0 +1,119 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "CyrusSecurityLayer.h"
+#include <algorithm>
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace sys {
+namespace cyrus {
+
+CyrusSecurityLayer::CyrusSecurityLayer(sasl_conn_t* c, uint16_t maxFrameSize) :
+ conn(c), decrypted(0), decryptedSize(0), encrypted(0), encryptedSize(0), codec(0), maxInputSize(0), decodeBuffer(maxFrameSize)
+{
+ const void* value(0);
+ int result = sasl_getprop(conn, SASL_MAXOUTBUF, &value);
+ if (result != SASL_OK) {
+ throw framing::InternalErrorException(QPID_MSG("SASL encode error: " << sasl_errdetail(conn)));
+ }
+ maxInputSize = *(reinterpret_cast<const unsigned*>(value));
+}
+
+size_t CyrusSecurityLayer::decode(const char* input, size_t size)
+{
+ size_t inStart = 0;
+ do {
+ size_t inSize = std::min(size - inStart, maxInputSize);
+ int result = sasl_decode(conn, input + inStart, inSize, &decrypted, &decryptedSize);
+ if (result != SASL_OK) {
+ throw framing::InternalErrorException(QPID_MSG("SASL encode error: " << sasl_errdetail(conn)));
+ }
+ inStart += inSize;
+ size_t copied = 0;
+ do {
+ size_t count = std::min(decryptedSize - copied, decodeBuffer.size - decodeBuffer.position);
+ ::memcpy(decodeBuffer.data + decodeBuffer.position, decrypted + copied, count);
+ copied += count;
+ decodeBuffer.position += count;
+ size_t decodedSize = codec->decode(decodeBuffer.data, decodeBuffer.position);
+ if (decodedSize < decodeBuffer.position) {
+ ::memmove(decodeBuffer.data, decodeBuffer.data + decodedSize, decodeBuffer.position - decodedSize);
+ }
+ decodeBuffer.position -= decodedSize;
+ } while (copied < decryptedSize);
+ } while (inStart < size);
+ return size;
+}
+
+size_t CyrusSecurityLayer::encode(const char* buffer, size_t size)
+{
+ size_t processed = 0;//records how many bytes have been written to buffer
+ do {
+ if (!encrypted) {
+ DataBuffer encodeBuffer(maxInputSize);//make sure maxInputSize > maxFrameSize
+ size_t encoded = codec->encode(encodeBuffer.data, encodeBuffer.size);
+ if (!encoded) break;//nothing more to do
+ int result = sasl_encode(conn, encodeBuffer.data, encoded, &encrypted, &encryptedSize);
+ if (result != SASL_OK) {
+ throw framing::InternalErrorException(QPID_MSG("SASL encode error: " << sasl_errdetail(conn)));
+ }
+ }
+ size_t remaining = size - processed;
+ if (remaining < encryptedSize) {
+ //can't fit all encrypted data in the buffer we've
+ //been given, copy in what we can and hold on to the
+ //rest until the next call
+ ::memcpy(const_cast<char*>(buffer + processed), encrypted, remaining);
+ processed += remaining;
+ encrypted += remaining;
+ encryptedSize -= remaining;
+ } else {
+ ::memcpy(const_cast<char*>(buffer + processed), encrypted, encryptedSize);
+ processed += encryptedSize;
+ encrypted = 0;
+ encryptedSize = 0;
+ }
+ } while (processed < size);
+ return processed;
+}
+
+bool CyrusSecurityLayer::canEncode()
+{
+ return encrypted || codec->canEncode();
+}
+
+void CyrusSecurityLayer::init(qpid::sys::Codec* c)
+{
+ codec = c;
+}
+
+CyrusSecurityLayer::DataBuffer::DataBuffer(size_t s) : position(0), size(s)
+{
+ data = new char[size];
+}
+
+CyrusSecurityLayer::DataBuffer::~DataBuffer()
+{
+ delete[] data;
+}
+
+}}} // namespace qpid::sys::cyrus
diff --git a/qpid/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.h b/qpid/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.h
new file mode 100644
index 0000000000..3c00d496a9
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.h
@@ -0,0 +1,66 @@
+#ifndef QPID_SYS_CYRUS_CYRUSSECURITYLAYER_H
+#define QPID_SYS_CYRUS_CYRUSSECURITYLAYER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/IntegerTypes.h"
+#include "qpid/sys/SecurityLayer.h"
+#include <sasl/sasl.h>
+
+namespace qpid {
+namespace sys {
+namespace cyrus {
+
+
+/**
+ * Implementation of SASL security layer using cyrus-sasl library
+ */
+class CyrusSecurityLayer : public qpid::sys::SecurityLayer
+{
+ public:
+ CyrusSecurityLayer(sasl_conn_t*, uint16_t maxFrameSize);
+ size_t decode(const char* buffer, size_t size);
+ size_t encode(const char* buffer, size_t size);
+ bool canEncode();
+ void init(qpid::sys::Codec*);
+ private:
+ struct DataBuffer
+ {
+ char* data;
+ size_t position;
+ const size_t size;
+ DataBuffer(size_t);
+ ~DataBuffer();
+ };
+
+ sasl_conn_t* conn;
+ const char* decrypted;
+ unsigned decryptedSize;
+ const char* encrypted;
+ unsigned encryptedSize;
+ qpid::sys::Codec* codec;
+ size_t maxInputSize;
+ DataBuffer decodeBuffer;
+};
+}}} // namespace qpid::sys::cyrus
+
+#endif /*!QPID_SYS_CYRUS_CYRUSSECURITYLAYER_H*/
diff --git a/qpid/cpp/src/tests/.valgrind.supp b/qpid/cpp/src/tests/.valgrind.supp
index 7ac34fde5d..7ae2bd9845 100644
--- a/qpid/cpp/src/tests/.valgrind.supp
+++ b/qpid/cpp/src/tests/.valgrind.supp
@@ -201,3 +201,10 @@
fun:_ZN5boost6detail3tss3setEPv
}
+{
+ Shows up on RHEL5: believed benign
+ Memcheck:Cond
+ fun:__strcpy_chk
+ fun:_sasl_load_plugins
+ fun:sasl_client_init
+}
diff --git a/qpid/cpp/src/tests/ConnectionOptions.h b/qpid/cpp/src/tests/ConnectionOptions.h
index 0130842668..30fe5ad9b1 100644
--- a/qpid/cpp/src/tests/ConnectionOptions.h
+++ b/qpid/cpp/src/tests/ConnectionOptions.h
@@ -47,7 +47,10 @@ struct ConnectionOptions : public qpid::Options,
("max-frame-size", optValue(maxFrameSize, "N"), "the maximum frame size to request.")
("bounds-multiplier", optValue(bounds, "N"),
"bound size of write queue (as a multiple of the max frame size).")
- ("tcp-nodelay", optValue(tcpNoDelay), "Turn on tcp-nodelay");
+ ("tcp-nodelay", optValue(tcpNoDelay), "Turn on tcp-nodelay")
+ ("service", optValue(service, "SERVICE-NAME"), "SASL service name.")
+ ("min-ssf", optValue(minSsf, "N"), "Minimum acceptable strength for SASL security layer")
+ ("max-ssf", optValue(maxSsf, "N"), "Maximum acceptable strength for SASL security layer");
}
};