diff options
author | Stephen D. Huston <shuston@apache.org> | 2009-01-12 23:31:49 +0000 |
---|---|---|
committer | Stephen D. Huston <shuston@apache.org> | 2009-01-12 23:31:49 +0000 |
commit | 968d7fd6a2edb1f99ffab980d8130abbd854adc5 (patch) | |
tree | f3ccf11c71900cd2c2dc1a27592bc9e71c7b9486 /cpp/src | |
parent | 081b523538d7731c4c0128f90a6393720a7d4ecc (diff) | |
download | qpid-python-968d7fd6a2edb1f99ffab980d8130abbd854adc5.tar.gz |
Add support for AsynchIO::RequestCallback processing
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@733966 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/sys/windows/AsynchIO.cpp | 25 | ||||
-rwxr-xr-x | cpp/src/qpid/sys/windows/AsynchIoResult.h | 17 | ||||
-rwxr-xr-x | cpp/src/qpid/sys/windows/IoHandlePrivate.h | 10 | ||||
-rwxr-xr-x | cpp/src/qpid/sys/windows/IocpPoller.cpp | 24 |
4 files changed, 65 insertions, 11 deletions
diff --git a/cpp/src/qpid/sys/windows/AsynchIO.cpp b/cpp/src/qpid/sys/windows/AsynchIO.cpp index 409480dbdc..5ae9a4bfef 100644 --- a/cpp/src/qpid/sys/windows/AsynchIO.cpp +++ b/cpp/src/qpid/sys/windows/AsynchIO.cpp @@ -534,9 +534,22 @@ void AsynchIO::startReading() { return; } -// TODO: This needs to arrange for a callback that is serialised with -// the other IO callbacks for this AsynchIO +// Queue the specified callback for invocation from an I/O thread. void AsynchIO::requestCallback(RequestCallback callback) { + // This method is generally called from a processing thread; transfer + // work on this to an I/O thread. Much of the upper layer code assumes + // that all I/O-related things happen in an I/O thread. + if (poller == 0) // Not really going yet... + return; + + InterlockedIncrement(&opsInProgress); + IOHandlePrivate *hp = + new IOHandlePrivate (INVALID_SOCKET, + boost::bind(&AsynchIO::completion, this, _1), + callback); + IOHandle h(hp); + PollerHandle ph(h); + poller->addFd(ph, Poller::INPUT); } /** @@ -714,7 +727,13 @@ void AsynchIO::completion(AsynchIoResult *result) { else { AsynchWriteResult *w = dynamic_cast<AsynchWriteResult*>(result); - writeComplete(w); + if (w != 0) + writeComplete(w); + else { + AsynchCallbackRequest *req = + dynamic_cast<AsynchCallbackRequest*>(result); + req->reqCallback(*this); + } } delete result; result = 0; diff --git a/cpp/src/qpid/sys/windows/AsynchIoResult.h b/cpp/src/qpid/sys/windows/AsynchIoResult.h index 7db4e9c331..f47607d08c 100755 --- a/cpp/src/qpid/sys/windows/AsynchIoResult.h +++ b/cpp/src/qpid/sys/windows/AsynchIoResult.h @@ -180,6 +180,23 @@ public: } }; +class AsynchCallbackRequest : public AsynchIoResult { + // complete() needs to simply call the completionCallback; no buffers. + virtual void complete(void) { + completionCallback(this); + } + +public: + AsynchCallbackRequest(AsynchIoResult::Completer cb, + AsynchIO::RequestCallback reqCb) + : AsynchIoResult(cb, 0, 0), reqCallback(reqCb) { + wsabuf.buf = 0; + wsabuf.len = 0; + } + + AsynchIO::RequestCallback reqCallback; +}; + }} #endif /*!_windows_asynchIoResult_h*/ diff --git a/cpp/src/qpid/sys/windows/IoHandlePrivate.h b/cpp/src/qpid/sys/windows/IoHandlePrivate.h index 18e75047ed..067c0fe6b7 100755 --- a/cpp/src/qpid/sys/windows/IoHandlePrivate.h +++ b/cpp/src/qpid/sys/windows/IoHandlePrivate.h @@ -33,16 +33,20 @@ namespace sys { // There should be either a valid socket handle or a completer callback. // Handle is used to associate with poller's iocp; completer is used to // inject a completion that will very quickly trigger a callback to the -// completer from an I/O thread. +// completer from an I/O thread. If the callback mechanism is used, there +// can be a RequestCallback set - this carries the callback object through +// from AsynchIO::requestCallback() through to the I/O completion processing. class IOHandlePrivate { public: IOHandlePrivate(SOCKET f = INVALID_SOCKET, - AsynchIoResult::Completer cb = 0) : - fd(f), event(cb) + AsynchIoResult::Completer cb = 0, + AsynchIO::RequestCallback reqCallback = 0) : + fd(f), event(cb), cbRequest(reqCallback) {} SOCKET fd; AsynchIoResult::Completer event; + AsynchIO::RequestCallback cbRequest; }; SOCKET toFd(const IOHandlePrivate* h); diff --git a/cpp/src/qpid/sys/windows/IocpPoller.cpp b/cpp/src/qpid/sys/windows/IocpPoller.cpp index d23dafcd6e..1e24adfb46 100755 --- a/cpp/src/qpid/sys/windows/IocpPoller.cpp +++ b/cpp/src/qpid/sys/windows/IocpPoller.cpp @@ -43,16 +43,19 @@ class PollerHandlePrivate { SOCKET fd; AsynchIoResult::Completer cb; + AsynchIO::RequestCallback cbRequest; - PollerHandlePrivate(SOCKET f, AsynchIoResult::Completer cb0 = 0) : - fd(f), cb(cb0) + PollerHandlePrivate(SOCKET f, + AsynchIoResult::Completer cb0 = 0, + AsynchIO::RequestCallback rcb = 0) + : fd(f), cb(cb0), cbRequest(rcb) { } }; PollerHandle::PollerHandle(const IOHandle& h) : - impl(new PollerHandlePrivate(toFd(h.impl), h.impl->event)) + impl(new PollerHandlePrivate(toFd(h.impl), h.impl->event, h.impl->cbRequest)) {} PollerHandle::~PollerHandle() { @@ -114,8 +117,19 @@ void Poller::addFd(PollerHandle& handle, Direction dir) { QPID_WINDOWS_CHECK_NULL(iocpHandle); } else { - AsynchWriteWanted *result = new AsynchWriteWanted(handle.impl->cb); - PostQueuedCompletionStatus(impl->iocp, 0, 0, result->overlapped()); + // INPUT is used to request a callback; OUTPUT to request a write + assert(dir == Poller::INPUT || dir == Poller::OUTPUT); + + if (dir == Poller::OUTPUT) { + AsynchWriteWanted *result = new AsynchWriteWanted(handle.impl->cb); + PostQueuedCompletionStatus(impl->iocp, 0, 0, result->overlapped()); + } + else { + AsynchCallbackRequest *result = + new AsynchCallbackRequest(handle.impl->cb, + handle.impl->cbRequest); + PostQueuedCompletionStatus(impl->iocp, 0, 0, result->overlapped()); + } } } |