diff options
Diffstat (limited to 'cpp/src/qpid/client/SslConnector.cpp')
-rw-r--r-- | cpp/src/qpid/client/SslConnector.cpp | 381 |
1 files changed, 0 insertions, 381 deletions
diff --git a/cpp/src/qpid/client/SslConnector.cpp b/cpp/src/qpid/client/SslConnector.cpp deleted file mode 100644 index 35c7e6bdf6..0000000000 --- a/cpp/src/qpid/client/SslConnector.cpp +++ /dev/null @@ -1,381 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/client/Connector.h" - -#include "config.h" -#include "qpid/client/Bounds.h" -#include "qpid/client/ConnectionImpl.h" -#include "qpid/client/ConnectionSettings.h" -#include "qpid/Options.h" -#include "qpid/log/Statement.h" -#include "qpid/sys/Time.h" -#include "qpid/framing/AMQFrame.h" -#include "qpid/framing/InitiationHandler.h" -#include "qpid/sys/ssl/util.h" -#include "qpid/sys/ssl/SslIo.h" -#include "qpid/sys/ssl/SslSocket.h" -#include "qpid/sys/Dispatcher.h" -#include "qpid/sys/Poller.h" -#include "qpid/sys/SecuritySettings.h" -#include "qpid/Msg.h" - -#include <iostream> -#include <map> -#include <boost/bind.hpp> -#include <boost/format.hpp> - -namespace qpid { -namespace client { - -using namespace qpid::sys; -using namespace qpid::sys::ssl; -using namespace qpid::framing; -using boost::format; -using boost::str; - - -class SslConnector : public Connector -{ - struct Buff; - - /** Batch up frames for writing to aio. */ - class Writer : public framing::FrameHandler { - typedef sys::ssl::SslIOBufferBase BufferBase; - typedef std::vector<framing::AMQFrame> Frames; - - const uint16_t maxFrameSize; - sys::Mutex lock; - sys::ssl::SslIO* aio; - BufferBase* buffer; - Frames frames; - size_t lastEof; // Position after last EOF in frames - framing::Buffer encode; - size_t framesEncoded; - std::string identifier; - Bounds* bounds; - - void writeOne(); - void newBuffer(); - - public: - - Writer(uint16_t maxFrameSize, Bounds*); - ~Writer(); - void init(std::string id, sys::ssl::SslIO*); - void handle(framing::AMQFrame&); - void write(sys::ssl::SslIO&); - }; - - const uint16_t maxFrameSize; - framing::ProtocolVersion version; - bool initiated; - SecuritySettings securitySettings; - - sys::Mutex closedLock; - bool closed; - - sys::ShutdownHandler* shutdownHandler; - framing::InputHandler* input; - framing::InitiationHandler* initialiser; - framing::OutputHandler* output; - - Writer writer; - - sys::ssl::SslSocket socket; - - sys::ssl::SslIO* aio; - Poller::shared_ptr poller; - - ~SslConnector(); - - void readbuff(qpid::sys::ssl::SslIO&, qpid::sys::ssl::SslIOBufferBase*); - void writebuff(qpid::sys::ssl::SslIO&); - void writeDataBlock(const framing::AMQDataBlock& data); - void eof(qpid::sys::ssl::SslIO&); - void disconnected(qpid::sys::ssl::SslIO&); - - std::string identifier; - - void connect(const std::string& host, int port); - void init(); - void close(); - void send(framing::AMQFrame& frame); - void abort() {} // TODO: Need to fix for heartbeat timeouts to work - - void setInputHandler(framing::InputHandler* handler); - void setShutdownHandler(sys::ShutdownHandler* handler); - sys::ShutdownHandler* getShutdownHandler() const; - framing::OutputHandler* getOutputHandler(); - const std::string& getIdentifier() const; - const SecuritySettings* getSecuritySettings(); - void socketClosed(qpid::sys::ssl::SslIO&, const qpid::sys::ssl::SslSocket&); - -public: - SslConnector(Poller::shared_ptr p, framing::ProtocolVersion pVersion, - const ConnectionSettings&, - ConnectionImpl*); -}; - -struct SslConnector::Buff : public SslIO::BufferBase { - Buff(size_t size) : SslIO::BufferBase(new char[size], size) {} - ~Buff() { delete [] bytes;} -}; - -// Static constructor which registers connector here -namespace { - Connector* create(Poller::shared_ptr p, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) { - return new SslConnector(p, v, s, c); - } - - struct StaticInit { - StaticInit() { - try { - SslOptions options; - options.parse (0, 0, QPIDC_CONF_FILE, true); - if (options.certDbPath.empty()) { - QPID_LOG(info, "SSL connector not enabled, you must set QPID_SSL_CERT_DB to enable it."); - } else { - initNSS(options); - Connector::registerFactory("ssl", &create); - } - } catch (const std::exception& e) { - QPID_LOG(error, "Failed to initialise SSL connector: " << e.what()); - } - }; - - ~StaticInit() { shutdownNSS(); } - } init; -} - -SslConnector::SslConnector(Poller::shared_ptr p, - ProtocolVersion ver, - const ConnectionSettings& settings, - ConnectionImpl* cimpl) - : maxFrameSize(settings.maxFrameSize), - version(ver), - initiated(false), - closed(true), - shutdownHandler(0), - writer(maxFrameSize, cimpl), - aio(0), - poller(p) -{ - QPID_LOG(debug, "SslConnector created for " << version.toString()); - - if (settings.sslCertName != "") { - QPID_LOG(debug, "ssl-cert-name = " << settings.sslCertName); - socket.setCertName(settings.sslCertName); - } -} - -SslConnector::~SslConnector() { - close(); -} - -void SslConnector::connect(const std::string& host, int port){ - Mutex::ScopedLock l(closedLock); - assert(closed); - try { - socket.connect(host, port); - } catch (const std::exception& e) { - socket.close(); - throw ConnectionException(framing::connection::CLOSE_CODE_FRAMING_ERROR, e.what()); - } - - identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress()); - closed = false; - aio = new SslIO(socket, - boost::bind(&SslConnector::readbuff, this, _1, _2), - boost::bind(&SslConnector::eof, this, _1), - boost::bind(&SslConnector::disconnected, this, _1), - boost::bind(&SslConnector::socketClosed, this, _1, _2), - 0, // nobuffs - boost::bind(&SslConnector::writebuff, this, _1)); - writer.init(identifier, aio); -} - -void SslConnector::init(){ - Mutex::ScopedLock l(closedLock); - ProtocolInitiation init(version); - writeDataBlock(init); - for (int i = 0; i < 32; i++) { - aio->queueReadBuffer(new Buff(maxFrameSize)); - } - aio->start(poller); -} - -void SslConnector::close() { - Mutex::ScopedLock l(closedLock); - if (!closed) { - closed = true; - if (aio) - aio->queueWriteClose(); - } -} - -void SslConnector::socketClosed(SslIO&, const SslSocket&) { - if (aio) - aio->queueForDeletion(); - if (shutdownHandler) - shutdownHandler->shutdown(); -} - -void SslConnector::setInputHandler(InputHandler* handler){ - input = handler; -} - -void SslConnector::setShutdownHandler(ShutdownHandler* handler){ - shutdownHandler = handler; -} - -OutputHandler* SslConnector::getOutputHandler() { - return this; -} - -sys::ShutdownHandler* SslConnector::getShutdownHandler() const { - return shutdownHandler; -} - -const std::string& SslConnector::getIdentifier() const { - return identifier; -} - -void SslConnector::send(AMQFrame& frame) { - writer.handle(frame); -} - -SslConnector::Writer::Writer(uint16_t s, Bounds* b) : maxFrameSize(s), aio(0), buffer(0), lastEof(0), bounds(b) -{ -} - -SslConnector::Writer::~Writer() { delete buffer; } - -void SslConnector::Writer::init(std::string id, sys::ssl::SslIO* a) { - Mutex::ScopedLock l(lock); - identifier = id; - aio = a; - newBuffer(); -} -void SslConnector::Writer::handle(framing::AMQFrame& frame) { - Mutex::ScopedLock l(lock); - frames.push_back(frame); - if (frame.getEof() || (bounds && bounds->getCurrentSize() >= maxFrameSize)) { - lastEof = frames.size(); - aio->notifyPendingWrite(); - } - QPID_LOG(trace, "SENT " << identifier << ": " << frame); -} - -void SslConnector::Writer::writeOne() { - assert(buffer); - framesEncoded = 0; - - buffer->dataStart = 0; - buffer->dataCount = encode.getPosition(); - aio->queueWrite(buffer); - newBuffer(); -} - -void SslConnector::Writer::newBuffer() { - buffer = aio->getQueuedBuffer(); - if (!buffer) buffer = new Buff(maxFrameSize); - encode = framing::Buffer(buffer->bytes, buffer->byteCount); - framesEncoded = 0; -} - -// Called in IO thread. -void SslConnector::Writer::write(sys::ssl::SslIO&) { - Mutex::ScopedLock l(lock); - assert(buffer); - size_t bytesWritten(0); - for (size_t i = 0; i < lastEof; ++i) { - AMQFrame& frame = frames[i]; - uint32_t size = frame.encodedSize(); - if (size > encode.available()) writeOne(); - assert(size <= encode.available()); - frame.encode(encode); - ++framesEncoded; - bytesWritten += size; - } - frames.erase(frames.begin(), frames.begin()+lastEof); - lastEof = 0; - if (bounds) bounds->reduce(bytesWritten); - if (encode.getPosition() > 0) writeOne(); -} - -void SslConnector::readbuff(SslIO& aio, SslIO::BufferBase* buff) { - framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount); - - if (!initiated) { - framing::ProtocolInitiation protocolInit; - if (protocolInit.decode(in)) { - //TODO: check the version is correct - QPID_LOG(debug, "RECV " << identifier << " INIT(" << protocolInit << ")"); - } - initiated = true; - } - AMQFrame frame; - while(frame.decode(in)){ - QPID_LOG(trace, "RECV " << identifier << ": " << frame); - input->received(frame); - } - // TODO: unreading needs to go away, and when we can cope - // with multiple sub-buffers in the general buffer scheme, it will - if (in.available() != 0) { - // Adjust buffer for used bytes and then "unread them" - buff->dataStart += buff->dataCount-in.available(); - buff->dataCount = in.available(); - aio.unread(buff); - } else { - // Give whole buffer back to aio subsystem - aio.queueReadBuffer(buff); - } -} - -void SslConnector::writebuff(SslIO& aio_) { - writer.write(aio_); -} - -void SslConnector::writeDataBlock(const AMQDataBlock& data) { - SslIO::BufferBase* buff = new Buff(maxFrameSize); - framing::Buffer out(buff->bytes, buff->byteCount); - data.encode(out); - buff->dataCount = data.encodedSize(); - aio->queueWrite(buff); -} - -void SslConnector::eof(SslIO&) { - close(); -} - -void SslConnector::disconnected(SslIO&) { - close(); - socketClosed(*aio, socket); -} - -const SecuritySettings* SslConnector::getSecuritySettings() -{ - securitySettings.ssf = socket.getKeyLen(); - securitySettings.authid = "dummy";//set to non-empty string to enable external authentication - return &securitySettings; -} - -}} // namespace qpid::client |