summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorStephen D. Huston <shuston@apache.org>2009-01-12 23:31:49 +0000
committerStephen D. Huston <shuston@apache.org>2009-01-12 23:31:49 +0000
commit968d7fd6a2edb1f99ffab980d8130abbd854adc5 (patch)
treef3ccf11c71900cd2c2dc1a27592bc9e71c7b9486 /cpp/src
parent081b523538d7731c4c0128f90a6393720a7d4ecc (diff)
downloadqpid-python-968d7fd6a2edb1f99ffab980d8130abbd854adc5.tar.gz
Add support for AsynchIO::RequestCallback processing
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@733966 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/sys/windows/AsynchIO.cpp25
-rwxr-xr-xcpp/src/qpid/sys/windows/AsynchIoResult.h17
-rwxr-xr-xcpp/src/qpid/sys/windows/IoHandlePrivate.h10
-rwxr-xr-xcpp/src/qpid/sys/windows/IocpPoller.cpp24
4 files changed, 65 insertions, 11 deletions
diff --git a/cpp/src/qpid/sys/windows/AsynchIO.cpp b/cpp/src/qpid/sys/windows/AsynchIO.cpp
index 409480dbdc..5ae9a4bfef 100644
--- a/cpp/src/qpid/sys/windows/AsynchIO.cpp
+++ b/cpp/src/qpid/sys/windows/AsynchIO.cpp
@@ -534,9 +534,22 @@ void AsynchIO::startReading() {
return;
}
-// TODO: This needs to arrange for a callback that is serialised with
-// the other IO callbacks for this AsynchIO
+// Queue the specified callback for invocation from an I/O thread.
void AsynchIO::requestCallback(RequestCallback callback) {
+ // This method is generally called from a processing thread; transfer
+ // work on this to an I/O thread. Much of the upper layer code assumes
+ // that all I/O-related things happen in an I/O thread.
+ if (poller == 0) // Not really going yet...
+ return;
+
+ InterlockedIncrement(&opsInProgress);
+ IOHandlePrivate *hp =
+ new IOHandlePrivate (INVALID_SOCKET,
+ boost::bind(&AsynchIO::completion, this, _1),
+ callback);
+ IOHandle h(hp);
+ PollerHandle ph(h);
+ poller->addFd(ph, Poller::INPUT);
}
/**
@@ -714,7 +727,13 @@ void AsynchIO::completion(AsynchIoResult *result) {
else {
AsynchWriteResult *w =
dynamic_cast<AsynchWriteResult*>(result);
- writeComplete(w);
+ if (w != 0)
+ writeComplete(w);
+ else {
+ AsynchCallbackRequest *req =
+ dynamic_cast<AsynchCallbackRequest*>(result);
+ req->reqCallback(*this);
+ }
}
delete result;
result = 0;
diff --git a/cpp/src/qpid/sys/windows/AsynchIoResult.h b/cpp/src/qpid/sys/windows/AsynchIoResult.h
index 7db4e9c331..f47607d08c 100755
--- a/cpp/src/qpid/sys/windows/AsynchIoResult.h
+++ b/cpp/src/qpid/sys/windows/AsynchIoResult.h
@@ -180,6 +180,23 @@ public:
}
};
+class AsynchCallbackRequest : public AsynchIoResult {
+ // complete() needs to simply call the completionCallback; no buffers.
+ virtual void complete(void) {
+ completionCallback(this);
+ }
+
+public:
+ AsynchCallbackRequest(AsynchIoResult::Completer cb,
+ AsynchIO::RequestCallback reqCb)
+ : AsynchIoResult(cb, 0, 0), reqCallback(reqCb) {
+ wsabuf.buf = 0;
+ wsabuf.len = 0;
+ }
+
+ AsynchIO::RequestCallback reqCallback;
+};
+
}}
#endif /*!_windows_asynchIoResult_h*/
diff --git a/cpp/src/qpid/sys/windows/IoHandlePrivate.h b/cpp/src/qpid/sys/windows/IoHandlePrivate.h
index 18e75047ed..067c0fe6b7 100755
--- a/cpp/src/qpid/sys/windows/IoHandlePrivate.h
+++ b/cpp/src/qpid/sys/windows/IoHandlePrivate.h
@@ -33,16 +33,20 @@ namespace sys {
// There should be either a valid socket handle or a completer callback.
// Handle is used to associate with poller's iocp; completer is used to
// inject a completion that will very quickly trigger a callback to the
-// completer from an I/O thread.
+// completer from an I/O thread. If the callback mechanism is used, there
+// can be a RequestCallback set - this carries the callback object through
+// from AsynchIO::requestCallback() through to the I/O completion processing.
class IOHandlePrivate {
public:
IOHandlePrivate(SOCKET f = INVALID_SOCKET,
- AsynchIoResult::Completer cb = 0) :
- fd(f), event(cb)
+ AsynchIoResult::Completer cb = 0,
+ AsynchIO::RequestCallback reqCallback = 0) :
+ fd(f), event(cb), cbRequest(reqCallback)
{}
SOCKET fd;
AsynchIoResult::Completer event;
+ AsynchIO::RequestCallback cbRequest;
};
SOCKET toFd(const IOHandlePrivate* h);
diff --git a/cpp/src/qpid/sys/windows/IocpPoller.cpp b/cpp/src/qpid/sys/windows/IocpPoller.cpp
index d23dafcd6e..1e24adfb46 100755
--- a/cpp/src/qpid/sys/windows/IocpPoller.cpp
+++ b/cpp/src/qpid/sys/windows/IocpPoller.cpp
@@ -43,16 +43,19 @@ class PollerHandlePrivate {
SOCKET fd;
AsynchIoResult::Completer cb;
+ AsynchIO::RequestCallback cbRequest;
- PollerHandlePrivate(SOCKET f, AsynchIoResult::Completer cb0 = 0) :
- fd(f), cb(cb0)
+ PollerHandlePrivate(SOCKET f,
+ AsynchIoResult::Completer cb0 = 0,
+ AsynchIO::RequestCallback rcb = 0)
+ : fd(f), cb(cb0), cbRequest(rcb)
{
}
};
PollerHandle::PollerHandle(const IOHandle& h) :
- impl(new PollerHandlePrivate(toFd(h.impl), h.impl->event))
+ impl(new PollerHandlePrivate(toFd(h.impl), h.impl->event, h.impl->cbRequest))
{}
PollerHandle::~PollerHandle() {
@@ -114,8 +117,19 @@ void Poller::addFd(PollerHandle& handle, Direction dir) {
QPID_WINDOWS_CHECK_NULL(iocpHandle);
}
else {
- AsynchWriteWanted *result = new AsynchWriteWanted(handle.impl->cb);
- PostQueuedCompletionStatus(impl->iocp, 0, 0, result->overlapped());
+ // INPUT is used to request a callback; OUTPUT to request a write
+ assert(dir == Poller::INPUT || dir == Poller::OUTPUT);
+
+ if (dir == Poller::OUTPUT) {
+ AsynchWriteWanted *result = new AsynchWriteWanted(handle.impl->cb);
+ PostQueuedCompletionStatus(impl->iocp, 0, 0, result->overlapped());
+ }
+ else {
+ AsynchCallbackRequest *result =
+ new AsynchCallbackRequest(handle.impl->cb,
+ handle.impl->cbRequest);
+ PostQueuedCompletionStatus(impl->iocp, 0, 0, result->overlapped());
+ }
}
}