summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorClifford Allan Jansen <cliffjansen@apache.org>2012-03-19 23:24:23 +0000
committerClifford Allan Jansen <cliffjansen@apache.org>2012-03-19 23:24:23 +0000
commit1d2363b6dccc3139ca4cbda3f1127d40adff1d3e (patch)
tree42830939ad97dc28d4e3d169b40f1b5666277af9
parente9a23033bb075f50b0a46c9366012e30538a4e54 (diff)
downloadqpid-python-1d2363b6dccc3139ca4cbda3f1127d40adff1d3e.tar.gz
QPID-3759 hang on heartbeat connection close
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1302718 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp25
1 files changed, 21 insertions, 4 deletions
diff --git a/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
index 30378d4c5f..fb8df5ddf8 100644
--- a/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
+++ b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
@@ -295,6 +295,8 @@ private:
volatile bool queuedDelete;
// Socket close requested, but there are operations in progress.
volatile bool queuedClose;
+ // Most recent asynch read request
+ volatile AsynchReadResult* pendingRead;
private:
// Dispatch events that have completed.
@@ -374,6 +376,7 @@ AsynchIO::AsynchIO(const Socket& s,
writeInProgress(false),
queuedDelete(false),
queuedClose(false),
+ pendingRead(0),
working(false) {
}
@@ -504,6 +507,7 @@ void AsynchIO::startReading() {
}
}
// On status 0 or WSA_IO_PENDING, completion will handle the rest.
+ pendingRead = result;
}
else {
notifyBuffersEmpty();
@@ -617,16 +621,17 @@ void AsynchIO::readComplete(AsynchReadResult *result) {
int status = result->getStatus();
size_t bytes = result->getTransferred();
if (status == 0 && bytes > 0) {
- bool restartRead = true; // May not if receiver doesn't want more
if (readCallback)
readCallback(*this, result->getBuff());
- if (restartRead)
- startReading();
+ startReading();
}
else {
// No data read, so put the buffer back. It may be partially filled,
// so "unread" it back to the front of the queue.
unread(result->getBuff());
+ if (queuedClose && status == ERROR_OPERATION_ABORTED) {
+ return; // Expected reap from CancelIoEx
+ }
notifyEof();
if (status != 0)
{
@@ -697,8 +702,11 @@ void AsynchIO::completion(AsynchIoResult *result) {
{
ScopedUnlock<Mutex> ul(completionLock);
AsynchReadResult *r = dynamic_cast<AsynchReadResult*>(result);
- if (r != 0)
+ if (r != 0) {
readComplete(r);
+ // Set pendingRead to 0 if it's still pointing to (newly completed) r
+ InterlockedCompareExchangePointer((void * volatile *)&pendingRead, 0, r);
+ }
else {
AsynchWriteResult *w =
dynamic_cast<AsynchWriteResult*>(result);
@@ -732,6 +740,15 @@ void AsynchIO::completion(AsynchIoResult *result) {
else if (queuedDelete)
delete this;
}
+ else {
+ if (queuedClose && pendingRead) {
+ // Force outstanding read to completion. Layer above will
+ // call back.
+ CancelIoEx((HANDLE)toSocketHandle(socket),
+ ((AsynchReadResult *)pendingRead)->overlapped());
+ pendingRead = 0;
+ }
+ }
}
} // namespace windows