summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStephen D. Huston <shuston@apache.org>2012-05-05 20:23:48 +0000
committerStephen D. Huston <shuston@apache.org>2012-05-05 20:23:48 +0000
commit691b0b1849413e576d105803fd93adb2457796ac (patch)
treedba535d790260ca2865aceb2e77bff4c2d93693a
parentbfd07dc5e87c63803dcb45c492a4746d8a9fc391 (diff)
downloadqpid-python-0.8-release-candidates.tar.gz
Merge in QPID-3759 from trunk and fixes QPID-3982.0.8-release-candidates
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.8-release-candidates@1334484 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp158
-rwxr-xr-xqpid/cpp/src/qpid/sys/windows/IocpPoller.cpp5
2 files changed, 103 insertions, 60 deletions
diff --git a/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
index c55d34e875..8f835e3f8c 100644
--- a/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
+++ b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
@@ -46,16 +46,13 @@ namespace {
/*
* The function pointers for AcceptEx and ConnectEx need to be looked up
- * at run time. Make sure this is done only once.
+ * at run time.
*/
-boost::once_flag lookUpAcceptExOnce = BOOST_ONCE_INIT;
-LPFN_ACCEPTEX fnAcceptEx = 0;
-typedef void (*lookUpFunc)(const qpid::sys::Socket &);
-
-void lookUpAcceptEx() {
- SOCKET h = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
+const LPFN_ACCEPTEX lookUpAcceptEx(const qpid::sys::Socket& s) {
+ SOCKET h = toSocketHandle(s);
GUID guidAcceptEx = WSAID_ACCEPTEX;
DWORD dwBytes = 0;
+ LPFN_ACCEPTEX fnAcceptEx;
WSAIoctl(h,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&guidAcceptEx,
@@ -65,9 +62,9 @@ void lookUpAcceptEx() {
&dwBytes,
NULL,
NULL);
- closesocket(h);
if (fnAcceptEx == 0)
throw qpid::Exception(QPID_MSG("Failed to look up AcceptEx"));
+ return fnAcceptEx;
}
}
@@ -94,18 +91,15 @@ private:
AsynchAcceptor::Callback acceptedCallback;
const Socket& socket;
+ const LPFN_ACCEPTEX fnAcceptEx;
};
AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback)
: acceptedCallback(callback),
- socket(s) {
+ socket(s),
+ fnAcceptEx(lookUpAcceptEx(s)) {
s.setNonblocking();
-#if (BOOST_VERSION >= 103500) /* boost 1.35 or later reversed the args */
- boost::call_once(lookUpAcceptExOnce, lookUpAcceptEx);
-#else
- boost::call_once(lookUpAcceptEx, lookUpAcceptExOnce);
-#endif
}
AsynchAcceptor::~AsynchAcceptor()
@@ -114,7 +108,8 @@ AsynchAcceptor::~AsynchAcceptor()
}
void AsynchAcceptor::start(Poller::shared_ptr poller) {
- poller->monitorHandle(PollerHandle(socket), Poller::INPUT);
+ PollerHandle ph = PollerHandle(socket);
+ poller->monitorHandle(ph, Poller::INPUT);
restart ();
}
@@ -124,14 +119,14 @@ void AsynchAcceptor::restart(void) {
this,
toSocketHandle(socket));
BOOL status;
- status = ::fnAcceptEx(toSocketHandle(socket),
- toSocketHandle(*result->newSocket),
- result->addressBuffer,
- 0,
- AsynchAcceptResult::SOCKADDRMAXLEN,
- AsynchAcceptResult::SOCKADDRMAXLEN,
- &bytesReceived,
- result->overlapped());
+ status = fnAcceptEx(toSocketHandle(socket),
+ toSocketHandle(*result->newSocket),
+ result->addressBuffer,
+ 0,
+ AsynchAcceptResult::SOCKADDRMAXLEN,
+ AsynchAcceptResult::SOCKADDRMAXLEN,
+ &bytesReceived,
+ result->overlapped());
QPID_WINDOWS_CHECK_ASYNC_START(status);
}
@@ -154,7 +149,7 @@ void AsynchAcceptResult::success(size_t /*bytesTransferred*/) {
delete this;
}
-void AsynchAcceptResult::failure(int status) {
+void AsynchAcceptResult::failure(int /*status*/) {
//if (status != WSA_OPERATION_ABORTED)
// Can there be anything else? ;
delete this;
@@ -177,7 +172,7 @@ private:
public:
AsynchConnector(const Socket& socket,
- std::string hostname,
+ std::string& hostname,
uint16_t port,
ConnectedCallback connCb,
FailedCallback failCb = 0);
@@ -185,7 +180,7 @@ public:
};
AsynchConnector::AsynchConnector(const Socket& sock,
- std::string hname,
+ std::string& hname,
uint16_t p,
ConnectedCallback connCb,
FailedCallback failCb) :
@@ -294,6 +289,8 @@ private:
volatile LONG opsInProgress;
// Is there a write in progress?
volatile bool writeInProgress;
+ // Or a read?
+ volatile bool readInProgress;
// Deletion requested, but there are callbacks in progress.
volatile bool queuedDelete;
// Socket close requested, but there are operations in progress.
@@ -347,6 +344,11 @@ private:
* Called when there's a completion to process.
*/
void completion(AsynchIoResult *result);
+
+ /**
+ * Helper function to facilitate the close operation
+ */
+ void cancelRead();
};
// This is used to encapsulate pure callbacks into a handle
@@ -375,6 +377,7 @@ AsynchIO::AsynchIO(const Socket& s,
socket(s),
opsInProgress(0),
writeInProgress(false),
+ readInProgress(false),
queuedDelete(false),
queuedClose(false),
working(false) {
@@ -392,26 +395,30 @@ AsynchIO::~AsynchIO() {
}
void AsynchIO::queueForDeletion() {
- queuedDelete = true;
- if (opsInProgress > 0) {
- QPID_LOG(info, "Delete AsynchIO queued; ops in progress");
- // AsynchIOHandler calls this then deletes itself; don't do any more
- // callbacks.
- readCallback = 0;
- eofCallback = 0;
- disCallback = 0;
- closedCallback = 0;
- emptyCallback = 0;
- idleCallback = 0;
- }
- else {
- delete this;
+ {
+ ScopedLock<Mutex> l(completionLock);
+ assert(!queuedDelete);
+ queuedDelete = true;
+ if (working || opsInProgress > 0) {
+ QPID_LOG(info, "Delete AsynchIO queued; ops in progress");
+ // AsynchIOHandler calls this then deletes itself; don't do any more
+ // callbacks.
+ readCallback = 0;
+ eofCallback = 0;
+ disCallback = 0;
+ closedCallback = 0;
+ emptyCallback = 0;
+ idleCallback = 0;
+ return;
+ }
}
+ delete this;
}
void AsynchIO::start(Poller::shared_ptr poller0) {
+ PollerHandle ph = PollerHandle(socket);
poller = poller0;
- poller->monitorHandle(PollerHandle(socket), Poller::INPUT);
+ poller->monitorHandle(ph, Poller::INPUT);
if (writeQueue.size() > 0) // Already have data queued for write
notifyPendingWrite();
startReading();
@@ -453,9 +460,14 @@ void AsynchIO::notifyPendingWrite() {
}
void AsynchIO::queueWriteClose() {
- queuedClose = true;
- if (!writeInProgress)
- notifyPendingWrite();
+ {
+ ScopedLock<Mutex> l(completionLock);
+ queuedClose = true;
+ if (working || writeInProgress)
+ // no need to summon an IO thread
+ return;
+ }
+ notifyPendingWrite();
}
bool AsynchIO::writeQueueEmpty() {
@@ -468,7 +480,7 @@ bool AsynchIO::writeQueueEmpty() {
* called when the read is complete and data is available.
*/
void AsynchIO::startReading() {
- if (queuedDelete)
+ if (queuedDelete || queuedClose)
return;
// (Try to) get a buffer; look on the front since there may be an
@@ -491,6 +503,7 @@ void AsynchIO::startReading() {
readCount);
DWORD bytesReceived = 0, flags = 0;
InterlockedIncrement(&opsInProgress);
+ readInProgress = true;
int status = WSARecv(toSocketHandle(socket),
const_cast<LPWSABUF>(result->getWSABUF()), 1,
&bytesReceived,
@@ -584,7 +597,6 @@ void AsynchIO::notifyIdle(void) {
void AsynchIO::startWrite(AsynchIO::BufferBase* buff) {
writeInProgress = true;
InterlockedIncrement(&opsInProgress);
- int writeCount = buff->byteCount-buff->dataCount;
AsynchWriteResult *result =
new AsynchWriteResult(boost::bind(&AsynchIO::completion, this, _1),
buff,
@@ -619,21 +631,24 @@ void AsynchIO::close(void) {
void AsynchIO::readComplete(AsynchReadResult *result) {
int status = result->getStatus();
size_t bytes = result->getTransferred();
+ readInProgress = false;
if (status == 0 && bytes > 0) {
- bool restartRead = true; // May not if receiver doesn't want more
if (readCallback)
readCallback(*this, result->getBuff());
- if (restartRead)
- startReading();
+ startReading();
}
else {
// No data read, so put the buffer back. It may be partially filled,
// so "unread" it back to the front of the queue.
unread(result->getBuff());
- if (status == 0)
- notifyEof();
- else
+ if (queuedClose) {
+ return; // Expected from cancelRead()
+ }
+ notifyEof();
+ if (status != 0)
+ {
notifyDisconnect();
+ }
}
}
@@ -684,6 +699,8 @@ void AsynchIO::writeComplete(AsynchWriteResult *result) {
}
void AsynchIO::completion(AsynchIoResult *result) {
+ bool closing = false;
+ bool deleting = false;
{
ScopedLock<Mutex> l(completionLock);
if (working) {
@@ -715,6 +732,8 @@ void AsynchIO::completion(AsynchIoResult *result) {
delete result;
result = 0;
InterlockedDecrement(&opsInProgress);
+ if (queuedClose && opsInProgress == 1 && readInProgress)
+ cancelRead();
}
// Lock is held again.
if (completionQueue.empty())
@@ -723,17 +742,40 @@ void AsynchIO::completion(AsynchIoResult *result) {
completionQueue.pop();
}
working = false;
+ if (opsInProgress == 0) {
+ closing = queuedClose;
+ deleting = queuedDelete;
+ }
}
// Lock released; ok to close if ops are done and close requested.
// Layer above will call back to queueForDeletion() if it hasn't
// already been done. If it already has, go ahead and delete.
- if (opsInProgress == 0) {
- if (queuedClose)
- // close() may cause a delete; don't trust 'this' on return
- close();
- else if (queuedDelete)
- delete this;
+ if (deleting)
+ delete this;
+ else if (closing)
+ // close() may cause a delete; don't trust 'this' on return
+ close();
+}
+
+/*
+ * NOTE - this method must be called in the same context as other completions,
+ * so that the resulting readComplete, and final AsynchIO::close() is serialized
+ * after this method returns.
+ */
+void AsynchIO::cancelRead() {
+ if (queuedDelete)
+ return; // socket already deleted
+ else {
+ ScopedLock<Mutex> l(completionLock);;
+ if (!completionQueue.empty())
+ return; // process it; come back later if necessary
}
+ // Cancel outstanding read and force to completion. Otherwise, on a faulty
+ // physical link, the pending read can remain uncompleted indefinitely.
+ // Draining the pending read will result in the official close (and
+ // notifyClosed). CancelIoEX() is the natural choice, but not available in
+ // XP, so we make do with closesocket().
+ socket.close();
}
} // namespace windows
diff --git a/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp b/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp
index d326ab02ac..4c0b420eee 100755
--- a/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp
+++ b/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp
@@ -96,6 +96,7 @@ void Poller::shutdown() {
// Allow sloppy code to shut us down more than once.
if (impl->isShutdown)
return;
+ impl->isShutdown = true;
ULONG_PTR key = 1; // Tell wait() it's a shutdown, not I/O
PostQueuedCompletionStatus(impl->iocp, 0, key, 0);
}
@@ -110,7 +111,7 @@ bool Poller::interrupt(PollerHandle&) {
}
void Poller::run() {
- do {
+ while (!impl->isShutdown) {
Poller::Event event = this->wait();
// Handle shutdown
@@ -124,7 +125,7 @@ void Poller::run() {
// This should be impossible
assert(false);
}
- } while (true);
+ }
}
void Poller::monitorHandle(PollerHandle& handle, Direction dir) {