summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/client/SslConnector.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/client/SslConnector.cpp')
-rw-r--r--qpid/cpp/src/qpid/client/SslConnector.cpp428
1 files changed, 428 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/client/SslConnector.cpp b/qpid/cpp/src/qpid/client/SslConnector.cpp
new file mode 100644
index 0000000000..d5d2433060
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/SslConnector.cpp
@@ -0,0 +1,428 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/client/Connector.h"
+
+#include "config.h"
+#include "qpid/client/Bounds.h"
+#include "qpid/client/ConnectionImpl.h"
+#include "qpid/client/ConnectionSettings.h"
+#include "qpid/Options.h"
+#include "qpid/log/Statement.h"
+#include "qpid/sys/Time.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/InitiationHandler.h"
+#include "qpid/sys/ssl/util.h"
+#include "qpid/sys/AsynchIO.h"
+#include "qpid/sys/ssl/SslSocket.h"
+#include "qpid/sys/SocketAddress.h"
+#include "qpid/sys/Dispatcher.h"
+#include "qpid/sys/Poller.h"
+#include "qpid/sys/SecuritySettings.h"
+#include "qpid/Msg.h"
+#include "qpid/types/Exception.h"
+
+#include <iostream>
+#include <boost/bind.hpp>
+#include <boost/format.hpp>
+
+namespace qpid {
+namespace client {
+
+using namespace qpid::sys;
+using namespace qpid::sys::ssl;
+using namespace qpid::framing;
+using boost::format;
+using boost::str;
+
+
+class SslConnector : public Connector
+{
+ typedef std::deque<framing::AMQFrame> Frames;
+
+ const uint16_t maxFrameSize;
+
+ sys::Mutex lock;
+ Frames frames;
+ size_t lastEof; // Position after last EOF in frames
+ uint64_t currentSize;
+ Bounds* bounds;
+
+ framing::ProtocolVersion version;
+ bool initiated;
+ bool closed;
+
+ sys::ShutdownHandler* shutdownHandler;
+ framing::InputHandler* input;
+
+ sys::ssl::SslSocket socket;
+
+ sys::AsynchConnector* connector;
+ sys::AsynchIO* aio;
+ std::string identifier;
+ Poller::shared_ptr poller;
+ SecuritySettings securitySettings;
+
+ ~SslConnector();
+
+ void readbuff(AsynchIO&, AsynchIOBufferBase*);
+ void writebuff(AsynchIO&);
+ void writeDataBlock(const framing::AMQDataBlock& data);
+ void eof(AsynchIO&);
+ void disconnected(AsynchIO&);
+
+ void connect(const std::string& host, const std::string& port);
+ void connected(const sys::Socket&);
+ void connectFailed(const std::string& msg);
+
+ void close();
+ void handle(framing::AMQFrame& frame);
+ void abort();
+ void connectAborted();
+
+ void setInputHandler(framing::InputHandler* handler);
+ void setShutdownHandler(sys::ShutdownHandler* handler);
+ const std::string& getIdentifier() const;
+ const SecuritySettings* getSecuritySettings();
+ void socketClosed(AsynchIO&, const Socket&);
+
+ size_t decode(const char* buffer, size_t size);
+ size_t encode(char* buffer, size_t size);
+ bool canEncode();
+
+public:
+ SslConnector(Poller::shared_ptr p, framing::ProtocolVersion pVersion,
+ const ConnectionSettings&,
+ ConnectionImpl*);
+};
+
+// Static constructor which registers connector here
+namespace {
+ Connector* create(Poller::shared_ptr p, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c);
+
+ struct StaticInit {
+ static bool initialised;
+
+ StaticInit() {
+ Connector::registerFactory("ssl", &create);
+ };
+ ~StaticInit() {
+ if (initialised) shutdownNSS();
+ }
+
+ void checkInitialised() {
+ static qpid::sys::Mutex lock;
+ qpid::sys::Mutex::ScopedLock l(lock);
+ if (!initialised) {
+ CommonOptions common("", "", QPIDC_CONF_FILE);
+ SslOptions options;
+ try {
+ common.parse(0, 0, common.clientConfig, true);
+ options.parse (0, 0, common.clientConfig, true);
+ } catch (const std::exception& e) {
+ throw qpid::types::Exception(QPID_MSG("Failed to parse options while initialising SSL connector: " << e.what()));
+ }
+ if (options.certDbPath.empty()) {
+ throw qpid::types::Exception(QPID_MSG("SSL connector not enabled, you must set QPID_SSL_CERT_DB to enable it."));
+ } else {
+ try {
+ initNSS(options);
+ initialised = true;
+ } catch (const std::exception& e) {
+ throw qpid::types::Exception(QPID_MSG("Failed to initialise SSL: " << e.what()));
+ }
+ }
+ }
+ }
+
+ } init;
+ bool StaticInit::initialised = false;
+
+ Connector* create(Poller::shared_ptr p, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) {
+ init.checkInitialised();
+ return new SslConnector(p, v, s, c);
+ }
+}
+
+void initialiseSSL()
+{
+ init.checkInitialised();
+}
+
+void shutdownSSL()
+{
+ if (StaticInit::initialised) shutdownNSS();
+}
+
+SslConnector::SslConnector(Poller::shared_ptr p,
+ ProtocolVersion ver,
+ const ConnectionSettings& settings,
+ ConnectionImpl* cimpl)
+ : maxFrameSize(settings.maxFrameSize),
+ lastEof(0),
+ currentSize(0),
+ bounds(cimpl),
+ version(ver),
+ initiated(false),
+ closed(true),
+ shutdownHandler(0),
+ input(0),
+ aio(0),
+ poller(p)
+{
+ QPID_LOG(debug, "SslConnector created for " << version.toString());
+
+ if (settings.sslCertName != "") {
+ QPID_LOG(debug, "ssl-cert-name = " << settings.sslCertName);
+ socket.setCertName(settings.sslCertName);
+ }
+ if (settings.sslIgnoreHostnameVerificationFailure) {
+ socket.ignoreHostnameVerificationFailure();
+ }
+}
+
+SslConnector::~SslConnector() {
+ close();
+}
+
+void SslConnector::connect(const std::string& host, const std::string& port) {
+ Mutex::ScopedLock l(lock);
+ assert(closed);
+ connector = AsynchConnector::create(
+ socket,
+ host, port,
+ boost::bind(&SslConnector::connected, this, _1),
+ boost::bind(&SslConnector::connectFailed, this, _3));
+ closed = false;
+
+ connector->start(poller);
+}
+
+void SslConnector::connected(const Socket&) {
+ connector = 0;
+ aio = AsynchIO::create(socket,
+ boost::bind(&SslConnector::readbuff, this, _1, _2),
+ boost::bind(&SslConnector::eof, this, _1),
+ boost::bind(&SslConnector::disconnected, this, _1),
+ boost::bind(&SslConnector::socketClosed, this, _1, _2),
+ 0, // nobuffs
+ boost::bind(&SslConnector::writebuff, this, _1));
+
+ aio->createBuffers(maxFrameSize);
+ identifier = str(format("[%1%]") % socket.getFullAddress());
+ ProtocolInitiation init(version);
+ writeDataBlock(init);
+ aio->start(poller);
+}
+
+void SslConnector::connectFailed(const std::string& msg) {
+ connector = 0;
+ QPID_LOG(warning, "Connect failed: " << msg);
+ socket.close();
+ if (!closed)
+ closed = true;
+ if (shutdownHandler)
+ shutdownHandler->shutdown();
+}
+
+void SslConnector::close() {
+ Mutex::ScopedLock l(lock);
+ if (!closed) {
+ closed = true;
+ if (aio)
+ aio->queueWriteClose();
+ }
+}
+
+void SslConnector::socketClosed(AsynchIO&, const Socket&) {
+ if (aio)
+ aio->queueForDeletion();
+ if (shutdownHandler)
+ shutdownHandler->shutdown();
+}
+
+void SslConnector::connectAborted() {
+ connector->stop();
+ connectFailed("Connection timedout");
+}
+
+void SslConnector::abort() {
+ // Can't abort a closed connection
+ if (!closed) {
+ if (aio) {
+ // Established connection
+ aio->requestCallback(boost::bind(&SslConnector::eof, this, _1));
+ } else if (connector) {
+ // We're still connecting
+ connector->requestCallback(boost::bind(&SslConnector::connectAborted, this));
+ }
+ }
+}
+
+void SslConnector::setInputHandler(InputHandler* handler){
+ input = handler;
+}
+
+void SslConnector::setShutdownHandler(ShutdownHandler* handler){
+ shutdownHandler = handler;
+}
+
+const std::string& SslConnector::getIdentifier() const {
+ return identifier;
+}
+
+void SslConnector::handle(AMQFrame& frame) {
+ bool notifyWrite = false;
+ {
+ Mutex::ScopedLock l(lock);
+ frames.push_back(frame);
+ //only ask to write if this is the end of a frameset or if we
+ //already have a buffers worth of data
+ currentSize += frame.encodedSize();
+ if (frame.getEof()) {
+ lastEof = frames.size();
+ notifyWrite = true;
+ } else {
+ notifyWrite = (currentSize >= maxFrameSize);
+ }
+ /*
+ NOTE: Moving the following line into this mutex block
+ is a workaround for BZ 570168, in which the test
+ testConcurrentSenders causes a hang about 1.5%
+ of the time. ( To see the hang much more frequently
+ leave this line out of the mutex block, and put a
+ small usleep just before it.)
+
+ TODO mgoulish - fix the underlying cause and then
+ move this call back outside the mutex.
+ */
+ if (notifyWrite && !closed) aio->notifyPendingWrite();
+ }
+}
+
+void SslConnector::writebuff(AsynchIO& /*aio*/)
+{
+ // It's possible to be disconnected and be writable
+ if (closed)
+ return;
+
+ if (!canEncode()) {
+ return;
+ }
+
+ AsynchIOBufferBase* buffer = aio->getQueuedBuffer();
+ if (buffer) {
+
+ size_t encoded = encode(buffer->bytes, buffer->byteCount);
+
+ buffer->dataStart = 0;
+ buffer->dataCount = encoded;
+ aio->queueWrite(buffer);
+ }
+}
+
+// Called in IO thread.
+bool SslConnector::canEncode()
+{
+ Mutex::ScopedLock l(lock);
+ //have at least one full frameset or a whole buffers worth of data
+ return lastEof || currentSize >= maxFrameSize;
+}
+
+// Called in IO thread.
+size_t SslConnector::encode(char* buffer, size_t size)
+{
+ framing::Buffer out(buffer, size);
+ size_t bytesWritten(0);
+ {
+ Mutex::ScopedLock l(lock);
+ while (!frames.empty() && out.available() >= frames.front().encodedSize() ) {
+ frames.front().encode(out);
+ QPID_LOG(trace, "SENT [" << identifier << "]: " << frames.front());
+ frames.pop_front();
+ if (lastEof) --lastEof;
+ }
+ bytesWritten = size - out.available();
+ currentSize -= bytesWritten;
+ }
+ if (bounds) bounds->reduce(bytesWritten);
+ return bytesWritten;
+}
+
+void SslConnector::readbuff(AsynchIO& aio, AsynchIOBufferBase* buff)
+{
+ int32_t decoded = decode(buff->bytes+buff->dataStart, buff->dataCount);
+ // TODO: unreading needs to go away, and when we can cope
+ // with multiple sub-buffers in the general buffer scheme, it will
+ if (decoded < buff->dataCount) {
+ // Adjust buffer for used bytes and then "unread them"
+ buff->dataStart += decoded;
+ buff->dataCount -= decoded;
+ aio.unread(buff);
+ } else {
+ // Give whole buffer back to aio subsystem
+ aio.queueReadBuffer(buff);
+ }
+}
+
+size_t SslConnector::decode(const char* buffer, size_t size)
+{
+ framing::Buffer in(const_cast<char*>(buffer), size);
+ try {
+ if (checkProtocolHeader(in, version)) {
+ AMQFrame frame;
+ while(frame.decode(in)){
+ QPID_LOG(trace, "RECV [" << identifier << "]: " << frame);
+ input->received(frame);
+ }
+ }
+ } catch (const ProtocolVersionError& e) {
+ QPID_LOG(info, "Closing connection due to " << e.what());
+ close();
+ }
+ return size - in.available();
+}
+
+void SslConnector::writeDataBlock(const AMQDataBlock& data) {
+ AsynchIOBufferBase* buff = aio->getQueuedBuffer();
+ assert(buff);
+ framing::Buffer out(buff->bytes, buff->byteCount);
+ data.encode(out);
+ buff->dataCount = data.encodedSize();
+ aio->queueWrite(buff);
+}
+
+void SslConnector::eof(AsynchIO&) {
+ close();
+}
+
+void SslConnector::disconnected(AsynchIO&) {
+ close();
+ socketClosed(*aio, socket);
+}
+
+const SecuritySettings* SslConnector::getSecuritySettings()
+{
+ securitySettings.ssf = socket.getKeyLen();
+ securitySettings.authid = "dummy";//set to non-empty string to enable external authentication
+ return &securitySettings;
+}
+
+}} // namespace qpid::client