summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/windows/AsynchIO.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys/windows/AsynchIO.cpp')
-rw-r--r--cpp/src/qpid/sys/windows/AsynchIO.cpp25
1 files changed, 22 insertions, 3 deletions
diff --git a/cpp/src/qpid/sys/windows/AsynchIO.cpp b/cpp/src/qpid/sys/windows/AsynchIO.cpp
index 409480dbdc..5ae9a4bfef 100644
--- a/cpp/src/qpid/sys/windows/AsynchIO.cpp
+++ b/cpp/src/qpid/sys/windows/AsynchIO.cpp
@@ -534,9 +534,22 @@ void AsynchIO::startReading() {
return;
}
-// TODO: This needs to arrange for a callback that is serialised with
-// the other IO callbacks for this AsynchIO
+// Queue the specified callback for invocation from an I/O thread.
void AsynchIO::requestCallback(RequestCallback callback) {
+ // This method is generally called from a processing thread; transfer
+ // work on this to an I/O thread. Much of the upper layer code assumes
+ // that all I/O-related things happen in an I/O thread.
+ if (poller == 0) // Not really going yet...
+ return;
+
+ InterlockedIncrement(&opsInProgress);
+ IOHandlePrivate *hp =
+ new IOHandlePrivate (INVALID_SOCKET,
+ boost::bind(&AsynchIO::completion, this, _1),
+ callback);
+ IOHandle h(hp);
+ PollerHandle ph(h);
+ poller->addFd(ph, Poller::INPUT);
}
/**
@@ -714,7 +727,13 @@ void AsynchIO::completion(AsynchIoResult *result) {
else {
AsynchWriteResult *w =
dynamic_cast<AsynchWriteResult*>(result);
- writeComplete(w);
+ if (w != 0)
+ writeComplete(w);
+ else {
+ AsynchCallbackRequest *req =
+ dynamic_cast<AsynchCallbackRequest*>(result);
+ req->reqCallback(*this);
+ }
}
delete result;
result = 0;