summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/ssl
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys/ssl')
-rw-r--r--cpp/src/qpid/sys/ssl/SslHandler.cpp217
-rw-r--r--cpp/src/qpid/sys/ssl/SslHandler.h85
-rw-r--r--cpp/src/qpid/sys/ssl/SslIo.cpp470
-rw-r--r--cpp/src/qpid/sys/ssl/SslIo.h193
-rw-r--r--cpp/src/qpid/sys/ssl/SslSocket.cpp182
-rw-r--r--cpp/src/qpid/sys/ssl/SslSocket.h32
-rw-r--r--cpp/src/qpid/sys/ssl/util.cpp19
7 files changed, 95 insertions, 1103 deletions
diff --git a/cpp/src/qpid/sys/ssl/SslHandler.cpp b/cpp/src/qpid/sys/ssl/SslHandler.cpp
deleted file mode 100644
index eeb8c26a76..0000000000
--- a/cpp/src/qpid/sys/ssl/SslHandler.cpp
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "qpid/sys/ssl/SslHandler.h"
-#include "qpid/sys/ssl/SslIo.h"
-#include "qpid/sys/ssl/SslSocket.h"
-#include "qpid/sys/Timer.h"
-#include "qpid/framing/AMQP_HighestVersion.h"
-#include "qpid/framing/ProtocolInitiation.h"
-#include "qpid/log/Statement.h"
-
-#include <boost/bind.hpp>
-
-namespace qpid {
-namespace sys {
-namespace ssl {
-
-
-struct ProtocolTimeoutTask : public sys::TimerTask {
- SslHandler& handler;
- std::string id;
-
- ProtocolTimeoutTask(const std::string& i, const Duration& timeout, SslHandler& h) :
- TimerTask(timeout, "ProtocolTimeout"),
- handler(h),
- id(i)
- {}
-
- void fire() {
- // If this fires it means that we didn't negotiate the connection in the timeout period
- // Schedule closing the connection for the io thread
- QPID_LOG(error, "Connection " << id << " No protocol received closing");
- handler.abort();
- }
-};
-
-SslHandler::SslHandler(std::string id, ConnectionCodec::Factory* f, bool _nodict) :
- identifier(id),
- aio(0),
- factory(f),
- codec(0),
- readError(false),
- isClient(false),
- nodict(_nodict)
-{}
-
-SslHandler::~SslHandler() {
- if (codec)
- codec->closed();
- if (timeoutTimerTask)
- timeoutTimerTask->cancel();
- delete codec;
-}
-
-void SslHandler::init(SslIO* a, Timer& timer, uint32_t maxTime) {
- aio = a;
-
- // Start timer for this connection
- timeoutTimerTask = new ProtocolTimeoutTask(identifier, maxTime*TIME_MSEC, *this);
- timer.add(timeoutTimerTask);
-
- // Give connection some buffers to use
- aio->createBuffers();
-}
-
-void SslHandler::write(const framing::ProtocolInitiation& data)
-{
- QPID_LOG(debug, "SENT [" << identifier << "]: INIT(" << data << ")");
- SslIO::BufferBase* buff = aio->getQueuedBuffer();
- assert(buff);
- framing::Buffer out(buff->bytes, buff->byteCount);
- data.encode(out);
- buff->dataCount = data.encodedSize();
- aio->queueWrite(buff);
-}
-
-void SslHandler::abort() {
- // Don't disconnect if we're already disconnecting
- if (!readError) {
- aio->requestCallback(boost::bind(&SslHandler::eof, this, _1));
- }
-}
-void SslHandler::activateOutput() {
- aio->notifyPendingWrite();
-}
-
-void SslHandler::giveReadCredit(int32_t) {
- // FIXME aconway 2008-12-05: not yet implemented.
-}
-
-// Input side
-void SslHandler::readbuff(SslIO& , SslIO::BufferBase* buff) {
- if (readError) {
- return;
- }
- size_t decoded = 0;
- if (codec) { // Already initiated
- try {
- decoded = codec->decode(buff->bytes+buff->dataStart, buff->dataCount);
- }catch(const std::exception& e){
- QPID_LOG(error, e.what());
- readError = true;
- aio->queueWriteClose();
- }
- }else{
- framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
- framing::ProtocolInitiation protocolInit;
- if (protocolInit.decode(in)) {
- // We've just got the protocol negotiation so we can cancel the timeout for that
- timeoutTimerTask->cancel();
-
- decoded = in.getPosition();
- QPID_LOG(debug, "RECV [" << identifier << "]: INIT(" << protocolInit << ")");
- try {
- codec = factory->create(protocolInit.getVersion(), *this, identifier, getSecuritySettings(aio));
- if (!codec) {
- //TODO: may still want to revise this...
- //send valid version header & close connection.
- write(framing::ProtocolInitiation(framing::highestProtocolVersion));
- readError = true;
- aio->queueWriteClose();
- }
- } catch (const std::exception& e) {
- QPID_LOG(error, e.what());
- readError = true;
- aio->queueWriteClose();
- }
- }
- }
- // TODO: unreading needs to go away, and when we can cope
- // with multiple sub-buffers in the general buffer scheme, it will
- if (decoded != size_t(buff->dataCount)) {
- // Adjust buffer for used bytes and then "unread them"
- buff->dataStart += decoded;
- buff->dataCount -= decoded;
- aio->unread(buff);
- } else {
- // Give whole buffer back to aio subsystem
- aio->queueReadBuffer(buff);
- }
-}
-
-void SslHandler::eof(SslIO&) {
- QPID_LOG(debug, "DISCONNECTED [" << identifier << "]");
- if (codec) codec->closed();
- aio->queueWriteClose();
-}
-
-void SslHandler::closedSocket(SslIO&, const SslSocket& s) {
- // If we closed with data still to send log a warning
- if (!aio->writeQueueEmpty()) {
- QPID_LOG(warning, "CLOSING [" << identifier << "] unsent data (probably due to client disconnect)");
- }
- delete &s;
- aio->queueForDeletion();
- delete this;
-}
-
-void SslHandler::disconnect(SslIO& a) {
- // treat the same as eof
- eof(a);
-}
-
-// Notifications
-void SslHandler::nobuffs(SslIO&) {
-}
-
-void SslHandler::idle(SslIO&){
- if (isClient && codec == 0) {
- codec = factory->create(*this, identifier, getSecuritySettings(aio));
- write(framing::ProtocolInitiation(codec->getVersion()));
- // We've just sent the protocol negotiation so we can cancel the timeout for that
- // This is not ideal, because we've not received anything yet, but heartbeats will
- // be active soon
- timeoutTimerTask->cancel();
- return;
- }
- if (codec == 0) return;
- if (!codec->canEncode()) {
- return;
- }
- SslIO::BufferBase* buff = aio->getQueuedBuffer();
- if (buff) {
- size_t encoded=codec->encode(buff->bytes, buff->byteCount);
- buff->dataCount = encoded;
- aio->queueWrite(buff);
- }
- if (codec->isClosed())
- aio->queueWriteClose();
-}
-
-SecuritySettings SslHandler::getSecuritySettings(SslIO* aio)
-{
- SecuritySettings settings = aio->getSecuritySettings();
- settings.nodict = nodict;
- return settings;
-}
-
-
-}}} // namespace qpid::sys::ssl
diff --git a/cpp/src/qpid/sys/ssl/SslHandler.h b/cpp/src/qpid/sys/ssl/SslHandler.h
deleted file mode 100644
index 14814b0281..0000000000
--- a/cpp/src/qpid/sys/ssl/SslHandler.h
+++ /dev/null
@@ -1,85 +0,0 @@
-#ifndef QPID_SYS_SSL_SSLHANDLER_H
-#define QPID_SYS_SSL_SSLHANDLER_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/sys/ConnectionCodec.h"
-#include "qpid/sys/OutputControl.h"
-
-#include <boost/intrusive_ptr.hpp>
-
-namespace qpid {
-
-namespace framing {
- class ProtocolInitiation;
-}
-
-namespace sys {
-
-class Timer;
-class TimerTask;
-
-namespace ssl {
-
-class SslIO;
-struct SslIOBufferBase;
-class SslSocket;
-
-class SslHandler : public OutputControl {
- std::string identifier;
- SslIO* aio;
- ConnectionCodec::Factory* factory;
- ConnectionCodec* codec;
- bool readError;
- bool isClient;
- bool nodict;
- boost::intrusive_ptr<sys::TimerTask> timeoutTimerTask;
-
- void write(const framing::ProtocolInitiation&);
- qpid::sys::SecuritySettings getSecuritySettings(SslIO* aio);
-
- public:
- SslHandler(std::string id, ConnectionCodec::Factory* f, bool nodict);
- ~SslHandler();
- void init(SslIO* a, Timer& timer, uint32_t maxTime);
-
- void setClient() { isClient = true; }
-
- // Output side
- void abort();
- void activateOutput();
- void giveReadCredit(int32_t);
-
- // Input side
- void readbuff(SslIO& aio, SslIOBufferBase* buff);
- void eof(SslIO& aio);
- void disconnect(SslIO& aio);
-
- // Notifications
- void nobuffs(SslIO& aio);
- void idle(SslIO& aio);
- void closedSocket(SslIO& aio, const SslSocket& s);
-};
-
-}}} // namespace qpid::sys::ssl
-
-#endif /*!QPID_SYS_SSL_SSLHANDLER_H*/
diff --git a/cpp/src/qpid/sys/ssl/SslIo.cpp b/cpp/src/qpid/sys/ssl/SslIo.cpp
deleted file mode 100644
index bbfb703170..0000000000
--- a/cpp/src/qpid/sys/ssl/SslIo.cpp
+++ /dev/null
@@ -1,470 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/sys/ssl/SslIo.h"
-#include "qpid/sys/ssl/SslSocket.h"
-#include "qpid/sys/ssl/check.h"
-
-#include "qpid/sys/Time.h"
-#include "qpid/sys/posix/check.h"
-#include "qpid/log/Statement.h"
-
-// TODO The basic algorithm here is not really POSIX specific and with a bit more abstraction
-// could (should) be promoted to be platform portable
-#include <unistd.h>
-#include <sys/socket.h>
-#include <signal.h>
-#include <errno.h>
-#include <string.h>
-
-#include <boost/bind.hpp>
-
-namespace qpid {
-namespace sys {
-namespace ssl {
-
-namespace {
-
-/*
- * Make *process* not generate SIGPIPE when writing to closed
- * pipe/socket (necessary as default action is to terminate process)
- */
-void ignoreSigpipe() {
- ::signal(SIGPIPE, SIG_IGN);
-}
-
-/*
- * We keep per thread state to avoid locking overhead. The assumption is that
- * on average all the connections are serviced by all the threads so the state
- * recorded in each thread is about the same. If this turns out not to be the
- * case we could rebalance the info occasionally.
- */
-__thread int threadReadTotal = 0;
-__thread int threadReadCount = 0;
-__thread int threadWriteTotal = 0;
-__thread int threadWriteCount = 0;
-__thread int64_t threadMaxIoTimeNs = 2 * 1000000; // start at 2ms
-}
-
-/*
- * Asynch Acceptor
- */
-
-template <class T>
-SslAcceptorTmpl<T>::SslAcceptorTmpl(const T& s, Callback callback) :
- acceptedCallback(callback),
- handle(s, boost::bind(&SslAcceptorTmpl<T>::readable, this, _1), 0, 0),
- socket(s) {
-
- s.setNonblocking();
- ignoreSigpipe();
-}
-
-template <class T>
-SslAcceptorTmpl<T>::~SslAcceptorTmpl()
-{
- handle.stopWatch();
-}
-
-template <class T>
-void SslAcceptorTmpl<T>::start(Poller::shared_ptr poller) {
- handle.startWatch(poller);
-}
-
-/*
- * We keep on accepting as long as there is something to accept
- */
-template <class T>
-void SslAcceptorTmpl<T>::readable(DispatchHandle& h) {
- Socket* s;
- do {
- errno = 0;
- // TODO: Currently we ignore the peers address, perhaps we should
- // log it or use it for connection acceptance.
- try {
- s = socket.accept();
- if (s) {
- acceptedCallback(*s);
- } else {
- break;
- }
- } catch (const std::exception& e) {
- QPID_LOG(error, "Could not accept socket: " << e.what());
- }
- } while (true);
-
- h.rewatch();
-}
-
-// Explicitly instantiate the templates we need
-template class SslAcceptorTmpl<SslSocket>;
-template class SslAcceptorTmpl<SslMuxSocket>;
-
-/*
- * Asynch Connector
- */
-
-SslConnector::SslConnector(const SslSocket& s,
- Poller::shared_ptr poller,
- std::string hostname,
- std::string port,
- ConnectedCallback connCb,
- FailedCallback failCb) :
- DispatchHandle(s,
- 0,
- boost::bind(&SslConnector::connComplete, this, _1),
- boost::bind(&SslConnector::connComplete, this, _1)),
- connCallback(connCb),
- failCallback(failCb),
- socket(s)
-{
- //TODO: would be better for connect to be performed on a
- //non-blocking socket, but that doesn't work at present so connect
- //blocks until complete
- try {
- socket.connect(hostname, port);
- socket.setNonblocking();
- startWatch(poller);
- } catch(std::exception& e) {
- failure(-1, std::string(e.what()));
- }
-}
-
-void SslConnector::connComplete(DispatchHandle& h)
-{
- int errCode = socket.getError();
-
- h.stopWatch();
- if (errCode == 0) {
- connCallback(socket);
- DispatchHandle::doDelete();
- } else {
- // TODO: This need to be fixed as strerror isn't thread safe
- failure(errCode, std::string(::strerror(errCode)));
- }
-}
-
-void SslConnector::failure(int errCode, std::string message)
-{
- if (failCallback)
- failCallback(errCode, message);
-
- socket.close();
- delete &socket;
-
- DispatchHandle::doDelete();
-}
-
-/*
- * Asynch reader/writer
- */
-SslIO::SslIO(const SslSocket& s,
- ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb,
- ClosedCallback cCb, BuffersEmptyCallback eCb, IdleCallback iCb) :
-
- DispatchHandle(s,
- boost::bind(&SslIO::readable, this, _1),
- boost::bind(&SslIO::writeable, this, _1),
- boost::bind(&SslIO::disconnected, this, _1)),
- readCallback(rCb),
- eofCallback(eofCb),
- disCallback(disCb),
- closedCallback(cCb),
- emptyCallback(eCb),
- idleCallback(iCb),
- socket(s),
- queuedClose(false),
- writePending(false) {
-
- s.setNonblocking();
-}
-
-SslIO::~SslIO() {
-}
-
-void SslIO::queueForDeletion() {
- DispatchHandle::doDelete();
-}
-
-void SslIO::start(Poller::shared_ptr poller) {
- DispatchHandle::startWatch(poller);
-}
-
-void SslIO::createBuffers(uint32_t size) {
- // Allocate all the buffer memory at once
- bufferMemory.reset(new char[size*BufferCount]);
-
- // Create the Buffer structs in a vector
- // And push into the buffer queue
- buffers.reserve(BufferCount);
- for (uint32_t i = 0; i < BufferCount; i++) {
- buffers.push_back(BufferBase(&bufferMemory[i*size], size));
- queueReadBuffer(&buffers[i]);
- }
-}
-
-void SslIO::queueReadBuffer(BufferBase* buff) {
- assert(buff);
- buff->dataStart = 0;
- buff->dataCount = 0;
- bufferQueue.push_back(buff);
- DispatchHandle::rewatchRead();
-}
-
-void SslIO::unread(BufferBase* buff) {
- assert(buff);
- if (buff->dataStart != 0) {
- memmove(buff->bytes, buff->bytes+buff->dataStart, buff->dataCount);
- buff->dataStart = 0;
- }
- bufferQueue.push_front(buff);
- DispatchHandle::rewatchRead();
-}
-
-void SslIO::queueWrite(BufferBase* buff) {
- assert(buff);
- // If we've already closed the socket then throw the write away
- if (queuedClose) {
- bufferQueue.push_front(buff);
- return;
- } else {
- writeQueue.push_front(buff);
- }
- writePending = false;
- DispatchHandle::rewatchWrite();
-}
-
-void SslIO::notifyPendingWrite() {
- writePending = true;
- DispatchHandle::rewatchWrite();
-}
-
-void SslIO::queueWriteClose() {
- queuedClose = true;
- DispatchHandle::rewatchWrite();
-}
-
-void SslIO::requestCallback(RequestCallback callback) {
- // TODO creating a function object every time isn't all that
- // efficient - if this becomes heavily used do something better (what?)
- assert(callback);
- DispatchHandle::call(boost::bind(&SslIO::requestedCall, this, callback));
-}
-
-void SslIO::requestedCall(RequestCallback callback) {
- assert(callback);
- callback(*this);
-}
-
-/** Return a queued buffer if there are enough
- * to spare
- */
-SslIO::BufferBase* SslIO::getQueuedBuffer() {
- // Always keep at least one buffer (it might have data that was "unread" in it)
- if (bufferQueue.size()<=1)
- return 0;
- BufferBase* buff = bufferQueue.back();
- assert(buff);
- buff->dataStart = 0;
- buff->dataCount = 0;
- bufferQueue.pop_back();
- return buff;
-}
-
-/*
- * We keep on reading as long as we have something to read and a buffer to put
- * it in
- */
-void SslIO::readable(DispatchHandle& h) {
- AbsTime readStartTime = AbsTime::now();
- do {
- // (Try to) get a buffer
- if (!bufferQueue.empty()) {
- // Read into buffer
- BufferBase* buff = bufferQueue.front();
- assert(buff);
- bufferQueue.pop_front();
- errno = 0;
- int readCount = buff->byteCount-buff->dataCount;
- int rc = socket.read(buff->bytes + buff->dataCount, readCount);
- if (rc > 0) {
- buff->dataCount += rc;
- threadReadTotal += rc;
-
- readCallback(*this, buff);
- if (rc != readCount) {
- // If we didn't fill the read buffer then time to stop reading
- break;
- }
-
- // Stop reading if we've overrun our timeslot
- if (Duration(readStartTime, AbsTime::now()) > threadMaxIoTimeNs) {
- break;
- }
-
- } else {
- // Put buffer back (at front so it doesn't interfere with unread buffers)
- bufferQueue.push_front(buff);
- assert(buff);
-
- // Eof or other side has gone away
- if (rc == 0 || errno == ECONNRESET) {
- eofCallback(*this);
- h.unwatchRead();
- break;
- } else if (errno == EAGAIN) {
- // We have just put a buffer back so we know
- // we can carry on watching for reads
- break;
- } else {
- // Report error then just treat as a socket disconnect
- QPID_LOG(error, "Error reading socket: " << getErrorString(PR_GetError()));
- eofCallback(*this);
- h.unwatchRead();
- break;
- }
- }
- } else {
- // Something to read but no buffer
- if (emptyCallback) {
- emptyCallback(*this);
- }
- // If we still have no buffers we can't do anything more
- if (bufferQueue.empty()) {
- h.unwatchRead();
- break;
- }
-
- }
- } while (true);
-
- ++threadReadCount;
- return;
-}
-
-/*
- * We carry on writing whilst we have data to write and we can write
- */
-void SslIO::writeable(DispatchHandle& h) {
- AbsTime writeStartTime = AbsTime::now();
- do {
- // See if we've got something to write
- if (!writeQueue.empty()) {
- // Write buffer
- BufferBase* buff = writeQueue.back();
- writeQueue.pop_back();
- errno = 0;
- assert(buff->dataStart+buff->dataCount <= buff->byteCount);
- int rc = socket.write(buff->bytes+buff->dataStart, buff->dataCount);
- if (rc >= 0) {
- threadWriteTotal += rc;
-
- // If we didn't write full buffer put rest back
- if (rc != buff->dataCount) {
- buff->dataStart += rc;
- buff->dataCount -= rc;
- writeQueue.push_back(buff);
- break;
- }
-
- // Recycle the buffer
- queueReadBuffer(buff);
-
- // Stop writing if we've overrun our timeslot
- if (Duration(writeStartTime, AbsTime::now()) > threadMaxIoTimeNs) {
- break;
- }
- } else {
- // Put buffer back
- writeQueue.push_back(buff);
- if (errno == ECONNRESET || errno == EPIPE) {
- // Just stop watching for write here - we'll get a
- // disconnect callback soon enough
- h.unwatchWrite();
- break;
- } else if (errno == EAGAIN) {
- // We have just put a buffer back so we know
- // we can carry on watching for writes
- break;
- } else {
- QPID_LOG(error, "Error writing to socket: " << getErrorString(PR_GetError()));
- h.unwatchWrite();
- break;
- }
- }
- } else {
- // If we're waiting to close the socket then can do it now as there is nothing to write
- if (queuedClose) {
- close(h);
- break;
- }
- // Fd is writable, but nothing to write
- if (idleCallback) {
- writePending = false;
- idleCallback(*this);
- }
- // If we still have no buffers to write we can't do anything more
- if (writeQueue.empty() && !writePending && !queuedClose) {
- h.unwatchWrite();
- // The following handles the case where writePending is
- // set to true after the test above; in this case its
- // possible that the unwatchWrite overwrites the
- // desired rewatchWrite so we correct that here
- if (writePending)
- h.rewatchWrite();
- break;
- }
- }
- } while (true);
-
- ++threadWriteCount;
- return;
-}
-
-void SslIO::disconnected(DispatchHandle& h) {
- // If we've already queued close do it instead of disconnected callback
- if (queuedClose) {
- close(h);
- } else if (disCallback) {
- disCallback(*this);
- h.unwatch();
- }
-}
-
-/*
- * Close the socket and callback to say we've done it
- */
-void SslIO::close(DispatchHandle& h) {
- h.stopWatch();
- socket.close();
- if (closedCallback) {
- closedCallback(*this, socket);
- }
-}
-
-SecuritySettings SslIO::getSecuritySettings() {
- SecuritySettings settings;
- settings.ssf = socket.getKeyLen();
- settings.authid = socket.getClientAuthId();
- return settings;
-}
-
-}}}
diff --git a/cpp/src/qpid/sys/ssl/SslIo.h b/cpp/src/qpid/sys/ssl/SslIo.h
deleted file mode 100644
index f3112bfa65..0000000000
--- a/cpp/src/qpid/sys/ssl/SslIo.h
+++ /dev/null
@@ -1,193 +0,0 @@
-#ifndef _sys_ssl_SslIO
-#define _sys_ssl_SslIO
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/sys/DispatchHandle.h"
-#include "qpid/sys/SecuritySettings.h"
-
-#include <boost/function.hpp>
-#include <boost/shared_array.hpp>
-#include <deque>
-
-namespace qpid {
-namespace sys {
-
-class Socket;
-
-namespace ssl {
-
-class SslSocket;
-
-/*
- * Asynchronous ssl acceptor: accepts connections then does a callback
- * with the accepted fd
- */
-template <class T>
-class SslAcceptorTmpl {
-public:
- typedef boost::function1<void, const Socket&> Callback;
-
-private:
- Callback acceptedCallback;
- qpid::sys::DispatchHandle handle;
- const T& socket;
-
-public:
- SslAcceptorTmpl(const T& s, Callback callback);
- ~SslAcceptorTmpl();
- void start(qpid::sys::Poller::shared_ptr poller);
-
-private:
- void readable(qpid::sys::DispatchHandle& handle);
-};
-
-/*
- * Asynchronous ssl connector: starts the process of initiating a
- * connection and invokes a callback when completed or failed.
- */
-class SslConnector : private qpid::sys::DispatchHandle {
-public:
- typedef boost::function1<void, const SslSocket&> ConnectedCallback;
- typedef boost::function2<void, int, std::string> FailedCallback;
-
-private:
- ConnectedCallback connCallback;
- FailedCallback failCallback;
- const SslSocket& socket;
-
-public:
- SslConnector(const SslSocket& socket,
- Poller::shared_ptr poller,
- std::string hostname,
- std::string port,
- ConnectedCallback connCb,
- FailedCallback failCb = 0);
-
-private:
- void connComplete(DispatchHandle& handle);
- void failure(int, std::string);
-};
-
-struct SslIOBufferBase {
- char* bytes;
- int32_t byteCount;
- int32_t dataStart;
- int32_t dataCount;
-
- SslIOBufferBase(char* const b, const int32_t s) :
- bytes(b),
- byteCount(s),
- dataStart(0),
- dataCount(0)
- {}
-
- virtual ~SslIOBufferBase()
- {}
-};
-
-/*
- * Asychronous reader/writer:
- * Reader accepts buffers to read into; reads into the provided buffers
- * and then does a callback with the buffer and amount read. Optionally it can callback
- * when there is something to read but no buffer to read it into.
- *
- * Writer accepts a buffer and queues it for writing; can also be given
- * a callback for when writing is "idle" (ie fd is writable, but nothing to write)
- *
- * The class is implemented in terms of DispatchHandle to allow it to be deleted by deleting
- * the contained DispatchHandle
- */
-class SslIO : private qpid::sys::DispatchHandle {
-public:
- typedef SslIOBufferBase BufferBase;
-
- typedef boost::function2<void, SslIO&, BufferBase*> ReadCallback;
- typedef boost::function1<void, SslIO&> EofCallback;
- typedef boost::function1<void, SslIO&> DisconnectCallback;
- typedef boost::function2<void, SslIO&, const SslSocket&> ClosedCallback;
- typedef boost::function1<void, SslIO&> BuffersEmptyCallback;
- typedef boost::function1<void, SslIO&> IdleCallback;
- typedef boost::function1<void, SslIO&> RequestCallback;
-
- SslIO(const SslSocket& s,
- ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb,
- ClosedCallback cCb = 0, BuffersEmptyCallback eCb = 0, IdleCallback iCb = 0);
-private:
- ReadCallback readCallback;
- EofCallback eofCallback;
- DisconnectCallback disCallback;
- ClosedCallback closedCallback;
- BuffersEmptyCallback emptyCallback;
- IdleCallback idleCallback;
- const SslSocket& socket;
- std::deque<BufferBase*> bufferQueue;
- std::deque<BufferBase*> writeQueue;
- std::vector<BufferBase> buffers;
- boost::shared_array<char> bufferMemory;
- bool queuedClose;
- /**
- * This flag is used to detect and handle concurrency between
- * calls to notifyPendingWrite() (which can be made from any thread) and
- * the execution of the writeable() method (which is always on the
- * thread processing this handle.
- */
- volatile bool writePending;
-
-public:
- /*
- * Size of IO buffers - this is the maximum possible frame size + 1
- */
- const static uint32_t MaxBufferSize = 65536;
-
- /*
- * Number of IO buffers allocated - I think the code can only use 2 -
- * 1 for reading and 1 for writing, allocate 4 for safety
- */
- const static uint32_t BufferCount = 4;
-
- void queueForDeletion();
-
- void start(qpid::sys::Poller::shared_ptr poller);
- void createBuffers(uint32_t size = MaxBufferSize);
- void queueReadBuffer(BufferBase* buff);
- void unread(BufferBase* buff);
- void queueWrite(BufferBase* buff);
- void notifyPendingWrite();
- void queueWriteClose();
- bool writeQueueEmpty() { return writeQueue.empty(); }
- void requestCallback(RequestCallback);
- BufferBase* getQueuedBuffer();
-
- qpid::sys::SecuritySettings getSecuritySettings();
-
-private:
- ~SslIO();
- void readable(qpid::sys::DispatchHandle& handle);
- void writeable(qpid::sys::DispatchHandle& handle);
- void disconnected(qpid::sys::DispatchHandle& handle);
- void requestedCall(RequestCallback);
- void close(qpid::sys::DispatchHandle& handle);
-};
-
-}}}
-
-#endif // _sys_ssl_SslIO
diff --git a/cpp/src/qpid/sys/ssl/SslSocket.cpp b/cpp/src/qpid/sys/ssl/SslSocket.cpp
index 30234bb686..a328e49c13 100644
--- a/cpp/src/qpid/sys/ssl/SslSocket.cpp
+++ b/cpp/src/qpid/sys/ssl/SslSocket.cpp
@@ -20,6 +20,7 @@
*/
#include "qpid/sys/ssl/SslSocket.h"
+#include "qpid/sys/SocketAddress.h"
#include "qpid/sys/ssl/check.h"
#include "qpid/sys/ssl/util.h"
#include "qpid/Exception.h"
@@ -52,28 +53,6 @@ namespace sys {
namespace ssl {
namespace {
-std::string getService(int fd, bool local)
-{
- ::sockaddr_storage name; // big enough for any socket address
- ::socklen_t namelen = sizeof(name);
-
- int result = -1;
- if (local) {
- result = ::getsockname(fd, (::sockaddr*)&name, &namelen);
- } else {
- result = ::getpeername(fd, (::sockaddr*)&name, &namelen);
- }
-
- QPID_POSIX_CHECK(result);
-
- char servName[NI_MAXSERV];
- if (int rc=::getnameinfo((::sockaddr*)&name, namelen, 0, 0,
- servName, sizeof(servName),
- NI_NUMERICHOST | NI_NUMERICSERV) != 0)
- throw QPID_POSIX_ERROR(rc);
- return servName;
-}
-
const std::string DOMAIN_SEPARATOR("@");
const std::string DC_SEPARATOR(".");
const std::string DC("DC");
@@ -101,14 +80,18 @@ std::string getDomainFromSubject(std::string subject)
}
return domain;
}
-
}
-SslSocket::SslSocket() : socket(0), prototype(0)
+SslSocket::SslSocket(const std::string& certName, bool clientAuth) :
+ nssSocket(0), certname(certName), prototype(0)
{
- impl->fd = ::socket (PF_INET, SOCK_STREAM, 0);
- if (impl->fd < 0) throw QPID_POSIX_ERROR(errno);
- socket = SSL_ImportFD(0, PR_ImportTCPSocket(impl->fd));
+ //configure prototype socket:
+ prototype = SSL_ImportFD(0, PR_NewTCPSocket());
+
+ if (clientAuth) {
+ NSS_CHECK(SSL_OptionSet(prototype, SSL_REQUEST_CERTIFICATE, PR_TRUE));
+ NSS_CHECK(SSL_OptionSet(prototype, SSL_REQUIRE_CERTIFICATE, PR_TRUE));
+ }
}
/**
@@ -116,25 +99,44 @@ SslSocket::SslSocket() : socket(0), prototype(0)
* returned from accept. Because we use posix accept rather than
* PR_Accept, we have to reset the handshake.
*/
-SslSocket::SslSocket(IOHandlePrivate* ioph, PRFileDesc* model) : Socket(ioph), socket(0), prototype(0)
+SslSocket::SslSocket(int fd, PRFileDesc* model) : BSDSocket(fd), nssSocket(0), prototype(0)
{
- socket = SSL_ImportFD(model, PR_ImportTCPSocket(impl->fd));
- NSS_CHECK(SSL_ResetHandshake(socket, true));
+ nssSocket = SSL_ImportFD(model, PR_ImportTCPSocket(fd));
+ NSS_CHECK(SSL_ResetHandshake(nssSocket, PR_TRUE));
}
void SslSocket::setNonblocking() const
{
+ if (!nssSocket) {
+ BSDSocket::setNonblocking();
+ return;
+ }
PRSocketOptionData option;
option.option = PR_SockOpt_Nonblocking;
option.value.non_blocking = true;
- PR_SetSocketOption(socket, &option);
+ PR_SetSocketOption(nssSocket, &option);
}
-void SslSocket::connect(const std::string& host, const std::string& port) const
+void SslSocket::setTcpNoDelay() const
{
- std::stringstream namestream;
- namestream << host << ":" << port;
- connectname = namestream.str();
+ if (!nssSocket) {
+ BSDSocket::setTcpNoDelay();
+ return;
+ }
+ PRSocketOptionData option;
+ option.option = PR_SockOpt_NoDelay;
+ option.value.no_delay = true;
+ PR_SetSocketOption(nssSocket, &option);
+}
+
+void SslSocket::connect(const SocketAddress& addr) const
+{
+ BSDSocket::connect(addr);
+}
+
+void SslSocket::finishConnect(const SocketAddress& addr) const
+{
+ nssSocket = SSL_ImportFD(0, PR_ImportTCPSocket(fd));
void* arg;
// Use the connection's cert-name if it has one; else use global cert-name
@@ -145,75 +147,48 @@ void SslSocket::connect(const std::string& host, const std::string& port) const
} else {
arg = const_cast<char*>(SslOptions::global.certName.c_str());
}
- NSS_CHECK(SSL_GetClientAuthDataHook(socket, NSS_GetClientAuthData, arg));
- NSS_CHECK(SSL_SetURL(socket, host.data()));
-
- char hostBuffer[PR_NETDB_BUF_SIZE];
- PRHostEnt hostEntry;
- PR_CHECK(PR_GetHostByName(host.data(), hostBuffer, PR_NETDB_BUF_SIZE, &hostEntry));
- PRNetAddr address;
- int value = PR_EnumerateHostEnt(0, &hostEntry, boost::lexical_cast<PRUint16>(port), &address);
- if (value < 0) {
- throw Exception(QPID_MSG("Error getting address for host: " << ErrorString()));
- } else if (value == 0) {
- throw Exception(QPID_MSG("Could not resolve address for host."));
- }
- PR_CHECK(PR_Connect(socket, &address, PR_INTERVAL_NO_TIMEOUT));
- NSS_CHECK(SSL_ForceHandshake(socket));
+ NSS_CHECK(SSL_GetClientAuthDataHook(nssSocket, NSS_GetClientAuthData, arg));
+
+ url = addr.getHost();
+ NSS_CHECK(SSL_SetURL(nssSocket, url.data()));
+
+ NSS_CHECK(SSL_ResetHandshake(nssSocket, PR_FALSE));
+ NSS_CHECK(SSL_ForceHandshake(nssSocket));
}
void SslSocket::close() const
{
- if (impl->fd > 0) {
- PR_Close(socket);
- impl->fd = -1;
+ if (!nssSocket) {
+ BSDSocket::close();
+ return;
+ }
+ if (fd > 0) {
+ PR_Close(nssSocket);
+ fd = -1;
}
}
-int SslSocket::listen(uint16_t port, int backlog, const std::string& certName, bool clientAuth) const
+int SslSocket::listen(const SocketAddress& sa, int backlog) const
{
- //configure prototype socket:
- prototype = SSL_ImportFD(0, PR_NewTCPSocket());
- if (clientAuth) {
- NSS_CHECK(SSL_OptionSet(prototype, SSL_REQUEST_CERTIFICATE, PR_TRUE));
- NSS_CHECK(SSL_OptionSet(prototype, SSL_REQUIRE_CERTIFICATE, PR_TRUE));
- }
-
//get certificate and key (is this the correct way?)
- CERTCertificate *cert = PK11_FindCertFromNickname(const_cast<char*>(certName.c_str()), 0);
- if (!cert) throw Exception(QPID_MSG("Failed to load certificate '" << certName << "'"));
+ std::string cName( (certname == "") ? "localhost.localdomain" : certname);
+ CERTCertificate *cert = PK11_FindCertFromNickname(const_cast<char*>(cName.c_str()), 0);
+ if (!cert) throw Exception(QPID_MSG("Failed to load certificate '" << cName << "'"));
SECKEYPrivateKey *key = PK11_FindKeyByAnyCert(cert, 0);
if (!key) throw Exception(QPID_MSG("Failed to retrieve private key from certificate"));
NSS_CHECK(SSL_ConfigSecureServer(prototype, cert, key, NSS_FindCertKEAType(cert)));
SECKEY_DestroyPrivateKey(key);
CERT_DestroyCertificate(cert);
- //bind and listen
- const int& socket = impl->fd;
- int yes=1;
- QPID_POSIX_CHECK(setsockopt(socket,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(yes)));
- struct sockaddr_in name;
- name.sin_family = AF_INET;
- name.sin_port = htons(port);
- name.sin_addr.s_addr = 0;
- if (::bind(socket, (struct sockaddr*)&name, sizeof(name)) < 0)
- throw Exception(QPID_MSG("Can't bind to port " << port << ": " << strError(errno)));
- if (::listen(socket, backlog) < 0)
- throw Exception(QPID_MSG("Can't listen on port " << port << ": " << strError(errno)));
-
- socklen_t namelen = sizeof(name);
- if (::getsockname(socket, (struct sockaddr*)&name, &namelen) < 0)
- throw QPID_POSIX_ERROR(errno);
-
- return ntohs(name.sin_port);
+ return BSDSocket::listen(sa, backlog);
}
-SslSocket* SslSocket::accept() const
+Socket* SslSocket::accept() const
{
QPID_LOG(trace, "Accepting SSL connection.");
- int afd = ::accept(impl->fd, 0, 0);
+ int afd = ::accept(fd, 0, 0);
if ( afd >= 0) {
- return new SslSocket(new IOHandlePrivate(afd), prototype);
+ return new SslSocket(afd, prototype);
} else if (errno == EAGAIN) {
return 0;
} else {
@@ -297,17 +272,22 @@ static bool isSslStream(int afd) {
return isSSL2Handshake || isSSL3Handshake;
}
+SslMuxSocket::SslMuxSocket(const std::string& certName, bool clientAuth) :
+ SslSocket(certName, clientAuth)
+{
+}
+
Socket* SslMuxSocket::accept() const
{
- int afd = ::accept(impl->fd, 0, 0);
+ int afd = ::accept(fd, 0, 0);
if (afd >= 0) {
QPID_LOG(trace, "Accepting connection with optional SSL wrapper.");
if (isSslStream(afd)) {
QPID_LOG(trace, "Accepted SSL connection.");
- return new SslSocket(new IOHandlePrivate(afd), prototype);
+ return new SslSocket(afd, prototype);
} else {
QPID_LOG(trace, "Accepted Plaintext connection.");
- return new Socket(new IOHandlePrivate(afd));
+ return new BSDSocket(afd);
}
} else if (errno == EAGAIN) {
return 0;
@@ -318,32 +298,12 @@ Socket* SslMuxSocket::accept() const
int SslSocket::read(void *buf, size_t count) const
{
- return PR_Read(socket, buf, count);
+ return PR_Read(nssSocket, buf, count);
}
int SslSocket::write(const void *buf, size_t count) const
{
- return PR_Write(socket, buf, count);
-}
-
-uint16_t SslSocket::getLocalPort() const
-{
- return std::atoi(getService(impl->fd, true).c_str());
-}
-
-uint16_t SslSocket::getRemotePort() const
-{
- return atoi(getService(impl->fd, true).c_str());
-}
-
-void SslSocket::setTcpNoDelay(bool nodelay) const
-{
- if (nodelay) {
- PRSocketOptionData option;
- option.option = PR_SockOpt_NoDelay;
- option.value.no_delay = true;
- PR_SetSocketOption(socket, &option);
- }
+ return PR_Write(nssSocket, buf, count);
}
void SslSocket::setCertName(const std::string& name)
@@ -359,7 +319,7 @@ int SslSocket::getKeyLen() const
int keySize = 0;
SECStatus rc;
- rc = SSL_SecurityStatus( socket,
+ rc = SSL_SecurityStatus( nssSocket,
&enabled,
NULL,
NULL,
@@ -374,7 +334,7 @@ int SslSocket::getKeyLen() const
std::string SslSocket::getClientAuthId() const
{
std::string authId;
- CERTCertificate* cert = SSL_PeerCertificate(socket);
+ CERTCertificate* cert = SSL_PeerCertificate(nssSocket);
if (cert) {
authId = CERT_GetCommonName(&(cert->subject));
/*
diff --git a/cpp/src/qpid/sys/ssl/SslSocket.h b/cpp/src/qpid/sys/ssl/SslSocket.h
index eabadcbe23..fc97059cfd 100644
--- a/cpp/src/qpid/sys/ssl/SslSocket.h
+++ b/cpp/src/qpid/sys/ssl/SslSocket.h
@@ -23,7 +23,7 @@
*/
#include "qpid/sys/IOHandle.h"
-#include "qpid/sys/Socket.h"
+#include "qpid/sys/posix/BSDSocket.h"
#include <nspr.h>
#include <string>
@@ -37,55 +37,54 @@ class Duration;
namespace ssl {
-class SslSocket : public qpid::sys::Socket
+class SslSocket : public qpid::sys::BSDSocket
{
public:
- /** Create a socket wrapper for descriptor. */
- SslSocket();
+ /** Create a socket wrapper for descriptor.
+ *@param certName name of certificate to use to identify the socket
+ */
+ SslSocket(const std::string& certName = "", bool clientAuth = false);
/** Set socket non blocking */
void setNonblocking() const;
/** Set tcp-nodelay */
- void setTcpNoDelay(bool nodelay) const;
+ void setTcpNoDelay() const;
/** Set SSL cert-name. Allows the cert-name to be set per
* connection, overriding global cert-name settings from
* NSSInit().*/
void setCertName(const std::string& certName);
- void connect(const std::string& host, const std::string& port) const;
+ void connect(const SocketAddress&) const;
+ void finishConnect(const SocketAddress&) const;
void close() const;
/** Bind to a port and start listening.
*@param port 0 means choose an available port.
*@param backlog maximum number of pending connections.
- *@param certName name of certificate to use to identify the server
*@return The bound port.
*/
- int listen(uint16_t port = 0, int backlog = 10, const std::string& certName = "localhost.localdomain", bool clientAuth = false) const;
+ int listen(const SocketAddress&, int backlog = 10) const;
/**
* Accept a connection from a socket that is already listening
* and has an incoming connection
*/
- SslSocket* accept() const;
+ virtual Socket* accept() const;
// TODO The following are raw operations, maybe they need better wrapping?
int read(void *buf, size_t count) const;
int write(const void *buf, size_t count) const;
- uint16_t getLocalPort() const;
- uint16_t getRemotePort() const;
-
int getKeyLen() const;
std::string getClientAuthId() const;
protected:
- mutable std::string connectname;
- mutable PRFileDesc* socket;
+ mutable PRFileDesc* nssSocket;
std::string certname;
+ mutable std::string url;
/**
* 'model' socket, with configuration to use when importing
@@ -94,13 +93,14 @@ protected:
*/
mutable PRFileDesc* prototype;
- SslSocket(IOHandlePrivate* ioph, PRFileDesc* model);
- friend class SslMuxSocket;
+ SslSocket(int fd, PRFileDesc* model);
+ friend class SslMuxSocket; // Needed for this constructor
};
class SslMuxSocket : public SslSocket
{
public:
+ SslMuxSocket(const std::string& certName = "", bool clientAuth = false);
Socket* accept() const;
};
diff --git a/cpp/src/qpid/sys/ssl/util.cpp b/cpp/src/qpid/sys/ssl/util.cpp
index 3078e894df..de5d638b09 100644
--- a/cpp/src/qpid/sys/ssl/util.cpp
+++ b/cpp/src/qpid/sys/ssl/util.cpp
@@ -31,8 +31,6 @@
#include <iostream>
#include <fstream>
-#include <boost/filesystem/operations.hpp>
-#include <boost/filesystem/path.hpp>
namespace qpid {
namespace sys {
@@ -82,15 +80,14 @@ SslOptions SslOptions::global;
char* readPasswordFromFile(PK11SlotInfo*, PRBool retry, void*)
{
const std::string& passwordFile = SslOptions::global.certPasswordFile;
- if (retry || passwordFile.empty() || !boost::filesystem::exists(passwordFile)) {
- return 0;
- } else {
- std::ifstream file(passwordFile.c_str());
- std::string password;
- file >> password;
- return PL_strdup(password.c_str());
- }
-}
+ if (retry || passwordFile.empty()) return 0;
+ std::ifstream file(passwordFile.c_str());
+ if (!file) return 0;
+
+ std::string password;
+ file >> password;
+ return PL_strdup(password.c_str());
+}
void initNSS(const SslOptions& options, bool server)
{