summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/ssl
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys/ssl')
-rw-r--r--cpp/src/qpid/sys/ssl/SslHandler.cpp220
-rw-r--r--cpp/src/qpid/sys/ssl/SslHandler.h85
-rw-r--r--cpp/src/qpid/sys/ssl/SslIo.cpp463
-rw-r--r--cpp/src/qpid/sys/ssl/SslIo.h159
-rw-r--r--cpp/src/qpid/sys/ssl/SslSocket.cpp10
-rw-r--r--cpp/src/qpid/sys/ssl/SslSocket.h1
6 files changed, 8 insertions, 930 deletions
diff --git a/cpp/src/qpid/sys/ssl/SslHandler.cpp b/cpp/src/qpid/sys/ssl/SslHandler.cpp
deleted file mode 100644
index f6605da953..0000000000
--- a/cpp/src/qpid/sys/ssl/SslHandler.cpp
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "qpid/sys/ssl/SslHandler.h"
-#include "qpid/sys/ssl/SslIo.h"
-#include "qpid/sys/ssl/SslSocket.h"
-#include "qpid/sys/Timer.h"
-#include "qpid/framing/AMQP_HighestVersion.h"
-#include "qpid/framing/ProtocolInitiation.h"
-#include "qpid/log/Statement.h"
-
-#include <boost/bind.hpp>
-
-namespace qpid {
-namespace sys {
-namespace ssl {
-
-
-struct ProtocolTimeoutTask : public sys::TimerTask {
- SslHandler& handler;
- std::string id;
-
- ProtocolTimeoutTask(const std::string& i, const Duration& timeout, SslHandler& h) :
- TimerTask(timeout, "ProtocolTimeout"),
- handler(h),
- id(i)
- {}
-
- void fire() {
- // If this fires it means that we didn't negotiate the connection in the timeout period
- // Schedule closing the connection for the io thread
- QPID_LOG(error, "Connection " << id << " No protocol received closing");
- handler.abort();
- }
-};
-
-SslHandler::SslHandler(std::string id, ConnectionCodec::Factory* f, bool _nodict) :
- identifier(id),
- aio(0),
- factory(f),
- codec(0),
- readError(false),
- isClient(false),
- nodict(_nodict)
-{}
-
-SslHandler::~SslHandler() {
- if (codec)
- codec->closed();
- if (timeoutTimerTask)
- timeoutTimerTask->cancel();
- delete codec;
-}
-
-void SslHandler::init(AsynchIO* a, Timer& timer, uint32_t maxTime) {
- aio = a;
-
- // Start timer for this connection
- timeoutTimerTask = new ProtocolTimeoutTask(identifier, maxTime*TIME_MSEC, *this);
- timer.add(timeoutTimerTask);
-
- // Give connection some buffers to use
- aio->createBuffers();
-}
-
-void SslHandler::write(const framing::ProtocolInitiation& data)
-{
- QPID_LOG(debug, "SENT [" << identifier << "]: INIT(" << data << ")");
- AsynchIOBufferBase* buff = aio->getQueuedBuffer();
- assert(buff);
- framing::Buffer out(buff->bytes, buff->byteCount);
- data.encode(out);
- buff->dataCount = data.encodedSize();
- aio->queueWrite(buff);
-}
-
-void SslHandler::abort() {
- // Don't disconnect if we're already disconnecting
- if (!readError) {
- aio->requestCallback(boost::bind(&SslHandler::eof, this, _1));
- }
-}
-void SslHandler::activateOutput() {
- aio->notifyPendingWrite();
-}
-
-void SslHandler::giveReadCredit(int32_t) {
- // FIXME aconway 2008-12-05: not yet implemented.
-}
-
-// Input side
-void SslHandler::readbuff(AsynchIO& , AsynchIOBufferBase* buff) {
- if (readError) {
- return;
- }
- size_t decoded = 0;
- if (codec) { // Already initiated
- try {
- decoded = codec->decode(buff->bytes+buff->dataStart, buff->dataCount);
- }catch(const std::exception& e){
- QPID_LOG(error, e.what());
- readError = true;
- aio->queueWriteClose();
- }
- }else{
- framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
- framing::ProtocolInitiation protocolInit;
- if (protocolInit.decode(in)) {
- // We've just got the protocol negotiation so we can cancel the timeout for that
- timeoutTimerTask->cancel();
-
- decoded = in.getPosition();
- QPID_LOG(debug, "RECV [" << identifier << "]: INIT(" << protocolInit << ")");
- try {
- codec = factory->create(protocolInit.getVersion(), *this, identifier, getSecuritySettings(aio));
- if (!codec) {
- //TODO: may still want to revise this...
- //send valid version header & close connection.
- write(framing::ProtocolInitiation(framing::highestProtocolVersion));
- readError = true;
- aio->queueWriteClose();
- } else {
- //read any further data that may already have been sent
- decoded += codec->decode(buff->bytes+buff->dataStart+in.getPosition(), buff->dataCount-in.getPosition());
- }
- } catch (const std::exception& e) {
- QPID_LOG(error, e.what());
- readError = true;
- aio->queueWriteClose();
- }
- }
- }
- // TODO: unreading needs to go away, and when we can cope
- // with multiple sub-buffers in the general buffer scheme, it will
- if (decoded != size_t(buff->dataCount)) {
- // Adjust buffer for used bytes and then "unread them"
- buff->dataStart += decoded;
- buff->dataCount -= decoded;
- aio->unread(buff);
- } else {
- // Give whole buffer back to aio subsystem
- aio->queueReadBuffer(buff);
- }
-}
-
-void SslHandler::eof(AsynchIO&) {
- QPID_LOG(debug, "DISCONNECTED [" << identifier << "]");
- if (codec) codec->closed();
- aio->queueWriteClose();
-}
-
-void SslHandler::closedSocket(AsynchIO&, const Socket& s) {
- // If we closed with data still to send log a warning
- if (!aio->writeQueueEmpty()) {
- QPID_LOG(warning, "CLOSING [" << identifier << "] unsent data (probably due to client disconnect)");
- }
- delete &s;
- aio->queueForDeletion();
- delete this;
-}
-
-void SslHandler::disconnect(AsynchIO& a) {
- // treat the same as eof
- eof(a);
-}
-
-// Notifications
-void SslHandler::nobuffs(AsynchIO&) {
-}
-
-void SslHandler::idle(AsynchIO&){
- if (isClient && codec == 0) {
- codec = factory->create(*this, identifier, getSecuritySettings(aio));
- write(framing::ProtocolInitiation(codec->getVersion()));
- // We've just sent the protocol negotiation so we can cancel the timeout for that
- // This is not ideal, because we've not received anything yet, but heartbeats will
- // be active soon
- timeoutTimerTask->cancel();
- return;
- }
- if (codec == 0) return;
- if (!codec->canEncode()) {
- return;
- }
- AsynchIOBufferBase* buff = aio->getQueuedBuffer();
- if (buff) {
- size_t encoded=codec->encode(buff->bytes, buff->byteCount);
- buff->dataCount = encoded;
- aio->queueWrite(buff);
- }
- if (codec->isClosed())
- aio->queueWriteClose();
-}
-
-SecuritySettings SslHandler::getSecuritySettings(AsynchIO* aio)
-{
- SecuritySettings settings = aio->getSecuritySettings();
- settings.nodict = nodict;
- return settings;
-}
-
-
-}}} // namespace qpid::sys::ssl
diff --git a/cpp/src/qpid/sys/ssl/SslHandler.h b/cpp/src/qpid/sys/ssl/SslHandler.h
deleted file mode 100644
index aedfea1888..0000000000
--- a/cpp/src/qpid/sys/ssl/SslHandler.h
+++ /dev/null
@@ -1,85 +0,0 @@
-#ifndef QPID_SYS_SSL_SSLHANDLER_H
-#define QPID_SYS_SSL_SSLHANDLER_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/sys/ConnectionCodec.h"
-#include "qpid/sys/OutputControl.h"
-#include "qpid/sys/SecuritySettings.h"
-
-#include <boost/intrusive_ptr.hpp>
-
-namespace qpid {
-
-namespace framing {
- class ProtocolInitiation;
-}
-
-namespace sys {
-
-class AsynchIO;
-struct AsynchIOBufferBase;
-class Socket;
-class Timer;
-class TimerTask;
-
-namespace ssl {
-
-class SslHandler : public OutputControl {
- std::string identifier;
- AsynchIO* aio;
- ConnectionCodec::Factory* factory;
- ConnectionCodec* codec;
- bool readError;
- bool isClient;
- bool nodict;
- boost::intrusive_ptr<sys::TimerTask> timeoutTimerTask;
-
- void write(const framing::ProtocolInitiation&);
- qpid::sys::SecuritySettings getSecuritySettings(AsynchIO* aio);
-
- public:
- SslHandler(std::string id, ConnectionCodec::Factory* f, bool nodict);
- ~SslHandler();
- void init(AsynchIO* a, Timer& timer, uint32_t maxTime);
-
- void setClient() { isClient = true; }
-
- // Output side
- void abort();
- void activateOutput();
- void giveReadCredit(int32_t);
-
- // Input side
- void readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase* buff);
- void eof(qpid::sys::AsynchIO&);
- void disconnect(qpid::sys::AsynchIO& a);
-
- // Notifications
- void nobuffs(qpid::sys::AsynchIO&);
- void idle(qpid::sys::AsynchIO&);
- void closedSocket(qpid::sys::AsynchIO&, const qpid::sys::Socket& s);
-};
-
-}}} // namespace qpid::sys::ssl
-
-#endif /*!QPID_SYS_SSL_SSLHANDLER_H*/
diff --git a/cpp/src/qpid/sys/ssl/SslIo.cpp b/cpp/src/qpid/sys/ssl/SslIo.cpp
deleted file mode 100644
index 92e51a2234..0000000000
--- a/cpp/src/qpid/sys/ssl/SslIo.cpp
+++ /dev/null
@@ -1,463 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/sys/ssl/SslIo.h"
-#include "qpid/sys/ssl/SslSocket.h"
-#include "qpid/sys/ssl/check.h"
-
-#include "qpid/sys/Time.h"
-#include "qpid/sys/posix/check.h"
-#include "qpid/log/Statement.h"
-
-// TODO The basic algorithm here is not really POSIX specific and with a bit more abstraction
-// could (should) be promoted to be platform portable
-#include <unistd.h>
-#include <sys/socket.h>
-#include <signal.h>
-#include <errno.h>
-#include <string.h>
-
-#include <boost/bind.hpp>
-
-namespace qpid {
-namespace sys {
-namespace ssl {
-
-namespace {
-
-/*
- * Make *process* not generate SIGPIPE when writing to closed
- * pipe/socket (necessary as default action is to terminate process)
- */
-void ignoreSigpipe() {
- ::signal(SIGPIPE, SIG_IGN);
-}
-
-/*
- * We keep per thread state to avoid locking overhead. The assumption is that
- * on average all the connections are serviced by all the threads so the state
- * recorded in each thread is about the same. If this turns out not to be the
- * case we could rebalance the info occasionally.
- */
-__thread int threadReadTotal = 0;
-__thread int threadReadCount = 0;
-__thread int threadWriteTotal = 0;
-__thread int threadWriteCount = 0;
-__thread int64_t threadMaxIoTimeNs = 2 * 1000000; // start at 2ms
-}
-
-/*
- * Asynch Acceptor
- */
-
-SslAcceptor::SslAcceptor(const Socket& s, Callback callback) :
- acceptedCallback(callback),
- handle(s, boost::bind(&SslAcceptor::readable, this, _1), 0, 0),
- socket(s) {
-
- s.setNonblocking();
- ignoreSigpipe();
-}
-
-SslAcceptor::~SslAcceptor()
-{
- handle.stopWatch();
-}
-
-void SslAcceptor::start(Poller::shared_ptr poller) {
- handle.startWatch(poller);
-}
-
-/*
- * We keep on accepting as long as there is something to accept
- */
-void SslAcceptor::readable(DispatchHandle& h) {
- Socket* s;
- do {
- errno = 0;
- // TODO: Currently we ignore the peers address, perhaps we should
- // log it or use it for connection acceptance.
- try {
- s = socket.accept();
- if (s) {
- acceptedCallback(*s);
- } else {
- break;
- }
- } catch (const std::exception& e) {
- QPID_LOG(error, "Could not accept socket: " << e.what());
- }
- } while (true);
-
- h.rewatch();
-}
-
-/*
- * Asynch Connector
- */
-
-SslConnector::SslConnector(const SslSocket& s,
- Poller::shared_ptr poller,
- std::string hostname,
- std::string port,
- ConnectedCallback connCb,
- FailedCallback failCb) :
- DispatchHandle(s,
- 0,
- boost::bind(&SslConnector::connComplete, this, _1),
- boost::bind(&SslConnector::connComplete, this, _1)),
- connCallback(connCb),
- failCallback(failCb),
- socket(s),
- sa(hostname, port)
-{
- //TODO: would be better for connect to be performed on a
- //non-blocking socket, but that doesn't work at present so connect
- //blocks until complete
- try {
- socket.connect(sa);
- socket.setNonblocking();
- startWatch(poller);
- } catch(std::exception& e) {
- failure(-1, std::string(e.what()));
- }
-}
-
-void SslConnector::connComplete(DispatchHandle& h)
-{
- int errCode = socket.getError();
-
- h.stopWatch();
- if (errCode == 0) {
- connCallback(socket);
- DispatchHandle::doDelete();
- } else {
- // TODO: This need to be fixed as strerror isn't thread safe
- failure(errCode, std::string(::strerror(errCode)));
- }
-}
-
-void SslConnector::failure(int errCode, std::string message)
-{
- if (failCallback)
- failCallback(errCode, message);
-
- socket.close();
- delete &socket;
-
- DispatchHandle::doDelete();
-}
-
-/*
- * Asynch reader/writer
- */
-SslIO::SslIO(const SslSocket& s,
- ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb,
- ClosedCallback cCb, BuffersEmptyCallback eCb, IdleCallback iCb) :
-
- DispatchHandle(s,
- boost::bind(&SslIO::readable, this, _1),
- boost::bind(&SslIO::writeable, this, _1),
- boost::bind(&SslIO::disconnected, this, _1)),
- readCallback(rCb),
- eofCallback(eofCb),
- disCallback(disCb),
- closedCallback(cCb),
- emptyCallback(eCb),
- idleCallback(iCb),
- socket(s),
- queuedClose(false),
- writePending(false) {
-
- s.setNonblocking();
-}
-
-SslIO::~SslIO() {
-}
-
-void SslIO::queueForDeletion() {
- DispatchHandle::doDelete();
-}
-
-void SslIO::start(Poller::shared_ptr poller) {
- DispatchHandle::startWatch(poller);
-}
-
-void SslIO::createBuffers(uint32_t size) {
- // Allocate all the buffer memory at once
- bufferMemory.reset(new char[size*BufferCount]);
-
- // Create the Buffer structs in a vector
- // And push into the buffer queue
- buffers.reserve(BufferCount);
- for (uint32_t i = 0; i < BufferCount; i++) {
- buffers.push_back(BufferBase(&bufferMemory[i*size], size));
- queueReadBuffer(&buffers[i]);
- }
-}
-
-void SslIO::queueReadBuffer(BufferBase* buff) {
- assert(buff);
- buff->dataStart = 0;
- buff->dataCount = 0;
- bufferQueue.push_back(buff);
- DispatchHandle::rewatchRead();
-}
-
-void SslIO::unread(BufferBase* buff) {
- assert(buff);
- if (buff->dataStart != 0) {
- memmove(buff->bytes, buff->bytes+buff->dataStart, buff->dataCount);
- buff->dataStart = 0;
- }
- bufferQueue.push_front(buff);
- DispatchHandle::rewatchRead();
-}
-
-void SslIO::queueWrite(BufferBase* buff) {
- assert(buff);
- // If we've already closed the socket then throw the write away
- if (queuedClose) {
- bufferQueue.push_front(buff);
- return;
- } else {
- writeQueue.push_front(buff);
- }
- writePending = false;
- DispatchHandle::rewatchWrite();
-}
-
-void SslIO::notifyPendingWrite() {
- writePending = true;
- DispatchHandle::rewatchWrite();
-}
-
-void SslIO::queueWriteClose() {
- queuedClose = true;
- DispatchHandle::rewatchWrite();
-}
-
-void SslIO::requestCallback(RequestCallback callback) {
- // TODO creating a function object every time isn't all that
- // efficient - if this becomes heavily used do something better (what?)
- assert(callback);
- DispatchHandle::call(boost::bind(&SslIO::requestedCall, this, callback));
-}
-
-void SslIO::requestedCall(RequestCallback callback) {
- assert(callback);
- callback(*this);
-}
-
-/** Return a queued buffer if there are enough
- * to spare
- */
-SslIO::BufferBase* SslIO::getQueuedBuffer() {
- // Always keep at least one buffer (it might have data that was "unread" in it)
- if (bufferQueue.size()<=1)
- return 0;
- BufferBase* buff = bufferQueue.back();
- assert(buff);
- buff->dataStart = 0;
- buff->dataCount = 0;
- bufferQueue.pop_back();
- return buff;
-}
-
-/*
- * We keep on reading as long as we have something to read and a buffer to put
- * it in
- */
-void SslIO::readable(DispatchHandle& h) {
- AbsTime readStartTime = AbsTime::now();
- do {
- // (Try to) get a buffer
- if (!bufferQueue.empty()) {
- // Read into buffer
- BufferBase* buff = bufferQueue.front();
- assert(buff);
- bufferQueue.pop_front();
- errno = 0;
- int readCount = buff->byteCount-buff->dataCount;
- int rc = socket.read(buff->bytes + buff->dataCount, readCount);
- if (rc > 0) {
- buff->dataCount += rc;
- threadReadTotal += rc;
-
- readCallback(*this, buff);
- if (rc != readCount) {
- // If we didn't fill the read buffer then time to stop reading
- break;
- }
-
- // Stop reading if we've overrun our timeslot
- if (Duration(readStartTime, AbsTime::now()) > threadMaxIoTimeNs) {
- break;
- }
-
- } else {
- // Put buffer back (at front so it doesn't interfere with unread buffers)
- bufferQueue.push_front(buff);
- assert(buff);
-
- // Eof or other side has gone away
- if (rc == 0 || errno == ECONNRESET) {
- eofCallback(*this);
- h.unwatchRead();
- break;
- } else if (errno == EAGAIN) {
- // We have just put a buffer back so we know
- // we can carry on watching for reads
- break;
- } else {
- // Report error then just treat as a socket disconnect
- QPID_LOG(error, "Error reading socket: " << getErrorString(PR_GetError()));
- eofCallback(*this);
- h.unwatchRead();
- break;
- }
- }
- } else {
- // Something to read but no buffer
- if (emptyCallback) {
- emptyCallback(*this);
- }
- // If we still have no buffers we can't do anything more
- if (bufferQueue.empty()) {
- h.unwatchRead();
- break;
- }
-
- }
- } while (true);
-
- ++threadReadCount;
- return;
-}
-
-/*
- * We carry on writing whilst we have data to write and we can write
- */
-void SslIO::writeable(DispatchHandle& h) {
- AbsTime writeStartTime = AbsTime::now();
- do {
- // See if we've got something to write
- if (!writeQueue.empty()) {
- // Write buffer
- BufferBase* buff = writeQueue.back();
- writeQueue.pop_back();
- errno = 0;
- assert(buff->dataStart+buff->dataCount <= buff->byteCount);
- int rc = socket.write(buff->bytes+buff->dataStart, buff->dataCount);
- if (rc >= 0) {
- threadWriteTotal += rc;
-
- // If we didn't write full buffer put rest back
- if (rc != buff->dataCount) {
- buff->dataStart += rc;
- buff->dataCount -= rc;
- writeQueue.push_back(buff);
- break;
- }
-
- // Recycle the buffer
- queueReadBuffer(buff);
-
- // Stop writing if we've overrun our timeslot
- if (Duration(writeStartTime, AbsTime::now()) > threadMaxIoTimeNs) {
- break;
- }
- } else {
- // Put buffer back
- writeQueue.push_back(buff);
- if (errno == ECONNRESET || errno == EPIPE) {
- // Just stop watching for write here - we'll get a
- // disconnect callback soon enough
- h.unwatchWrite();
- break;
- } else if (errno == EAGAIN) {
- // We have just put a buffer back so we know
- // we can carry on watching for writes
- break;
- } else {
- QPID_LOG(error, "Error writing to socket: " << getErrorString(PR_GetError()));
- h.unwatchWrite();
- break;
- }
- }
- } else {
- // If we're waiting to close the socket then can do it now as there is nothing to write
- if (queuedClose) {
- close(h);
- break;
- }
- // Fd is writable, but nothing to write
- if (idleCallback) {
- writePending = false;
- idleCallback(*this);
- }
- // If we still have no buffers to write we can't do anything more
- if (writeQueue.empty() && !writePending && !queuedClose) {
- h.unwatchWrite();
- // The following handles the case where writePending is
- // set to true after the test above; in this case its
- // possible that the unwatchWrite overwrites the
- // desired rewatchWrite so we correct that here
- if (writePending)
- h.rewatchWrite();
- break;
- }
- }
- } while (true);
-
- ++threadWriteCount;
- return;
-}
-
-void SslIO::disconnected(DispatchHandle& h) {
- // If we've already queued close do it instead of disconnected callback
- if (queuedClose) {
- close(h);
- } else if (disCallback) {
- disCallback(*this);
- h.unwatch();
- }
-}
-
-/*
- * Close the socket and callback to say we've done it
- */
-void SslIO::close(DispatchHandle& h) {
- h.stopWatch();
- socket.close();
- if (closedCallback) {
- closedCallback(*this, socket);
- }
-}
-
-SecuritySettings SslIO::getSecuritySettings() {
- SecuritySettings settings;
- settings.ssf = socket.getKeyLen();
- settings.authid = socket.getClientAuthId();
- return settings;
-}
-
-}}}
diff --git a/cpp/src/qpid/sys/ssl/SslIo.h b/cpp/src/qpid/sys/ssl/SslIo.h
deleted file mode 100644
index a72cd7c76c..0000000000
--- a/cpp/src/qpid/sys/ssl/SslIo.h
+++ /dev/null
@@ -1,159 +0,0 @@
-#ifndef _sys_ssl_SslIO
-#define _sys_ssl_SslIO
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <qpid/sys/AsynchIO.h>
-#include "qpid/sys/DispatchHandle.h"
-#include "qpid/sys/SecuritySettings.h"
-#include "qpid/sys/SocketAddress.h"
-
-#include <boost/function.hpp>
-#include <boost/shared_array.hpp>
-#include <deque>
-
-namespace qpid {
-namespace sys {
-
-class Socket;
-
-namespace ssl {
-
-class SslSocket;
-
-/*
- * Asynchronous ssl acceptor: accepts connections then does a callback
- * with the accepted fd
- */
-class SslAcceptor {
-public:
- typedef boost::function1<void, const Socket&> Callback;
-
-private:
- Callback acceptedCallback;
- qpid::sys::DispatchHandle handle;
- const Socket& socket;
-
-public:
- SslAcceptor(const Socket& s, Callback callback);
- ~SslAcceptor();
- void start(qpid::sys::Poller::shared_ptr poller);
-
-private:
- void readable(qpid::sys::DispatchHandle& handle);
-};
-
-/*
- * Asynchronous ssl connector: starts the process of initiating a
- * connection and invokes a callback when completed or failed.
- */
-class SslConnector : private qpid::sys::DispatchHandle {
-public:
- typedef boost::function1<void, const SslSocket&> ConnectedCallback;
- typedef boost::function2<void, int, std::string> FailedCallback;
-
-private:
- ConnectedCallback connCallback;
- FailedCallback failCallback;
- const SslSocket& socket;
- SocketAddress sa;
-
-public:
- SslConnector(const SslSocket& socket,
- Poller::shared_ptr poller,
- std::string hostname,
- std::string port,
- ConnectedCallback connCb,
- FailedCallback failCb = 0);
-
-private:
- void connComplete(DispatchHandle& handle);
- void failure(int, std::string);
-};
-
-/*
- * Asychronous reader/writer:
- * Reader accepts buffers to read into; reads into the provided buffers
- * and then does a callback with the buffer and amount read. Optionally it can callback
- * when there is something to read but no buffer to read it into.
- *
- * Writer accepts a buffer and queues it for writing; can also be given
- * a callback for when writing is "idle" (ie fd is writable, but nothing to write)
- *
- * The class is implemented in terms of DispatchHandle to allow it to be deleted by deleting
- * the contained DispatchHandle
- */
-class SslIO : public AsynchIO, private qpid::sys::DispatchHandle {
-public:
- SslIO(const SslSocket& s,
- ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb,
- ClosedCallback cCb = 0, BuffersEmptyCallback eCb = 0, IdleCallback iCb = 0);
-private:
- ReadCallback readCallback;
- EofCallback eofCallback;
- DisconnectCallback disCallback;
- ClosedCallback closedCallback;
- BuffersEmptyCallback emptyCallback;
- IdleCallback idleCallback;
- const SslSocket& socket;
- std::deque<BufferBase*> bufferQueue;
- std::deque<BufferBase*> writeQueue;
- std::vector<BufferBase> buffers;
- boost::shared_array<char> bufferMemory;
- bool queuedClose;
- /**
- * This flag is used to detect and handle concurrency between
- * calls to notifyPendingWrite() (which can be made from any thread) and
- * the execution of the writeable() method (which is always on the
- * thread processing this handle.
- */
- volatile bool writePending;
-
-public:
- void queueForDeletion();
-
- void start(qpid::sys::Poller::shared_ptr poller);
- void createBuffers(uint32_t size = MaxBufferSize);
- void queueReadBuffer(BufferBase* buff);
- void unread(BufferBase* buff);
- void queueWrite(BufferBase* buff);
- void notifyPendingWrite();
- void queueWriteClose();
- bool writeQueueEmpty() { return writeQueue.empty(); }
- void startReading() {};
- void stopReading() {};
- void requestCallback(RequestCallback);
- BufferBase* getQueuedBuffer();
-
- qpid::sys::SecuritySettings getSecuritySettings();
-
-private:
- ~SslIO();
- void readable(qpid::sys::DispatchHandle& handle);
- void writeable(qpid::sys::DispatchHandle& handle);
- void disconnected(qpid::sys::DispatchHandle& handle);
- void requestedCall(RequestCallback);
- void close(qpid::sys::DispatchHandle& handle);
-};
-
-}}}
-
-#endif // _sys_ssl_SslIO
diff --git a/cpp/src/qpid/sys/ssl/SslSocket.cpp b/cpp/src/qpid/sys/ssl/SslSocket.cpp
index 22f9f63fff..a328e49c13 100644
--- a/cpp/src/qpid/sys/ssl/SslSocket.cpp
+++ b/cpp/src/qpid/sys/ssl/SslSocket.cpp
@@ -87,6 +87,7 @@ SslSocket::SslSocket(const std::string& certName, bool clientAuth) :
{
//configure prototype socket:
prototype = SSL_ImportFD(0, PR_NewTCPSocket());
+
if (clientAuth) {
NSS_CHECK(SSL_OptionSet(prototype, SSL_REQUEST_CERTIFICATE, PR_TRUE));
NSS_CHECK(SSL_OptionSet(prototype, SSL_REQUIRE_CERTIFICATE, PR_TRUE));
@@ -131,7 +132,10 @@ void SslSocket::setTcpNoDelay() const
void SslSocket::connect(const SocketAddress& addr) const
{
BSDSocket::connect(addr);
+}
+void SslSocket::finishConnect(const SocketAddress& addr) const
+{
nssSocket = SSL_ImportFD(0, PR_ImportTCPSocket(fd));
void* arg;
@@ -167,9 +171,9 @@ void SslSocket::close() const
int SslSocket::listen(const SocketAddress& sa, int backlog) const
{
//get certificate and key (is this the correct way?)
- std::string certName( (certname == "") ? "localhost.localdomain" : certname);
- CERTCertificate *cert = PK11_FindCertFromNickname(const_cast<char*>(certName.c_str()), 0);
- if (!cert) throw Exception(QPID_MSG("Failed to load certificate '" << certName << "'"));
+ std::string cName( (certname == "") ? "localhost.localdomain" : certname);
+ CERTCertificate *cert = PK11_FindCertFromNickname(const_cast<char*>(cName.c_str()), 0);
+ if (!cert) throw Exception(QPID_MSG("Failed to load certificate '" << cName << "'"));
SECKEYPrivateKey *key = PK11_FindKeyByAnyCert(cert, 0);
if (!key) throw Exception(QPID_MSG("Failed to retrieve private key from certificate"));
NSS_CHECK(SSL_ConfigSecureServer(prototype, cert, key, NSS_FindCertKEAType(cert)));
diff --git a/cpp/src/qpid/sys/ssl/SslSocket.h b/cpp/src/qpid/sys/ssl/SslSocket.h
index 1efbbe4a88..fc97059cfd 100644
--- a/cpp/src/qpid/sys/ssl/SslSocket.h
+++ b/cpp/src/qpid/sys/ssl/SslSocket.h
@@ -57,6 +57,7 @@ public:
void setCertName(const std::string& certName);
void connect(const SocketAddress&) const;
+ void finishConnect(const SocketAddress&) const;
void close() const;