summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/windows/AsynchIO.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys/windows/AsynchIO.cpp')
-rw-r--r--cpp/src/qpid/sys/windows/AsynchIO.cpp100
1 files changed, 73 insertions, 27 deletions
diff --git a/cpp/src/qpid/sys/windows/AsynchIO.cpp b/cpp/src/qpid/sys/windows/AsynchIO.cpp
index 30378d4c5f..ae53414e52 100644
--- a/cpp/src/qpid/sys/windows/AsynchIO.cpp
+++ b/cpp/src/qpid/sys/windows/AsynchIO.cpp
@@ -291,6 +291,8 @@ private:
volatile LONG opsInProgress;
// Is there a write in progress?
volatile bool writeInProgress;
+ // Or a read?
+ volatile bool readInProgress;
// Deletion requested, but there are callbacks in progress.
volatile bool queuedDelete;
// Socket close requested, but there are operations in progress.
@@ -344,6 +346,11 @@ private:
* Called when there's a completion to process.
*/
void completion(AsynchIoResult *result);
+
+ /**
+ * Helper function to facilitate the close operation
+ */
+ void cancelRead();
};
// This is used to encapsulate pure callbacks into a handle
@@ -372,6 +379,7 @@ AsynchIO::AsynchIO(const Socket& s,
socket(s),
opsInProgress(0),
writeInProgress(false),
+ readInProgress(false),
queuedDelete(false),
queuedClose(false),
working(false) {
@@ -389,21 +397,24 @@ AsynchIO::~AsynchIO() {
}
void AsynchIO::queueForDeletion() {
- queuedDelete = true;
- if (opsInProgress > 0) {
- QPID_LOG(info, "Delete AsynchIO queued; ops in progress");
- // AsynchIOHandler calls this then deletes itself; don't do any more
- // callbacks.
- readCallback = 0;
- eofCallback = 0;
- disCallback = 0;
- closedCallback = 0;
- emptyCallback = 0;
- idleCallback = 0;
- }
- else {
- delete this;
+ {
+ ScopedLock<Mutex> l(completionLock);
+ assert(!queuedDelete);
+ queuedDelete = true;
+ if (working || opsInProgress > 0) {
+ QPID_LOG(info, "Delete AsynchIO queued; ops in progress");
+ // AsynchIOHandler calls this then deletes itself; don't do any more
+ // callbacks.
+ readCallback = 0;
+ eofCallback = 0;
+ disCallback = 0;
+ closedCallback = 0;
+ emptyCallback = 0;
+ idleCallback = 0;
+ return;
+ }
}
+ delete this;
}
void AsynchIO::start(Poller::shared_ptr poller0) {
@@ -451,9 +462,14 @@ void AsynchIO::notifyPendingWrite() {
}
void AsynchIO::queueWriteClose() {
- queuedClose = true;
- if (!writeInProgress)
- notifyPendingWrite();
+ {
+ ScopedLock<Mutex> l(completionLock);
+ queuedClose = true;
+ if (working || writeInProgress)
+ // no need to summon an IO thread
+ return;
+ }
+ notifyPendingWrite();
}
bool AsynchIO::writeQueueEmpty() {
@@ -466,7 +482,7 @@ bool AsynchIO::writeQueueEmpty() {
* called when the read is complete and data is available.
*/
void AsynchIO::startReading() {
- if (queuedDelete)
+ if (queuedDelete || queuedClose)
return;
// (Try to) get a buffer; look on the front since there may be an
@@ -489,6 +505,7 @@ void AsynchIO::startReading() {
readCount);
DWORD bytesReceived = 0, flags = 0;
InterlockedIncrement(&opsInProgress);
+ readInProgress = true;
int status = WSARecv(toSocketHandle(socket),
const_cast<LPWSABUF>(result->getWSABUF()), 1,
&bytesReceived,
@@ -616,17 +633,19 @@ void AsynchIO::close(void) {
void AsynchIO::readComplete(AsynchReadResult *result) {
int status = result->getStatus();
size_t bytes = result->getTransferred();
+ readInProgress = false;
if (status == 0 && bytes > 0) {
- bool restartRead = true; // May not if receiver doesn't want more
if (readCallback)
readCallback(*this, result->getBuff());
- if (restartRead)
- startReading();
+ startReading();
}
else {
// No data read, so put the buffer back. It may be partially filled,
// so "unread" it back to the front of the queue.
unread(result->getBuff());
+ if (queuedClose) {
+ return; // Expected from cancelRead()
+ }
notifyEof();
if (status != 0)
{
@@ -682,6 +701,8 @@ void AsynchIO::writeComplete(AsynchWriteResult *result) {
}
void AsynchIO::completion(AsynchIoResult *result) {
+ bool closing = false;
+ bool deleting = false;
{
ScopedLock<Mutex> l(completionLock);
if (working) {
@@ -713,6 +734,8 @@ void AsynchIO::completion(AsynchIoResult *result) {
delete result;
result = 0;
InterlockedDecrement(&opsInProgress);
+ if (queuedClose && opsInProgress == 1 && readInProgress)
+ cancelRead();
}
// Lock is held again.
if (completionQueue.empty())
@@ -721,17 +744,40 @@ void AsynchIO::completion(AsynchIoResult *result) {
completionQueue.pop();
}
working = false;
+ if (opsInProgress == 0) {
+ closing = queuedClose;
+ deleting = queuedDelete;
+ }
}
// Lock released; ok to close if ops are done and close requested.
// Layer above will call back to queueForDeletion() if it hasn't
// already been done. If it already has, go ahead and delete.
- if (opsInProgress == 0) {
- if (queuedClose)
- // close() may cause a delete; don't trust 'this' on return
- close();
- else if (queuedDelete)
- delete this;
+ if (deleting)
+ delete this;
+ else if (closing)
+ // close() may cause a delete; don't trust 'this' on return
+ close();
+}
+
+/*
+ * NOTE - this method must be called in the same context as other completions,
+ * so that the resulting readComplete, and final AsynchIO::close() is serialized
+ * after this method returns.
+ */
+void AsynchIO::cancelRead() {
+ if (queuedDelete)
+ return; // socket already deleted
+ else {
+ ScopedLock<Mutex> l(completionLock);;
+ if (!completionQueue.empty())
+ return; // process it; come back later if necessary
}
+ // Cancel outstanding read and force to completion. Otherwise, on a faulty
+ // physical link, the pending read can remain uncompleted indefinitely.
+ // Draining the pending read will result in the official close (and
+ // notifyClosed). CancelIoEX() is the natural choice, but not available in
+ // XP, so we make do with closesocket().
+ socket.close();
}
} // namespace windows