summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2012-03-22 13:39:14 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2012-03-22 13:39:14 +0000
commit3c2954782cc6d27bacf8865cfaea9c71c2bfec2b (patch)
tree2f319b97e0c6d9e0052a8606f0e29d2b643834c2 /qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
parentb51489127365952db902579438294a587a7acf47 (diff)
downloadqpid-python-3c2954782cc6d27bacf8865cfaea9c71c2bfec2b.tar.gz
QPID-3890: resync this branch to latest trunkqpid-3890
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3890@1303774 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp')
-rw-r--r--qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp25
1 files changed, 21 insertions, 4 deletions
diff --git a/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
index 30378d4c5f..fb8df5ddf8 100644
--- a/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
+++ b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
@@ -295,6 +295,8 @@ private:
volatile bool queuedDelete;
// Socket close requested, but there are operations in progress.
volatile bool queuedClose;
+ // Most recent asynch read request
+ volatile AsynchReadResult* pendingRead;
private:
// Dispatch events that have completed.
@@ -374,6 +376,7 @@ AsynchIO::AsynchIO(const Socket& s,
writeInProgress(false),
queuedDelete(false),
queuedClose(false),
+ pendingRead(0),
working(false) {
}
@@ -504,6 +507,7 @@ void AsynchIO::startReading() {
}
}
// On status 0 or WSA_IO_PENDING, completion will handle the rest.
+ pendingRead = result;
}
else {
notifyBuffersEmpty();
@@ -617,16 +621,17 @@ void AsynchIO::readComplete(AsynchReadResult *result) {
int status = result->getStatus();
size_t bytes = result->getTransferred();
if (status == 0 && bytes > 0) {
- bool restartRead = true; // May not if receiver doesn't want more
if (readCallback)
readCallback(*this, result->getBuff());
- if (restartRead)
- startReading();
+ startReading();
}
else {
// No data read, so put the buffer back. It may be partially filled,
// so "unread" it back to the front of the queue.
unread(result->getBuff());
+ if (queuedClose && status == ERROR_OPERATION_ABORTED) {
+ return; // Expected reap from CancelIoEx
+ }
notifyEof();
if (status != 0)
{
@@ -697,8 +702,11 @@ void AsynchIO::completion(AsynchIoResult *result) {
{
ScopedUnlock<Mutex> ul(completionLock);
AsynchReadResult *r = dynamic_cast<AsynchReadResult*>(result);
- if (r != 0)
+ if (r != 0) {
readComplete(r);
+ // Set pendingRead to 0 if it's still pointing to (newly completed) r
+ InterlockedCompareExchangePointer((void * volatile *)&pendingRead, 0, r);
+ }
else {
AsynchWriteResult *w =
dynamic_cast<AsynchWriteResult*>(result);
@@ -732,6 +740,15 @@ void AsynchIO::completion(AsynchIoResult *result) {
else if (queuedDelete)
delete this;
}
+ else {
+ if (queuedClose && pendingRead) {
+ // Force outstanding read to completion. Layer above will
+ // call back.
+ CancelIoEx((HANDLE)toSocketHandle(socket),
+ ((AsynchReadResult *)pendingRead)->overlapped());
+ pendingRead = 0;
+ }
+ }
}
} // namespace windows