diff options
author | Andrew Stitcher <astitcher@apache.org> | 2012-10-24 05:51:45 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2012-10-24 05:51:45 +0000 |
commit | 531c9d1aef70f788265f60ca63efb1654d6e32b7 (patch) | |
tree | bb1927b69b5a2814c10db864f1853489dd0f80f2 /cpp/src/qpid/sys/ssl/SslIo.cpp | |
parent | 46ac396386fee5e816e5d3a9fae2355017e079bb (diff) | |
download | qpid-python-531c9d1aef70f788265f60ca63efb1654d6e32b7.tar.gz |
QPID-4272: Large amounts of code are duplicated between the SSL and TCP transports
Refactor SslMux support simplifying it and remove need for separate
SslHandler and SslIo code.
Refactored SSL client code to use the same connect and broker SSL to use the same
accept sequences as the TCP code. This also solves QPID-3565: IPv6 support for SSL
transport on Unix C++ client/broker
Remove now unneeded ssl files.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1401561 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/ssl/SslIo.cpp')
-rw-r--r-- | cpp/src/qpid/sys/ssl/SslIo.cpp | 463 |
1 files changed, 0 insertions, 463 deletions
diff --git a/cpp/src/qpid/sys/ssl/SslIo.cpp b/cpp/src/qpid/sys/ssl/SslIo.cpp deleted file mode 100644 index 92e51a2234..0000000000 --- a/cpp/src/qpid/sys/ssl/SslIo.cpp +++ /dev/null @@ -1,463 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/sys/ssl/SslIo.h" -#include "qpid/sys/ssl/SslSocket.h" -#include "qpid/sys/ssl/check.h" - -#include "qpid/sys/Time.h" -#include "qpid/sys/posix/check.h" -#include "qpid/log/Statement.h" - -// TODO The basic algorithm here is not really POSIX specific and with a bit more abstraction -// could (should) be promoted to be platform portable -#include <unistd.h> -#include <sys/socket.h> -#include <signal.h> -#include <errno.h> -#include <string.h> - -#include <boost/bind.hpp> - -namespace qpid { -namespace sys { -namespace ssl { - -namespace { - -/* - * Make *process* not generate SIGPIPE when writing to closed - * pipe/socket (necessary as default action is to terminate process) - */ -void ignoreSigpipe() { - ::signal(SIGPIPE, SIG_IGN); -} - -/* - * We keep per thread state to avoid locking overhead. The assumption is that - * on average all the connections are serviced by all the threads so the state - * recorded in each thread is about the same. If this turns out not to be the - * case we could rebalance the info occasionally. - */ -__thread int threadReadTotal = 0; -__thread int threadReadCount = 0; -__thread int threadWriteTotal = 0; -__thread int threadWriteCount = 0; -__thread int64_t threadMaxIoTimeNs = 2 * 1000000; // start at 2ms -} - -/* - * Asynch Acceptor - */ - -SslAcceptor::SslAcceptor(const Socket& s, Callback callback) : - acceptedCallback(callback), - handle(s, boost::bind(&SslAcceptor::readable, this, _1), 0, 0), - socket(s) { - - s.setNonblocking(); - ignoreSigpipe(); -} - -SslAcceptor::~SslAcceptor() -{ - handle.stopWatch(); -} - -void SslAcceptor::start(Poller::shared_ptr poller) { - handle.startWatch(poller); -} - -/* - * We keep on accepting as long as there is something to accept - */ -void SslAcceptor::readable(DispatchHandle& h) { - Socket* s; - do { - errno = 0; - // TODO: Currently we ignore the peers address, perhaps we should - // log it or use it for connection acceptance. - try { - s = socket.accept(); - if (s) { - acceptedCallback(*s); - } else { - break; - } - } catch (const std::exception& e) { - QPID_LOG(error, "Could not accept socket: " << e.what()); - } - } while (true); - - h.rewatch(); -} - -/* - * Asynch Connector - */ - -SslConnector::SslConnector(const SslSocket& s, - Poller::shared_ptr poller, - std::string hostname, - std::string port, - ConnectedCallback connCb, - FailedCallback failCb) : - DispatchHandle(s, - 0, - boost::bind(&SslConnector::connComplete, this, _1), - boost::bind(&SslConnector::connComplete, this, _1)), - connCallback(connCb), - failCallback(failCb), - socket(s), - sa(hostname, port) -{ - //TODO: would be better for connect to be performed on a - //non-blocking socket, but that doesn't work at present so connect - //blocks until complete - try { - socket.connect(sa); - socket.setNonblocking(); - startWatch(poller); - } catch(std::exception& e) { - failure(-1, std::string(e.what())); - } -} - -void SslConnector::connComplete(DispatchHandle& h) -{ - int errCode = socket.getError(); - - h.stopWatch(); - if (errCode == 0) { - connCallback(socket); - DispatchHandle::doDelete(); - } else { - // TODO: This need to be fixed as strerror isn't thread safe - failure(errCode, std::string(::strerror(errCode))); - } -} - -void SslConnector::failure(int errCode, std::string message) -{ - if (failCallback) - failCallback(errCode, message); - - socket.close(); - delete &socket; - - DispatchHandle::doDelete(); -} - -/* - * Asynch reader/writer - */ -SslIO::SslIO(const SslSocket& s, - ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb, - ClosedCallback cCb, BuffersEmptyCallback eCb, IdleCallback iCb) : - - DispatchHandle(s, - boost::bind(&SslIO::readable, this, _1), - boost::bind(&SslIO::writeable, this, _1), - boost::bind(&SslIO::disconnected, this, _1)), - readCallback(rCb), - eofCallback(eofCb), - disCallback(disCb), - closedCallback(cCb), - emptyCallback(eCb), - idleCallback(iCb), - socket(s), - queuedClose(false), - writePending(false) { - - s.setNonblocking(); -} - -SslIO::~SslIO() { -} - -void SslIO::queueForDeletion() { - DispatchHandle::doDelete(); -} - -void SslIO::start(Poller::shared_ptr poller) { - DispatchHandle::startWatch(poller); -} - -void SslIO::createBuffers(uint32_t size) { - // Allocate all the buffer memory at once - bufferMemory.reset(new char[size*BufferCount]); - - // Create the Buffer structs in a vector - // And push into the buffer queue - buffers.reserve(BufferCount); - for (uint32_t i = 0; i < BufferCount; i++) { - buffers.push_back(BufferBase(&bufferMemory[i*size], size)); - queueReadBuffer(&buffers[i]); - } -} - -void SslIO::queueReadBuffer(BufferBase* buff) { - assert(buff); - buff->dataStart = 0; - buff->dataCount = 0; - bufferQueue.push_back(buff); - DispatchHandle::rewatchRead(); -} - -void SslIO::unread(BufferBase* buff) { - assert(buff); - if (buff->dataStart != 0) { - memmove(buff->bytes, buff->bytes+buff->dataStart, buff->dataCount); - buff->dataStart = 0; - } - bufferQueue.push_front(buff); - DispatchHandle::rewatchRead(); -} - -void SslIO::queueWrite(BufferBase* buff) { - assert(buff); - // If we've already closed the socket then throw the write away - if (queuedClose) { - bufferQueue.push_front(buff); - return; - } else { - writeQueue.push_front(buff); - } - writePending = false; - DispatchHandle::rewatchWrite(); -} - -void SslIO::notifyPendingWrite() { - writePending = true; - DispatchHandle::rewatchWrite(); -} - -void SslIO::queueWriteClose() { - queuedClose = true; - DispatchHandle::rewatchWrite(); -} - -void SslIO::requestCallback(RequestCallback callback) { - // TODO creating a function object every time isn't all that - // efficient - if this becomes heavily used do something better (what?) - assert(callback); - DispatchHandle::call(boost::bind(&SslIO::requestedCall, this, callback)); -} - -void SslIO::requestedCall(RequestCallback callback) { - assert(callback); - callback(*this); -} - -/** Return a queued buffer if there are enough - * to spare - */ -SslIO::BufferBase* SslIO::getQueuedBuffer() { - // Always keep at least one buffer (it might have data that was "unread" in it) - if (bufferQueue.size()<=1) - return 0; - BufferBase* buff = bufferQueue.back(); - assert(buff); - buff->dataStart = 0; - buff->dataCount = 0; - bufferQueue.pop_back(); - return buff; -} - -/* - * We keep on reading as long as we have something to read and a buffer to put - * it in - */ -void SslIO::readable(DispatchHandle& h) { - AbsTime readStartTime = AbsTime::now(); - do { - // (Try to) get a buffer - if (!bufferQueue.empty()) { - // Read into buffer - BufferBase* buff = bufferQueue.front(); - assert(buff); - bufferQueue.pop_front(); - errno = 0; - int readCount = buff->byteCount-buff->dataCount; - int rc = socket.read(buff->bytes + buff->dataCount, readCount); - if (rc > 0) { - buff->dataCount += rc; - threadReadTotal += rc; - - readCallback(*this, buff); - if (rc != readCount) { - // If we didn't fill the read buffer then time to stop reading - break; - } - - // Stop reading if we've overrun our timeslot - if (Duration(readStartTime, AbsTime::now()) > threadMaxIoTimeNs) { - break; - } - - } else { - // Put buffer back (at front so it doesn't interfere with unread buffers) - bufferQueue.push_front(buff); - assert(buff); - - // Eof or other side has gone away - if (rc == 0 || errno == ECONNRESET) { - eofCallback(*this); - h.unwatchRead(); - break; - } else if (errno == EAGAIN) { - // We have just put a buffer back so we know - // we can carry on watching for reads - break; - } else { - // Report error then just treat as a socket disconnect - QPID_LOG(error, "Error reading socket: " << getErrorString(PR_GetError())); - eofCallback(*this); - h.unwatchRead(); - break; - } - } - } else { - // Something to read but no buffer - if (emptyCallback) { - emptyCallback(*this); - } - // If we still have no buffers we can't do anything more - if (bufferQueue.empty()) { - h.unwatchRead(); - break; - } - - } - } while (true); - - ++threadReadCount; - return; -} - -/* - * We carry on writing whilst we have data to write and we can write - */ -void SslIO::writeable(DispatchHandle& h) { - AbsTime writeStartTime = AbsTime::now(); - do { - // See if we've got something to write - if (!writeQueue.empty()) { - // Write buffer - BufferBase* buff = writeQueue.back(); - writeQueue.pop_back(); - errno = 0; - assert(buff->dataStart+buff->dataCount <= buff->byteCount); - int rc = socket.write(buff->bytes+buff->dataStart, buff->dataCount); - if (rc >= 0) { - threadWriteTotal += rc; - - // If we didn't write full buffer put rest back - if (rc != buff->dataCount) { - buff->dataStart += rc; - buff->dataCount -= rc; - writeQueue.push_back(buff); - break; - } - - // Recycle the buffer - queueReadBuffer(buff); - - // Stop writing if we've overrun our timeslot - if (Duration(writeStartTime, AbsTime::now()) > threadMaxIoTimeNs) { - break; - } - } else { - // Put buffer back - writeQueue.push_back(buff); - if (errno == ECONNRESET || errno == EPIPE) { - // Just stop watching for write here - we'll get a - // disconnect callback soon enough - h.unwatchWrite(); - break; - } else if (errno == EAGAIN) { - // We have just put a buffer back so we know - // we can carry on watching for writes - break; - } else { - QPID_LOG(error, "Error writing to socket: " << getErrorString(PR_GetError())); - h.unwatchWrite(); - break; - } - } - } else { - // If we're waiting to close the socket then can do it now as there is nothing to write - if (queuedClose) { - close(h); - break; - } - // Fd is writable, but nothing to write - if (idleCallback) { - writePending = false; - idleCallback(*this); - } - // If we still have no buffers to write we can't do anything more - if (writeQueue.empty() && !writePending && !queuedClose) { - h.unwatchWrite(); - // The following handles the case where writePending is - // set to true after the test above; in this case its - // possible that the unwatchWrite overwrites the - // desired rewatchWrite so we correct that here - if (writePending) - h.rewatchWrite(); - break; - } - } - } while (true); - - ++threadWriteCount; - return; -} - -void SslIO::disconnected(DispatchHandle& h) { - // If we've already queued close do it instead of disconnected callback - if (queuedClose) { - close(h); - } else if (disCallback) { - disCallback(*this); - h.unwatch(); - } -} - -/* - * Close the socket and callback to say we've done it - */ -void SslIO::close(DispatchHandle& h) { - h.stopWatch(); - socket.close(); - if (closedCallback) { - closedCallback(*this, socket); - } -} - -SecuritySettings SslIO::getSecuritySettings() { - SecuritySettings settings; - settings.ssf = socket.getKeyLen(); - settings.authid = socket.getClientAuthId(); - return settings; -} - -}}} |