/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "qpid/client/Connector.h" #include "config.h" #include "qpid/client/Bounds.h" #include "qpid/client/ConnectionImpl.h" #include "qpid/client/ConnectionSettings.h" #include "qpid/Options.h" #include "qpid/log/Statement.h" #include "qpid/sys/Time.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/InitiationHandler.h" #include "qpid/sys/ssl/util.h" #include "qpid/sys/AsynchIO.h" #include "qpid/sys/ssl/SslSocket.h" #include "qpid/sys/SocketAddress.h" #include "qpid/sys/Dispatcher.h" #include "qpid/sys/Poller.h" #include "qpid/sys/SecuritySettings.h" #include "qpid/Msg.h" #include #include #include namespace qpid { namespace client { using namespace qpid::sys; using namespace qpid::sys::ssl; using namespace qpid::framing; using boost::format; using boost::str; class SslConnector : public Connector { typedef std::deque Frames; const uint16_t maxFrameSize; sys::Mutex lock; Frames frames; size_t lastEof; // Position after last EOF in frames uint64_t currentSize; Bounds* bounds; framing::ProtocolVersion version; bool initiated; bool closed; sys::ShutdownHandler* shutdownHandler; framing::InputHandler* input; sys::ssl::SslSocket socket; sys::AsynchConnector* connector; sys::AsynchIO* aio; std::string identifier; Poller::shared_ptr poller; SecuritySettings securitySettings; ~SslConnector(); void readbuff(AsynchIO&, AsynchIOBufferBase*); void writebuff(AsynchIO&); void writeDataBlock(const framing::AMQDataBlock& data); void eof(AsynchIO&); void disconnected(AsynchIO&); void connect(const std::string& host, const std::string& port); void connected(const sys::Socket&); void connectFailed(const std::string& msg); void close(); void send(framing::AMQFrame& frame); void abort(); void connectAborted(); void setInputHandler(framing::InputHandler* handler); void setShutdownHandler(sys::ShutdownHandler* handler); sys::ShutdownHandler* getShutdownHandler() const; framing::OutputHandler* getOutputHandler(); const std::string& getIdentifier() const; const SecuritySettings* getSecuritySettings(); void socketClosed(AsynchIO&, const Socket&); size_t decode(const char* buffer, size_t size); size_t encode(char* buffer, size_t size); bool canEncode(); public: SslConnector(Poller::shared_ptr p, framing::ProtocolVersion pVersion, const ConnectionSettings&, ConnectionImpl*); }; // Static constructor which registers connector here namespace { Connector* create(Poller::shared_ptr p, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) { return new SslConnector(p, v, s, c); } struct StaticInit { StaticInit() { try { CommonOptions common("", "", QPIDC_CONF_FILE); SslOptions options; common.parse(0, 0, common.clientConfig, true); options.parse (0, 0, common.clientConfig, true); if (options.certDbPath.empty()) { QPID_LOG(info, "SSL connector not enabled, you must set QPID_SSL_CERT_DB to enable it."); } else { initNSS(options); Connector::registerFactory("ssl", &create); } } catch (const std::exception& e) { QPID_LOG(error, "Failed to initialise SSL connector: " << e.what()); } }; ~StaticInit() { shutdownNSS(); } } init; } SslConnector::SslConnector(Poller::shared_ptr p, ProtocolVersion ver, const ConnectionSettings& settings, ConnectionImpl* cimpl) : maxFrameSize(settings.maxFrameSize), lastEof(0), currentSize(0), bounds(cimpl), version(ver), initiated(false), closed(true), shutdownHandler(0), input(0), aio(0), poller(p) { QPID_LOG(debug, "SslConnector created for " << version.toString()); if (settings.sslCertName != "") { QPID_LOG(debug, "ssl-cert-name = " << settings.sslCertName); socket.setCertName(settings.sslCertName); } } SslConnector::~SslConnector() { close(); } void SslConnector::connect(const std::string& host, const std::string& port) { Mutex::ScopedLock l(lock); assert(closed); connector = AsynchConnector::create( socket, host, port, boost::bind(&SslConnector::connected, this, _1), boost::bind(&SslConnector::connectFailed, this, _3)); closed = false; connector->start(poller); } void SslConnector::connected(const Socket&) { connector = 0; aio = AsynchIO::create(socket, boost::bind(&SslConnector::readbuff, this, _1, _2), boost::bind(&SslConnector::eof, this, _1), boost::bind(&SslConnector::disconnected, this, _1), boost::bind(&SslConnector::socketClosed, this, _1, _2), 0, // nobuffs boost::bind(&SslConnector::writebuff, this, _1)); aio->createBuffers(maxFrameSize); identifier = str(format("[%1%]") % socket.getFullAddress()); ProtocolInitiation init(version); writeDataBlock(init); aio->start(poller); } void SslConnector::connectFailed(const std::string& msg) { connector = 0; QPID_LOG(warning, "Connect failed: " << msg); socket.close(); if (!closed) closed = true; if (shutdownHandler) shutdownHandler->shutdown(); } void SslConnector::close() { Mutex::ScopedLock l(lock); if (!closed) { closed = true; if (aio) aio->queueWriteClose(); } } void SslConnector::socketClosed(AsynchIO&, const Socket&) { if (aio) aio->queueForDeletion(); if (shutdownHandler) shutdownHandler->shutdown(); } void SslConnector::connectAborted() { connector->stop(); connectFailed("Connection timedout"); } void SslConnector::abort() { // Can't abort a closed connection if (!closed) { if (aio) { // Established connection aio->requestCallback(boost::bind(&SslConnector::eof, this, _1)); } else if (connector) { // We're still connecting connector->requestCallback(boost::bind(&SslConnector::connectAborted, this)); } } } void SslConnector::setInputHandler(InputHandler* handler){ input = handler; } void SslConnector::setShutdownHandler(ShutdownHandler* handler){ shutdownHandler = handler; } OutputHandler* SslConnector::getOutputHandler() { return this; } sys::ShutdownHandler* SslConnector::getShutdownHandler() const { return shutdownHandler; } const std::string& SslConnector::getIdentifier() const { return identifier; } void SslConnector::send(AMQFrame& frame) { bool notifyWrite = false; { Mutex::ScopedLock l(lock); frames.push_back(frame); //only ask to write if this is the end of a frameset or if we //already have a buffers worth of data currentSize += frame.encodedSize(); if (frame.getEof()) { lastEof = frames.size(); notifyWrite = true; } else { notifyWrite = (currentSize >= maxFrameSize); } /* NOTE: Moving the following line into this mutex block is a workaround for BZ 570168, in which the test testConcurrentSenders causes a hang about 1.5% of the time. ( To see the hang much more frequently leave this line out of the mutex block, and put a small usleep just before it.) TODO mgoulish - fix the underlying cause and then move this call back outside the mutex. */ if (notifyWrite && !closed) aio->notifyPendingWrite(); } } void SslConnector::writebuff(AsynchIO& /*aio*/) { // It's possible to be disconnected and be writable if (closed) return; if (!canEncode()) { return; } AsynchIOBufferBase* buffer = aio->getQueuedBuffer(); if (buffer) { size_t encoded = encode(buffer->bytes, buffer->byteCount); buffer->dataStart = 0; buffer->dataCount = encoded; aio->queueWrite(buffer); } } // Called in IO thread. bool SslConnector::canEncode() { Mutex::ScopedLock l(lock); //have at least one full frameset or a whole buffers worth of data return lastEof || currentSize >= maxFrameSize; } // Called in IO thread. size_t SslConnector::encode(char* buffer, size_t size) { framing::Buffer out(buffer, size); size_t bytesWritten(0); { Mutex::ScopedLock l(lock); while (!frames.empty() && out.available() >= frames.front().encodedSize() ) { frames.front().encode(out); QPID_LOG(trace, "SENT [" << identifier << "]: " << frames.front()); frames.pop_front(); if (lastEof) --lastEof; } bytesWritten = size - out.available(); currentSize -= bytesWritten; } if (bounds) bounds->reduce(bytesWritten); return bytesWritten; } void SslConnector::readbuff(AsynchIO& aio, AsynchIOBufferBase* buff) { int32_t decoded = decode(buff->bytes+buff->dataStart, buff->dataCount); // TODO: unreading needs to go away, and when we can cope // with multiple sub-buffers in the general buffer scheme, it will if (decoded < buff->dataCount) { // Adjust buffer for used bytes and then "unread them" buff->dataStart += decoded; buff->dataCount -= decoded; aio.unread(buff); } else { // Give whole buffer back to aio subsystem aio.queueReadBuffer(buff); } } size_t SslConnector::decode(const char* buffer, size_t size) { framing::Buffer in(const_cast(buffer), size); if (!initiated) { framing::ProtocolInitiation protocolInit; if (protocolInit.decode(in)) { QPID_LOG(debug, "RECV [" << identifier << "]: INIT(" << protocolInit << ")"); if(!(protocolInit==version)){ throw Exception(QPID_MSG("Unsupported version: " << protocolInit << " supported version " << version)); } } initiated = true; } AMQFrame frame; while(frame.decode(in)){ QPID_LOG(trace, "RECV [" << identifier << "]: " << frame); input->received(frame); } return size - in.available(); } void SslConnector::writeDataBlock(const AMQDataBlock& data) { AsynchIOBufferBase* buff = aio->getQueuedBuffer(); assert(buff); framing::Buffer out(buff->bytes, buff->byteCount); data.encode(out); buff->dataCount = data.encodedSize(); aio->queueWrite(buff); } void SslConnector::eof(AsynchIO&) { close(); } void SslConnector::disconnected(AsynchIO&) { close(); socketClosed(*aio, socket); } const SecuritySettings* SslConnector::getSecuritySettings() { securitySettings.ssf = socket.getKeyLen(); securitySettings.authid = "dummy";//set to non-empty string to enable external authentication return &securitySettings; } }} // namespace qpid::client