diff options
-rw-r--r-- | cpp/include/qpid/sys/IOHandle.h | 8 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/sys/windows/AsynchIO.cpp | 56 | ||||
-rwxr-xr-x | cpp/src/qpid/sys/windows/AsynchIoResult.h | 32 | ||||
-rwxr-xr-x | cpp/src/qpid/sys/windows/IoHandlePrivate.h | 5 | ||||
-rwxr-xr-x | cpp/src/qpid/sys/windows/IocpPoller.cpp | 19 | ||||
-rw-r--r-- | cpp/src/qpid/sys/windows/PollableCondition.cpp | 6 | ||||
-rwxr-xr-x | cpp/src/qpid/sys/windows/Socket.cpp | 28 |
8 files changed, 79 insertions, 77 deletions
diff --git a/cpp/include/qpid/sys/IOHandle.h b/cpp/include/qpid/sys/IOHandle.h index 283e021162..68e9d92a04 100644 --- a/cpp/include/qpid/sys/IOHandle.h +++ b/cpp/include/qpid/sys/IOHandle.h @@ -35,6 +35,8 @@ namespace sys { class AsynchAcceptorPrivate; class AsynchAcceptResult; namespace windows { + class AsynchAcceptor; + class AsynchAcceptResult; class AsynchIO; } @@ -43,8 +45,8 @@ class PollerHandle; class IOHandlePrivate; class IOHandle { - friend class AsynchAcceptorPrivate; - friend class AsynchAcceptResult; + friend class windows::AsynchAcceptResult; + friend class windows::AsynchAcceptor; friend class windows::AsynchIO; friend class PollerHandle; @@ -52,7 +54,7 @@ class IOHandle { protected: IOHandlePrivate* const impl; - IOHandle(IOHandlePrivate*); + IOHandle(IOHandlePrivate*); QPID_COMMON_EXTERN virtual ~IOHandle(); }; diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 5ca01e0867..ed21270334 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -201,7 +201,7 @@ public: void registerProtocolFactory(const std::string& name, boost::shared_ptr<sys::ProtocolFactory>); /** Accept connections */ - void accept(); + QPID_BROKER_EXTERN void accept(); /** Create a connection to another broker. */ void connect(const std::string& host, uint16_t port, diff --git a/cpp/src/qpid/sys/windows/AsynchIO.cpp b/cpp/src/qpid/sys/windows/AsynchIO.cpp index 971f0bb665..c4f67ddf70 100644 --- a/cpp/src/qpid/sys/windows/AsynchIO.cpp +++ b/cpp/src/qpid/sys/windows/AsynchIO.cpp @@ -74,6 +74,7 @@ void lookUpAcceptEx() { namespace qpid { namespace sys { +namespace windows { /* * Asynch Acceptor @@ -88,13 +89,13 @@ namespace sys { * and status of each accept operation outstanding. */ -class AsynchAcceptorPrivate { +class AsynchAcceptor : public qpid::sys::AsynchAcceptor { friend class AsynchAcceptResult; public: - AsynchAcceptorPrivate(const Socket& s, AsynchAcceptor::Callback callback); - ~AsynchAcceptorPrivate(); + AsynchAcceptor(const Socket& s, AsynchAcceptor::Callback callback); + ~AsynchAcceptor(); void start(Poller::shared_ptr poller); private: @@ -104,19 +105,7 @@ private: const Socket& socket; }; -AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback) : - impl(new AsynchAcceptorPrivate(s, callback)) -{} - -AsynchAcceptor::~AsynchAcceptor() -{ delete impl; } - -void AsynchAcceptor::start(Poller::shared_ptr poller) { - impl->start(poller); -} - -AsynchAcceptorPrivate::AsynchAcceptorPrivate(const Socket& s, - AsynchAcceptor::Callback callback) +AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback) : acceptedCallback(callback), socket(s) { @@ -128,16 +117,17 @@ AsynchAcceptorPrivate::AsynchAcceptorPrivate(const Socket& s, #endif } -AsynchAcceptorPrivate::~AsynchAcceptorPrivate(void) { +AsynchAcceptor::~AsynchAcceptor() +{ socket.close(); } -void AsynchAcceptorPrivate::start(Poller::shared_ptr poller) { +void AsynchAcceptor::start(Poller::shared_ptr poller) { poller->monitorHandle(PollerHandle(socket), Poller::INPUT); restart (); } -void AsynchAcceptorPrivate::restart(void) { +void AsynchAcceptor::restart(void) { DWORD bytesReceived = 0; // Not used, needed for AcceptEx API AsynchAcceptResult *result = new AsynchAcceptResult(acceptedCallback, this, @@ -156,7 +146,7 @@ void AsynchAcceptorPrivate::restart(void) { AsynchAcceptResult::AsynchAcceptResult(AsynchAcceptor::Callback cb, - AsynchAcceptorPrivate *acceptor, + AsynchAcceptor *acceptor, SOCKET listener) : callback(cb), acceptor(acceptor), listener(listener) { newSocket.reset (new Socket()); @@ -174,13 +164,11 @@ void AsynchAcceptResult::success(size_t /*bytesTransferred*/) { } void AsynchAcceptResult::failure(int status) { - //if (status != WSA_OPERATION_ABORTED) - // Can there be anything else? ; - delete this; + //if (status != WSA_OPERATION_ABORTED) + // Can there be anything else? ; + delete this; } -namespace windows { - /* * AsynchConnector does synchronous connects for now... to do asynch the * IocpPoller will need some extension to register an event handle as a @@ -224,6 +212,12 @@ AsynchConnector::AsynchConnector(const Socket& sock, } // namespace windows +AsynchAcceptor* AsynchAcceptor::create(const Socket& s, + Callback callback) +{ + return new windows::AsynchAcceptor(s, callback); +} + AsynchConnector* qpid::sys::AsynchConnector::create(const Socket& s, Poller::shared_ptr poller, std::string hostname, @@ -231,12 +225,12 @@ AsynchConnector* qpid::sys::AsynchConnector::create(const Socket& s, ConnectedCallback connCb, FailedCallback failCb) { - return new qpid::sys::windows::AsynchConnector(s, - poller, - hostname, - port, - connCb, - failCb); + return new windows::AsynchConnector(s, + poller, + hostname, + port, + connCb, + failCb); } diff --git a/cpp/src/qpid/sys/windows/AsynchIoResult.h b/cpp/src/qpid/sys/windows/AsynchIoResult.h index f47607d08c..c9cdebce16 100755 --- a/cpp/src/qpid/sys/windows/AsynchIoResult.h +++ b/cpp/src/qpid/sys/windows/AsynchIoResult.h @@ -30,6 +30,7 @@ namespace qpid { namespace sys { +namespace windows { /* * AsynchIoResult defines the class that receives the result of an @@ -73,14 +74,13 @@ protected: int status; }; -class AsynchAcceptorPrivate; class AsynchAcceptResult : public AsynchResult { - friend class AsynchAcceptorPrivate; + friend class AsynchAcceptor; public: - AsynchAcceptResult(AsynchAcceptor::Callback cb, - AsynchAcceptorPrivate *acceptor, + AsynchAcceptResult(qpid::sys::AsynchAcceptor::Callback cb, + AsynchAcceptor *acceptor, SOCKET listener); virtual void success (size_t bytesTransferred); virtual void failure (int error); @@ -89,8 +89,8 @@ private: virtual void complete(void) {} // No-op for this class. std::auto_ptr<qpid::sys::Socket> newSocket; - AsynchAcceptor::Callback callback; - AsynchAcceptorPrivate *acceptor; + qpid::sys::AsynchAcceptor::Callback callback; + AsynchAcceptor *acceptor; SOCKET listener; // AcceptEx needs a place to write the local and remote addresses @@ -106,16 +106,16 @@ public: typedef boost::function1<void, AsynchIoResult *> Completer; virtual ~AsynchIoResult() {} - AsynchIO::BufferBase *getBuff(void) const { return iobuff; } + qpid::sys::AsynchIO::BufferBase *getBuff(void) const { return iobuff; } size_t getRequested(void) const { return requested; } const WSABUF *getWSABUF(void) const { return &wsabuf; } protected: - void setBuff (AsynchIO::BufferBase *buffer) { iobuff = buffer; } + void setBuff (qpid::sys::AsynchIO::BufferBase *buffer) { iobuff = buffer; } protected: AsynchIoResult(Completer cb, - AsynchIO::BufferBase *buff, size_t length) + qpid::sys::AsynchIO::BufferBase *buff, size_t length) : completionCallback(cb), iobuff(buff), requested(length) {} virtual void complete(void) = 0; @@ -123,7 +123,7 @@ protected: Completer completionCallback; private: - AsynchIO::BufferBase *iobuff; + qpid::sys::AsynchIO::BufferBase *iobuff; size_t requested; // Number of bytes in original I/O request }; @@ -137,7 +137,7 @@ class AsynchReadResult : public AsynchIoResult { public: AsynchReadResult(AsynchIoResult::Completer cb, - AsynchIO::BufferBase *buff, + qpid::sys::AsynchIO::BufferBase *buff, size_t length) : AsynchIoResult(cb, buff, length) { wsabuf.buf = buff->bytes + buff->dataCount; @@ -149,7 +149,7 @@ class AsynchWriteResult : public AsynchIoResult { // complete() updates buffer then does completion callback. virtual void complete(void) { - AsynchIO::BufferBase *b = getBuff(); + qpid::sys::AsynchIO::BufferBase *b = getBuff(); b->dataStart += bytes; b->dataCount -= bytes; completionCallback(this); @@ -157,7 +157,7 @@ class AsynchWriteResult : public AsynchIoResult { public: AsynchWriteResult(AsynchIoResult::Completer cb, - AsynchIO::BufferBase *buff, + qpid::sys::AsynchIO::BufferBase *buff, size_t length) : AsynchIoResult(cb, buff, length) { wsabuf.buf = buff ? buff->bytes : 0; @@ -188,15 +188,15 @@ class AsynchCallbackRequest : public AsynchIoResult { public: AsynchCallbackRequest(AsynchIoResult::Completer cb, - AsynchIO::RequestCallback reqCb) + qpid::sys::AsynchIO::RequestCallback reqCb) : AsynchIoResult(cb, 0, 0), reqCallback(reqCb) { wsabuf.buf = 0; wsabuf.len = 0; } - AsynchIO::RequestCallback reqCallback; + qpid::sys::AsynchIO::RequestCallback reqCallback; }; -}} +}}} // qpid::sys::windows #endif /*!_windows_asynchIoResult_h*/ diff --git a/cpp/src/qpid/sys/windows/IoHandlePrivate.h b/cpp/src/qpid/sys/windows/IoHandlePrivate.h index 1bb0f7aa2e..ffe539aab2 100755 --- a/cpp/src/qpid/sys/windows/IoHandlePrivate.h +++ b/cpp/src/qpid/sys/windows/IoHandlePrivate.h @@ -22,6 +22,7 @@ * */ +#include "qpid/sys/AsynchIO.h" #include "qpid/sys/windows/AsynchIoResult.h" #include "qpid/CommonImportExport.h" @@ -40,13 +41,13 @@ namespace sys { class IOHandlePrivate { public: IOHandlePrivate(SOCKET f = INVALID_SOCKET, - AsynchIoResult::Completer cb = 0, + windows::AsynchIoResult::Completer cb = 0, AsynchIO::RequestCallback reqCallback = 0) : fd(f), event(cb), cbRequest(reqCallback) {} SOCKET fd; - AsynchIoResult::Completer event; + windows::AsynchIoResult::Completer event; AsynchIO::RequestCallback cbRequest; }; diff --git a/cpp/src/qpid/sys/windows/IocpPoller.cpp b/cpp/src/qpid/sys/windows/IocpPoller.cpp index 3943fd82cd..b18e41b3a7 100755 --- a/cpp/src/qpid/sys/windows/IocpPoller.cpp +++ b/cpp/src/qpid/sys/windows/IocpPoller.cpp @@ -42,11 +42,11 @@ class PollerHandlePrivate { friend class PollerHandle; SOCKET fd; - AsynchIoResult::Completer cb; + windows::AsynchIoResult::Completer cb; AsynchIO::RequestCallback cbRequest; PollerHandlePrivate(SOCKET f, - AsynchIoResult::Completer cb0 = 0, + windows::AsynchIoResult::Completer cb0 = 0, AsynchIO::RequestCallback rcb = 0) : fd(f), cb(cb0), cbRequest(rcb) { @@ -133,13 +133,14 @@ void Poller::monitorHandle(PollerHandle& handle, Direction dir) { assert(dir == Poller::INPUT || dir == Poller::OUTPUT); if (dir == Poller::OUTPUT) { - AsynchWriteWanted *result = new AsynchWriteWanted(handle.impl->cb); + windows::AsynchWriteWanted *result = + new windows::AsynchWriteWanted(handle.impl->cb); PostQueuedCompletionStatus(impl->iocp, 0, 0, result->overlapped()); } else { - AsynchCallbackRequest *result = - new AsynchCallbackRequest(handle.impl->cb, - handle.impl->cbRequest); + windows::AsynchCallbackRequest *result = + new windows::AsynchCallbackRequest(handle.impl->cb, + handle.impl->cbRequest); PostQueuedCompletionStatus(impl->iocp, 0, 0, result->overlapped()); } } @@ -155,7 +156,7 @@ Poller::Event Poller::wait(Duration timeout) { DWORD numTransferred = 0; ULONG_PTR completionKey = 0; OVERLAPPED *overlapped = 0; - AsynchResult *result = 0; + windows::AsynchResult *result = 0; // Wait for either an I/O operation to finish (thus signaling the // IOCP handle) or a shutdown request to be made (thus signaling the @@ -185,7 +186,7 @@ Poller::Event Poller::wait(Duration timeout) { return Event(0, SHUTDOWN); } - result = AsynchResult::from_overlapped(overlapped); + result = windows::AsynchResult::from_overlapped(overlapped); result->success (static_cast<size_t>(numTransferred)); } else { @@ -193,7 +194,7 @@ Poller::Event Poller::wait(Duration timeout) { // Dequeued a completion for a failed operation. Downcast back // to the result object and inform it that the operation failed. DWORD status = ::GetLastError(); - result = AsynchResult::from_overlapped(overlapped); + result = windows::AsynchResult::from_overlapped(overlapped); result->failure (static_cast<int>(status)); } } diff --git a/cpp/src/qpid/sys/windows/PollableCondition.cpp b/cpp/src/qpid/sys/windows/PollableCondition.cpp index e49b6ceb0c..5ccc136bd1 100644 --- a/cpp/src/qpid/sys/windows/PollableCondition.cpp +++ b/cpp/src/qpid/sys/windows/PollableCondition.cpp @@ -47,8 +47,8 @@ private: const boost::shared_ptr<sys::Poller>& poller); ~PollableConditionPrivate(); - void poke(); - void dispatch(AsynchIoResult *result); + void poke(); + void dispatch(windows::AsynchIoResult *result); private: PollableCondition::Callback cb; @@ -82,7 +82,7 @@ void PollableConditionPrivate::poke() poller->monitorHandle(ph, Poller::INPUT); } -void PollableConditionPrivate::dispatch(AsynchIoResult *result) +void PollableConditionPrivate::dispatch(windows::AsynchIoResult *result) { delete result; // Poller::monitorHandle() allocates this cb(parent); diff --git a/cpp/src/qpid/sys/windows/Socket.cpp b/cpp/src/qpid/sys/windows/Socket.cpp index 8e6233bbf8..e2ef195040 100755 --- a/cpp/src/qpid/sys/windows/Socket.cpp +++ b/cpp/src/qpid/sys/windows/Socket.cpp @@ -135,7 +135,9 @@ std::string getService(SOCKET fd, bool local) } // namespace Socket::Socket() : - IOHandle(new IOHandlePrivate) + IOHandle(new IOHandlePrivate), + nonblocking(false), + nodelay(false) { SOCKET& socket = impl->fd; if (socket != INVALID_SOCKET) Socket::close(); @@ -145,7 +147,9 @@ Socket::Socket() : } Socket::Socket(IOHandlePrivate* h) : - IOHandle(h) + IOHandle(h), + nonblocking(false), + nodelay(false) {} void @@ -162,6 +166,7 @@ Socket::createSocket(const SocketAddress& sa) const try { if (nonblocking) setNonblocking(); + if (nodelay) setTcpNoDelay(); } catch (std::exception&) { closesocket(s); socket = INVALID_SOCKET; @@ -313,17 +318,16 @@ int Socket::getError() const return result; } -void Socket::setTcpNoDelay(bool nodelay) const +void Socket::setTcpNoDelay() const { - if (nodelay) { - int flag = 1; - int result = setsockopt(impl->fd, - IPPROTO_TCP, - TCP_NODELAY, - (char *)&flag, - sizeof(flag)); - QPID_WINSOCK_CHECK(result); - } + int flag = 1; + int result = setsockopt(impl->fd, + IPPROTO_TCP, + TCP_NODELAY, + (char *)&flag, + sizeof(flag)); + QPID_WINSOCK_CHECK(result); + nodelay = true; } }} // namespace qpid::sys |