diff options
Diffstat (limited to 'qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp | 611 |
1 files changed, 611 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp new file mode 100644 index 0000000000..b5a0b0bf32 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp @@ -0,0 +1,611 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/AsynchIO.h" +#include "qpid/sys/Socket.h" +#include "qpid/sys/SocketAddress.h" +#include "qpid/sys/Poller.h" +#include "qpid/sys/DispatchHandle.h" +#include "qpid/sys/Time.h" +#include "qpid/log/Statement.h" + +#include "qpid/sys/posix/check.h" + +// TODO The basic algorithm here is not really POSIX specific and with a +// bit more abstraction could (should) be promoted to be platform portable +#include <unistd.h> +#include <sys/socket.h> +#include <signal.h> +#include <errno.h> +#include <string.h> + +#include <boost/bind.hpp> +#include <boost/lexical_cast.hpp> + +using namespace qpid::sys; + +namespace { + +struct StaticInit { + StaticInit() { + /** + * Make *process* not generate SIGPIPE when writing to closed + * pipe/socket (necessary as default action is to terminate process) + */ + ::signal(SIGPIPE, SIG_IGN); + }; +} init; + +/* + * We keep per thread state to avoid locking overhead. The assumption is that + * on average all the connections are serviced by all the threads so the state + * recorded in each thread is about the same. If this turns out not to be the + * case we could rebalance the info occasionally. + */ +__thread int threadReadTotal = 0; +__thread int threadMaxRead = 0; +__thread int threadReadCount = 0; +__thread int threadWriteTotal = 0; +__thread int threadWriteCount = 0; +__thread int64_t threadMaxReadTimeNs = 2 * 1000000; // start at 2ms +} + +/* + * Asynch Acceptor + */ +namespace qpid { +namespace sys { +namespace posix { + +class AsynchAcceptor : public qpid::sys::AsynchAcceptor { +public: + AsynchAcceptor(const Socket& s, AsynchAcceptor::Callback callback); + ~AsynchAcceptor(); + void start(Poller::shared_ptr poller); + +private: + void readable(DispatchHandle& handle); + +private: + AsynchAcceptor::Callback acceptedCallback; + DispatchHandle handle; + const Socket& socket; + +}; + +AsynchAcceptor::AsynchAcceptor(const Socket& s, + AsynchAcceptor::Callback callback) : + acceptedCallback(callback), + handle(s, boost::bind(&AsynchAcceptor::readable, this, _1), 0, 0), + socket(s) { + + s.setNonblocking(); +} + +AsynchAcceptor::~AsynchAcceptor() { + handle.stopWatch(); +} + +void AsynchAcceptor::start(Poller::shared_ptr poller) { + handle.startWatch(poller); +} + +/* + * We keep on accepting as long as there is something to accept + */ +void AsynchAcceptor::readable(DispatchHandle& h) { + Socket* s; + do { + errno = 0; + // TODO: Currently we ignore the peers address, perhaps we should + // log it or use it for connection acceptance. + try { + s = socket.accept(); + if (s) { + acceptedCallback(*s); + } else { + break; + } + } catch (const std::exception& e) { + QPID_LOG(error, "Could not accept socket: " << e.what()); + break; + } + } while (true); + + h.rewatch(); +} + +/* + * POSIX version of AsynchIO TCP socket connector. + * + * The class is implemented in terms of DispatchHandle to allow it to be + * deleted by deleting the contained DispatchHandle. + */ +class AsynchConnector : public qpid::sys::AsynchConnector, + private DispatchHandle { + +private: + void connComplete(DispatchHandle& handle); + +private: + ConnectedCallback connCallback; + FailedCallback failCallback; + const Socket& socket; + +public: + AsynchConnector(const Socket& socket, + const std::string& hostname, + const std::string& port, + ConnectedCallback connCb, + FailedCallback failCb); + void start(Poller::shared_ptr poller); + void stop(); +}; + +AsynchConnector::AsynchConnector(const Socket& s, + const std::string& hostname, + const std::string& port, + ConnectedCallback connCb, + FailedCallback failCb) : + DispatchHandle(s, + 0, + boost::bind(&AsynchConnector::connComplete, this, _1), + boost::bind(&AsynchConnector::connComplete, this, _1)), + connCallback(connCb), + failCallback(failCb), + socket(s) +{ + socket.setNonblocking(); + SocketAddress sa(hostname, port); + // Note, not catching any exceptions here, also has effect of destructing + socket.connect(sa); +} + +void AsynchConnector::start(Poller::shared_ptr poller) +{ + startWatch(poller); +} + +void AsynchConnector::stop() +{ + stopWatch(); +} + +void AsynchConnector::connComplete(DispatchHandle& h) +{ + h.stopWatch(); + int errCode = socket.getError(); + if (errCode == 0) { + connCallback(socket); + } else { + failCallback(socket, errCode, strError(errCode)); + } + DispatchHandle::doDelete(); +} + +/* + * POSIX version of AsynchIO reader/writer + * + * The class is implemented in terms of DispatchHandle to allow it to be + * deleted by deleting the contained DispatchHandle. + */ +class AsynchIO : public qpid::sys::AsynchIO, private DispatchHandle { + +public: + AsynchIO(const Socket& s, + ReadCallback rCb, + EofCallback eofCb, + DisconnectCallback disCb, + ClosedCallback cCb = 0, + BuffersEmptyCallback eCb = 0, + IdleCallback iCb = 0); + + // Methods inherited from qpid::sys::AsynchIO + + virtual void queueForDeletion(); + + virtual void start(Poller::shared_ptr poller); + virtual void queueReadBuffer(BufferBase* buff); + virtual void unread(BufferBase* buff); + virtual void queueWrite(BufferBase* buff); + virtual void notifyPendingWrite(); + virtual void queueWriteClose(); + virtual bool writeQueueEmpty(); + virtual void startReading(); + virtual void stopReading(); + virtual void requestCallback(RequestCallback); + virtual BufferBase* getQueuedBuffer(); + +private: + ~AsynchIO(); + + // Methods that are callback targets from Dispatcher. + void readable(DispatchHandle& handle); + void writeable(DispatchHandle& handle); + void disconnected(DispatchHandle& handle); + void requestedCall(RequestCallback); + void close(DispatchHandle& handle); + +private: + ReadCallback readCallback; + EofCallback eofCallback; + DisconnectCallback disCallback; + ClosedCallback closedCallback; + BuffersEmptyCallback emptyCallback; + IdleCallback idleCallback; + const Socket& socket; + std::deque<BufferBase*> bufferQueue; + std::deque<BufferBase*> writeQueue; + bool queuedClose; + /** + * This flag is used to detect and handle concurrency between + * calls to notifyPendingWrite() (which can be made from any thread) and + * the execution of the writeable() method (which is always on the + * thread processing this handle. + */ + volatile bool writePending; + /** + * This records whether we've been reading is flow controlled: + * it's safe as a simple boolean as the only way to be stopped + * is in calls only allowed in the callback context, the only calls + * checking it are also in calls only allowed in callback context. + */ + volatile bool readingStopped; +}; + +AsynchIO::AsynchIO(const Socket& s, + ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb, + ClosedCallback cCb, BuffersEmptyCallback eCb, IdleCallback iCb) : + + DispatchHandle(s, + boost::bind(&AsynchIO::readable, this, _1), + boost::bind(&AsynchIO::writeable, this, _1), + boost::bind(&AsynchIO::disconnected, this, _1)), + readCallback(rCb), + eofCallback(eofCb), + disCallback(disCb), + closedCallback(cCb), + emptyCallback(eCb), + idleCallback(iCb), + socket(s), + queuedClose(false), + writePending(false), + readingStopped(false) { + + s.setNonblocking(); +} + +struct deleter +{ + template <typename T> + void operator()(T *ptr){ delete ptr;} +}; + +AsynchIO::~AsynchIO() { + std::for_each( bufferQueue.begin(), bufferQueue.end(), deleter()); + std::for_each( writeQueue.begin(), writeQueue.end(), deleter()); +} + +void AsynchIO::queueForDeletion() { + DispatchHandle::doDelete(); +} + +void AsynchIO::start(Poller::shared_ptr poller) { + DispatchHandle::startWatch(poller); +} + +void AsynchIO::queueReadBuffer(BufferBase* buff) { + assert(buff); + buff->dataStart = 0; + buff->dataCount = 0; + + bool queueWasEmpty = bufferQueue.empty(); + bufferQueue.push_back(buff); + if (queueWasEmpty && !readingStopped) + DispatchHandle::rewatchRead(); +} + +void AsynchIO::unread(BufferBase* buff) { + assert(buff); + buff->squish(); + + bool queueWasEmpty = bufferQueue.empty(); + bufferQueue.push_front(buff); + if (queueWasEmpty && !readingStopped) + DispatchHandle::rewatchRead(); +} + +void AsynchIO::queueWrite(BufferBase* buff) { + assert(buff); + // If we've already closed the socket then throw the write away + if (queuedClose) { + queueReadBuffer(buff); + return; + } else { + writeQueue.push_front(buff); + } + writePending = false; + DispatchHandle::rewatchWrite(); +} + +// This can happen outside the callback context +void AsynchIO::notifyPendingWrite() { + writePending = true; + DispatchHandle::rewatchWrite(); +} + +void AsynchIO::queueWriteClose() { + queuedClose = true; + DispatchHandle::rewatchWrite(); +} + +bool AsynchIO::writeQueueEmpty() { + return writeQueue.empty(); +} + +// This can happen outside the callback context +void AsynchIO::startReading() { + readingStopped = false; + DispatchHandle::rewatchRead(); +} + +void AsynchIO::stopReading() { + readingStopped = true; + DispatchHandle::unwatchRead(); +} + +void AsynchIO::requestCallback(RequestCallback callback) { + // TODO creating a function object every time isn't all that + // efficient - if this becomes heavily used do something better (what?) + assert(callback); + DispatchHandle::call(boost::bind(&AsynchIO::requestedCall, this, callback)); +} + +void AsynchIO::requestedCall(RequestCallback callback) { + assert(callback); + callback(*this); +} + +/** Return a queued buffer if there are enough + * to spare + */ +AsynchIO::BufferBase* AsynchIO::getQueuedBuffer() { + // Always keep at least one buffer (it might have data that was "unread" in it) + if (bufferQueue.size()<=1) + return 0; + BufferBase* buff = bufferQueue.back(); + assert(buff); + buff->dataStart = 0; + buff->dataCount = 0; + bufferQueue.pop_back(); + return buff; +} + +/* + * We keep on reading as long as we have something to read, a buffer + * to put it in and reading is not stopped by flow control. + */ +void AsynchIO::readable(DispatchHandle& h) { + if (readingStopped) { + // We have been flow controlled. + return; + } + int readTotal = 0; + AbsTime readStartTime = AbsTime::now(); + do { + // (Try to) get a buffer + if (!bufferQueue.empty()) { + // Read into buffer + BufferBase* buff = bufferQueue.front(); + assert(buff); + bufferQueue.pop_front(); + errno = 0; + int readCount = buff->byteCount-buff->dataCount; + int rc = socket.read(buff->bytes + buff->dataCount, readCount); + if (rc > 0) { + buff->dataCount += rc; + threadReadTotal += rc; + readTotal += rc; + + readCallback(*this, buff); + if (readingStopped) { + // We have been flow controlled. + break; + } + + if (rc != readCount) { + // If we didn't fill the read buffer then time to stop reading + break; + } + + // Stop reading if we've overrun our timeslot + if (Duration(readStartTime, AbsTime::now()) > threadMaxReadTimeNs) { + break; + } + + } else { + // Put buffer back (at front so it doesn't interfere with unread buffers) + bufferQueue.push_front(buff); + assert(buff); + + // Eof or other side has gone away + if (rc == 0 || errno == ECONNRESET) { + eofCallback(*this); + h.unwatchRead(); + break; + } else if (errno == EAGAIN) { + // We have just put a buffer back so we know + // we can carry on watching for reads + break; + } else { + // Report error then just treat as a socket disconnect + QPID_LOG(error, "Error reading socket: " << qpid::sys::strError(errno) << "(" << errno << ")" ); + eofCallback(*this); + h.unwatchRead(); + break; + } + } + } else { + // Something to read but no buffer + if (emptyCallback) { + emptyCallback(*this); + } + // If we still have no buffers we can't do anything more + if (bufferQueue.empty()) { + h.unwatchRead(); + break; + } + + } + } while (true); + + ++threadReadCount; + threadMaxRead = std::max(threadMaxRead, readTotal); + return; +} + +/* + * We carry on writing whilst we have data to write and we can write + */ +void AsynchIO::writeable(DispatchHandle& h) { + int writeTotal = 0; + do { + // See if we've got something to write + if (!writeQueue.empty()) { + // Write buffer + BufferBase* buff = writeQueue.back(); + writeQueue.pop_back(); + errno = 0; + assert(buff->dataStart+buff->dataCount <= buff->byteCount); + int rc = socket.write(buff->bytes+buff->dataStart, buff->dataCount); + if (rc >= 0) { + threadWriteTotal += rc; + writeTotal += rc; + + // If we didn't write full buffer put rest back + if (rc != buff->dataCount) { + buff->dataStart += rc; + buff->dataCount -= rc; + writeQueue.push_back(buff); + break; + } + + // Recycle the buffer + queueReadBuffer(buff); + + // If we've already written more than the max for reading then stop + // (this is to stop writes dominating reads) + if (writeTotal > threadMaxRead) + break; + } else { + // Put buffer back + writeQueue.push_back(buff); + if (errno == ECONNRESET || errno == EPIPE) { + // Just stop watching for write here - we'll get a + // disconnect callback soon enough + h.unwatchWrite(); + break; + } else if (errno == EAGAIN) { + // We have just put a buffer back so we know + // we can carry on watching for writes + break; + } else { + // Report error then just treat as a socket disconnect + QPID_LOG(error, "Error writing socket: " << qpid::sys::strError(errno) << "(" << errno << ")" ); + h.unwatchWrite(); + break; + } + } + } else { + // If we're waiting to close the socket then can do it now as there is nothing to write + if (queuedClose) { + close(h); + break; + } + // Fd is writable, but nothing to write + if (idleCallback) { + writePending = false; + idleCallback(*this); + } + // If we still have no buffers to write we can't do anything more + if (writeQueue.empty() && !writePending && !queuedClose) { + h.unwatchWrite(); + // The following handles the case where writePending is + // set to true after the test above; in this case its + // possible that the unwatchWrite overwrites the + // desired rewatchWrite so we correct that here + if (writePending) + h.rewatchWrite(); + break; + } + } + } while (true); + + ++threadWriteCount; + return; +} + +void AsynchIO::disconnected(DispatchHandle& h) { + // If we have not already queued close then call disconnected callback before closing + if (!queuedClose && disCallback) disCallback(*this); + close(h); +} + +/* + * Close the socket and callback to say we've done it + */ +void AsynchIO::close(DispatchHandle& h) { + h.stopWatch(); + socket.close(); + if (closedCallback) { + closedCallback(*this, socket); + } +} + +} // namespace posix + +AsynchAcceptor* AsynchAcceptor::create(const Socket& s, + Callback callback) +{ + return new posix::AsynchAcceptor(s, callback); +} + +AsynchConnector* AsynchConnector::create(const Socket& s, + const std::string& hostname, + const std::string& port, + ConnectedCallback connCb, + FailedCallback failCb) +{ + return new posix::AsynchConnector(s, hostname, port, connCb, failCb); +} + +AsynchIO* AsynchIO::create(const Socket& s, + AsynchIO::ReadCallback rCb, + AsynchIO::EofCallback eofCb, + AsynchIO::DisconnectCallback disCb, + AsynchIO::ClosedCallback cCb, + AsynchIO::BuffersEmptyCallback eCb, + AsynchIO::IdleCallback iCb) +{ + return new posix::AsynchIO(s, rCb, eofCb, disCb, cCb, eCb, iCb); +} + +}} // namespace qpid::sys |