diff options
author | Gordon Sim <gsim@apache.org> | 2009-01-06 19:50:59 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2009-01-06 19:50:59 +0000 |
commit | 1fc265f9181e1b27491e517f95b327096e55f1fc (patch) | |
tree | f7a2774f4fbe887f537970438f5bae0bd78f56bd | |
parent | 51409ed32a294a4691e775be15b6a3cfe9690a7b (diff) | |
download | qpid-python-1fc265f9181e1b27491e517f95b327096e55f1fc.tar.gz |
* Cyrus SASL intgeration for c++ client
* SASL security layer support for c++ client and broker
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@732082 13f79535-47bb-0310-9956-ffa450edef68
32 files changed, 1381 insertions, 263 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 114de01d44..7aefa7f481 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -353,11 +353,14 @@ libqpidcommon_la_SOURCES = \ qpid/sys/Runnable.cpp \ qpid/sys/Shlib.cpp -libqpidbroker_la_LIBADD = libqpidcommon.la -luuid if HAVE_SASL -libqpidbroker_la_LIBADD += -lsasl2 +libqpidcommon_la_SOURCES += qpid/sys/cyrus/CyrusSecurityLayer.h +libqpidcommon_la_SOURCES += qpid/sys/cyrus/CyrusSecurityLayer.cpp +libqpidcommon_la_LIBADD += -lsasl2 endif +libqpidbroker_la_LIBADD = libqpidcommon.la -luuid + libqpidbroker_la_SOURCES = \ $(mgen_broker_cpp) \ $(posix_broker_src) \ @@ -403,6 +406,8 @@ libqpidbroker_la_SOURCES = \ qpid/broker/RecoveredEnqueue.cpp \ qpid/broker/RecoveredDequeue.cpp \ qpid/broker/SaslAuthenticator.cpp \ + qpid/broker/SecureConnection.cpp \ + qpid/broker/SecureConnectionFactory.cpp \ qpid/broker/SemanticState.h \ qpid/broker/SemanticState.cpp \ qpid/broker/SessionAdapter.cpp \ @@ -426,6 +431,7 @@ libqpidbroker_la_SOURCES = \ qpid/management/ManagementExchange.cpp \ qpid/sys/TCPIOPlugin.cpp + libqpidclient_la_LIBADD = libqpidcommon.la -luuid libqpidclient_la_SOURCES = \ @@ -452,6 +458,7 @@ libqpidclient_la_SOURCES = \ qpid/client/MessageReplayTracker.cpp \ qpid/client/QueueOptions.cpp \ qpid/client/Results.cpp \ + qpid/client/SaslFactory.cpp \ qpid/client/SessionBase_0_10.cpp \ qpid/client/SessionBase_0_10.h \ qpid/client/SessionBase_0_10Access.h \ @@ -552,6 +559,8 @@ nobase_include_HEADERS = \ qpid/broker/RecoveryManager.h \ qpid/broker/RecoveryManagerImpl.h \ qpid/broker/SaslAuthenticator.h \ + qpid/broker/SecureConnection.h \ + qpid/broker/SecureConnectionFactory.h \ qpid/broker/SessionAdapter.h \ qpid/broker/SessionManager.h \ qpid/broker/System.h \ @@ -591,6 +600,8 @@ nobase_include_HEADERS = \ qpid/client/MessageListener.h \ qpid/client/MessageReplayTracker.h \ qpid/client/Results.h \ + qpid/client/Sasl.h \ + qpid/client/SaslFactory.h \ qpid/client/SessionBase_0_10.h \ qpid/client/Session.h \ qpid/client/SessionImpl.h \ @@ -668,6 +679,7 @@ nobase_include_HEADERS = \ qpid/sys/AtomicValue_gcc.h \ qpid/sys/AtomicValue_mutex.h \ qpid/sys/BlockingQueue.h \ + qpid/sys/Codec.h \ qpid/sys/CopyOnWriteArray.h \ qpid/sys/Condition.h \ qpid/sys/ConnectionCodec.h \ @@ -692,6 +704,7 @@ nobase_include_HEADERS = \ qpid/sys/Runnable.h \ qpid/sys/Fork.h \ qpid/sys/ScopedIncrement.h \ + qpid/sys/SecurityLayer.h \ qpid/sys/Semaphore.h \ qpid/sys/SystemInfo.h \ qpid/sys/Shlib.h \ diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 64be104b98..37750f8352 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -27,6 +27,7 @@ #include "NullMessageStore.h" #include "RecoveryManagerImpl.h" #include "SaslAuthenticator.h" +#include "SecureConnectionFactory.h" #include "TopicExchange.h" #include "Link.h" @@ -135,7 +136,7 @@ Broker::Broker(const Broker::Options& conf) : acl(0), dataDir(conf.noDataDir ? std::string() : conf.dataDir), links(this), - factory(new ConnectionFactory(*this)), + factory(new SecureConnectionFactory(*this)), dtxManager(timer), sessionManager( qpid::SessionState::Configuration( diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp index f0b9980861..eb54ddfd56 100644 --- a/qpid/cpp/src/qpid/broker/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/Connection.cpp @@ -267,5 +267,10 @@ Manageable::status_t Connection::ManagementMethod(uint32_t methodId, Args&, stri return status; } +void Connection::setSecureConnection(SecureConnection* s) +{ + adapter.setSecureConnection(s); +} + }} diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h index 350ed2c07f..acd9f94d9b 100644 --- a/qpid/cpp/src/qpid/broker/Connection.h +++ b/qpid/cpp/src/qpid/broker/Connection.h @@ -57,6 +57,7 @@ namespace qpid { namespace broker { class LinkRegistry; +class SecureConnection; class Connection : public sys::ConnectionInputHandler, public ConnectionState, @@ -105,7 +106,7 @@ class Connection : public sys::ConnectionInputHandler, } void sendClose(); - + void setSecureConnection(SecureConnection* secured); private: typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap; typedef std::vector<Queue::shared_ptr>::iterator queue_iterator; diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp index 7386ce7229..6f99b60cd8 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp @@ -22,17 +22,20 @@ #include "ConnectionHandler.h" #include "Connection.h" +#include "SecureConnection.h" +#include "qpid/Url.h" #include "qpid/framing/ClientInvoker.h" #include "qpid/framing/ServerInvoker.h" #include "qpid/framing/enum.h" #include "qpid/log/Statement.h" -#include "qpid/Url.h" +#include "qpid/sys/SecurityLayer.h" #include "AclModule.h" #include "qmf/org/apache/qpid/broker/EventClientConnectFail.h" using namespace qpid; using namespace qpid::broker; using namespace qpid::framing; +using qpid::sys::SecurityLayer; namespace _qmf = qmf::org::apache::qpid::broker; namespace @@ -70,11 +73,16 @@ void ConnectionHandler::handle(framing::AMQFrame& frame) } } +void ConnectionHandler::setSecureConnection(SecureConnection* secured) +{ + handler->secured = secured; +} + ConnectionHandler::ConnectionHandler(Connection& connection, bool isClient) : handler(new Handler(connection, isClient)) {} ConnectionHandler::Handler::Handler(Connection& c, bool isClient) : client(c.getOutput()), server(c.getOutput()), - connection(c), serverMode(!isClient), acl(0) + connection(c), serverMode(!isClient), acl(0), secured(0) { if (serverMode) { @@ -160,6 +168,12 @@ void ConnectionHandler::Handler::open(const string& /*virtualHost*/, for (std::vector<Url>::iterator i = urls.begin(); i < urls.end(); ++i) array.add(boost::shared_ptr<Str16Value>(new Str16Value(i->str()))); client.openOk(array); + + //install security layer if one has been negotiated: + if (secured) { + std::auto_ptr<SecurityLayer> sl = authenticator->getSecurityLayer(connection.getFrameMax()); + if (sl.get()) secured->activateSecurityLayer(sl); + } } diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.h b/qpid/cpp/src/qpid/broker/ConnectionHandler.h index d3d5965dfc..6fd252b120 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionHandler.h +++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.h @@ -40,6 +40,7 @@ namespace qpid { namespace broker { class Connection; +class SecureConnection; class ConnectionHandler : public framing::FrameHandler { @@ -52,6 +53,7 @@ class ConnectionHandler : public framing::FrameHandler bool serverMode; std::auto_ptr<SaslAuthenticator> authenticator; AclModule* acl; + SecureConnection* secured; Handler(Connection& connection, bool isClient); ~Handler(); @@ -87,6 +89,7 @@ class ConnectionHandler : public framing::FrameHandler ConnectionHandler(Connection& connection, bool isClient); void close(framing::connection::CloseCode code, const std::string& text); void handle(framing::AMQFrame& frame); + void setSecureConnection(SecureConnection* secured); }; diff --git a/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp b/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp index 370de8a1d1..9fce1fbbd5 100644 --- a/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp +++ b/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp @@ -30,9 +30,12 @@ #if HAVE_SASL #include <sasl/sasl.h> +#include "qpid/sys/cyrus/CyrusSecurityLayer.h" +using qpid::sys::cyrus::CyrusSecurityLayer; #endif using namespace qpid::framing; +using qpid::sys::SecurityLayer; using boost::format; using boost::str; @@ -46,11 +49,12 @@ class NullAuthenticator : public SaslAuthenticator framing::AMQP_ClientProxy::Connection client; std::string realm; public: - NullAuthenticator(Connection& connection); + NullAuthenticator(Connection& connection, bool dummy=false/*dummy arg to match CyrusAuthenticator*/); ~NullAuthenticator(); void getMechanisms(framing::Array& mechanisms); void start(const std::string& mechanism, const std::string& response); void step(const std::string&) {} + std::auto_ptr<SecurityLayer> getSecurityLayer(uint16_t maxFrameSize); }; #if HAVE_SASL @@ -60,11 +64,12 @@ class CyrusAuthenticator : public SaslAuthenticator sasl_conn_t *sasl_conn; Connection& connection; framing::AMQP_ClientProxy::Connection client; + const bool encrypt; void processAuthenticationStep(int code, const char *challenge, unsigned int challenge_len); public: - CyrusAuthenticator(Connection& connection); + CyrusAuthenticator(Connection& connection, bool encrypt); ~CyrusAuthenticator(); void init(); void getMechanisms(framing::Array& mechanisms); @@ -72,6 +77,7 @@ public: void step(const std::string& response); void getUid(std::string& uid); void getError(std::string& error); + std::auto_ptr<SecurityLayer> getSecurityLayer(uint16_t maxFrameSize); }; bool SaslAuthenticator::available(void) @@ -120,7 +126,7 @@ std::auto_ptr<SaslAuthenticator> SaslAuthenticator::createAuthenticator(Connecti { static bool needWarning = true; if (c.getBroker().getOptions().auth) { - return std::auto_ptr<SaslAuthenticator>(new CyrusAuthenticator(c)); + return std::auto_ptr<SaslAuthenticator>(new CyrusAuthenticator(c, c.getBroker().getOptions().requireEncrypted)); } else { QPID_LOG(warning, "SASL: No Authentication Performed"); needWarning = false; @@ -128,7 +134,7 @@ std::auto_ptr<SaslAuthenticator> SaslAuthenticator::createAuthenticator(Connecti } } -NullAuthenticator::NullAuthenticator(Connection& c) : connection(c), client(c.getOutput()), + NullAuthenticator::NullAuthenticator(Connection& c, bool /*dummy*/) : connection(c), client(c.getOutput()), realm(c.getBroker().getOptions().realm) {} NullAuthenticator::~NullAuthenticator() {} @@ -158,9 +164,18 @@ void NullAuthenticator::start(const string& mechanism, const string& response) } +std::auto_ptr<SecurityLayer> NullAuthenticator::getSecurityLayer(uint16_t) +{ + std::auto_ptr<SecurityLayer> securityLayer; + return securityLayer; +} + + #if HAVE_SASL -CyrusAuthenticator::CyrusAuthenticator(Connection& c) : sasl_conn(0), connection(c), client(c.getOutput()) + +CyrusAuthenticator::CyrusAuthenticator(Connection& c, bool _encrypt) : + sasl_conn(0), connection(c), client(c.getOutput()), encrypt(_encrypt) { init(); } @@ -196,6 +211,25 @@ void CyrusAuthenticator::init() // server error, when one is available throw ConnectionForcedException("Unable to perform authentication"); } + + sasl_security_properties_t secprops; + + //TODO: should the actual SSF values be configurable here? + secprops.min_ssf = encrypt ? 10: 0; + secprops.max_ssf = 256; + secprops.maxbufsize = 65535; + + QPID_LOG(debug, "min_ssf: " << secprops.min_ssf << ", max_ssf: " << secprops.max_ssf); + + secprops.property_names = 0; + secprops.property_values = 0; + secprops.security_flags = 0; /* or SASL_SEC_NOANONYMOUS etc as appropriate */ + + int result = sasl_setprop(sasl_conn, SASL_SEC_PROPS, &secprops); + if (result != SASL_OK) { + throw framing::InternalErrorException(QPID_MSG("SASL error: " << result)); + } + } CyrusAuthenticator::~CyrusAuthenticator() @@ -332,6 +366,24 @@ void CyrusAuthenticator::processAuthenticationStep(int code, const char *challen } } } + +std::auto_ptr<SecurityLayer> CyrusAuthenticator::getSecurityLayer(uint16_t maxFrameSize) +{ + + const void* value(0); + int result = sasl_getprop(sasl_conn, SASL_SSF, &value); + if (result != SASL_OK) { + throw framing::InternalErrorException(QPID_MSG("SASL error: " << sasl_errdetail(sasl_conn))); + } + uint ssf = *(reinterpret_cast<const unsigned*>(value)); + std::auto_ptr<SecurityLayer> securityLayer; + if (ssf) { + QPID_LOG(info, "Installing security layer, SSF: "<< ssf); + securityLayer = std::auto_ptr<SecurityLayer>(new CyrusSecurityLayer(sasl_conn, maxFrameSize)); + } + return securityLayer; +} + #endif }} diff --git a/qpid/cpp/src/qpid/broker/SaslAuthenticator.h b/qpid/cpp/src/qpid/broker/SaslAuthenticator.h index 2598b6d177..8ddaeb19a4 100644 --- a/qpid/cpp/src/qpid/broker/SaslAuthenticator.h +++ b/qpid/cpp/src/qpid/broker/SaslAuthenticator.h @@ -24,6 +24,7 @@ #include "qpid/framing/amqp_types.h" #include "qpid/framing/AMQP_ClientProxy.h" #include "qpid/Exception.h" +#include "qpid/sys/SecurityLayer.h" #include <memory> namespace qpid { @@ -40,6 +41,7 @@ public: virtual void step(const std::string& response) = 0; virtual void getUid(std::string&) {} virtual void getError(std::string&) {} + virtual std::auto_ptr<qpid::sys::SecurityLayer> getSecurityLayer(uint16_t maxFrameSize) = 0; static bool available(void); diff --git a/qpid/cpp/src/qpid/broker/SecureConnection.cpp b/qpid/cpp/src/qpid/broker/SecureConnection.cpp new file mode 100644 index 0000000000..4a9946e176 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/SecureConnection.cpp @@ -0,0 +1,87 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "SecureConnection.h" +#include "qpid/sys/SecurityLayer.h" +#include "qpid/framing/reply_exceptions.h" + +namespace qpid { +namespace broker { + +using qpid::sys::SecurityLayer; + +SecureConnection::SecureConnection() : secured(false) {} + +size_t SecureConnection::decode(const char* buffer, size_t size) +{ + if (!secured && securityLayer.get()) { + //security layer comes into effect on first read after its + //activated + secured = true; + } + if (secured) { + return securityLayer->decode(buffer, size); + } else { + return codec->decode(buffer, size); + } +} + +size_t SecureConnection::encode(const char* buffer, size_t size) +{ + if (secured) { + return securityLayer->encode(buffer, size); + } else { + return codec->encode(buffer, size); + } +} + +bool SecureConnection::canEncode() +{ + if (secured) return securityLayer->canEncode(); + else return codec->canEncode(); +} + +void SecureConnection::closed() +{ + codec->closed(); +} + +bool SecureConnection::isClosed() const +{ + return codec->isClosed(); +} + +framing::ProtocolVersion SecureConnection::getVersion() const +{ + return codec->getVersion(); +} + +void SecureConnection:: setCodec(std::auto_ptr<ConnectionCodec> c) +{ + codec = c; +} + +void SecureConnection::activateSecurityLayer(std::auto_ptr<SecurityLayer> sl) +{ + securityLayer = sl; + securityLayer->init(codec.get()); +} + +}} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/SecureConnection.h b/qpid/cpp/src/qpid/broker/SecureConnection.h new file mode 100644 index 0000000000..4a0cc50e34 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/SecureConnection.h @@ -0,0 +1,60 @@ +#ifndef QPID_BROKER_SECURECONNECTION_H +#define QPID_BROKER_SECURECONNECTION_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/ConnectionCodec.h" +#include <memory> + +namespace qpid { + +namespace sys { +class SecurityLayer; +} + +namespace broker { + +/** + * A ConnectionCodec 'wrapper' that allows a connection to be + * 'secured' e.g. encrypted based on settings negotiatiated at the + * time of establishment. + */ +class SecureConnection : public qpid::sys::ConnectionCodec +{ + public: + SecureConnection(); + size_t decode(const char* buffer, size_t size); + size_t encode(const char* buffer, size_t size); + bool canEncode(); + void closed(); + bool isClosed() const; + framing::ProtocolVersion getVersion() const; + void setCodec(std::auto_ptr<ConnectionCodec>); + void activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer>); + private: + std::auto_ptr<ConnectionCodec> codec; + std::auto_ptr<qpid::sys::SecurityLayer> securityLayer; + bool secured; +}; +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_SECURECONNECTION_H*/ diff --git a/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp b/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp new file mode 100644 index 0000000000..38fd96bcba --- /dev/null +++ b/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp @@ -0,0 +1,65 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "SecureConnectionFactory.h" +#include "qpid/framing/ProtocolVersion.h" +#include "qpid/amqp_0_10/Connection.h" +#include "qpid/broker/Connection.h" +#include "qpid/broker/SecureConnection.h" + +namespace qpid { +namespace broker { + +using framing::ProtocolVersion; +typedef std::auto_ptr<amqp_0_10::Connection> CodecPtr; +typedef std::auto_ptr<SecureConnection> SecureConnectionPtr; +typedef std::auto_ptr<Connection> ConnectionPtr; +typedef std::auto_ptr<sys::ConnectionInputHandler> InputPtr; + +SecureConnectionFactory::SecureConnectionFactory(Broker& b) : broker(b) {} + +sys::ConnectionCodec* +SecureConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id) { + if (v == ProtocolVersion(0, 10)) { + SecureConnectionPtr sc(new SecureConnection()); + CodecPtr c(new amqp_0_10::Connection(out, id, false)); + ConnectionPtr i(new broker::Connection(c.get(), broker, id, false)); + i->setSecureConnection(sc.get()); + c->setInputHandler(InputPtr(i.release())); + sc->setCodec(std::auto_ptr<sys::ConnectionCodec>(c)); + return sc.release(); + } + return 0; +} + +sys::ConnectionCodec* +SecureConnectionFactory::create(sys::OutputControl& out, const std::string& id) { + // used to create connections from one broker to another + SecureConnectionPtr sc(new SecureConnection()); + CodecPtr c(new amqp_0_10::Connection(out, id, true)); + ConnectionPtr i(new broker::Connection(c.get(), broker, id, true)); + i->setSecureConnection(sc.get()); + c->setInputHandler(InputPtr(i.release())); + sc->setCodec(std::auto_ptr<sys::ConnectionCodec>(c)); + return sc.release(); +} + + +}} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/SecureConnectionFactory.h b/qpid/cpp/src/qpid/broker/SecureConnectionFactory.h new file mode 100644 index 0000000000..048fb250d6 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/SecureConnectionFactory.h @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _SecureConnectionFactory_ +#define _SecureConnectionFactory_ + +#include "qpid/sys/ConnectionCodec.h" + +namespace qpid { +namespace broker { +class Broker; + +class SecureConnectionFactory : public sys::ConnectionCodec::Factory +{ + public: + SecureConnectionFactory(Broker& b); + + sys::ConnectionCodec* + create(framing::ProtocolVersion, sys::OutputControl&, const std::string& id); + + sys::ConnectionCodec* + create(sys::OutputControl&, const std::string& id); + + private: + Broker& broker; +}; + +}} + + +#endif diff --git a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp b/qpid/cpp/src/qpid/client/ConnectionHandler.cpp index db5d006a17..2a070ebcff 100644 --- a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp +++ b/qpid/cpp/src/qpid/client/ConnectionHandler.cpp @@ -21,16 +21,18 @@ #include "ConnectionHandler.h" -#include "qpid/log/Statement.h" +#include "SaslFactory.h" #include "qpid/framing/amqp_framing.h" #include "qpid/framing/all_method_bodies.h" #include "qpid/framing/ClientInvoker.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/log/Helpers.h" +#include "qpid/log/Statement.h" using namespace qpid::client; using namespace qpid::framing; using namespace qpid::framing::connection; +using qpid::sys::SecurityLayer; namespace { const std::string OK("OK"); @@ -146,18 +148,50 @@ void ConnectionHandler::fail(const std::string& message) setState(FAILED); } -void ConnectionHandler::start(const FieldTable& /*serverProps*/, const Array& /*mechanisms*/, const Array& /*locales*/) +namespace { +std::string SPACE(" "); +} + +void ConnectionHandler::start(const FieldTable& /*serverProps*/, const Array& mechanisms, const Array& /*locales*/) { checkState(NOT_STARTED, INVALID_STATE_START); setState(NEGOTIATING); - //TODO: verify that desired mechanism and locale are supported - string response = ((char)0) + username + ((char)0) + password; - proxy.startOk(properties, mechanism, response, locale); + sasl = SaslFactory::getInstance().create(*this); + + std::string mechlist; + bool chosenMechanismSupported = mechanism.empty(); + for (Array::const_iterator i = mechanisms.begin(); i != mechanisms.end(); ++i) { + if (!mechanism.empty() && mechanism == (*i)->get<std::string>()) { + chosenMechanismSupported = true; + mechlist = (*i)->get<std::string>() + SPACE + mechlist; + } else { + if (i != mechanisms.begin()) mechlist += SPACE; + mechlist += (*i)->get<std::string>(); + } + } + + if (!chosenMechanismSupported) { + fail("Selected mechanism not supported: " + mechanism); + } + + if (sasl.get()) { + string response = sasl->start(mechanism.empty() ? mechlist : mechanism); + proxy.startOk(properties, sasl->getMechanism(), response, locale); + } else { + //TODO: verify that desired mechanism and locale are supported + string response = ((char)0) + username + ((char)0) + password; + proxy.startOk(properties, mechanism, response, locale); + } } -void ConnectionHandler::secure(const std::string& /*challenge*/) +void ConnectionHandler::secure(const std::string& challenge) { - throw NotImplementedException("Challenge-response cycle not yet implemented in client"); + if (sasl.get()) { + string response = sasl->step(challenge); + proxy.secureOk(response); + } else { + throw NotImplementedException("Challenge-response cycle not yet implemented in client"); + } } void ConnectionHandler::tune(uint16_t maxChannelsProposed, uint16_t maxFrameSizeProposed, @@ -179,6 +213,9 @@ void ConnectionHandler::openOk ( const Array& knownBrokers ) framing::Array::ValueVector::const_iterator i; for ( i = knownBrokers.begin(); i != knownBrokers.end(); ++i ) knownBrokersUrls.push_back(Url((*i)->get<std::string>())); + if (sasl.get()) { + securityLayer = sasl->getSecurityLayer(maxFrameSize); + } setState(OPEN); QPID_LOG(debug, "Known-brokers for connection: " << log::formatList(knownBrokersUrls)); } @@ -224,3 +261,8 @@ bool ConnectionHandler::isClosed() const } bool ConnectionHandler::isClosing() const { return getState() == CLOSING; } + +std::auto_ptr<qpid::sys::SecurityLayer> ConnectionHandler::getSecurityLayer() +{ + return securityLayer; +} diff --git a/qpid/cpp/src/qpid/client/ConnectionHandler.h b/qpid/cpp/src/qpid/client/ConnectionHandler.h index 12323684a5..ec9278626f 100644 --- a/qpid/cpp/src/qpid/client/ConnectionHandler.h +++ b/qpid/cpp/src/qpid/client/ConnectionHandler.h @@ -23,6 +23,7 @@ #include "ChainableFrameHandler.h" #include "ConnectionSettings.h" +#include "Sasl.h" #include "StateManager.h" #include "qpid/framing/AMQMethodBody.h" #include "qpid/framing/AMQP_HighestVersion.h" @@ -33,7 +34,9 @@ #include "qpid/framing/FieldTable.h" #include "qpid/framing/FrameHandler.h" #include "qpid/framing/InputHandler.h" +#include "qpid/sys/SecurityLayer.h" #include "qpid/Url.h" +#include <memory> namespace qpid { namespace client { @@ -64,6 +67,8 @@ class ConnectionHandler : private StateManager, framing::ProtocolVersion version; framing::Array capabilities; framing::FieldTable properties; + std::auto_ptr<Sasl> sasl; + std::auto_ptr<qpid::sys::SecurityLayer> securityLayer; void checkState(STATES s, const std::string& msg); @@ -103,6 +108,8 @@ public: bool isClosed() const; bool isClosing() const; + std::auto_ptr<qpid::sys::SecurityLayer> getSecurityLayer(); + CloseListener onClose; ErrorListener onError; diff --git a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp index 0d7ffa0288..aa9eeb7489 100644 --- a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp @@ -110,6 +110,14 @@ void ConnectionImpl::open() connector->connect(host, port); connector->init(); handler.waitForOpen(); + //enable security layer if one has been negotiated: + std::auto_ptr<SecurityLayer> securityLayer = handler.getSecurityLayer(); + if (securityLayer.get()) { + QPID_LOG(debug, "Activating security layer"); + connector->activateSecurityLayer(securityLayer); + } else { + QPID_LOG(debug, "No security layer in place"); + } failover.reset(new FailoverListener(shared_from_this(), handler.knownBrokersUrls)); } diff --git a/qpid/cpp/src/qpid/client/ConnectionSettings.cpp b/qpid/cpp/src/qpid/client/ConnectionSettings.cpp index f5fc62dad2..5851917da6 100644 --- a/qpid/cpp/src/qpid/client/ConnectionSettings.cpp +++ b/qpid/cpp/src/qpid/client/ConnectionSettings.cpp @@ -22,6 +22,7 @@ #include "qpid/log/Logger.h" #include "qpid/sys/Socket.h" +#include "qpid/Version.h" namespace qpid { namespace client { @@ -30,15 +31,15 @@ ConnectionSettings::ConnectionSettings() : protocol("tcp"), host("localhost"), port(TcpAddress::DEFAULT_PORT), - username("guest"), - password("guest"), - mechanism("PLAIN"), locale("en_US"), heartbeat(0), maxChannels(32767), maxFrameSize(65535), bounds(2), - tcpNoDelay(false) + tcpNoDelay(false), + service(qpid::saslName), + minSsf(0), + maxSsf(256) {} ConnectionSettings::~ConnectionSettings() {} diff --git a/qpid/cpp/src/qpid/client/ConnectionSettings.h b/qpid/cpp/src/qpid/client/ConnectionSettings.h index 1b994a6da3..c7725e19f0 100644 --- a/qpid/cpp/src/qpid/client/ConnectionSettings.h +++ b/qpid/cpp/src/qpid/client/ConnectionSettings.h @@ -71,7 +71,8 @@ struct ConnectionSettings { std::string virtualhost; /** - * The username to use when authenticating the connection. + * The username to use when authenticating the connection. If not + * specified the current users login is used if available. */ std::string username; /** @@ -111,6 +112,20 @@ struct ConnectionSettings { * If true, TCP_NODELAY will be set for the connection. */ bool tcpNoDelay; + /** + * SASL service name + */ + std::string service; + /** + * Minimum acceptable strength of any SASL negotiated security + * layer. 0 means no security layer required. + */ + uint minSsf; + /** + * Maximum acceptable strength of any SASL negotiated security + * layer. 0 means no security layer allowed. + */ + uint maxSsf; }; }} // namespace qpid::client diff --git a/qpid/cpp/src/qpid/client/Connector.cpp b/qpid/cpp/src/qpid/client/Connector.cpp index bef98863a1..0e11b920e1 100644 --- a/qpid/cpp/src/qpid/client/Connector.cpp +++ b/qpid/cpp/src/qpid/client/Connector.cpp @@ -24,15 +24,18 @@ #include "ConnectionImpl.h" #include "ConnectionSettings.h" #include "qpid/log/Statement.h" +#include "qpid/sys/Codec.h" #include "qpid/sys/Time.h" #include "qpid/framing/AMQFrame.h" #include "qpid/sys/AsynchIO.h" #include "qpid/sys/Dispatcher.h" #include "qpid/sys/Poller.h" +#include "qpid/sys/SecurityLayer.h" #include "qpid/Msg.h" #include <iostream> #include <map> +#include <deque> #include <boost/bind.hpp> #include <boost/format.hpp> #include <boost/weak_ptr.hpp> @@ -74,39 +77,19 @@ void Connector::registerFactory(const std::string& proto, Factory* connectorFact theProtocolRegistry()[proto] = connectorFactory; } -class TCPConnector : public Connector, private sys::Runnable +class TCPConnector : public Connector, public sys::Codec, private sys::Runnable { + typedef std::deque<framing::AMQFrame> Frames; struct Buff; - /** Batch up frames for writing to aio. */ - class Writer : public framing::FrameHandler { - typedef sys::AsynchIOBufferBase BufferBase; - typedef std::vector<framing::AMQFrame> Frames; - - const uint16_t maxFrameSize; - sys::Mutex lock; - sys::AsynchIO* aio; - BufferBase* buffer; - Frames frames; - size_t lastEof; // Position after last EOF in frames - framing::Buffer encode; - size_t framesEncoded; - std::string identifier; - Bounds* bounds; - - void writeOne(); - void newBuffer(); + const uint16_t maxFrameSize; - public: - - Writer(uint16_t maxFrameSize, Bounds*); - ~Writer(); - void init(std::string id, sys::AsynchIO*); - void handle(framing::AMQFrame&); - void write(sys::AsynchIO&); - }; + sys::Mutex lock; + Frames frames; // Outgoing frame queue + size_t lastEof; // Position after last EOF in frames + uint64_t currentSize; + Bounds* bounds; - const uint16_t maxFrameSize; framing::ProtocolVersion version; bool initiated; @@ -119,14 +102,14 @@ class TCPConnector : public Connector, private sys::Runnable framing::InitiationHandler* initialiser; framing::OutputHandler* output; - Writer writer; - sys::Thread receiver; sys::Socket socket; sys::AsynchIO* aio; + std::string identifier; boost::shared_ptr<sys::Poller> poller; + std::auto_ptr<qpid::sys::SecurityLayer> securityLayer; ~TCPConnector(); @@ -139,8 +122,6 @@ class TCPConnector : public Connector, private sys::Runnable void writeDataBlock(const framing::AMQDataBlock& data); void eof(qpid::sys::AsynchIO&); - std::string identifier; - boost::weak_ptr<ConnectionImpl> impl; void connect(const std::string& host, int port); @@ -153,6 +134,12 @@ class TCPConnector : public Connector, private sys::Runnable sys::ShutdownHandler* getShutdownHandler() const; framing::OutputHandler* getOutputHandler(); const std::string& getIdentifier() const; + void activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer>); + + size_t decode(const char* buffer, size_t size); + size_t encode(const char* buffer, size_t size); + bool canEncode(); + public: TCPConnector(framing::ProtocolVersion pVersion, @@ -177,12 +164,14 @@ TCPConnector::TCPConnector(ProtocolVersion ver, const ConnectionSettings& settings, ConnectionImpl* cimpl) : maxFrameSize(settings.maxFrameSize), + lastEof(0), + currentSize(0), + bounds(cimpl), version(ver), initiated(false), closed(true), joined(true), shutdownHandler(0), - writer(maxFrameSize, cimpl), aio(0), impl(cimpl->shared_from_this()) { @@ -214,7 +203,6 @@ void TCPConnector::connect(const std::string& host, int port){ 0, // closed 0, // nobuffs boost::bind(&TCPConnector::writebuff, this, _1)); - writer.init(identifier, aio); } void TCPConnector::init(){ @@ -266,7 +254,21 @@ const std::string& TCPConnector::getIdentifier() const { } void TCPConnector::send(AMQFrame& frame) { - writer.handle(frame); + bool notifyWrite = false; + { + Mutex::ScopedLock l(lock); + frames.push_back(frame); + //only ask to write if this is the end of a frameset or if we + //already have a buffers worth of data + currentSize += frame.encodedSize(); + if (frame.getEof()) { + lastEof = frames.size(); + notifyWrite = true; + } else { + notifyWrite = (currentSize >= maxFrameSize); + } + } + if (notifyWrite) aio->notifyPendingWrite(); } void TCPConnector::handleClosed() { @@ -279,70 +281,70 @@ struct TCPConnector::Buff : public AsynchIO::BufferBase { ~Buff() { delete [] bytes;} }; -TCPConnector::Writer::Writer(uint16_t s, Bounds* b) : maxFrameSize(s), aio(0), buffer(0), lastEof(0), bounds(b) +void TCPConnector::writebuff(AsynchIO& /*aio*/) { -} - -TCPConnector::Writer::~Writer() { delete buffer; } + Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this; + if (codec->canEncode()) { + std::auto_ptr<AsynchIO::BufferBase> buffer = std::auto_ptr<AsynchIO::BufferBase>(aio->getQueuedBuffer()); + if (!buffer.get()) buffer = std::auto_ptr<AsynchIO::BufferBase>(new Buff(maxFrameSize)); + + size_t encoded = codec->encode(buffer->bytes, buffer->byteCount); -void TCPConnector::Writer::init(std::string id, sys::AsynchIO* a) { - Mutex::ScopedLock l(lock); - identifier = id; - aio = a; - newBuffer(); -} -void TCPConnector::Writer::handle(framing::AMQFrame& frame) { - Mutex::ScopedLock l(lock); - frames.push_back(frame); - //only try to write if this is the end of a frameset or if we - //already have a buffers worth of data - if (frame.getEof() || (bounds && bounds->getCurrentSize() >= maxFrameSize)) { - lastEof = frames.size(); - aio->notifyPendingWrite(); + buffer->dataStart = 0; + buffer->dataCount = encoded; + aio->queueWrite(buffer.release()); } - QPID_LOG(trace, "SENT " << identifier << ": " << frame); -} - -void TCPConnector::Writer::writeOne() { - assert(buffer); - framesEncoded = 0; - - buffer->dataStart = 0; - buffer->dataCount = encode.getPosition(); - aio->queueWrite(buffer); - newBuffer(); } -void TCPConnector::Writer::newBuffer() { - buffer = aio->getQueuedBuffer(); - if (!buffer) buffer = new Buff(maxFrameSize); - encode = framing::Buffer(buffer->bytes, buffer->byteCount); - framesEncoded = 0; +// Called in IO thread. +bool TCPConnector::canEncode() +{ + Mutex::ScopedLock l(lock); + //have at least one full frameset or a whole buffers worth of data + return lastEof || currentSize >= maxFrameSize; } // Called in IO thread. -void TCPConnector::Writer::write(sys::AsynchIO&) { - Mutex::ScopedLock l(lock); - assert(buffer); +size_t TCPConnector::encode(const char* buffer, size_t size) +{ + framing::Buffer out(const_cast<char*>(buffer), size); size_t bytesWritten(0); - for (size_t i = 0; i < lastEof; ++i) { - AMQFrame& frame = frames[i]; - uint32_t size = frame.encodedSize(); - if (size > encode.available()) writeOne(); - assert(size <= encode.available()); - frame.encode(encode); - ++framesEncoded; - bytesWritten += size; + { + Mutex::ScopedLock l(lock); + while (!frames.empty() && out.available() >= frames.front().encodedSize() ) { + frames.front().encode(out); + QPID_LOG(trace, "SENT " << identifier << ": " << frames.front()); + frames.pop_front(); + if (lastEof) --lastEof; + } + bytesWritten = size - out.available(); + currentSize -= bytesWritten; } - frames.erase(frames.begin(), frames.begin()+lastEof); - lastEof = 0; if (bounds) bounds->reduce(bytesWritten); - if (encode.getPosition() > 0) writeOne(); + return bytesWritten; } -bool TCPConnector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) { - framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount); +bool TCPConnector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) +{ + Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this; + int32_t decoded = codec->decode(buff->bytes+buff->dataStart, buff->dataCount); + // TODO: unreading needs to go away, and when we can cope + // with multiple sub-buffers in the general buffer scheme, it will + if (decoded < buff->dataCount) { + // Adjust buffer for used bytes and then "unread them" + buff->dataStart += decoded; + buff->dataCount -= decoded; + aio.unread(buff); + } else { + // Give whole buffer back to aio subsystem + aio.queueReadBuffer(buff); + } + return true; +} +size_t TCPConnector::decode(const char* buffer, size_t size) +{ + framing::Buffer in(const_cast<char*>(buffer), size); if (!initiated) { framing::ProtocolInitiation protocolInit; if (protocolInit.decode(in)) { @@ -356,22 +358,7 @@ bool TCPConnector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) { QPID_LOG(trace, "RECV " << identifier << ": " << frame); input->received(frame); } - // TODO: unreading needs to go away, and when we can cope - // with multiple sub-buffers in the general buffer scheme, it will - if (in.available() != 0) { - // Adjust buffer for used bytes and then "unread them" - buff->dataStart += buff->dataCount-in.available(); - buff->dataCount = in.available(); - aio.unread(buff); - } else { - // Give whole buffer back to aio subsystem - aio.queueReadBuffer(buff); - } - return true; -} - -void TCPConnector::writebuff(AsynchIO& aio_) { - writer.write(aio_); + return size - in.available(); } void TCPConnector::writeDataBlock(const AMQDataBlock& data) { @@ -388,7 +375,7 @@ void TCPConnector::eof(AsynchIO&) { // TODO: astitcher 20070908 This version of the code can never time out, so the idle processing // will never be called -void TCPConnector::run(){ +void TCPConnector::run() { // Keep the connection impl in memory until run() completes. boost::shared_ptr<ConnectionImpl> protect = impl.lock(); assert(protect); @@ -409,5 +396,11 @@ void TCPConnector::run(){ } } +void TCPConnector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer> sl) +{ + securityLayer = sl; + securityLayer->init(this); +} + }} // namespace qpid::client diff --git a/qpid/cpp/src/qpid/client/Connector.h b/qpid/cpp/src/qpid/client/Connector.h index 5c37d95300..e23fb8875b 100644 --- a/qpid/cpp/src/qpid/client/Connector.h +++ b/qpid/cpp/src/qpid/client/Connector.h @@ -40,6 +40,11 @@ #include <boost/shared_ptr.hpp> namespace qpid { + +namespace sys { +class SecurityLayer; +} + namespace client { struct ConnectionSettings; @@ -65,6 +70,9 @@ class Connector : public framing::OutputHandler virtual sys::ShutdownHandler* getShutdownHandler() const = 0; virtual framing::OutputHandler* getOutputHandler() = 0; virtual const std::string& getIdentifier() const = 0; + + virtual void activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer>) {} + }; }} diff --git a/qpid/cpp/src/qpid/client/RdmaConnector.cpp b/qpid/cpp/src/qpid/client/RdmaConnector.cpp index 98fe762f31..3cc8961eea 100644 --- a/qpid/cpp/src/qpid/client/RdmaConnector.cpp +++ b/qpid/cpp/src/qpid/client/RdmaConnector.cpp @@ -29,6 +29,7 @@ #include "qpid/sys/rdma/RdmaIO.h" #include "qpid/sys/Dispatcher.h" #include "qpid/sys/Poller.h" +#include "qpid/sys/SecurityLayer.h" #include "qpid/Msg.h" #include <iostream> @@ -47,39 +48,21 @@ using namespace qpid::framing; using boost::format; using boost::str; -class RdmaConnector : public Connector, private sys::Runnable + class RdmaConnector : public Connector, public sys::Codec, private sys::Runnable { struct Buff; - /** Batch up frames for writing to aio. */ - class Writer : public framing::FrameHandler { - typedef Rdma::Buffer BufferBase; - typedef std::deque<framing::AMQFrame> Frames; - - const uint16_t maxFrameSize; - sys::Mutex lock; - Rdma::AsynchIO* aio; - BufferBase* buffer; - Frames frames; - size_t lastEof; // Position after last EOF in frames - framing::Buffer encode; - size_t framesEncoded; - std::string identifier; - Bounds* bounds; - - void writeOne(); - void newBuffer(); + typedef Rdma::Buffer BufferBase; + typedef std::deque<framing::AMQFrame> Frames; - public: - - Writer(uint16_t maxFrameSize, Bounds*); - ~Writer(); - void init(std::string id, Rdma::AsynchIO*); - void handle(framing::AMQFrame&); - void write(Rdma::AsynchIO&); - }; - const uint16_t maxFrameSize; + sys::Mutex lock; + Frames frames; + size_t lastEof; // Position after last EOF in frames + uint64_t currentSize; + Bounds* bounds; + + framing::ProtocolVersion version; bool initiated; @@ -92,12 +75,11 @@ class RdmaConnector : public Connector, private sys::Runnable framing::InitiationHandler* initialiser; framing::OutputHandler* output; - Writer writer; - sys::Thread receiver; Rdma::AsynchIO* aio; sys::Poller::shared_ptr poller; + std::auto_ptr<qpid::sys::SecurityLayer> securityLayer; ~RdmaConnector(); @@ -129,6 +111,11 @@ class RdmaConnector : public Connector, private sys::Runnable sys::ShutdownHandler* getShutdownHandler() const; framing::OutputHandler* getOutputHandler(); const std::string& getIdentifier() const; + void activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer>); + + size_t decode(const char* buffer, size_t size); + size_t encode(const char* buffer, size_t size); + bool canEncode(); public: RdmaConnector(framing::ProtocolVersion pVersion, @@ -155,12 +142,14 @@ RdmaConnector::RdmaConnector(ProtocolVersion ver, const ConnectionSettings& settings, ConnectionImpl* cimpl) : maxFrameSize(settings.maxFrameSize), + lastEof(0), + currentSize(0), + bounds(cimpl), version(ver), initiated(false), polling(false), joined(true), shutdownHandler(0), - writer(maxFrameSize, cimpl), aio(0), impl(cimpl) { @@ -216,7 +205,6 @@ void RdmaConnector::connected(Poller::shared_ptr poller, Rdma::Connection::intru aio->start(poller); identifier = str(format("[%1% %2%]") % ci->getLocalName() % ci->getPeerName()); - writer.init(identifier, aio); ProtocolInitiation init(version); writeDataBlock(init); } @@ -279,7 +267,21 @@ const std::string& RdmaConnector::getIdentifier() const { } void RdmaConnector::send(AMQFrame& frame) { - writer.handle(frame); + bool notifyWrite = false; + { + Mutex::ScopedLock l(lock); + frames.push_back(frame); + //only ask to write if this is the end of a frameset or if we + //already have a buffers worth of data + currentSize += frame.encodedSize(); + if (frame.getEof()) { + lastEof = frames.size(); + notifyWrite = true; + } else { + notifyWrite = (currentSize >= maxFrameSize); + } + } + if (notifyWrite) aio->notifyPendingWrite(); } void RdmaConnector::handleClosed() { @@ -287,88 +289,54 @@ void RdmaConnector::handleClosed() { shutdownHandler->shutdown(); } -RdmaConnector::Writer::Writer(uint16_t s, Bounds* b) : - maxFrameSize(s), - aio(0), - buffer(0), - lastEof(0), - bounds(b) -{ -} - -RdmaConnector::Writer::~Writer() { - if (aio) - aio->returnBuffer(buffer); -} - -void RdmaConnector::Writer::init(std::string id, Rdma::AsynchIO* a) { - Mutex::ScopedLock l(lock); - identifier = id; - aio = a; - assert(aio->bufferAvailable()); - newBuffer(); -} -void RdmaConnector::Writer::handle(framing::AMQFrame& frame) { - Mutex::ScopedLock l(lock); - frames.push_back(frame); - // Don't bother to send anything unless we're at the end of a frameset (assembly in 0-10 terminology) - if (frame.getEof()) { - lastEof = frames.size(); - QPID_LOG(debug, "Requesting write: lastEof=" << lastEof); - aio->notifyPendingWrite(); +// Called in IO thread. (write idle routine) +// This is NOT only called in response to previously calling notifyPendingWrite +void RdmaConnector::writebuff(Rdma::AsynchIO&) { + Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this; + if (codec->canEncode()) { + std::auto_ptr<BufferBase> buffer = std::auto_ptr<BufferBase>(aio->getBuffer()); + size_t encoded = codec->encode(buffer->bytes, buffer->byteCount); + + buffer->dataStart = 0; + buffer->dataCount = encoded; + aio->queueWrite(buffer.release()); } - QPID_LOG(trace, "SENT " << identifier << ": " << frame); } -void RdmaConnector::Writer::writeOne() { - assert(buffer); - QPID_LOG(trace, "Write buffer " << encode.getPosition() - << " bytes " << framesEncoded << " frames "); - framesEncoded = 0; - - buffer->dataStart = 0; - buffer->dataCount = encode.getPosition(); - aio->queueWrite(buffer); - newBuffer(); -} - -void RdmaConnector::Writer::newBuffer() { - buffer = aio->getBuffer(); - encode = framing::Buffer(buffer->bytes, buffer->byteCount); - framesEncoded = 0; +bool RdmaConnector::canEncode() +{ + Mutex::ScopedLock l(lock); + //have at least one full frameset or a whole buffers worth of data + return aio->writable() && aio->bufferAvailable() && (lastEof || currentSize >= maxFrameSize); } -// Called in IO thread. (write idle routine) -// This is NOT only called in response to previously calling notifyPendingWrite -void RdmaConnector::Writer::write(Rdma::AsynchIO&) { - Mutex::ScopedLock l(lock); - assert(buffer); - // If nothing to do return immediately - if (lastEof==0) - return; - size_t bytesWritten = 0; - while (aio->writable() && aio->bufferAvailable() && !frames.empty()) { - const AMQFrame* frame = &frames.front(); - uint32_t size = frame->encodedSize(); - while (size <= encode.available()) { - frame->encode(encode); +size_t RdmaConnector::encode(const char* buffer, size_t size) +{ + framing::Buffer out(const_cast<char*>(buffer), size); + size_t bytesWritten(0); + { + Mutex::ScopedLock l(lock); + while (!frames.empty() && out.available() >= frames.front().encodedSize() ) { + frames.front().encode(out); + QPID_LOG(trace, "SENT " << identifier << ": " << frames.front()); frames.pop_front(); - ++framesEncoded; - bytesWritten += size; - if (frames.empty()) - break; - frame = &frames.front(); - size = frame->encodedSize(); + if (lastEof) --lastEof; } - lastEof -= framesEncoded; - writeOne(); + bytesWritten = size - out.available(); + currentSize -= bytesWritten; } if (bounds) bounds->reduce(bytesWritten); + return bytesWritten; } void RdmaConnector::readbuff(Rdma::AsynchIO&, Rdma::Buffer* buff) { - framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount); + Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this; + codec->decode(buff->bytes+buff->dataStart, buff->dataCount); +} +size_t RdmaConnector::decode(const char* buffer, size_t size) +{ + framing::Buffer in(const_cast<char*>(buffer), size); if (!initiated) { framing::ProtocolInitiation protocolInit; if (protocolInit.decode(in)) { @@ -382,10 +350,7 @@ void RdmaConnector::readbuff(Rdma::AsynchIO&, Rdma::Buffer* buff) { QPID_LOG(trace, "RECV " << identifier << ": " << frame); input->received(frame); } -} - -void RdmaConnector::writebuff(Rdma::AsynchIO& aio_) { - writer.write(aio_); + return size - in.available(); } void RdmaConnector::writeDataBlock(const AMQDataBlock& data) { @@ -424,5 +389,10 @@ void RdmaConnector::run(){ } } +void RdmaConnector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer> sl) +{ + securityLayer = sl; + securityLayer->init(this); +} }} // namespace qpid::client diff --git a/qpid/cpp/src/qpid/client/Sasl.h b/qpid/cpp/src/qpid/client/Sasl.h new file mode 100644 index 0000000000..e7a911ebce --- /dev/null +++ b/qpid/cpp/src/qpid/client/Sasl.h @@ -0,0 +1,52 @@ +#ifndef QPID_CLIENT_SASL_H +#define QPID_CLIENT_SASL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <memory> +#include <string> + +namespace qpid { + +namespace sys { +class SecurityLayer; +} + +namespace client { + +class ConnectionSettings; + +/** + * Interface to SASL support + */ +class Sasl +{ + public: + virtual std::string start(const std::string& mechanisms) = 0; + virtual std::string step(const std::string& challenge) = 0; + virtual std::string getMechanism() = 0; + virtual std::auto_ptr<qpid::sys::SecurityLayer> getSecurityLayer(uint16_t maxFrameSize) = 0; + virtual ~Sasl() {} +}; +}} // namespace qpid::client + +#endif /*!QPID_CLIENT_SASL_H*/ diff --git a/qpid/cpp/src/qpid/client/SaslFactory.cpp b/qpid/cpp/src/qpid/client/SaslFactory.cpp new file mode 100644 index 0000000000..d6edc6501d --- /dev/null +++ b/qpid/cpp/src/qpid/client/SaslFactory.cpp @@ -0,0 +1,345 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "SaslFactory.h" +#include "ConnectionSettings.h" + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#ifndef HAVE_SASL + +namespace qpid { +namespace client { + +//Null implementation + +SaslFactory::SaslFactory() {} + +SaslFactory::~SaslFactory() {} + +SaslFactory& SaslFactory::getInstance() +{ + qpid::sys::Mutex::ScopedLock l(lock); + if (!instance.get()) { + instance = std::auto_ptr<SaslFactory>(new SaslFactory()); + } + return *instance; +} + +std::auto_ptr<Sasl> SaslFactory::create(const ConnectionSettings&) +{ + return std::auto_ptr<Sasl>(); +} + +qpid::sys::Mutex SaslFactory::lock; +std::auto_ptr<SaslFactory> SaslFactory::instance; + +}} // namespace qpid::client + +#else + +#include "qpid/Exception.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/sys/SecurityLayer.h" +#include "qpid/sys/cyrus/CyrusSecurityLayer.h" +#include "qpid/log/Statement.h" +#include <sasl/sasl.h> +#include <strings.h> + +namespace qpid { +namespace client { + +using qpid::sys::SecurityLayer; +using qpid::sys::cyrus::CyrusSecurityLayer; +using qpid::framing::InternalErrorException; + +const size_t MAX_LOGIN_LENGTH = 50; + +class CyrusSasl : public Sasl +{ + public: + CyrusSasl(const ConnectionSettings&); + ~CyrusSasl(); + std::string start(const std::string& mechanisms); + std::string step(const std::string& challenge); + std::string getMechanism(); + std::auto_ptr<SecurityLayer> getSecurityLayer(uint16_t maxFrameSize); + private: + sasl_conn_t* conn; + sasl_callback_t callbacks[5];//realm, user, authname, password, end-of-list + ConnectionSettings settings; + std::string input; + std::string mechanism; + char login[MAX_LOGIN_LENGTH]; + + void interact(sasl_interact_t* client_interact); +}; + +//sasl callback functions +int getLogin(void *context, int id, const char **result, unsigned *len); +int getUserFromSettings(void *context, int id, const char **result, unsigned *len); +int getPasswordFromSettings(sasl_conn_t *conn, void *context, int id, sasl_secret_t **psecret); +typedef int CallbackProc(); + +qpid::sys::Mutex SaslFactory::lock; +std::auto_ptr<SaslFactory> SaslFactory::instance; + +SaslFactory::SaslFactory() +{ + sasl_callback_t* callbacks = 0; + int result = sasl_client_init(callbacks); + if (result != SASL_OK) { + throw InternalErrorException(QPID_MSG("Sasl error: " << sasl_errstring(result, 0, 0))); + } +} + +SaslFactory::~SaslFactory() +{ + sasl_done(); +} + +SaslFactory& SaslFactory::getInstance() +{ + qpid::sys::Mutex::ScopedLock l(lock); + if (!instance.get()) { + instance = std::auto_ptr<SaslFactory>(new SaslFactory()); + } + return *instance; +} + +std::auto_ptr<Sasl> SaslFactory::create(const ConnectionSettings& settings) +{ + std::auto_ptr<Sasl> sasl(new CyrusSasl(settings)); + return sasl; +} + +CyrusSasl::CyrusSasl(const ConnectionSettings& s) : conn(0), settings(s) +{ + size_t i = 0; + + callbacks[i].id = SASL_CB_GETREALM; + callbacks[i].proc = 0; + callbacks[i++].context = 0; + + if (settings.username.empty()) { + callbacks[i].id = SASL_CB_USER; + callbacks[i].proc = (CallbackProc*) &getLogin; + callbacks[i++].context = &login; + + callbacks[i].id = SASL_CB_AUTHNAME; + callbacks[i].proc = (CallbackProc*) &getLogin; + callbacks[i++].context = &login; + } else { + callbacks[i].id = SASL_CB_USER; + callbacks[i].proc = (CallbackProc*) &getUserFromSettings; + callbacks[i++].context = &settings; + + callbacks[i].id = SASL_CB_AUTHNAME; + callbacks[i].proc = (CallbackProc*) &getUserFromSettings; + callbacks[i++].context = &settings; + } + + callbacks[i].id = SASL_CB_PASS; + callbacks[i].proc = (CallbackProc*) &getPasswordFromSettings; + callbacks[i++].context = &settings; + + callbacks[i].id = SASL_CB_LIST_END; + callbacks[i].proc = 0; + callbacks[i++].context = 0; +} + +CyrusSasl::~CyrusSasl() +{ + if (conn) { + sasl_dispose(&conn); + } +} + +namespace { + const std::string SSL("ssl"); +} + +std::string CyrusSasl::start(const std::string& mechanisms) +{ + QPID_LOG(debug, "CyrusSasl::start(" << mechanisms << ")"); + int result = sasl_client_new(settings.service.c_str(), + settings.host.c_str(), + 0, 0, /* Local and remote IP address strings */ + callbacks, + 0, /* security flags */ + &conn); + + if (result != SASL_OK) throw InternalErrorException(QPID_MSG("Sasl error: " << sasl_errdetail(conn))); + + sasl_security_properties_t secprops; + + secprops.min_ssf = settings.minSsf; + secprops.max_ssf = settings.maxSsf; + secprops.maxbufsize = 65535; + + QPID_LOG(debug, "min_ssf: " << secprops.min_ssf << ", max_ssf: " << secprops.max_ssf); + + secprops.property_names = 0; + secprops.property_values = 0; + secprops.security_flags = 0;//TODO: provide means for application to configure these + + result = sasl_setprop(conn, SASL_SEC_PROPS, &secprops); + if (result != SASL_OK) { + throw framing::InternalErrorException(QPID_MSG("SASL error: " << sasl_errdetail(conn))); + } + + + sasl_interact_t* client_interact = 0; + const char *out = 0; + unsigned outlen = 0; + const char *chosenMechanism = 0; + + do { + result = sasl_client_start(conn, + mechanisms.c_str(), + &client_interact, + &out, + &outlen, + &chosenMechanism); + + if (result == SASL_INTERACT) { + interact(client_interact); + } + } while (result == SASL_INTERACT); + + if (result != SASL_CONTINUE && result != SASL_OK) { + throw InternalErrorException(QPID_MSG("Sasl error: " << sasl_errdetail(conn))); + } + + mechanism = std::string(chosenMechanism); + QPID_LOG(debug, "CyrusSasl::start(" << mechanisms << "): selected " + << mechanism << " response: '" << std::string(out, outlen) << "'"); + return std::string(out, outlen); +} + +std::string CyrusSasl::step(const std::string& challenge) +{ + sasl_interact_t* client_interact = 0; + const char *out = 0; + unsigned outlen = 0; + int result = 0; + do { + result = sasl_client_step(conn, /* our context */ + challenge.data(), /* the data from the server */ + challenge.size(), /* it's length */ + &client_interact, /* this should be + unallocated and NULL */ + &out, /* filled in on success */ + &outlen); /* filled in on success */ + + if (result == SASL_INTERACT) { + interact(client_interact); + } + } while (result == SASL_INTERACT); + + std::string response; + if (result == SASL_CONTINUE || result == SASL_OK) response = std::string(out, outlen); + else if (result != SASL_OK) { + throw InternalErrorException(QPID_MSG("Sasl error: " << sasl_errdetail(conn))); + } + QPID_LOG(debug, "CyrusSasl::step(" << challenge << "): " << response); + return response; +} + +std::string CyrusSasl::getMechanism() +{ + return mechanism; +} + +void CyrusSasl::interact(sasl_interact_t* client_interact) +{ + std::cout << "[" << client_interact->id << "] " << client_interact->challenge << " " << client_interact->prompt; + if (client_interact->defresult) std::cout << " (" << client_interact->defresult << ")"; + std::cout << std::endl; + if (std::cin >> input) { + client_interact->result = input.data(); + client_interact->len = input.size(); + } +} + +std::auto_ptr<SecurityLayer> CyrusSasl::getSecurityLayer(uint16_t maxFrameSize) +{ + const void* value(0); + int result = sasl_getprop(conn, SASL_SSF, &value); + if (result != SASL_OK) { + throw framing::InternalErrorException(QPID_MSG("SASL error: " << sasl_errdetail(conn))); + } + uint ssf = *(reinterpret_cast<const unsigned*>(value)); + std::auto_ptr<SecurityLayer> securityLayer; + if (ssf) { + QPID_LOG(info, "Installing security layer, SSF: "<< ssf); + securityLayer = std::auto_ptr<SecurityLayer>(new CyrusSecurityLayer(conn, maxFrameSize)); + } + return securityLayer; +} + +int getLogin(void* context, int /*id*/, const char** result, unsigned* /*len*/) +{ + if (context) { + char* login = (char*) context; + int status = getlogin_r(login, MAX_LOGIN_LENGTH); + if (status == 0) { + *result = login; + QPID_LOG(debug, "getLogin(): " << (*result)); + } else { + strcpy(login, "guest"); + QPID_LOG(error, "getlogin_r() failed with " << status << "; defaulting to " << login); + } + return SASL_OK; + } else { + return SASL_FAIL; + } +} + +int getUserFromSettings(void* context, int /*id*/, const char** result, unsigned* /*len*/) +{ + if (context) { + *result = ((ConnectionSettings*) context)->username.c_str(); + QPID_LOG(debug, "getUserFromSettings(): " << (*result)); + return SASL_OK; + } else { + return SASL_FAIL; + } +} + +int getPasswordFromSettings(sasl_conn_t* /*conn*/, void* context, int /*id*/, sasl_secret_t** psecret) +{ + if (context) { + size_t length = ((ConnectionSettings*) context)->password.size(); + sasl_secret_t* secret = (sasl_secret_t*) malloc(sizeof(sasl_secret_t) + length); + secret->len = length; + memcpy(secret->data, ((ConnectionSettings*) context)->password.data(), length); + *psecret = secret; + return SASL_OK; + } else { + return SASL_FAIL; + } +} + +}} // namespace qpid::client + +#endif diff --git a/qpid/cpp/src/qpid/client/SaslFactory.h b/qpid/cpp/src/qpid/client/SaslFactory.h new file mode 100644 index 0000000000..60a1d60ff3 --- /dev/null +++ b/qpid/cpp/src/qpid/client/SaslFactory.h @@ -0,0 +1,48 @@ +#ifndef QPID_CLIENT_SASLFACTORY_H +#define QPID_CLIENT_SASLFACTORY_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Sasl.h" +#include "qpid/sys/Mutex.h" +#include <memory> + +namespace qpid { +namespace client { + +/** + * Factory for instances of the Sasl interface through which Sasl + * support is provided to a ConnectionHandler. + */ +class SaslFactory +{ + public: + std::auto_ptr<Sasl> create(const ConnectionSettings&); + static SaslFactory& getInstance(); + ~SaslFactory(); + private: + SaslFactory(); + static qpid::sys::Mutex lock; + static std::auto_ptr<SaslFactory> instance; +}; +}} // namespace qpid::client + +#endif /*!QPID_CLIENT_SASLFACTORY_H*/ diff --git a/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp b/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp index 83b6329889..35b75c1fe8 100644 --- a/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp +++ b/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp @@ -184,16 +184,21 @@ void AsynchIOHandler::idle(AsynchIO&){ return; } if (codec == 0) return; - if (codec->canEncode()) { - // Try and get a queued buffer if not then construct new one - AsynchIO::BufferBase* buff = aio->getQueuedBuffer(); - if (!buff) buff = new Buff; - size_t encoded=codec->encode(buff->bytes, buff->byteCount); - buff->dataCount = encoded; - aio->queueWrite(buff); - } - if (codec->isClosed()) + try { + if (codec->canEncode()) { + // Try and get a queued buffer if not then construct new one + AsynchIO::BufferBase* buff = aio->getQueuedBuffer(); + if (!buff) buff = new Buff; + size_t encoded=codec->encode(buff->bytes, buff->byteCount); + buff->dataCount = encoded; + aio->queueWrite(buff); + } + if (codec->isClosed()) + aio->queueWriteClose(); + } catch (const std::exception& e) { + QPID_LOG(error, e.what()); aio->queueWriteClose(); + } } }} // namespace qpid::sys diff --git a/qpid/cpp/src/qpid/sys/Codec.h b/qpid/cpp/src/qpid/sys/Codec.h new file mode 100644 index 0000000000..f9645f554e --- /dev/null +++ b/qpid/cpp/src/qpid/sys/Codec.h @@ -0,0 +1,52 @@ +#ifndef QPID_SYS_CODEC_H +#define QPID_SYS_CODEC_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <cstddef> + +namespace qpid { +namespace sys { + +/** + * Generic codec interface + */ +class Codec +{ + public: + virtual ~Codec() {} + + /** Decode from buffer, return number of bytes decoded. + * @return may be less than size if there was incomplete + * data at the end of the buffer. + */ + virtual size_t decode(const char* buffer, size_t size) = 0; + + + /** Encode into buffer, return number of bytes encoded */ + virtual size_t encode(const char* buffer, size_t size) = 0; + + /** Return true if we have data to encode */ + virtual bool canEncode() = 0; +}; +}} // namespace qpid::sys + +#endif /*!QPID_SYS_CODEC_H*/ diff --git a/qpid/cpp/src/qpid/sys/ConnectionCodec.h b/qpid/cpp/src/qpid/sys/ConnectionCodec.h index b1b047d2cc..7f5e2f831c 100644 --- a/qpid/cpp/src/qpid/sys/ConnectionCodec.h +++ b/qpid/cpp/src/qpid/sys/ConnectionCodec.h @@ -21,6 +21,7 @@ * under the License. * */ +#include "Codec.h" #include "qpid/framing/ProtocolVersion.h" namespace qpid { @@ -34,23 +35,10 @@ class OutputControl; * Interface of coder/decoder for a connection of a specific protocol * version. */ -class ConnectionCodec { +class ConnectionCodec : public Codec { public: virtual ~ConnectionCodec() {} - /** Decode from buffer, return number of bytes decoded. - * @return may be less than size if there was incomplete - * data at the end of the buffer. - */ - virtual size_t decode(const char* buffer, size_t size) = 0; - - - /** Encode into buffer, return number of bytes encoded */ - virtual size_t encode(const char* buffer, size_t size) = 0; - - /** Return true if we have data to encode */ - virtual bool canEncode() = 0; - /** Network connection was closed from other end. */ virtual void closed() = 0; diff --git a/qpid/cpp/src/qpid/sys/SecurityLayer.h b/qpid/cpp/src/qpid/sys/SecurityLayer.h new file mode 100644 index 0000000000..6ad29eea80 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/SecurityLayer.h @@ -0,0 +1,42 @@ +#ifndef QPID_SYS_SECURITYLAYER_H +#define QPID_SYS_SECURITYLAYER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Codec.h" + +namespace qpid { +namespace sys { + +/** + * Defines interface to a SASL negotiated Security Layer (for + * encryption/integrity) + */ +class SecurityLayer : public Codec +{ + public: + virtual void init(Codec*) = 0; + virtual ~SecurityLayer() {} +}; + +}} // namespace qpid::sys + +#endif /*!QPID_SYS_SECURITYLAYER_H*/ diff --git a/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp b/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp index be091f86d8..c6e45b8fa4 100644 --- a/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp +++ b/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp @@ -65,14 +65,10 @@ static class TCPIOPlugin : public Plugin { // Only provide to a Broker if (broker) { const broker::Broker::Options& opts = broker->getOptions(); - if (opts.requireEncrypted) { - QPID_LOG(info, "Not accepting unencrypted connections on TCP"); - } else { - ProtocolFactory::shared_ptr protocol(new AsynchIOProtocolFactory(opts.port, opts.connectionBacklog, - opts.tcpNoDelay)); - QPID_LOG(notice, "Listening on TCP port " << protocol->getPort()); - broker->registerProtocolFactory("tcp", protocol); - } + ProtocolFactory::shared_ptr protocol(new AsynchIOProtocolFactory(opts.port, opts.connectionBacklog, + opts.tcpNoDelay)); + QPID_LOG(notice, "Listening on TCP port " << protocol->getPort()); + broker->registerProtocolFactory("tcp", protocol); } } } tcpPlugin; diff --git a/qpid/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp b/qpid/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp new file mode 100644 index 0000000000..e906d22cae --- /dev/null +++ b/qpid/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp @@ -0,0 +1,119 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "CyrusSecurityLayer.h" +#include <algorithm> +#include "qpid/framing/reply_exceptions.h" +#include "qpid/log/Statement.h" + +namespace qpid { +namespace sys { +namespace cyrus { + +CyrusSecurityLayer::CyrusSecurityLayer(sasl_conn_t* c, uint16_t maxFrameSize) : + conn(c), decrypted(0), decryptedSize(0), encrypted(0), encryptedSize(0), codec(0), maxInputSize(0), decodeBuffer(maxFrameSize) +{ + const void* value(0); + int result = sasl_getprop(conn, SASL_MAXOUTBUF, &value); + if (result != SASL_OK) { + throw framing::InternalErrorException(QPID_MSG("SASL encode error: " << sasl_errdetail(conn))); + } + maxInputSize = *(reinterpret_cast<const unsigned*>(value)); +} + +size_t CyrusSecurityLayer::decode(const char* input, size_t size) +{ + size_t inStart = 0; + do { + size_t inSize = std::min(size - inStart, maxInputSize); + int result = sasl_decode(conn, input + inStart, inSize, &decrypted, &decryptedSize); + if (result != SASL_OK) { + throw framing::InternalErrorException(QPID_MSG("SASL encode error: " << sasl_errdetail(conn))); + } + inStart += inSize; + size_t copied = 0; + do { + size_t count = std::min(decryptedSize - copied, decodeBuffer.size - decodeBuffer.position); + ::memcpy(decodeBuffer.data + decodeBuffer.position, decrypted + copied, count); + copied += count; + decodeBuffer.position += count; + size_t decodedSize = codec->decode(decodeBuffer.data, decodeBuffer.position); + if (decodedSize < decodeBuffer.position) { + ::memmove(decodeBuffer.data, decodeBuffer.data + decodedSize, decodeBuffer.position - decodedSize); + } + decodeBuffer.position -= decodedSize; + } while (copied < decryptedSize); + } while (inStart < size); + return size; +} + +size_t CyrusSecurityLayer::encode(const char* buffer, size_t size) +{ + size_t processed = 0;//records how many bytes have been written to buffer + do { + if (!encrypted) { + DataBuffer encodeBuffer(maxInputSize);//make sure maxInputSize > maxFrameSize + size_t encoded = codec->encode(encodeBuffer.data, encodeBuffer.size); + if (!encoded) break;//nothing more to do + int result = sasl_encode(conn, encodeBuffer.data, encoded, &encrypted, &encryptedSize); + if (result != SASL_OK) { + throw framing::InternalErrorException(QPID_MSG("SASL encode error: " << sasl_errdetail(conn))); + } + } + size_t remaining = size - processed; + if (remaining < encryptedSize) { + //can't fit all encrypted data in the buffer we've + //been given, copy in what we can and hold on to the + //rest until the next call + ::memcpy(const_cast<char*>(buffer + processed), encrypted, remaining); + processed += remaining; + encrypted += remaining; + encryptedSize -= remaining; + } else { + ::memcpy(const_cast<char*>(buffer + processed), encrypted, encryptedSize); + processed += encryptedSize; + encrypted = 0; + encryptedSize = 0; + } + } while (processed < size); + return processed; +} + +bool CyrusSecurityLayer::canEncode() +{ + return encrypted || codec->canEncode(); +} + +void CyrusSecurityLayer::init(qpid::sys::Codec* c) +{ + codec = c; +} + +CyrusSecurityLayer::DataBuffer::DataBuffer(size_t s) : position(0), size(s) +{ + data = new char[size]; +} + +CyrusSecurityLayer::DataBuffer::~DataBuffer() +{ + delete[] data; +} + +}}} // namespace qpid::sys::cyrus diff --git a/qpid/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.h b/qpid/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.h new file mode 100644 index 0000000000..3c00d496a9 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.h @@ -0,0 +1,66 @@ +#ifndef QPID_SYS_CYRUS_CYRUSSECURITYLAYER_H +#define QPID_SYS_CYRUS_CYRUSSECURITYLAYER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/IntegerTypes.h" +#include "qpid/sys/SecurityLayer.h" +#include <sasl/sasl.h> + +namespace qpid { +namespace sys { +namespace cyrus { + + +/** + * Implementation of SASL security layer using cyrus-sasl library + */ +class CyrusSecurityLayer : public qpid::sys::SecurityLayer +{ + public: + CyrusSecurityLayer(sasl_conn_t*, uint16_t maxFrameSize); + size_t decode(const char* buffer, size_t size); + size_t encode(const char* buffer, size_t size); + bool canEncode(); + void init(qpid::sys::Codec*); + private: + struct DataBuffer + { + char* data; + size_t position; + const size_t size; + DataBuffer(size_t); + ~DataBuffer(); + }; + + sasl_conn_t* conn; + const char* decrypted; + unsigned decryptedSize; + const char* encrypted; + unsigned encryptedSize; + qpid::sys::Codec* codec; + size_t maxInputSize; + DataBuffer decodeBuffer; +}; +}}} // namespace qpid::sys::cyrus + +#endif /*!QPID_SYS_CYRUS_CYRUSSECURITYLAYER_H*/ diff --git a/qpid/cpp/src/tests/.valgrind.supp b/qpid/cpp/src/tests/.valgrind.supp index 7ac34fde5d..7ae2bd9845 100644 --- a/qpid/cpp/src/tests/.valgrind.supp +++ b/qpid/cpp/src/tests/.valgrind.supp @@ -201,3 +201,10 @@ fun:_ZN5boost6detail3tss3setEPv } +{ + Shows up on RHEL5: believed benign + Memcheck:Cond + fun:__strcpy_chk + fun:_sasl_load_plugins + fun:sasl_client_init +} diff --git a/qpid/cpp/src/tests/ConnectionOptions.h b/qpid/cpp/src/tests/ConnectionOptions.h index 0130842668..30fe5ad9b1 100644 --- a/qpid/cpp/src/tests/ConnectionOptions.h +++ b/qpid/cpp/src/tests/ConnectionOptions.h @@ -47,7 +47,10 @@ struct ConnectionOptions : public qpid::Options, ("max-frame-size", optValue(maxFrameSize, "N"), "the maximum frame size to request.") ("bounds-multiplier", optValue(bounds, "N"), "bound size of write queue (as a multiple of the max frame size).") - ("tcp-nodelay", optValue(tcpNoDelay), "Turn on tcp-nodelay"); + ("tcp-nodelay", optValue(tcpNoDelay), "Turn on tcp-nodelay") + ("service", optValue(service, "SERVICE-NAME"), "SASL service name.") + ("min-ssf", optValue(minSsf, "N"), "Minimum acceptable strength for SASL security layer") + ("max-ssf", optValue(maxSsf, "N"), "Maximum acceptable strength for SASL security layer"); } }; |