diff options
Diffstat (limited to 'cpp/src/qpid/sys/posix/AsynchIO.cpp')
-rw-r--r-- | cpp/src/qpid/sys/posix/AsynchIO.cpp | 611 |
1 files changed, 0 insertions, 611 deletions
diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp deleted file mode 100644 index 119a6aa8a4..0000000000 --- a/cpp/src/qpid/sys/posix/AsynchIO.cpp +++ /dev/null @@ -1,611 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/sys/AsynchIO.h" -#include "qpid/sys/Socket.h" -#include "qpid/sys/SocketAddress.h" -#include "qpid/sys/Poller.h" -#include "qpid/sys/DispatchHandle.h" -#include "qpid/sys/Time.h" -#include "qpid/log/Statement.h" - -#include "qpid/sys/posix/check.h" - -// TODO The basic algorithm here is not really POSIX specific and with a -// bit more abstraction could (should) be promoted to be platform portable -#include <unistd.h> -#include <sys/socket.h> -#include <signal.h> -#include <errno.h> -#include <string.h> - -#include <boost/bind.hpp> -#include <boost/lexical_cast.hpp> - -using namespace qpid::sys; - -namespace { - -struct StaticInit { - StaticInit() { - /** - * Make *process* not generate SIGPIPE when writing to closed - * pipe/socket (necessary as default action is to terminate process) - */ - ::signal(SIGPIPE, SIG_IGN); - }; -} init; - -/* - * We keep per thread state to avoid locking overhead. The assumption is that - * on average all the connections are serviced by all the threads so the state - * recorded in each thread is about the same. If this turns out not to be the - * case we could rebalance the info occasionally. - */ -__thread int threadReadTotal = 0; -__thread int threadMaxRead = 0; -__thread int threadReadCount = 0; -__thread int threadWriteTotal = 0; -__thread int threadWriteCount = 0; -__thread int64_t threadMaxReadTimeNs = 2 * 1000000; // start at 2ms -} - -/* - * Asynch Acceptor - */ -namespace qpid { -namespace sys { -namespace posix { - -class AsynchAcceptor : public qpid::sys::AsynchAcceptor { -public: - AsynchAcceptor(const Socket& s, AsynchAcceptor::Callback callback); - ~AsynchAcceptor(); - void start(Poller::shared_ptr poller); - -private: - void readable(DispatchHandle& handle); - -private: - AsynchAcceptor::Callback acceptedCallback; - DispatchHandle handle; - const Socket& socket; - -}; - -AsynchAcceptor::AsynchAcceptor(const Socket& s, - AsynchAcceptor::Callback callback) : - acceptedCallback(callback), - handle(s, boost::bind(&AsynchAcceptor::readable, this, _1), 0, 0), - socket(s) { - - s.setNonblocking(); -} - -AsynchAcceptor::~AsynchAcceptor() { - handle.stopWatch(); -} - -void AsynchAcceptor::start(Poller::shared_ptr poller) { - handle.startWatch(poller); -} - -/* - * We keep on accepting as long as there is something to accept - */ -void AsynchAcceptor::readable(DispatchHandle& h) { - Socket* s; - do { - errno = 0; - // TODO: Currently we ignore the peers address, perhaps we should - // log it or use it for connection acceptance. - try { - s = socket.accept(); - if (s) { - acceptedCallback(*s); - } else { - break; - } - } catch (const std::exception& e) { - QPID_LOG(error, "Could not accept socket: " << e.what()); - break; - } - } while (true); - - h.rewatch(); -} - -/* - * POSIX version of AsynchIO TCP socket connector. - * - * The class is implemented in terms of DispatchHandle to allow it to be - * deleted by deleting the contained DispatchHandle. - */ -class AsynchConnector : public qpid::sys::AsynchConnector, - private DispatchHandle { - -private: - void connComplete(DispatchHandle& handle); - -private: - ConnectedCallback connCallback; - FailedCallback failCallback; - const Socket& socket; - -public: - AsynchConnector(const Socket& socket, - std::string hostname, - uint16_t port, - ConnectedCallback connCb, - FailedCallback failCb); - void start(Poller::shared_ptr poller); - void stop(); -}; - -AsynchConnector::AsynchConnector(const Socket& s, - std::string hostname, - uint16_t port, - ConnectedCallback connCb, - FailedCallback failCb) : - DispatchHandle(s, - 0, - boost::bind(&AsynchConnector::connComplete, this, _1), - boost::bind(&AsynchConnector::connComplete, this, _1)), - connCallback(connCb), - failCallback(failCb), - socket(s) -{ - socket.setNonblocking(); - SocketAddress sa(hostname, boost::lexical_cast<std::string>(port)); - // Note, not catching any exceptions here, also has effect of destructing - socket.connect(sa); -} - -void AsynchConnector::start(Poller::shared_ptr poller) -{ - startWatch(poller); -} - -void AsynchConnector::stop() -{ - stopWatch(); -} - -void AsynchConnector::connComplete(DispatchHandle& h) -{ - h.stopWatch(); - int errCode = socket.getError(); - if (errCode == 0) { - connCallback(socket); - } else { - failCallback(socket, errCode, strError(errCode)); - } - DispatchHandle::doDelete(); -} - -/* - * POSIX version of AsynchIO reader/writer - * - * The class is implemented in terms of DispatchHandle to allow it to be - * deleted by deleting the contained DispatchHandle. - */ -class AsynchIO : public qpid::sys::AsynchIO, private DispatchHandle { - -public: - AsynchIO(const Socket& s, - ReadCallback rCb, - EofCallback eofCb, - DisconnectCallback disCb, - ClosedCallback cCb = 0, - BuffersEmptyCallback eCb = 0, - IdleCallback iCb = 0); - - // Methods inherited from qpid::sys::AsynchIO - - virtual void queueForDeletion(); - - virtual void start(Poller::shared_ptr poller); - virtual void queueReadBuffer(BufferBase* buff); - virtual void unread(BufferBase* buff); - virtual void queueWrite(BufferBase* buff); - virtual void notifyPendingWrite(); - virtual void queueWriteClose(); - virtual bool writeQueueEmpty(); - virtual void startReading(); - virtual void stopReading(); - virtual void requestCallback(RequestCallback); - virtual BufferBase* getQueuedBuffer(); - -private: - ~AsynchIO(); - - // Methods that are callback targets from Dispatcher. - void readable(DispatchHandle& handle); - void writeable(DispatchHandle& handle); - void disconnected(DispatchHandle& handle); - void requestedCall(RequestCallback); - void close(DispatchHandle& handle); - -private: - ReadCallback readCallback; - EofCallback eofCallback; - DisconnectCallback disCallback; - ClosedCallback closedCallback; - BuffersEmptyCallback emptyCallback; - IdleCallback idleCallback; - const Socket& socket; - std::deque<BufferBase*> bufferQueue; - std::deque<BufferBase*> writeQueue; - bool queuedClose; - /** - * This flag is used to detect and handle concurrency between - * calls to notifyPendingWrite() (which can be made from any thread) and - * the execution of the writeable() method (which is always on the - * thread processing this handle. - */ - volatile bool writePending; - /** - * This records whether we've been reading is flow controlled: - * it's safe as a simple boolean as the only way to be stopped - * is in calls only allowed in the callback context, the only calls - * checking it are also in calls only allowed in callback context. - */ - volatile bool readingStopped; -}; - -AsynchIO::AsynchIO(const Socket& s, - ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb, - ClosedCallback cCb, BuffersEmptyCallback eCb, IdleCallback iCb) : - - DispatchHandle(s, - boost::bind(&AsynchIO::readable, this, _1), - boost::bind(&AsynchIO::writeable, this, _1), - boost::bind(&AsynchIO::disconnected, this, _1)), - readCallback(rCb), - eofCallback(eofCb), - disCallback(disCb), - closedCallback(cCb), - emptyCallback(eCb), - idleCallback(iCb), - socket(s), - queuedClose(false), - writePending(false), - readingStopped(false) { - - s.setNonblocking(); -} - -struct deleter -{ - template <typename T> - void operator()(T *ptr){ delete ptr;} -}; - -AsynchIO::~AsynchIO() { - std::for_each( bufferQueue.begin(), bufferQueue.end(), deleter()); - std::for_each( writeQueue.begin(), writeQueue.end(), deleter()); -} - -void AsynchIO::queueForDeletion() { - DispatchHandle::doDelete(); -} - -void AsynchIO::start(Poller::shared_ptr poller) { - DispatchHandle::startWatch(poller); -} - -void AsynchIO::queueReadBuffer(BufferBase* buff) { - assert(buff); - buff->dataStart = 0; - buff->dataCount = 0; - - bool queueWasEmpty = bufferQueue.empty(); - bufferQueue.push_back(buff); - if (queueWasEmpty && !readingStopped) - DispatchHandle::rewatchRead(); -} - -void AsynchIO::unread(BufferBase* buff) { - assert(buff); - buff->squish(); - - bool queueWasEmpty = bufferQueue.empty(); - bufferQueue.push_front(buff); - if (queueWasEmpty && !readingStopped) - DispatchHandle::rewatchRead(); -} - -void AsynchIO::queueWrite(BufferBase* buff) { - assert(buff); - // If we've already closed the socket then throw the write away - if (queuedClose) { - queueReadBuffer(buff); - return; - } else { - writeQueue.push_front(buff); - } - writePending = false; - DispatchHandle::rewatchWrite(); -} - -// This can happen outside the callback context -void AsynchIO::notifyPendingWrite() { - writePending = true; - DispatchHandle::rewatchWrite(); -} - -void AsynchIO::queueWriteClose() { - queuedClose = true; - DispatchHandle::rewatchWrite(); -} - -bool AsynchIO::writeQueueEmpty() { - return writeQueue.empty(); -} - -// This can happen outside the callback context -void AsynchIO::startReading() { - readingStopped = false; - DispatchHandle::rewatchRead(); -} - -void AsynchIO::stopReading() { - readingStopped = true; - DispatchHandle::unwatchRead(); -} - -void AsynchIO::requestCallback(RequestCallback callback) { - // TODO creating a function object every time isn't all that - // efficient - if this becomes heavily used do something better (what?) - assert(callback); - DispatchHandle::call(boost::bind(&AsynchIO::requestedCall, this, callback)); -} - -void AsynchIO::requestedCall(RequestCallback callback) { - assert(callback); - callback(*this); -} - -/** Return a queued buffer if there are enough - * to spare - */ -AsynchIO::BufferBase* AsynchIO::getQueuedBuffer() { - // Always keep at least one buffer (it might have data that was "unread" in it) - if (bufferQueue.size()<=1) - return 0; - BufferBase* buff = bufferQueue.back(); - assert(buff); - buff->dataStart = 0; - buff->dataCount = 0; - bufferQueue.pop_back(); - return buff; -} - -/* - * We keep on reading as long as we have something to read, a buffer - * to put it in and reading is not stopped by flow control. - */ -void AsynchIO::readable(DispatchHandle& h) { - if (readingStopped) { - // We have been flow controlled. - return; - } - int readTotal = 0; - AbsTime readStartTime = AbsTime::now(); - do { - // (Try to) get a buffer - if (!bufferQueue.empty()) { - // Read into buffer - BufferBase* buff = bufferQueue.front(); - assert(buff); - bufferQueue.pop_front(); - errno = 0; - int readCount = buff->byteCount-buff->dataCount; - int rc = socket.read(buff->bytes + buff->dataCount, readCount); - if (rc > 0) { - buff->dataCount += rc; - threadReadTotal += rc; - readTotal += rc; - - readCallback(*this, buff); - if (readingStopped) { - // We have been flow controlled. - break; - } - - if (rc != readCount) { - // If we didn't fill the read buffer then time to stop reading - break; - } - - // Stop reading if we've overrun our timeslot - if (Duration(readStartTime, AbsTime::now()) > threadMaxReadTimeNs) { - break; - } - - } else { - // Put buffer back (at front so it doesn't interfere with unread buffers) - bufferQueue.push_front(buff); - assert(buff); - - // Eof or other side has gone away - if (rc == 0 || errno == ECONNRESET) { - eofCallback(*this); - h.unwatchRead(); - break; - } else if (errno == EAGAIN) { - // We have just put a buffer back so we know - // we can carry on watching for reads - break; - } else { - // Report error then just treat as a socket disconnect - QPID_LOG(error, "Error reading socket: " << qpid::sys::strError(errno) << "(" << errno << ")" ); - eofCallback(*this); - h.unwatchRead(); - break; - } - } - } else { - // Something to read but no buffer - if (emptyCallback) { - emptyCallback(*this); - } - // If we still have no buffers we can't do anything more - if (bufferQueue.empty()) { - h.unwatchRead(); - break; - } - - } - } while (true); - - ++threadReadCount; - threadMaxRead = std::max(threadMaxRead, readTotal); - return; -} - -/* - * We carry on writing whilst we have data to write and we can write - */ -void AsynchIO::writeable(DispatchHandle& h) { - int writeTotal = 0; - do { - // See if we've got something to write - if (!writeQueue.empty()) { - // Write buffer - BufferBase* buff = writeQueue.back(); - writeQueue.pop_back(); - errno = 0; - assert(buff->dataStart+buff->dataCount <= buff->byteCount); - int rc = socket.write(buff->bytes+buff->dataStart, buff->dataCount); - if (rc >= 0) { - threadWriteTotal += rc; - writeTotal += rc; - - // If we didn't write full buffer put rest back - if (rc != buff->dataCount) { - buff->dataStart += rc; - buff->dataCount -= rc; - writeQueue.push_back(buff); - break; - } - - // Recycle the buffer - queueReadBuffer(buff); - - // If we've already written more than the max for reading then stop - // (this is to stop writes dominating reads) - if (writeTotal > threadMaxRead) - break; - } else { - // Put buffer back - writeQueue.push_back(buff); - if (errno == ECONNRESET || errno == EPIPE) { - // Just stop watching for write here - we'll get a - // disconnect callback soon enough - h.unwatchWrite(); - break; - } else if (errno == EAGAIN) { - // We have just put a buffer back so we know - // we can carry on watching for writes - break; - } else { - // Report error then just treat as a socket disconnect - QPID_LOG(error, "Error writing socket: " << qpid::sys::strError(errno) << "(" << errno << ")" ); - h.unwatchWrite(); - break; - } - } - } else { - // If we're waiting to close the socket then can do it now as there is nothing to write - if (queuedClose) { - close(h); - break; - } - // Fd is writable, but nothing to write - if (idleCallback) { - writePending = false; - idleCallback(*this); - } - // If we still have no buffers to write we can't do anything more - if (writeQueue.empty() && !writePending && !queuedClose) { - h.unwatchWrite(); - // The following handles the case where writePending is - // set to true after the test above; in this case its - // possible that the unwatchWrite overwrites the - // desired rewatchWrite so we correct that here - if (writePending) - h.rewatchWrite(); - break; - } - } - } while (true); - - ++threadWriteCount; - return; -} - -void AsynchIO::disconnected(DispatchHandle& h) { - // If we have not already queued close then call disconnected callback before closing - if (!queuedClose && disCallback) disCallback(*this); - close(h); -} - -/* - * Close the socket and callback to say we've done it - */ -void AsynchIO::close(DispatchHandle& h) { - h.stopWatch(); - socket.close(); - if (closedCallback) { - closedCallback(*this, socket); - } -} - -} // namespace posix - -AsynchAcceptor* AsynchAcceptor::create(const Socket& s, - Callback callback) -{ - return new posix::AsynchAcceptor(s, callback); -} - -AsynchConnector* AsynchConnector::create(const Socket& s, - std::string hostname, - uint16_t port, - ConnectedCallback connCb, - FailedCallback failCb) -{ - return new posix::AsynchConnector(s, hostname, port, connCb, failCb); -} - -AsynchIO* AsynchIO::create(const Socket& s, - AsynchIO::ReadCallback rCb, - AsynchIO::EofCallback eofCb, - AsynchIO::DisconnectCallback disCb, - AsynchIO::ClosedCallback cCb, - AsynchIO::BuffersEmptyCallback eCb, - AsynchIO::IdleCallback iCb) -{ - return new posix::AsynchIO(s, rCb, eofCb, disCb, cCb, eCb, iCb); -} - -}} // namespace qpid::sys |