diff options
author | Stephen D. Huston <shuston@apache.org> | 2008-10-21 18:29:44 +0000 |
---|---|---|
committer | Stephen D. Huston <shuston@apache.org> | 2008-10-21 18:29:44 +0000 |
commit | 65db1dc07a33aa419b842a34e61fa5781841b0bf (patch) | |
tree | f6cc891ab98850fe1fb6662de7e24797363e67e4 | |
parent | 14b74662ca1e5d113baff0553a01dd88a0ce6f5a (diff) | |
download | qpid-python-65db1dc07a33aa419b842a34e61fa5781841b0bf.tar.gz |
Refactor sys::AsynchIO class to allow reimplementing on other platforms without affecting upper level usage. Resolves QPID-1377 and supplies Windows AsynchIO.cpp
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@706709 13f79535-47bb-0310-9956-ffa450edef68
23 files changed, 2371 insertions, 600 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 6ca12dc189..9248ea4b06 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -283,6 +283,7 @@ libqpidcommon_la_SOURCES = \ qpid/sys/AggregateOutput.cpp \ qpid/sys/AsynchIOHandler.cpp \ qpid/sys/Dispatcher.cpp \ + qpid/sys/DispatchHandle.cpp \ qpid/sys/PollableCondition.h \ qpid/sys/PollableQueue.h \ qpid/sys/Runnable.cpp \ @@ -609,6 +610,7 @@ nobase_include_HEADERS = \ qpid/sys/ConnectionOutputHandlerPtr.h \ qpid/sys/DeletionManager.h \ qpid/sys/Dispatcher.h \ + qpid/sys/DispatchHandle.h \ qpid/sys/FileSysDir.h \ qpid/sys/IntegerTypes.h \ qpid/sys/IOHandle.h \ diff --git a/qpid/cpp/src/qpid/client/ConnectionSettings.cpp b/qpid/cpp/src/qpid/client/ConnectionSettings.cpp index 751f867c1e..f5fc62dad2 100644 --- a/qpid/cpp/src/qpid/client/ConnectionSettings.cpp +++ b/qpid/cpp/src/qpid/client/ConnectionSettings.cpp @@ -22,7 +22,6 @@ #include "qpid/log/Logger.h" #include "qpid/sys/Socket.h" -#include <sys/socket.h> namespace qpid { namespace client { diff --git a/qpid/cpp/src/qpid/client/Connector.cpp b/qpid/cpp/src/qpid/client/Connector.cpp index e674a738c2..67e9239224 100644 --- a/qpid/cpp/src/qpid/client/Connector.cpp +++ b/qpid/cpp/src/qpid/client/Connector.cpp @@ -200,7 +200,7 @@ void TCPConnector::connect(const std::string& host, int port){ identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress()); closed = false; poller = Poller::shared_ptr(new Poller); - aio = new AsynchIO(socket, + aio = AsynchIO::create(socket, boost::bind(&TCPConnector::readbuff, this, _1, _2), boost::bind(&TCPConnector::eof, this, _1), boost::bind(&TCPConnector::eof, this, _1), diff --git a/qpid/cpp/src/qpid/log/Selector.h b/qpid/cpp/src/qpid/log/Selector.h index 7c98bc6f8f..2acef4687a 100644 --- a/qpid/cpp/src/qpid/log/Selector.h +++ b/qpid/cpp/src/qpid/log/Selector.h @@ -24,7 +24,7 @@ namespace qpid { namespace log { -class Options; +struct Options; /** * A selector identifies the set of log messages to enable. diff --git a/qpid/cpp/src/qpid/sys/AsynchIO.h b/qpid/cpp/src/qpid/sys/AsynchIO.h index ff7823e00d..f5c4607992 100644 --- a/qpid/cpp/src/qpid/sys/AsynchIO.h +++ b/qpid/cpp/src/qpid/sys/AsynchIO.h @@ -21,7 +21,8 @@ * */ -#include "Dispatcher.h" +// @@TODO: TAKE THIS OUT... SHould be in posix version. +#include "DispatchHandle.h" #include <boost/function.hpp> #include <deque> @@ -35,48 +36,45 @@ class Socket; * Asynchronous acceptor: accepts connections then does a callback with the * accepted fd */ +class AsynchAcceptorPrivate; class AsynchAcceptor { public: typedef boost::function1<void, const Socket&> Callback; private: - Callback acceptedCallback; - DispatchHandle handle; - const Socket& socket; + AsynchAcceptorPrivate* impl; public: AsynchAcceptor(const Socket& s, Callback callback); + ~AsynchAcceptor(); void start(Poller::shared_ptr poller); - -private: - void readable(DispatchHandle& handle); }; /* * Asynchronous connector: starts the process of initiating a connection and * invokes a callback when completed or failed. */ -class AsynchConnector : private DispatchHandle { +class AsynchConnector { public: typedef boost::function1<void, const Socket&> ConnectedCallback; typedef boost::function2<void, int, std::string> FailedCallback; -private: - ConnectedCallback connCallback; - FailedCallback failCallback; - const Socket& socket; - -public: - AsynchConnector(const Socket& socket, - Poller::shared_ptr poller, - std::string hostname, - uint16_t port, - ConnectedCallback connCb, - FailedCallback failCb = 0); - -private: - void connComplete(DispatchHandle& handle); - void failure(int, std::string); + // Call create() to allocate a new AsynchConnector object with the + // specified poller, addressing, and callbacks. + // This method is implemented in platform-specific code to + // create a correctly typed object. The platform code also manages + // deletes. To correctly manage heaps when needed, the allocate and + // delete should both be done from the same class/library. + static AsynchConnector* create(const Socket& s, + Poller::shared_ptr poller, + std::string hostname, + uint16_t port, + ConnectedCallback connCb, + FailedCallback failCb = 0); + +protected: + AsynchConnector() {} + virtual ~AsynchConnector() {} }; struct AsynchIOBufferBase { @@ -99,16 +97,14 @@ struct AsynchIOBufferBase { /* * Asychronous reader/writer: * Reader accepts buffers to read into; reads into the provided buffers - * and then does a callback with the buffer and amount read. Optionally it can callback - * when there is something to read but no buffer to read it into. + * and then does a callback with the buffer and amount read. Optionally it + * can callback when there is something to read but no buffer to read it into. * * Writer accepts a buffer and queues it for writing; can also be given - * a callback for when writing is "idle" (ie fd is writable, but nothing to write) - * - * The class is implemented in terms of DispatchHandle to allow it to be deleted by deleting - * the contained DispatchHandle + * a callback for when writing is "idle" (ie fd is writable, but nothing + * to write). */ -class AsynchIO : private DispatchHandle { +class AsynchIO { public: typedef AsynchIOBufferBase BufferBase; @@ -119,46 +115,35 @@ public: typedef boost::function1<void, AsynchIO&> BuffersEmptyCallback; typedef boost::function1<void, AsynchIO&> IdleCallback; -private: - ReadCallback readCallback; - EofCallback eofCallback; - DisconnectCallback disCallback; - ClosedCallback closedCallback; - BuffersEmptyCallback emptyCallback; - IdleCallback idleCallback; - const Socket& socket; - std::deque<BufferBase*> bufferQueue; - std::deque<BufferBase*> writeQueue; - bool queuedClose; - /** - * This flag is used to detect and handle concurrency between - * calls to notifyPendingWrite() (which can be made from any thread) and - * the execution of the writeable() method (which is always on the - * thread processing this handle. - */ - volatile bool writePending; - + // Call create() to allocate a new AsynchIO object with the specified + // callbacks. This method is implemented in platform-specific code to + // create a correctly typed object. The platform code also manages + // deletes. To correctly manage heaps when needed, the allocate and + // delete should both be done from the same class/library. + static AsynchIO* create(const Socket& s, + ReadCallback rCb, + EofCallback eofCb, + DisconnectCallback disCb, + ClosedCallback cCb = 0, + BuffersEmptyCallback eCb = 0, + IdleCallback iCb = 0); public: - AsynchIO(const Socket& s, - ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb, - ClosedCallback cCb = 0, BuffersEmptyCallback eCb = 0, IdleCallback iCb = 0); - void queueForDeletion(); - - void start(Poller::shared_ptr poller); - void queueReadBuffer(BufferBase* buff); - void unread(BufferBase* buff); - void queueWrite(BufferBase* buff); - void notifyPendingWrite(); - void queueWriteClose(); - bool writeQueueEmpty() { return writeQueue.empty(); } - BufferBase* getQueuedBuffer(); - -private: - ~AsynchIO(); - void readable(DispatchHandle& handle); - void writeable(DispatchHandle& handle); - void disconnected(DispatchHandle& handle); - void close(DispatchHandle& handle); + virtual void queueForDeletion() = 0; + + virtual void start(Poller::shared_ptr poller) = 0; + virtual void queueReadBuffer(BufferBase* buff) = 0; + virtual void unread(BufferBase* buff) = 0; + virtual void queueWrite(BufferBase* buff) = 0; + virtual void notifyPendingWrite() = 0; + virtual void queueWriteClose() = 0; + virtual bool writeQueueEmpty() = 0; + virtual BufferBase* getQueuedBuffer() = 0; + +protected: + // Derived class manages lifetime; must be constructed using the + // static create() method. Deletes not allowed from outside. + AsynchIO() {} + virtual ~AsynchIO() {} }; }} diff --git a/qpid/cpp/src/qpid/sys/AsynchIOHandler.h b/qpid/cpp/src/qpid/sys/AsynchIOHandler.h index 26e2cf4c5c..c281c27d14 100644 --- a/qpid/cpp/src/qpid/sys/AsynchIOHandler.h +++ b/qpid/cpp/src/qpid/sys/AsynchIOHandler.h @@ -33,7 +33,7 @@ namespace framing { namespace sys { class AsynchIO; -class AsynchIOBufferBase; +struct AsynchIOBufferBase; class Socket; class AsynchIOHandler : public OutputControl { diff --git a/qpid/cpp/src/qpid/sys/DispatchHandle.cpp b/qpid/cpp/src/qpid/sys/DispatchHandle.cpp new file mode 100644 index 0000000000..4722fc0b8b --- /dev/null +++ b/qpid/cpp/src/qpid/sys/DispatchHandle.cpp @@ -0,0 +1,409 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "DispatchHandle.h" + +#include <boost/cast.hpp> + +#include <assert.h> + +namespace qpid { +namespace sys { + +DispatchHandle::~DispatchHandle() { + stopWatch(); +} + +void DispatchHandle::startWatch(Poller::shared_ptr poller0) { + bool r = readableCallback; + bool w = writableCallback; + + ScopedLock<Mutex> lock(stateLock); + assert(state == IDLE); + + // If no callbacks set then do nothing (that is what we were asked to do!) + // TODO: Maybe this should be an assert instead + if (!r && !w) { + state = INACTIVE; + return; + } + + Poller::Direction d = r ? + (w ? Poller::INOUT : Poller::INPUT) : + Poller::OUTPUT; + + poller = poller0; + poller->addFd(*this, d); + + state = r ? + (w ? ACTIVE_RW : ACTIVE_R) : + ACTIVE_W; +} + +void DispatchHandle::rewatch() { + bool r = readableCallback; + bool w = writableCallback; + + ScopedLock<Mutex> lock(stateLock); + switch(state) { + case IDLE: + case DELAYED_IDLE: + break; + case DELAYED_R: + case DELAYED_W: + case DELAYED_INACTIVE: + state = r ? + (w ? DELAYED_RW : DELAYED_R) : + DELAYED_W; + break; + case DELAYED_DELETE: + break; + case INACTIVE: + case ACTIVE_R: + case ACTIVE_W: { + assert(poller); + Poller::Direction d = r ? + (w ? Poller::INOUT : Poller::INPUT) : + Poller::OUTPUT; + poller->modFd(*this, d); + state = r ? + (w ? ACTIVE_RW : ACTIVE_R) : + ACTIVE_W; + break; + } + case DELAYED_RW: + case ACTIVE_RW: + // Don't need to do anything already waiting for readable/writable + break; + } +} + +void DispatchHandle::rewatchRead() { + if (!readableCallback) { + return; + } + + ScopedLock<Mutex> lock(stateLock); + switch(state) { + case IDLE: + case DELAYED_IDLE: + break; + case DELAYED_R: + case DELAYED_RW: + case DELAYED_DELETE: + break; + case DELAYED_W: + state = DELAYED_RW; + break; + case DELAYED_INACTIVE: + state = DELAYED_R; + break; + case ACTIVE_R: + case ACTIVE_RW: + // Nothing to do: already waiting for readable + break; + case INACTIVE: + assert(poller); + poller->modFd(*this, Poller::INPUT); + state = ACTIVE_R; + break; + case ACTIVE_W: + assert(poller); + poller->modFd(*this, Poller::INOUT); + state = ACTIVE_RW; + break; + } +} + +void DispatchHandle::rewatchWrite() { + if (!writableCallback) { + return; + } + + ScopedLock<Mutex> lock(stateLock); + switch(state) { + case IDLE: + case DELAYED_IDLE: + break; + case DELAYED_W: + case DELAYED_RW: + case DELAYED_DELETE: + break; + case DELAYED_R: + state = DELAYED_RW; + break; + case DELAYED_INACTIVE: + state = DELAYED_W; + break; + case INACTIVE: + assert(poller); + poller->modFd(*this, Poller::OUTPUT); + state = ACTIVE_W; + break; + case ACTIVE_R: + assert(poller); + poller->modFd(*this, Poller::INOUT); + state = ACTIVE_RW; + break; + case ACTIVE_W: + case ACTIVE_RW: + // Nothing to do: already waiting for writable + break; + } +} + +void DispatchHandle::unwatchRead() { + if (!readableCallback) { + return; + } + + ScopedLock<Mutex> lock(stateLock); + switch(state) { + case IDLE: + case DELAYED_IDLE: + break; + case DELAYED_R: + state = DELAYED_INACTIVE; + break; + case DELAYED_RW: + state = DELAYED_W; + break; + case DELAYED_W: + case DELAYED_INACTIVE: + case DELAYED_DELETE: + break; + case ACTIVE_R: + assert(poller); + poller->modFd(*this, Poller::NONE); + state = INACTIVE; + break; + case ACTIVE_RW: + assert(poller); + poller->modFd(*this, Poller::OUTPUT); + state = ACTIVE_W; + break; + case ACTIVE_W: + case INACTIVE: + break; + } +} + +void DispatchHandle::unwatchWrite() { + if (!writableCallback) { + return; + } + + ScopedLock<Mutex> lock(stateLock); + switch(state) { + case IDLE: + case DELAYED_IDLE: + break; + case DELAYED_W: + state = DELAYED_INACTIVE; + break; + case DELAYED_RW: + state = DELAYED_R; + break; + case DELAYED_R: + case DELAYED_INACTIVE: + case DELAYED_DELETE: + break; + case ACTIVE_W: + assert(poller); + poller->modFd(*this, Poller::NONE); + state = INACTIVE; + break; + case ACTIVE_RW: + assert(poller); + poller->modFd(*this, Poller::INPUT); + state = ACTIVE_R; + break; + case ACTIVE_R: + case INACTIVE: + break; + } +} + +void DispatchHandle::unwatch() { + ScopedLock<Mutex> lock(stateLock); + switch (state) { + case IDLE: + case DELAYED_IDLE: + break; + case DELAYED_R: + case DELAYED_W: + case DELAYED_RW: + case DELAYED_INACTIVE: + state = DELAYED_INACTIVE; + break; + case DELAYED_DELETE: + break; + default: + assert(poller); + poller->modFd(*this, Poller::NONE); + state = INACTIVE; + break; + } +} + +void DispatchHandle::stopWatch() { + ScopedLock<Mutex> lock(stateLock); + switch (state) { + case IDLE: + case DELAYED_IDLE: + case DELAYED_DELETE: + return; + case DELAYED_R: + case DELAYED_W: + case DELAYED_RW: + case DELAYED_INACTIVE: + state = DELAYED_IDLE; + break; + default: + state = IDLE; + break; + } + assert(poller); + poller->delFd(*this); + poller.reset(); +} + +// The slightly strange switch structure +// is to ensure that the lock is released before +// we do the delete +void DispatchHandle::doDelete() { + // Ensure that we're no longer watching anything + stopWatch(); + + // If we're in the middle of a callback defer the delete + { + ScopedLock<Mutex> lock(stateLock); + switch (state) { + case DELAYED_IDLE: + case DELAYED_DELETE: + state = DELAYED_DELETE; + return; + case IDLE: + break; + default: + // Can only get out of stopWatch() in DELAYED_IDLE/DELAYED_DELETE/IDLE states + assert(false); + } + } + // If we're not then do it right away + delete this; +} + +void DispatchHandle::processEvent(Poller::EventType type) { + // Note that we are now doing the callbacks + { + ScopedLock<Mutex> lock(stateLock); + + // Set up to wait for same events next time unless reset + switch(state) { + case ACTIVE_R: + state = DELAYED_R; + break; + case ACTIVE_W: + state = DELAYED_W; + break; + case ACTIVE_RW: + state = DELAYED_RW; + break; + // Can only get here in a DELAYED_* state in the rare case + // that we're already here for reading and we get activated for + // writing and we can write (it might be possible the other way + // round too). In this case we're already processing the handle + // in a different thread in this function so return right away + case DELAYED_R: + case DELAYED_W: + case DELAYED_RW: + case DELAYED_INACTIVE: + case DELAYED_IDLE: + case DELAYED_DELETE: + return; + default: + assert(false); + } + } + + // Do callbacks - whilst we are doing the callbacks we are prevented from processing + // the same handle until we re-enable it. To avoid rentering the callbacks for a single + // handle re-enabling in the callbacks is actually deferred until they are complete. + switch (type) { + case Poller::READABLE: + readableCallback(*this); + break; + case Poller::WRITABLE: + writableCallback(*this); + break; + case Poller::READ_WRITABLE: + readableCallback(*this); + writableCallback(*this); + break; + case Poller::DISCONNECTED: + { + ScopedLock<Mutex> lock(stateLock); + state = DELAYED_INACTIVE; + } + if (disconnectedCallback) { + disconnectedCallback(*this); + } + break; + default: + assert(false); + } + + // If any of the callbacks re-enabled reading/writing then actually + // do it now + { + ScopedLock<Mutex> lock(stateLock); + switch (state) { + case DELAYED_R: + poller->modFd(*this, Poller::INPUT); + state = ACTIVE_R; + return; + case DELAYED_W: + poller->modFd(*this, Poller::OUTPUT); + state = ACTIVE_W; + return; + case DELAYED_RW: + poller->modFd(*this, Poller::INOUT); + state = ACTIVE_RW; + return; + case DELAYED_INACTIVE: + state = INACTIVE; + return; + case DELAYED_IDLE: + state = IDLE; + return; + default: + // This should be impossible + assert(false); + return; + case DELAYED_DELETE: + break; + } + } + delete this; +} + +}} diff --git a/qpid/cpp/src/qpid/sys/DispatchHandle.h b/qpid/cpp/src/qpid/sys/DispatchHandle.h new file mode 100644 index 0000000000..219f2c53d6 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/DispatchHandle.h @@ -0,0 +1,146 @@ +#ifndef _sys_DispatchHandle_h +#define _sys_DispatchHandle_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Poller.h" +#include "Mutex.h" + +#include <boost/function.hpp> + + +namespace qpid { +namespace sys { + +class DispatchHandleRef; +/** + * In order to have your own handle (file descriptor on Unix) watched by the poller + * you need to: + * + * - Subclass IOHandle, in the constructor supply an appropriate + * IOHandlerPrivate object for the platform. + * + * - Construct a DispatchHandle passing it your IOHandle and + * callback functions for read, write and disconnect events. + * + * - Ensure the DispatchHandle is not deleted until the poller is no longer using it. + * TODO: astitcher document DispatchHandleRef to simplify this. + * + * When an event occurs on the handle, the poller calls the relevant callback and + * stops watching that handle. Your callback can call rewatch() or related functions + * to re-enable polling. + */ +class DispatchHandle : public PollerHandle { + friend class DispatchHandleRef; +public: + typedef boost::function1<void, DispatchHandle&> Callback; + +private: + Callback readableCallback; + Callback writableCallback; + Callback disconnectedCallback; + Poller::shared_ptr poller; + Mutex stateLock; + enum { + IDLE, INACTIVE, ACTIVE_R, ACTIVE_W, ACTIVE_RW, + DELAYED_IDLE, DELAYED_INACTIVE, DELAYED_R, DELAYED_W, DELAYED_RW, + DELAYED_DELETE + } state; + +public: + /** + * Provide a handle to poll and a set of callbacks. Note + * callbacks can be 0, meaning you are not interested in that + * event. + * + *@param h: the handle to watch. The IOHandle encapsulates a + * platfrom-specific handle to an IO object (e.g. a file descriptor + * on Unix.) + *@param rCb Callback called when the handle is readable. + *@param wCb Callback called when the handle is writable. + *@param dCb Callback called when the handle is disconnected. + */ + DispatchHandle(const IOHandle& h, Callback rCb, Callback wCb, Callback dCb) : + PollerHandle(h), + readableCallback(rCb), + writableCallback(wCb), + disconnectedCallback(dCb), + state(IDLE) + {} + + ~DispatchHandle(); + + /** Add this DispatchHandle to the poller to be watched. */ + void startWatch(Poller::shared_ptr poller); + + /** Resume watchingn for all non-0 callbacks. */ + void rewatch(); + /** Resume watchingn for read only. */ + void rewatchRead(); + + /** Resume watchingn for write only. */ + void rewatchWrite(); + + /** Stop watching temporarily. The DispatchHandle remains + associated with the poller and can be re-activated using + rewatch. */ + void unwatch(); + /** Stop watching for read */ + void unwatchRead(); + /** Stop watching for write */ + void unwatchWrite(); + + /** Stop watching permanently. Disassociates from the poller. */ + void stopWatch(); + +protected: + /** Override to get extra processing done when the DispatchHandle is deleted. */ + void doDelete(); + +private: + void processEvent(Poller::EventType dir); +}; + +class DispatchHandleRef { + DispatchHandle* ref; + +public: + typedef boost::function1<void, DispatchHandle&> Callback; + DispatchHandleRef(const IOHandle& h, Callback rCb, Callback wCb, Callback dCb) : + ref(new DispatchHandle(h, rCb, wCb, dCb)) + {} + + ~DispatchHandleRef() { ref->doDelete(); } + + void startWatch(Poller::shared_ptr poller) { ref->startWatch(poller); } + void rewatch() { ref->rewatch(); } + void rewatchRead() { ref->rewatchRead(); } + void rewatchWrite() { ref->rewatchWrite(); } + void unwatch() { ref->unwatch(); } + void unwatchRead() { ref->unwatchRead(); } + void unwatchWrite() { ref->unwatchWrite(); } + void stopWatch() { ref->stopWatch(); } +}; + +}} + +#endif // _sys_DispatchHandle_h diff --git a/qpid/cpp/src/qpid/sys/Dispatcher.cpp b/qpid/cpp/src/qpid/sys/Dispatcher.cpp index 02a62c8e66..8d1d1b79f5 100644 --- a/qpid/cpp/src/qpid/sys/Dispatcher.cpp +++ b/qpid/cpp/src/qpid/sys/Dispatcher.cpp @@ -21,8 +21,6 @@ #include "Dispatcher.h" -#include <boost/cast.hpp> - #include <assert.h> namespace qpid { @@ -58,382 +56,4 @@ dispatcher_shutdown: ; } -DispatchHandle::~DispatchHandle() { - stopWatch(); -} - -void DispatchHandle::startWatch(Poller::shared_ptr poller0) { - bool r = readableCallback; - bool w = writableCallback; - - ScopedLock<Mutex> lock(stateLock); - assert(state == IDLE); - - // If no callbacks set then do nothing (that is what we were asked to do!) - // TODO: Maybe this should be an assert instead - if (!r && !w) { - state = INACTIVE; - return; - } - - Poller::Direction d = r ? - (w ? Poller::INOUT : Poller::IN) : - Poller::OUT; - - poller = poller0; - poller->addFd(*this, d); - - state = r ? - (w ? ACTIVE_RW : ACTIVE_R) : - ACTIVE_W; -} - -void DispatchHandle::rewatch() { - bool r = readableCallback; - bool w = writableCallback; - - ScopedLock<Mutex> lock(stateLock); - switch(state) { - case IDLE: - case DELAYED_IDLE: - break; - case DELAYED_R: - case DELAYED_W: - case DELAYED_INACTIVE: - state = r ? - (w ? DELAYED_RW : DELAYED_R) : - DELAYED_W; - break; - case DELAYED_DELETE: - break; - case INACTIVE: - case ACTIVE_R: - case ACTIVE_W: { - assert(poller); - Poller::Direction d = r ? - (w ? Poller::INOUT : Poller::IN) : - Poller::OUT; - poller->modFd(*this, d); - state = r ? - (w ? ACTIVE_RW : ACTIVE_R) : - ACTIVE_W; - break; - } - case DELAYED_RW: - case ACTIVE_RW: - // Don't need to do anything already waiting for readable/writable - break; - } -} - -void DispatchHandle::rewatchRead() { - if (!readableCallback) { - return; - } - - ScopedLock<Mutex> lock(stateLock); - switch(state) { - case IDLE: - case DELAYED_IDLE: - break; - case DELAYED_R: - case DELAYED_RW: - case DELAYED_DELETE: - break; - case DELAYED_W: - state = DELAYED_RW; - break; - case DELAYED_INACTIVE: - state = DELAYED_R; - break; - case ACTIVE_R: - case ACTIVE_RW: - // Nothing to do: already waiting for readable - break; - case INACTIVE: - assert(poller); - poller->modFd(*this, Poller::IN); - state = ACTIVE_R; - break; - case ACTIVE_W: - assert(poller); - poller->modFd(*this, Poller::INOUT); - state = ACTIVE_RW; - break; - } -} - -void DispatchHandle::rewatchWrite() { - if (!writableCallback) { - return; - } - - ScopedLock<Mutex> lock(stateLock); - switch(state) { - case IDLE: - case DELAYED_IDLE: - break; - case DELAYED_W: - case DELAYED_RW: - case DELAYED_DELETE: - break; - case DELAYED_R: - state = DELAYED_RW; - break; - case DELAYED_INACTIVE: - state = DELAYED_W; - break; - case INACTIVE: - assert(poller); - poller->modFd(*this, Poller::OUT); - state = ACTIVE_W; - break; - case ACTIVE_R: - assert(poller); - poller->modFd(*this, Poller::INOUT); - state = ACTIVE_RW; - break; - case ACTIVE_W: - case ACTIVE_RW: - // Nothing to do: already waiting for writable - break; - } -} - -void DispatchHandle::unwatchRead() { - if (!readableCallback) { - return; - } - - ScopedLock<Mutex> lock(stateLock); - switch(state) { - case IDLE: - case DELAYED_IDLE: - break; - case DELAYED_R: - state = DELAYED_INACTIVE; - break; - case DELAYED_RW: - state = DELAYED_W; - break; - case DELAYED_W: - case DELAYED_INACTIVE: - case DELAYED_DELETE: - break; - case ACTIVE_R: - assert(poller); - poller->modFd(*this, Poller::NONE); - state = INACTIVE; - break; - case ACTIVE_RW: - assert(poller); - poller->modFd(*this, Poller::OUT); - state = ACTIVE_W; - break; - case ACTIVE_W: - case INACTIVE: - break; - } -} - -void DispatchHandle::unwatchWrite() { - if (!writableCallback) { - return; - } - - ScopedLock<Mutex> lock(stateLock); - switch(state) { - case IDLE: - case DELAYED_IDLE: - break; - case DELAYED_W: - state = DELAYED_INACTIVE; - break; - case DELAYED_RW: - state = DELAYED_R; - break; - case DELAYED_R: - case DELAYED_INACTIVE: - case DELAYED_DELETE: - break; - case ACTIVE_W: - assert(poller); - poller->modFd(*this, Poller::NONE); - state = INACTIVE; - break; - case ACTIVE_RW: - assert(poller); - poller->modFd(*this, Poller::IN); - state = ACTIVE_R; - break; - case ACTIVE_R: - case INACTIVE: - break; - } -} - -void DispatchHandle::unwatch() { - ScopedLock<Mutex> lock(stateLock); - switch (state) { - case IDLE: - case DELAYED_IDLE: - break; - case DELAYED_R: - case DELAYED_W: - case DELAYED_RW: - case DELAYED_INACTIVE: - state = DELAYED_INACTIVE; - break; - case DELAYED_DELETE: - break; - default: - assert(poller); - poller->modFd(*this, Poller::NONE); - state = INACTIVE; - break; - } -} - -void DispatchHandle::stopWatch() { - ScopedLock<Mutex> lock(stateLock); - switch (state) { - case IDLE: - case DELAYED_IDLE: - case DELAYED_DELETE: - return; - case DELAYED_R: - case DELAYED_W: - case DELAYED_RW: - case DELAYED_INACTIVE: - state = DELAYED_IDLE; - break; - default: - state = IDLE; - break; - } - assert(poller); - poller->delFd(*this); - poller.reset(); -} - -// The slightly strange switch structure -// is to ensure that the lock is released before -// we do the delete -void DispatchHandle::doDelete() { - // Ensure that we're no longer watching anything - stopWatch(); - - // If we're in the middle of a callback defer the delete - { - ScopedLock<Mutex> lock(stateLock); - switch (state) { - case DELAYED_IDLE: - case DELAYED_DELETE: - state = DELAYED_DELETE; - return; - case IDLE: - break; - default: - // Can only get out of stopWatch() in DELAYED_IDLE/DELAYED_DELETE/IDLE states - assert(false); - } - } - // If we're not then do it right away - delete this; -} - -void DispatchHandle::processEvent(Poller::EventType type) { - // Note that we are now doing the callbacks - { - ScopedLock<Mutex> lock(stateLock); - - // Set up to wait for same events next time unless reset - switch(state) { - case ACTIVE_R: - state = DELAYED_R; - break; - case ACTIVE_W: - state = DELAYED_W; - break; - case ACTIVE_RW: - state = DELAYED_RW; - break; - // Can only get here in a DELAYED_* state in the rare case - // that we're already here for reading and we get activated for - // writing and we can write (it might be possible the other way - // round too). In this case we're already processing the handle - // in a different thread in this function so return right away - case DELAYED_R: - case DELAYED_W: - case DELAYED_RW: - case DELAYED_INACTIVE: - case DELAYED_IDLE: - case DELAYED_DELETE: - return; - default: - assert(false); - } - } - - // Do callbacks - whilst we are doing the callbacks we are prevented from processing - // the same handle until we re-enable it. To avoid rentering the callbacks for a single - // handle re-enabling in the callbacks is actually deferred until they are complete. - switch (type) { - case Poller::READABLE: - readableCallback(*this); - break; - case Poller::WRITABLE: - writableCallback(*this); - break; - case Poller::READ_WRITABLE: - readableCallback(*this); - writableCallback(*this); - break; - case Poller::DISCONNECTED: - { - ScopedLock<Mutex> lock(stateLock); - state = DELAYED_INACTIVE; - } - if (disconnectedCallback) { - disconnectedCallback(*this); - } - break; - default: - assert(false); - } - - // If any of the callbacks re-enabled reading/writing then actually - // do it now - { - ScopedLock<Mutex> lock(stateLock); - switch (state) { - case DELAYED_R: - poller->modFd(*this, Poller::IN); - state = ACTIVE_R; - return; - case DELAYED_W: - poller->modFd(*this, Poller::OUT); - state = ACTIVE_W; - return; - case DELAYED_RW: - poller->modFd(*this, Poller::INOUT); - state = ACTIVE_RW; - return; - case DELAYED_INACTIVE: - state = INACTIVE; - return; - case DELAYED_IDLE: - state = IDLE; - return; - default: - // This should be impossible - assert(false); - return; - case DELAYED_DELETE: - break; - } - } - delete this; -} - }} diff --git a/qpid/cpp/src/qpid/sys/Dispatcher.h b/qpid/cpp/src/qpid/sys/Dispatcher.h index 8e34354f9e..f7c9e8d731 100644 --- a/qpid/cpp/src/qpid/sys/Dispatcher.h +++ b/qpid/cpp/src/qpid/sys/Dispatcher.h @@ -24,129 +24,10 @@ #include "Poller.h" #include "Runnable.h" -#include "Mutex.h" - -#include <memory> -#include <queue> -#include <boost/function.hpp> - -#include <assert.h> - namespace qpid { namespace sys { -class DispatchHandleRef; -/** - * In order to have your own handle (file descriptor on Unix) watched by the poller - * you need to: - * - * - Subclass IOHandle, in the constructor supply an appropriate - * IOHandlerPrivate object for the platform. - * - * - Construct a DispatchHandle passing it your IOHandle and - * callback functions for read, write and disconnect events. - * - * - Ensure the DispatchHandle is not deleted until the poller is no longer using it. - * TODO: astitcher document DispatchHandleRef to simplify this. - * - * When an event occurs on the handle, the poller calls the relevant callback and - * stops watching that handle. Your callback can call rewatch() or related functions - * to re-enable polling. - */ -class DispatchHandle : public PollerHandle { - friend class DispatchHandleRef; -public: - typedef boost::function1<void, DispatchHandle&> Callback; - -private: - Callback readableCallback; - Callback writableCallback; - Callback disconnectedCallback; - Poller::shared_ptr poller; - Mutex stateLock; - enum { - IDLE, INACTIVE, ACTIVE_R, ACTIVE_W, ACTIVE_RW, - DELAYED_IDLE, DELAYED_INACTIVE, DELAYED_R, DELAYED_W, DELAYED_RW, - DELAYED_DELETE - } state; - -public: - /** - * Provide a handle to poll and a set of callbacks. Note - * callbacks can be 0, meaning you are not interested in that - * event. - * - *@param h: the handle to watch. The IOHandle encapsulates a - * platfrom-specific handle to an IO object (e.g. a file descriptor - * on Unix.) - *@param rCb Callback called when the handle is readable. - *@param wCb Callback called when the handle is writable. - *@param dCb Callback called when the handle is disconnected. - */ - DispatchHandle(const IOHandle& h, Callback rCb, Callback wCb, Callback dCb) : - PollerHandle(h), - readableCallback(rCb), - writableCallback(wCb), - disconnectedCallback(dCb), - state(IDLE) - {} - - ~DispatchHandle(); - - /** Add this DispatchHandle to the poller to be watched. */ - void startWatch(Poller::shared_ptr poller); - - /** Resume watchingn for all non-0 callbacks. */ - void rewatch(); - /** Resume watchingn for read only. */ - void rewatchRead(); - - /** Resume watchingn for write only. */ - void rewatchWrite(); - - /** Stop watching temporarily. The DispatchHandle remains - associated with the poller and can be re-activated using - rewatch. */ - void unwatch(); - /** Stop watching for read */ - void unwatchRead(); - /** Stop watching for write */ - void unwatchWrite(); - - /** Stop watching permanently. Disassociates from the poller. */ - void stopWatch(); - -protected: - /** Override to get extra processing done when the DispatchHandle is deleted. */ - void doDelete(); - -private: - void processEvent(Poller::EventType dir); -}; - -class DispatchHandleRef { - DispatchHandle* ref; - -public: - typedef boost::function1<void, DispatchHandle&> Callback; - DispatchHandleRef(const IOHandle& h, Callback rCb, Callback wCb, Callback dCb) : - ref(new DispatchHandle(h, rCb, wCb, dCb)) - {} - - ~DispatchHandleRef() { ref->doDelete(); } - - void startWatch(Poller::shared_ptr poller) { ref->startWatch(poller); } - void rewatch() { ref->rewatch(); } - void rewatchRead() { ref->rewatchRead(); } - void rewatchWrite() { ref->rewatchWrite(); } - void unwatch() { ref->unwatch(); } - void unwatchRead() { ref->unwatchRead(); } - void unwatchWrite() { ref->unwatchWrite(); } - void stopWatch() { ref->stopWatch(); } -}; - - class Dispatcher : public Runnable { const Poller::shared_ptr poller; diff --git a/qpid/cpp/src/qpid/sys/Poller.h b/qpid/cpp/src/qpid/sys/Poller.h index e39528bdb5..6b7f4d818e 100644 --- a/qpid/cpp/src/qpid/sys/Poller.h +++ b/qpid/cpp/src/qpid/sys/Poller.h @@ -45,8 +45,8 @@ public: enum Direction { NONE = 0, - IN, - OUT, + INPUT, + OUTPUT, INOUT }; diff --git a/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp b/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp index c4bc99837e..a66aca1849 100644 --- a/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp +++ b/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp @@ -92,13 +92,14 @@ void AsynchIOProtocolFactory::established(Poller::shared_ptr poller, const Socke if (isClient) async->setClient(); - AsynchIO* aio = new AsynchIO(s, - boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), - boost::bind(&AsynchIOHandler::eof, async, _1), - boost::bind(&AsynchIOHandler::disconnect, async, _1), - boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), - boost::bind(&AsynchIOHandler::nobuffs, async, _1), - boost::bind(&AsynchIOHandler::idle, async, _1)); + AsynchIO* aio = AsynchIO::create + (s, + boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), + boost::bind(&AsynchIOHandler::eof, async, _1), + boost::bind(&AsynchIOHandler::disconnect, async, _1), + boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), + boost::bind(&AsynchIOHandler::nobuffs, async, _1), + boost::bind(&AsynchIOHandler::idle, async, _1)); async->init(aio, 4); aio->start(poller); @@ -133,9 +134,13 @@ void AsynchIOProtocolFactory::connect( // is no longer needed. Socket* socket = new Socket(); - new AsynchConnector (*socket, poller, host, port, - boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, true), - failed); + AsynchConnector::create (*socket, + poller, + host, + port, + boost::bind(&AsynchIOProtocolFactory::established, + this, poller, _1, fact, true), + failed); } }} // namespace qpid::sys diff --git a/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp b/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp index 5e20e312e0..d4627ec721 100644 --- a/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp +++ b/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp @@ -162,9 +162,9 @@ class PollerPrivate { static ::__uint32_t directionToEpollEvent(Poller::Direction dir) { switch (dir) { - case Poller::IN: return ::EPOLLIN; - case Poller::OUT: return ::EPOLLOUT; - case Poller::INOUT: return ::EPOLLIN | ::EPOLLOUT; + case Poller::INPUT: return ::EPOLLIN; + case Poller::OUTPUT: return ::EPOLLOUT; + case Poller::INOUT: return ::EPOLLIN | ::EPOLLOUT; default: return 0; } } diff --git a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp index 7598eefe83..c8a8b3d0f1 100644 --- a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp +++ b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp @@ -26,8 +26,8 @@ #include "check.h" -// TODO The basic algorithm here is not really POSIX specific and with a bit more abstraction -// could (should) be promoted to be platform portable +// TODO The basic algorithm here is not really POSIX specific and with a +// bit more abstraction could (should) be promoted to be platform portable #include <unistd.h> #include <sys/socket.h> #include <signal.h> @@ -65,24 +65,55 @@ __thread int64_t threadMaxReadTimeNs = 2 * 1000000; // start at 2ms /* * Asynch Acceptor */ +namespace qpid { +namespace sys { + +class AsynchAcceptorPrivate { +public: + AsynchAcceptorPrivate(const Socket& s, AsynchAcceptor::Callback callback); + void start(Poller::shared_ptr poller); + +private: + void readable(DispatchHandle& handle); + +private: + AsynchAcceptor::Callback acceptedCallback; + DispatchHandle handle; + const Socket& socket; + +}; + +}} // namespace qpid::sys AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback) : + impl(new AsynchAcceptorPrivate(s, callback)) +{} + +AsynchAcceptor::~AsynchAcceptor() +{ delete impl;} + +void AsynchAcceptor::start(Poller::shared_ptr poller) { + impl->start(poller); +} + +AsynchAcceptorPrivate::AsynchAcceptorPrivate(const Socket& s, + AsynchAcceptor::Callback callback) : acceptedCallback(callback), - handle(s, boost::bind(&AsynchAcceptor::readable, this, _1), 0, 0), + handle(s, boost::bind(&AsynchAcceptorPrivate::readable, this, _1), 0, 0), socket(s) { s.setNonblocking(); ignoreSigpipe(); } -void AsynchAcceptor::start(Poller::shared_ptr poller) { +void AsynchAcceptorPrivate::start(Poller::shared_ptr poller) { handle.startWatch(poller); } /* * We keep on accepting as long as there is something to accept */ -void AsynchAcceptor::readable(DispatchHandle& h) { +void AsynchAcceptorPrivate::readable(DispatchHandle& h) { Socket* s; do { errno = 0; @@ -106,6 +137,36 @@ void AsynchAcceptor::readable(DispatchHandle& h) { /* * Asynch Connector */ +namespace qpid { +namespace sys { +namespace posix { + +/* + * POSIX version of AsynchIO TCP socket connector. + * + * The class is implemented in terms of DispatchHandle to allow it to be + * deleted by deleting the contained DispatchHandle. + */ +class AsynchConnector : public qpid::sys::AsynchConnector, + private DispatchHandle { + +private: + void connComplete(DispatchHandle& handle); + void failure(int, std::string); + +private: + ConnectedCallback connCallback; + FailedCallback failCallback; + const Socket& socket; + +public: + AsynchConnector(const Socket& socket, + Poller::shared_ptr poller, + std::string hostname, + uint16_t port, + ConnectedCallback connCb, + FailedCallback failCb = 0); +}; AsynchConnector::AsynchConnector(const Socket& s, Poller::shared_ptr poller, @@ -155,9 +216,85 @@ void AsynchConnector::failure(int errCode, std::string message) DispatchHandle::doDelete(); } +} // namespace posix + + +AsynchConnector* qpid::sys::AsynchConnector::create(const Socket& s, + Poller::shared_ptr poller, + std::string hostname, + uint16_t port, + ConnectedCallback connCb, + FailedCallback failCb) +{ + return new qpid::sys::posix::AsynchConnector(s, + poller, + hostname, + port, + connCb, + failCb); +} + /* - * Asynch reader/writer + * POSIX version of AsynchIO reader/writer + * + * The class is implemented in terms of DispatchHandle to allow it to be + * deleted by deleting the contained DispatchHandle. */ +namespace posix { + +class AsynchIO : public qpid::sys::AsynchIO, private DispatchHandle { + +public: + AsynchIO(const Socket& s, + ReadCallback rCb, + EofCallback eofCb, + DisconnectCallback disCb, + ClosedCallback cCb = 0, + BuffersEmptyCallback eCb = 0, + IdleCallback iCb = 0); + + // Methods inherited from qpid::sys::AsynchIO + + virtual void queueForDeletion(); + + virtual void start(Poller::shared_ptr poller); + virtual void queueReadBuffer(BufferBase* buff); + virtual void unread(BufferBase* buff); + virtual void queueWrite(BufferBase* buff); + virtual void notifyPendingWrite(); + virtual void queueWriteClose(); + virtual bool writeQueueEmpty(); + virtual BufferBase* getQueuedBuffer(); + +private: + ~AsynchIO(); + + // Methods that are callback targets from Dispatcher. + void readable(DispatchHandle& handle); + void writeable(DispatchHandle& handle); + void disconnected(DispatchHandle& handle); + void close(DispatchHandle& handle); + +private: + ReadCallback readCallback; + EofCallback eofCallback; + DisconnectCallback disCallback; + ClosedCallback closedCallback; + BuffersEmptyCallback emptyCallback; + IdleCallback idleCallback; + const Socket& socket; + std::deque<BufferBase*> bufferQueue; + std::deque<BufferBase*> writeQueue; + bool queuedClose; + /** + * This flag is used to detect and handle concurrency between + * calls to notifyPendingWrite() (which can be made from any thread) and + * the execution of the writeable() method (which is always on the + * thread processing this handle. + */ + volatile bool writePending; +}; + AsynchIO::AsynchIO(const Socket& s, ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb, ClosedCallback cCb, BuffersEmptyCallback eCb, IdleCallback iCb) : @@ -239,6 +376,10 @@ void AsynchIO::queueWriteClose() { DispatchHandle::rewatchWrite(); } +bool AsynchIO::writeQueueEmpty() { + return writeQueue.empty(); +} + /** Return a queued buffer if there are enough * to spare */ @@ -427,3 +568,17 @@ void AsynchIO::close(DispatchHandle& h) { } } +} // namespace posix + +AsynchIO* qpid::sys::AsynchIO::create(const Socket& s, + AsynchIO::ReadCallback rCb, + AsynchIO::EofCallback eofCb, + AsynchIO::DisconnectCallback disCb, + AsynchIO::ClosedCallback cCb, + AsynchIO::BuffersEmptyCallback eCb, + AsynchIO::IdleCallback iCb) +{ + return new qpid::sys::posix::AsynchIO(s, rCb, eofCb, disCb, cCb, eCb, iCb); +} + +}} // namespace qpid::sys diff --git a/qpid/cpp/src/qpid/sys/solaris/ECFPoller.cpp b/qpid/cpp/src/qpid/sys/solaris/ECFPoller.cpp index fefa21dae1..783f84576b 100644 --- a/qpid/cpp/src/qpid/sys/solaris/ECFPoller.cpp +++ b/qpid/cpp/src/qpid/sys/solaris/ECFPoller.cpp @@ -129,9 +129,9 @@ class PollerPrivate { static uint32_t directionToPollEvent(Poller::Direction dir) { switch (dir) { - case Poller::IN: return POLLIN; - case Poller::OUT: return POLLOUT; - case Poller::INOUT: return POLLIN | POLLOUT; + case Poller::INPUT: return POLLIN; + case Poller::OUTPUT: return POLLOUT; + case Poller::INOUT: return POLLIN | POLLOUT; default: return 0; } } diff --git a/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp new file mode 100644 index 0000000000..bde1213131 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp @@ -0,0 +1,729 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "AsynchIoResult.h" +#include "IoHandlePrivate.h" +#include "qpid/sys/AsynchIO.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/Socket.h" +#include "qpid/sys/Thread.h" +#include "qpid/sys/Time.h" +#include "qpid/log/Statement.h" + +#include "check.h" + +#include <boost/thread/once.hpp> + +#include <winsock2.h> +#include <mswsock.h> +#include <windows.h> + +#include <boost/bind.hpp> + +namespace { + + typedef qpid::sys::ScopedLock<qpid::sys::Mutex> QLock; + +/* + * We keep per thread state to avoid locking overhead. The assumption is that + * on average all the connections are serviced by all the threads so the state + * recorded in each thread is about the same. If this turns out not to be the + * case we could rebalance the info occasionally. + */ +QPID_TSS int threadReadTotal = 0; +QPID_TSS int threadMaxRead = 0; +QPID_TSS int threadReadCount = 0; +QPID_TSS int threadWriteTotal = 0; +QPID_TSS int threadWriteCount = 0; +QPID_TSS int64_t threadMaxReadTimeNs = 2 * 1000000; // start at 2ms + +/* + * The function pointers for AcceptEx and ConnectEx need to be looked up + * at run time. Make sure this is done only once. + */ +boost::once_flag lookUpAcceptExOnce = BOOST_ONCE_INIT; +LPFN_ACCEPTEX fnAcceptEx = 0; +typedef void (*lookUpFunc)(const qpid::sys::Socket &); + +void lookUpAcceptEx() { + SOCKET h = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + GUID guidAcceptEx = WSAID_ACCEPTEX; + DWORD dwBytes = 0; + WSAIoctl(h, + SIO_GET_EXTENSION_FUNCTION_POINTER, + &guidAcceptEx, + sizeof(guidAcceptEx), + &fnAcceptEx, + sizeof(fnAcceptEx), + &dwBytes, + NULL, + NULL); + closesocket(h); + if (fnAcceptEx == 0) + throw qpid::Exception(QPID_MSG("Failed to look up AcceptEx")); +} + +} + +namespace qpid { +namespace sys { + +/* + * Asynch Acceptor + * + * This implementation uses knowledge that the DispatchHandle handle member + * is derived from PollerHandle, which has a reference to the Socket. + * No dispatching features of DispatchHandle are used - we just use the + * conduit to the Socket. + * + * AsynchAcceptor uses an AsynchAcceptResult object to track completion + * and status of each accept operation outstanding. + */ + +class AsynchAcceptorPrivate { + + friend class AsynchAcceptResult; + +public: + AsynchAcceptorPrivate(const Socket& s, AsynchAcceptor::Callback callback); + ~AsynchAcceptorPrivate(); + void start(Poller::shared_ptr poller); + +private: + void restart(void); + + AsynchAcceptor::Callback acceptedCallback; + const Socket& socket; +}; + +AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback) : + impl(new AsynchAcceptorPrivate(s, callback)) +{} + +AsynchAcceptor::~AsynchAcceptor() +{ delete impl; } + +void AsynchAcceptor::start(Poller::shared_ptr poller) { + impl->start(poller); +} + +AsynchAcceptorPrivate::AsynchAcceptorPrivate(const Socket& s, + AsynchAcceptor::Callback callback) + : acceptedCallback(callback), + socket(s) { + + s.setNonblocking(); +#if (BOOST_VERSION >= 103500) /* boost 1.35 or later reversed the args */ + boost::call_once(lookUpAcceptExOnce, lookUpAcceptEx); +#else + boost::call_once(lookUpAcceptEx, lookUpAcceptExOnce); +#endif +} + +AsynchAcceptorPrivate::~AsynchAcceptorPrivate(void) { + socket.close(); +} + +void AsynchAcceptorPrivate::start(Poller::shared_ptr poller) { + poller->addFd(PollerHandle(socket), Poller::INPUT); + restart (); +} + +void AsynchAcceptor::restart(void) { + DWORD bytesReceived = 0; // Not used, needed for AcceptEx API + AsynchAcceptResult *result = new AsynchAcceptResult(acceptedCallback, + this, + toFd(socket.impl)); + BOOL status; + status = ::fnAcceptEx(toFd(socket.impl), + toFd(result->newSocket->impl), + result->addressBuffer, + 0, + AsynchAcceptResult::SOCKADDRMAXLEN, + AsynchAcceptResult::SOCKADDRMAXLEN, + &bytesReceived, + result->overlapped()); + QPID_WINDOWS_CHECK_ASYNC_START(status); +} + + +AsynchAcceptResult::AsynchAcceptResult(AsynchAcceptor::Callback cb, + AsynchAcceptor *acceptor, + SOCKET listener) + : callback(cb), acceptor(acceptor), listener(listener) { + newSocket.reset (new Socket()); +} + +void AsynchAcceptResult::success(size_t /*bytesTransferred*/) { + ::setsockopt (toFd(newSocket->impl), + SOL_SOCKET, + SO_UPDATE_ACCEPT_CONTEXT, + (char*)&listener, + sizeof (listener)); + callback(*(newSocket.release())); + acceptor->restart (); + delete this; +} + +void AsynchAcceptResult::failure(int status) { + if (status != WSA_OPERATION_ABORTED) + ; + delete this; +} + +namespace windows { + +/* + * AsynchConnector does synchronous connects for now... to do asynch the + * IocpPoller will need some extension to register an event handle as a + * CONNECT-type "direction", the connect completion/result will need an + * event handle to associate with the connecting handle. But there's no + * time for that right now... + */ +class AsynchConnector : public qpid::sys::AsynchConnector { +private: + ConnectedCallback connCallback; + FailedCallback failCallback; + const Socket& socket; + +public: + AsynchConnector(const Socket& socket, + Poller::shared_ptr poller, + std::string hostname, + uint16_t port, + ConnectedCallback connCb, + FailedCallback failCb = 0); +}; + +AsynchConnector::AsynchConnector(const Socket& sock, + Poller::shared_ptr poller, + std::string hostname, + uint16_t port, + ConnectedCallback connCb, + FailedCallback failCb) + : connCallback(connCb), failCallback(failCb), socket(sock) { + socket.setNonblocking(); + try { + socket.connect(hostname, port); + connCallback(socket); + } catch(std::exception& e) { + if (failCallback) + failCallback(-1, std::string(e.what())); + socket.close(); + delete &socket; + } +} + +} // namespace windows + +AsynchConnector* qpid::sys::AsynchConnector::create(const Socket& s, + Poller::shared_ptr poller, + std::string hostname, + uint16_t port, + ConnectedCallback connCb, + FailedCallback failCb) +{ + return new qpid::sys::windows::AsynchConnector(s, + poller, + hostname, + port, + connCb, + failCb); +} + + +/* + * Asynch reader/writer + */ + +namespace windows { + +class AsynchIO : public qpid::sys::AsynchIO { +public: + AsynchIO(const Socket& s, + ReadCallback rCb, + EofCallback eofCb, + DisconnectCallback disCb, + ClosedCallback cCb = 0, + BuffersEmptyCallback eCb = 0, + IdleCallback iCb = 0); + ~AsynchIO(); + + // Methods inherited from qpid::sys::AsynchIO + + /** + * Notify the object is should delete itself as soon as possible. + */ + virtual void queueForDeletion(); + + /// Take any actions needed to prepare for working with the poller. + virtual void start(Poller::shared_ptr poller); + virtual void queueReadBuffer(BufferBase* buff); + virtual void unread(BufferBase* buff); + virtual void queueWrite(BufferBase* buff); + virtual void notifyPendingWrite(); + virtual void queueWriteClose(); + virtual bool writeQueueEmpty(); + + /** + * getQueuedBuffer returns a buffer from the buffer queue, if one is + * available. + * + * @retval Pointer to BufferBase buffer; 0 if none is available. + */ + virtual BufferBase* getQueuedBuffer(); + +private: + ReadCallback readCallback; + EofCallback eofCallback; + DisconnectCallback disCallback; + ClosedCallback closedCallback; + BuffersEmptyCallback emptyCallback; + IdleCallback idleCallback; + const Socket& socket; + Poller::shared_ptr poller; + + std::deque<BufferBase*> bufferQueue; + std::deque<BufferBase*> writeQueue; + /* The MSVC-supplied deque is not thread-safe; keep locks to serialize + * access to the buffer queue and write queue. + */ + Mutex bufferQueueLock; + + // Number of outstanding I/O operations. + volatile LONG opsInProgress; + // Is there a write in progress? + volatile bool writeInProgress; + // Deletion requested, but there are callbacks in progress. + volatile bool queuedDelete; + // Socket close requested, but there are operations in progress. + volatile bool queuedClose; + +private: + void close(void); + + /** + * Initiate a read operation. AsynchIO::dispatchReadComplete() will be + * called when the read is complete and data is available. + */ + virtual void startRead(void); + + /** + * Initiate a write of the specified buffer. There's no callback for + * write completion to the AsynchIO object. + */ + virtual void startWrite(AsynchIO::BufferBase* buff); + + virtual bool writesNotComplete(); + + /** + * readComplete is called when a read request is complete. + * + * @param result Results of the operation. + */ + void readComplete(AsynchReadResult *result); + + /** + * writeComplete is called when a write request is complete. + * + * @param result Results of the operation. + */ + void writeComplete(AsynchWriteResult *result); + + /** + * Queue of completions to run. This queue enforces the requirement + * from upper layers that only one thread at a time is allowed to act + * on any given connection. Once a thread is busy processing a completion + * on this object, other threads that dispatch completions queue the + * completions here for the in-progress thread to handle when done. + * Thus, any threads can dispatch a completion from the IocpPoller, but + * this class ensures that actual processing at the connection level is + * only on one thread at a time. + */ + std::queue<AsynchIoResult *> completionQueue; + volatile bool working; + Mutex completionLock; + + /** + * Called when there's a completion to process. + */ + void completion(AsynchIoResult *result); +}; + +AsynchIO::AsynchIO(const Socket& s, + ReadCallback rCb, + EofCallback eofCb, + DisconnectCallback disCb, + ClosedCallback cCb, + BuffersEmptyCallback eCb, + IdleCallback iCb) : + + readCallback(rCb), + eofCallback(eofCb), + disCallback(disCb), + closedCallback(cCb), + emptyCallback(eCb), + idleCallback(iCb), + socket(s), + opsInProgress(0), + writeInProgress(false), + queuedDelete(false), + queuedClose(false), + working(false) { +} + +struct deleter +{ + template <typename T> + void operator()(T *ptr){ delete ptr;} +}; + +AsynchIO::~AsynchIO() { + std::for_each( bufferQueue.begin(), bufferQueue.end(), deleter()); + std::for_each( writeQueue.begin(), writeQueue.end(), deleter()); +} + +void AsynchIO::queueForDeletion() { + queuedDelete = true; + if (opsInProgress > 0) { + QPID_LOG(info, "Delete AsynchIO queued; ops in progress"); + // AsynchIOHandler calls this then deletes itself; don't do any more + // callbacks. + readCallback = 0; + eofCallback = 0; + disCallback = 0; + closedCallback = 0; + emptyCallback = 0; + idleCallback = 0; + } + else { + delete this; + } +} + +void AsynchIO::start(Poller::shared_ptr poller0) { + poller = poller0; + poller->addFd(PollerHandle(socket), Poller::INPUT); + if (writeQueue.size() > 0) // Already have data queued for write + notifyPendingWrite(); + startRead(); +} + +void AsynchIO::queueReadBuffer(AsynchIO::BufferBase* buff) { + assert(buff); + buff->dataStart = 0; + buff->dataCount = 0; + QLock l(bufferQueueLock); + bufferQueue.push_back(buff); +} + +void AsynchIO::unread(AsynchIO::BufferBase* buff) { + assert(buff); + if (buff->dataStart != 0) { + memmove(buff->bytes, buff->bytes+buff->dataStart, buff->dataCount); + buff->dataStart = 0; + } + QLock l(bufferQueueLock); + bufferQueue.push_front(buff); +} + +void AsynchIO::queueWrite(AsynchIO::BufferBase* buff) { + assert(buff); + QLock l(bufferQueueLock); + writeQueue.push_back(buff); + if (!writeInProgress) + notifyPendingWrite(); +} + +void AsynchIO::notifyPendingWrite() { + // This method is generally called from a processing thread; transfer + // work on this to an I/O thread. Much of the upper layer code assumes + // that all I/O-related things happen in an I/O thread. + if (poller == 0) // Not really going yet... + return; + + InterlockedIncrement(&opsInProgress); + IOHandlePrivate *hp = + new IOHandlePrivate (INVALID_SOCKET, + boost::bind(&AsynchIO::completion, this, _1)); + IOHandle h(hp); + PollerHandle ph(h); + poller->addFd(ph, Poller::OUTPUT); +} + +void AsynchIO::queueWriteClose() { + queuedClose = true; + if (!writeInProgress) + notifyPendingWrite(); +} + +bool AsynchIO::writeQueueEmpty() { + QLock l(bufferQueueLock); + return writeQueue.size() == 0; +} + +/** + * Return a queued buffer if there are enough to spare. + */ +AsynchIO::BufferBase* AsynchIO::getQueuedBuffer() { + QLock l(bufferQueueLock); + // Always keep at least one buffer (it might have data that was + // "unread" in it). + if (bufferQueue.size() <= 1) + return 0; + BufferBase* buff = bufferQueue.back(); + assert(buff); + bufferQueue.pop_back(); + return buff; +} + +void AsynchIO::dispatchReadComplete(AsynchIO::BufferBase *buffer) { + if (readCallback) + readCallback(*this, buffer); +} + +void AsynchIO::notifyEof(void) { + if (eofCallback) + eofCallback(*this); +} + +void AsynchIO::notifyDisconnect(void) { + if (disCallback) + disCallback(*this); +} + +void AsynchIO::notifyClosed(void) { + if (closedCallback) + closedCallback(*this, socket); +} + +void AsynchIO::notifyBuffersEmpty(void) { + if (emptyCallback) + emptyCallback(*this); +} + +void AsynchIO::notifyIdle(void) { + if (idleCallback) + idleCallback(*this); +} + +/* + * Asynch reader/writer using overlapped I/O + */ + +void AsynchIO::startRead(void) { + if (queuedDelete) + return; + + // (Try to) get a buffer; look on the front since there may be an + // "unread" one there with data remaining from last time. + AsynchIO::BufferBase *buff = 0; + { + QLock l(bufferQueueLock); + + if (!bufferQueue.empty()) { + buff = bufferQueue.front(); + assert(buff); + bufferQueue.pop_front(); + } + } + if (buff != 0) { + int readCount = buff->byteCount - buff->dataCount; + AsynchReadResult *result = + new AsynchReadResult(boost::bind(&AsynchIO::completion, this, _1), + buff, + readCount); + DWORD bytesReceived = 0, flags = 0; + InterlockedIncrement(&opsInProgress); + int status = WSARecv(toFd(socket.impl), + const_cast<LPWSABUF>(result->getWSABUF()), 1, + &bytesReceived, + &flags, + result->overlapped(), + 0); + if (status != 0) { + int error = WSAGetLastError(); + if (error != WSA_IO_PENDING) { + result->failure(error); + result = 0; // result is invalid here + return; + } + } + // On status 0 or WSA_IO_PENDING, completion will handle the rest. + } + else { + notifyBuffersEmpty(); + } + return; +} + +void AsynchIO::startWrite(AsynchIO::BufferBase* buff) { + writeInProgress = true; + InterlockedIncrement(&opsInProgress); + int writeCount = buff->byteCount-buff->dataCount; + AsynchWriteResult *result = + new AsynchWriteResult(boost::bind(&AsynchIO::completion, this, _1), + buff, + buff->dataCount); + DWORD bytesSent = 0; + int status = WSASend(toFd(socket.impl), + const_cast<LPWSABUF>(result->getWSABUF()), 1, + &bytesSent, + 0, + result->overlapped(), + 0); + if (status != 0) { + int error = WSAGetLastError(); + if (error != WSA_IO_PENDING) { + result->failure(error); // Also decrements in-progress count + result = 0; // result is invalid here + return; + } + } + // On status 0 or WSA_IO_PENDING, completion will handle the rest. + return; +} + +bool AsynchIO::writesNotComplete() { + return writeInProgress; +} + +/* + * Close the socket and callback to say we've done it + */ +void AsynchIO::close(void) { + socket.close(); + notifyClosed(); +} + +void AsynchIO::readComplete(AsynchReadResult *result) { + ++threadReadCount; + int status = result->getStatus(); + size_t bytes = result->getTransferred(); + if (status == 0 && bytes > 0) { + threadReadTotal += bytes; + dispatchReadComplete(result->getBuff()); + startRead(); + } + else { + // No data read, so put the buffer back. It may be partially filled, + // so "unread" it back to the front of the queue. + unread(result->getBuff()); + if (status == 0) + notifyEof(); + else + notifyDisconnect(); + } +} + +/* + * NOTE - this completion is called for completed writes and also when + * a write is desired. The difference is in the buff - if a write is desired + * the buff is 0. + */ +void AsynchIO::writeComplete(AsynchWriteResult *result) { + int status = result->getStatus(); + size_t bytes = result->getTransferred(); + AsynchIO::BufferBase *buff = result->getBuff(); + if (buff != 0) { + ++threadWriteCount; + writeInProgress = false; + if (status == 0 && bytes > 0) { + threadWriteTotal += bytes; + if (bytes < result->getRequested()) // Still more to go; resubmit + startWrite(buff); + else + queueReadBuffer(buff); // All done; back to the pool + } + else { + // An error... if it's a connection close, ignore it - it will be + // noticed and handled on a read completion any moment now. + // What to do with real error??? Save the Buffer? + } + } + + // If there are no writes outstanding, the priority is to write any + // remaining buffers first (either queued or via idle), then close the + // socket if that's queued. + // opsInProgress handled in completion() + if (!writeInProgress) { + bool writing = false; + { + QLock l(bufferQueueLock); + if (writeQueue.size() > 0) { + buff = writeQueue.front(); + assert(buff); + writeQueue.pop_front(); + startWrite(buff); + writing = true; + } + } + if (!writing) { + if (queuedClose) + close(); + else + notifyIdle(); + } + } + return; +} + +void AsynchIO::completion(AsynchIoResult *result) { + { + ScopedLock<Mutex> l(completionLock); + if (working) { + completionQueue.push(result); + return; + } + + // First thread in with something to do; note we're working then keep + // handling completions. + working = true; + while (result != 0) { + // New scope to unlock temporarily. + { + ScopedUnlock<Mutex> ul(completionLock); + AsynchReadResult *r = dynamic_cast<AsynchReadResult*>(result); + if (r != 0) + readComplete(r); + else { + AsynchWriteResult *w = + dynamic_cast<AsynchWriteResult*>(result); + writeComplete(w); + } + delete result; + result = 0; + InterlockedDecrement(&opsInProgress); + } + // Lock is held again. + if (completionQueue.empty()) + continue; + result = completionQueue.front(); + completionQueue.pop(); + } + working = false; + } + // Lock released; ok to delete if all is done. + if (opsInProgress == 0 && queuedDelete) + delete this; +} + +}} // namespace qpid::windows diff --git a/qpid/cpp/src/qpid/sys/windows/AsynchIoResult.h b/qpid/cpp/src/qpid/sys/windows/AsynchIoResult.h new file mode 100755 index 0000000000..9efdaebda8 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/AsynchIoResult.h @@ -0,0 +1,187 @@ +#ifndef _windows_asynchIoResult_h +#define _windows_asynchIoResult_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "AsynchIO.h" +#include "qpid/sys/Socket.h" +#include <memory.h> +#include <winsock2.h> +#include <ws2tcpip.h> + +namespace qpid { +namespace sys { + +/* + * AsynchIoResult defines the class that receives the result of an + * asynchronous I/O operation, either send/recv or accept/connect. + * + * Operation factories should set one of these up before beginning the + * operation. Poller knows how to dispatch completion to this class. + * This class must be subclassed for needed operations; this class provides + * an interface only and cannot be instantiated. + * + * This class is tied to Windows; it inherits from OVERLAPPED so that the + * IocpPoller can cast OVERLAPPED pointers back to AsynchIoResult and call + * the completion handler. + */ +class AsynchResult : private OVERLAPPED { +public: + LPOVERLAPPED overlapped(void) { return this; } + static AsynchResult* from_overlapped(LPOVERLAPPED ol) { + return static_cast<AsynchResult*>(ol); + } + virtual void success (size_t bytesTransferred) { + bytes = bytesTransferred; + status = 0; + complete(); + } + virtual void failure (int error) { + bytes = 0; + status = error; + complete(); + } + size_t getTransferred(void) const { return bytes; } + int getStatus(void) const { return status; } + +protected: + AsynchResult() : bytes(0), status(0) + { memset(overlapped(), 0, sizeof(OVERLAPPED)); } + ~AsynchResult() {} + virtual void complete(void) = 0; + + size_t bytes; + int status; +}; + +class AsynchAcceptorPrivate; +class AsynchAcceptResult : public AsynchResult { + + friend class AsynchAcceptorPrivate; + +public: + AsynchAcceptResult(AsynchAcceptor::Callback cb, + AsynchAcceptorPrivate *acceptor, + SOCKET listener); + virtual void success (size_t bytesTransferred); + virtual void failure (int error); + +private: + virtual void complete(void) {} // No-op for this class. + + std::auto_ptr<qpid::sys::Socket> newSocket; + AsynchAcceptor::Callback callback; + AsynchAcceptorPrivate *acceptor; + SOCKET listener; + + // AcceptEx needs a place to write the local and remote addresses + // when accepting the connection. Place those here; get enough for + // IPv6 addresses, even if the socket is IPv4. + enum { SOCKADDRMAXLEN = sizeof sockaddr_in6 + 16, + SOCKADDRBUFLEN = 2 * SOCKADDRMAXLEN }; + char addressBuffer[SOCKADDRBUFLEN]; +}; + +class AsynchIO; + +class AsynchIoResult : public AsynchResult { +public: + typedef boost::function1<void, AsynchIoResult *> Completer; + + virtual ~AsynchIoResult() {} + AsynchIO::BufferBase *getBuff(void) const { return iobuff; } + size_t getRequested(void) const { return requested; } + const WSABUF *getWSABUF(void) const { return &wsabuf; } + +protected: + void setBuff (AsynchIO::BufferBase *buffer) { iobuff = buffer; } + +protected: + AsynchIoResult(Completer cb, + AsynchIO::BufferBase *buff, size_t length) + : completionCallback(cb), iobuff(buff), requested(length) {} + + virtual void complete(void) = 0; + WSABUF wsabuf; + Completer completionCallback; + +private: + AsynchIO::BufferBase *iobuff; + size_t requested; // Number of bytes in original I/O request +}; + +class AsynchReadResult : public AsynchIoResult { + + // complete() updates buffer then does completion callback. + virtual void complete(void) { + getBuff()->dataCount += bytes; + completionCallback(this); + } + +public: + AsynchReadResult(AsynchIoResult::Completer cb, + AsynchIO::BufferBase *buff, + size_t length) + : AsynchIoResult(cb, buff, length) { + wsabuf.buf = buff->bytes + buff->dataCount; + wsabuf.len = length; + } +}; + +class AsynchWriteResult : public AsynchIoResult { + + // complete() updates buffer then does completion callback. + virtual void complete(void) { + AsynchIO::BufferBase *b = getBuff(); + b->dataStart += bytes; + b->dataCount -= bytes; + completionCallback(this); + } + +public: + AsynchWriteResult(AsynchIoResult::Completer cb, + AsynchIO::BufferBase *buff, + size_t length) + : AsynchIoResult(cb, buff, length) { + wsabuf.buf = buff ? buff->bytes : 0; + wsabuf.len = length; + } +}; + +class AsynchWriteWanted : public AsynchWriteResult { + + // complete() just does completion callback; no buffers used. + virtual void complete(void) { + completionCallback(this); + } + +public: + AsynchWriteWanted(AsynchIoResult::Completer cb) + : AsynchWriteResult(cb, 0, 0) { + wsabuf.buf = 0; + wsabuf.len = 0; + } +}; + +}} + +#endif /*!_windows_asynchIoResult_h*/ diff --git a/qpid/cpp/src/qpid/sys/windows/IOHandle.cpp b/qpid/cpp/src/qpid/sys/windows/IOHandle.cpp new file mode 100755 index 0000000000..ba544c8c90 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/IOHandle.cpp @@ -0,0 +1,42 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/IOHandle.h" +#include "IoHandlePrivate.h" +#include <windows.h> + +namespace qpid { +namespace sys { + +SOCKET toFd(const IOHandlePrivate* h) +{ + return h->fd; +} + +IOHandle::IOHandle(IOHandlePrivate* h) : + impl(h) +{} + +IOHandle::~IOHandle() { + delete impl; +} + +}} // namespace qpid::sys diff --git a/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h b/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h new file mode 100755 index 0000000000..18e75047ed --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h @@ -0,0 +1,52 @@ +#ifndef _sys_windows_IoHandlePrivate_h +#define _sys_windows_IoHandlePrivate_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "AsynchIoResult.h" + +#include <winsock2.h> + +namespace qpid { +namespace sys { + +// Private fd related implementation details +// There should be either a valid socket handle or a completer callback. +// Handle is used to associate with poller's iocp; completer is used to +// inject a completion that will very quickly trigger a callback to the +// completer from an I/O thread. +class IOHandlePrivate { +public: + IOHandlePrivate(SOCKET f = INVALID_SOCKET, + AsynchIoResult::Completer cb = 0) : + fd(f), event(cb) + {} + + SOCKET fd; + AsynchIoResult::Completer event; +}; + +SOCKET toFd(const IOHandlePrivate* h); + +}} + +#endif /* _sys_windows_IoHandlePrivate_h */ diff --git a/qpid/cpp/src/qpid/sys/windows/IocpDispatcher.cpp b/qpid/cpp/src/qpid/sys/windows/IocpDispatcher.cpp new file mode 100755 index 0000000000..1a0f6ce927 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/IocpDispatcher.cpp @@ -0,0 +1,54 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/Dispatcher.h" + +#include <assert.h> + +namespace qpid { +namespace sys { + +Dispatcher::Dispatcher(Poller::shared_ptr poller0) : + poller(poller0) { +} + +Dispatcher::~Dispatcher() { +} + +void Dispatcher::run() { + do { + Poller::Event event = poller->wait(); + + // Handle shutdown + switch (event.type) { + case Poller::SHUTDOWN: + return; + break; + case Poller::INVALID: // On any type of success or fail completion + break; + default: + // This should be impossible + assert(false); + } + } while (true); +} + +}} diff --git a/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp b/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp new file mode 100755 index 0000000000..44298ac8ea --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp @@ -0,0 +1,176 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/Poller.h" +#include "qpid/sys/Mutex.h" + +#include "AsynchIoResult.h" +#include "IoHandlePrivate.h" +#include "check.h" + +#include <winsock2.h> +#include <windows.h> + +#include <assert.h> +#include <vector> +#include <exception> + +namespace qpid { +namespace sys { + +class PollerHandlePrivate { + friend class Poller; + friend class PollerHandle; + + SOCKET fd; + AsynchIoResult::Completer cb; + + PollerHandlePrivate(SOCKET f, AsynchIoResult::Completer cb0 = 0) : + fd(f), cb(cb0) + { + } + +}; + +PollerHandle::PollerHandle(const IOHandle& h) : + impl(new PollerHandlePrivate(toFd(h.impl), h.impl->event)) +{} + +PollerHandle::~PollerHandle() { + delete impl; +} + +/** + * Concrete implementation of Poller to use the Windows I/O Completion + * port (IOCP) facility. + */ +class PollerPrivate { + friend class Poller; + + const HANDLE iocp; + + // The number of threads running the event loop. + volatile LONG threadsRunning; + + // Shutdown request is handled by setting isShutdown and injecting a + // well-formed completion event into the iocp. + bool isShutdown; + + PollerPrivate() : + iocp(::CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0)), + threadsRunning(0), + isShutdown(false) { + QPID_WINDOWS_CHECK_NULL(iocp); + } + + ~PollerPrivate() { + // It's probably okay to ignore any errors here as there can't be + // data loss + ::CloseHandle(iocp); + } +}; + +void Poller::addFd(PollerHandle& handle, Direction dir) { + HANDLE h = (HANDLE)(handle.impl->fd); + if (h != INVALID_HANDLE_VALUE) { + HANDLE iocpHandle = ::CreateIoCompletionPort (h, impl->iocp, 0, 0); + QPID_WINDOWS_CHECK_NULL(iocpHandle); + } + else { + AsynchWriteWanted *result = new AsynchWriteWanted(handle.impl->cb); + PostQueuedCompletionStatus(impl->iocp, 0, 0, result->overlapped()); + } +} + +void Poller::shutdown() { + // Allow sloppy code to shut us down more than once. + if (impl->isShutdown) + return; + ULONG_PTR key = 1; // Tell wait() it's a shutdown, not I/O + PostQueuedCompletionStatus(impl->iocp, 0, key, 0); +} + +// All no-ops... +void Poller::delFd(PollerHandle& handle) {} +void Poller::modFd(PollerHandle& handle, Direction dir) {} +void Poller::rearmFd(PollerHandle& handle) {} + +Poller::Event Poller::wait(Duration timeout) { + DWORD timeoutMs = 0; + DWORD numTransferred = 0; + ULONG_PTR completionKey = 0; + OVERLAPPED *overlapped = 0; + AsynchResult *result = 0; + + // Wait for either an I/O operation to finish (thus signaling the + // IOCP handle) or a shutdown request to be made (thus signaling the + // shutdown event). + if (timeout == TIME_INFINITE) + timeoutMs = INFINITE; + else + timeoutMs = static_cast<DWORD>(timeout / TIME_MSEC); + + InterlockedIncrement(&impl->threadsRunning); + bool goodOp = ::GetQueuedCompletionStatus (impl->iocp, + &numTransferred, + &completionKey, + &overlapped, + timeoutMs); + LONG remainingThreads = InterlockedDecrement(&impl->threadsRunning); + if (goodOp) { + // Dequeued a successful completion. If it's a posted packet from + // shutdown() the overlapped ptr is 0 and key is 1. Else downcast + // the OVERLAPPED pointer to an AsynchIoResult and call the + // completion handler. + if (overlapped == 0 && completionKey == 1) { + // If there are other threads still running this wait, re-post + // the completion. + if (remainingThreads > 0) + PostQueuedCompletionStatus(impl->iocp, 0, completionKey, 0); + return Event(0, SHUTDOWN); + } + + result = AsynchResult::from_overlapped(overlapped); + result->success (static_cast<size_t>(numTransferred)); + } + else { + if (overlapped != 0) { + // Dequeued a completion for a failed operation. Downcast back + // to the result object and inform it that the operation failed. + DWORD status = ::GetLastError(); + result = AsynchResult::from_overlapped(overlapped); + result->failure (static_cast<int>(status)); + } + } + return Event(0, INVALID); // TODO - this may need to be changed. + +} + +// Concrete constructors +Poller::Poller() : + impl(new PollerPrivate()) +{} + +Poller::~Poller() { + delete impl; +} + +}} diff --git a/qpid/cpp/src/qpid/sys/windows/Socket.cpp b/qpid/cpp/src/qpid/sys/windows/Socket.cpp new file mode 100755 index 0000000000..a9959bf43e --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/Socket.cpp @@ -0,0 +1,329 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/Socket.h" +#include "IoHandlePrivate.h" +#include "check.h" +#include "qpid/sys/Time.h" + +#include <cstdlib> +#include <string.h> +#include <iostream> +#include <memory.h> + +#include <winsock2.h> +#include <ws2tcpip.h> + +#include <boost/format.hpp> + +// Need to initialize WinSock. Ideally, this would be a singleton or embedded +// in some one-time initialization function. I tried boost singleton and could +// not get it to compile (and others located in google had the same problem). +// So, this simple static with an interlocked increment will do for known +// use cases at this time. Since this will only shut down winsock at process +// termination, there may be some problems with client programs that also +// expect to load and unload winsock, but we'll see... +// If someone does get an easy-to-use singleton sometime, converting to it +// may be preferable. + +namespace { + +static LONG volatile initialized = 0; + +class WinSockSetup { + // : public boost::details::pool::singleton_default<WinSockSetup> { + +public: + WinSockSetup() { + LONG timesEntered = InterlockedIncrement(&initialized); + if (timesEntered > 1) + return; + err = 0; + WORD wVersionRequested; + WSADATA wsaData; + + /* Request WinSock 2.2 */ + wVersionRequested = MAKEWORD(2, 2); + err = WSAStartup(wVersionRequested, &wsaData); + } + + ~WinSockSetup() { + WSACleanup(); + } + +public: + int error(void) const { return err; } + +protected: + DWORD err; +}; + +static WinSockSetup setup; + +} /* namespace */ + +namespace qpid { +namespace sys { + +namespace { + +std::string getName(SOCKET fd, bool local, bool includeService = false) +{ + sockaddr_in name; // big enough for any socket address + socklen_t namelen = sizeof(name); + if (local) { + QPID_WINSOCK_CHECK(::getsockname(fd, (sockaddr*)&name, &namelen)); + } else { + QPID_WINSOCK_CHECK(::getpeername(fd, (sockaddr*)&name, &namelen)); + } + + char servName[NI_MAXSERV]; + char dispName[NI_MAXHOST]; + if (includeService) { + if (int rc = ::getnameinfo((sockaddr*)&name, namelen, + dispName, sizeof(dispName), + servName, sizeof(servName), + NI_NUMERICHOST | NI_NUMERICSERV) != 0) + throw qpid::Exception(QPID_MSG(gai_strerror(rc))); + return std::string(dispName) + ":" + std::string(servName); + } else { + if (int rc = ::getnameinfo((sockaddr*)&name, namelen, + dispName, sizeof(dispName), + 0, 0, + NI_NUMERICHOST) != 0) + throw qpid::Exception(QPID_MSG(gai_strerror(rc))); + return dispName; + } +} + +std::string getService(SOCKET fd, bool local) +{ + sockaddr_in name; // big enough for any socket address + socklen_t namelen = sizeof(name); + + if (local) { + QPID_WINSOCK_CHECK(::getsockname(fd, (sockaddr*)&name, &namelen)); + } else { + QPID_WINSOCK_CHECK(::getpeername(fd, (sockaddr*)&name, &namelen)); + } + + char servName[NI_MAXSERV]; + if (int rc = ::getnameinfo((sockaddr*)&name, namelen, + 0, 0, + servName, sizeof(servName), + NI_NUMERICHOST | NI_NUMERICSERV) != 0) + throw qpid::Exception(QPID_MSG(gai_strerror(rc))); + return servName; +} +} // namespace + +Socket::Socket() : + IOHandle(new IOHandlePrivate) +{ + createTcp(); +} + +Socket::Socket(IOHandlePrivate* h) : + IOHandle(h) +{} + +void Socket::createTcp() const +{ + SOCKET& socket = impl->fd; + if (socket != INVALID_SOCKET) Socket::close(); + SOCKET s = ::socket (PF_INET, SOCK_STREAM, 0); + if (s == INVALID_SOCKET) throw QPID_WINDOWS_ERROR(WSAGetLastError()); + socket = s; +} + +void Socket::setTimeout(const Duration& interval) const +{ + const SOCKET& socket = impl->fd; + int64_t nanosecs = interval; + nanosecs /= (1000 * 1000); // nsecs -> usec -> msec + int msec = 0; + if (nanosecs > std::numeric_limits<int>::max()) + msec = std::numeric_limits<int>::max(); + else + msec = static_cast<int>(nanosecs); + setsockopt(socket, SOL_SOCKET, SO_SNDTIMEO, (char *)&msec, sizeof(msec)); + setsockopt(socket, SOL_SOCKET, SO_RCVTIMEO, (char *)&msec, sizeof(msec)); +} + +void Socket::setNonblocking() const { + u_long nonblock = 1; + QPID_WINSOCK_CHECK(ioctlsocket(impl->fd, FIONBIO, &nonblock)); +} + +void Socket::connect(const std::string& host, uint16_t port) const +{ + std::stringstream portstream; + portstream << port << std::ends; + std::string portstr = portstream.str(); + std::stringstream namestream; + namestream << host << ":" << port; + connectname = namestream.str(); + + const SOCKET& socket = impl->fd; + // TODO: Be good to make this work for IPv6 as well as IPv4. Would require + // other changes, such as waiting to create the socket until after we + // have the address family. Maybe unbundle the translation of names here; + // use TcpAddress to resolve things and make this class take a TcpAddress + // and grab its address family to create the socket. + struct addrinfo hints; + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_INET; // We always creating AF_INET-only sockets. + hints.ai_socktype = SOCK_STREAM; // We always do TCP + addrinfo *addrs; + int status = getaddrinfo(host.c_str(), portstr.c_str(), &hints, &addrs); + if (status != 0) + throw Exception(QPID_MSG("Cannot resolve " << host << ": " << + gai_strerror(status))); + addrinfo *addr = addrs; + int error = 0; + WSASetLastError(0); + while (addr != 0) { + if ((::connect(socket, addr->ai_addr, addr->ai_addrlen) == 0) || + (WSAGetLastError() == WSAEWOULDBLOCK)) + break; + // Error... save this error code and see if there are other address + // to try before throwing the exception. + error = WSAGetLastError(); + addr = addr->ai_next; + } + freeaddrinfo(addrs); + if (error) + throw qpid::Exception(QPID_MSG(strError(error) << ": " << connectname)); +} + +void +Socket::close() const +{ + SOCKET& socket = impl->fd; + if (socket == INVALID_SOCKET) return; + QPID_WINSOCK_CHECK(closesocket(socket)); + socket = INVALID_SOCKET; +} + + +int Socket::write(const void *buf, size_t count) const +{ + const SOCKET& socket = impl->fd; + int sent = ::send(socket, (const char *)buf, count, 0); + if (sent == SOCKET_ERROR) + return -1; + return sent; +} + +int Socket::read(void *buf, size_t count) const +{ + const SOCKET& socket = impl->fd; + int received = ::recv(socket, (char *)buf, count, 0); + if (received == SOCKET_ERROR) + return -1; + return received; +} + +int Socket::listen(uint16_t port, int backlog) const +{ + const SOCKET& socket = impl->fd; + BOOL yes=1; + QPID_WINSOCK_CHECK(setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, (char *)&yes, sizeof(yes))); + struct sockaddr_in name; + memset(&name, 0, sizeof(name)); + name.sin_family = AF_INET; + name.sin_port = htons(port); + name.sin_addr.s_addr = 0; + if (::bind(socket, (struct sockaddr*)&name, sizeof(name)) == SOCKET_ERROR) + throw Exception(QPID_MSG("Can't bind to port " << port << ": " << strError(WSAGetLastError()))); + if (::listen(socket, backlog) == SOCKET_ERROR) + throw Exception(QPID_MSG("Can't listen on port " << port << ": " << strError(WSAGetLastError()))); + + socklen_t namelen = sizeof(name); + QPID_WINSOCK_CHECK(::getsockname(socket, (struct sockaddr*)&name, &namelen)); + return ntohs(name.sin_port); +} + +Socket* Socket::accept(struct sockaddr *addr, socklen_t *addrlen) const +{ + SOCKET afd = ::accept(impl->fd, addr, addrlen); + if (afd != INVALID_SOCKET) + return new Socket(new IOHandlePrivate(afd)); + else if (WSAGetLastError() == EAGAIN) + return 0; + else throw QPID_WINDOWS_ERROR(WSAGetLastError()); +} + +std::string Socket::getSockname() const +{ + return getName(impl->fd, true); +} + +std::string Socket::getPeername() const +{ + return getName(impl->fd, false); +} + +std::string Socket::getPeerAddress() const +{ + if (!connectname.empty()) + return std::string (connectname); + return getName(impl->fd, false, true); +} + +std::string Socket::getLocalAddress() const +{ + return getName(impl->fd, true, true); +} + +uint16_t Socket::getLocalPort() const +{ + return atoi(getService(impl->fd, true).c_str()); +} + +uint16_t Socket::getRemotePort() const +{ + return atoi(getService(impl->fd, true).c_str()); +} + +int Socket::getError() const +{ + int result; + socklen_t rSize = sizeof (result); + + QPID_WINSOCK_CHECK(::getsockopt(impl->fd, SOL_SOCKET, SO_ERROR, (char *)&result, &rSize)); + return result; +} + +void Socket::setTcpNoDelay(bool nodelay) const +{ + if (nodelay) { + int flag = 1; + int result = setsockopt(impl->fd, + IPPROTO_TCP, + TCP_NODELAY, + (char *)&flag, + sizeof(flag)); + QPID_WINSOCK_CHECK(result); + } +} + +}} // namespace qpid::sys diff --git a/qpid/cpp/src/tests/SocketProxy.h b/qpid/cpp/src/tests/SocketProxy.h index a1a1351c7d..9722359d82 100644 --- a/qpid/cpp/src/tests/SocketProxy.h +++ b/qpid/cpp/src/tests/SocketProxy.h @@ -88,7 +88,7 @@ class SocketProxy : private qpid::sys::Runnable std::auto_ptr<qpid::sys::Socket> server; try { qpid::sys::PollerHandle listenerHandle(listener); - poller.addFd(listenerHandle, qpid::sys::Poller::IN); + poller.addFd(listenerHandle, qpid::sys::Poller::INPUT); qpid::sys::Poller::Event event = poller.wait(); throwIf(event.type == qpid::sys::Poller::SHUTDOWN, "SocketProxy: Closed by close()"); throwIf(!(event.type == qpid::sys::Poller::READABLE && event.handle == &listenerHandle), "SocketProxy: Accept failed"); @@ -99,8 +99,8 @@ class SocketProxy : private qpid::sys::Runnable // Pump data between client & server sockets qpid::sys::PollerHandle clientHandle(client); qpid::sys::PollerHandle serverHandle(*server); - poller.addFd(clientHandle, qpid::sys::Poller::IN); - poller.addFd(serverHandle, qpid::sys::Poller::IN); + poller.addFd(clientHandle, qpid::sys::Poller::INPUT); + poller.addFd(serverHandle, qpid::sys::Poller::INPUT); char buffer[1024]; for (;;) { qpid::sys::Poller::Event event = poller.wait(); |