diff options
author | Stephen D. Huston <shuston@apache.org> | 2008-12-10 23:37:01 +0000 |
---|---|---|
committer | Stephen D. Huston <shuston@apache.org> | 2008-12-10 23:37:01 +0000 |
commit | b80ddfbfca160c20df9fc6d6f94806c013a43b03 (patch) | |
tree | bb25af1b12be42e08204db264cba665e1c95515f /cpp/src/qpid/sys/windows | |
parent | 6f03c29cfec851d9f63e7d8b0bbb98791dfc06db (diff) | |
download | qpid-python-b80ddfbfca160c20df9fc6d6f94806c013a43b03.tar.gz |
Add startReading() method required by sys::AsynchIO. Fixes QPID-1525.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@725486 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/windows')
-rw-r--r-- | cpp/src/qpid/sys/windows/AsynchIO.cpp | 116 |
1 files changed, 56 insertions, 60 deletions
diff --git a/cpp/src/qpid/sys/windows/AsynchIO.cpp b/cpp/src/qpid/sys/windows/AsynchIO.cpp index e5efc874aa..ca56efd8dd 100644 --- a/cpp/src/qpid/sys/windows/AsynchIO.cpp +++ b/cpp/src/qpid/sys/windows/AsynchIO.cpp @@ -185,8 +185,8 @@ void AsynchAcceptResult::success(size_t /*bytesTransferred*/) { } void AsynchAcceptResult::failure(int status) { - if (status != WSA_OPERATION_ABORTED) - ; + //if (status != WSA_OPERATION_ABORTED) + // Can there be anything else? ; delete this; } @@ -283,6 +283,7 @@ public: virtual void notifyPendingWrite(); virtual void queueWriteClose(); virtual bool writeQueueEmpty(); + virtual void startReading(); /** * getQueuedBuffer returns a buffer from the buffer queue, if one is @@ -320,7 +321,6 @@ private: private: // Dispatch events that have completed. - void dispatchReadComplete(AsynchIO::BufferBase *buffer); void notifyEof(void); void notifyDisconnect(void); void notifyClosed(void); @@ -328,12 +328,6 @@ private: void notifyIdle(void); /** - * Initiate a read operation. AsynchIO::dispatchReadComplete() will be - * called when the read is complete and data is available. - */ - void startRead(void); - - /** * Initiate a write of the specified buffer. There's no callback for * write completion to the AsynchIO object. */ @@ -431,7 +425,7 @@ void AsynchIO::start(Poller::shared_ptr poller0) { poller->addFd(PollerHandle(socket), Poller::INPUT); if (writeQueue.size() > 0) // Already have data queued for write notifyPendingWrite(); - startRead(); + startReading(); } void AsynchIO::queueReadBuffer(AsynchIO::BufferBase* buff) { @@ -487,56 +481,11 @@ bool AsynchIO::writeQueueEmpty() { return writeQueue.size() == 0; } -/** - * Return a queued buffer if there are enough to spare. - */ -AsynchIO::BufferBase* AsynchIO::getQueuedBuffer() { - QLock l(bufferQueueLock); - // Always keep at least one buffer (it might have data that was - // "unread" in it). - if (bufferQueue.size() <= 1) - return 0; - BufferBase* buff = bufferQueue.back(); - assert(buff); - bufferQueue.pop_back(); - return buff; -} - -void AsynchIO::dispatchReadComplete(AsynchIO::BufferBase *buffer) { - if (readCallback) - readCallback(*this, buffer); -} - -void AsynchIO::notifyEof(void) { - if (eofCallback) - eofCallback(*this); -} - -void AsynchIO::notifyDisconnect(void) { - if (disCallback) - disCallback(*this); -} - -void AsynchIO::notifyClosed(void) { - if (closedCallback) - closedCallback(*this, socket); -} - -void AsynchIO::notifyBuffersEmpty(void) { - if (emptyCallback) - emptyCallback(*this); -} - -void AsynchIO::notifyIdle(void) { - if (idleCallback) - idleCallback(*this); -} - /* - * Asynch reader/writer using overlapped I/O + * Initiate a read operation. AsynchIO::readComplete() will be + * called when the read is complete and data is available. */ - -void AsynchIO::startRead(void) { +void AsynchIO::startReading() { if (queuedDelete) return; @@ -582,6 +531,50 @@ void AsynchIO::startRead(void) { return; } +/** + * Return a queued buffer if there are enough to spare. + */ +AsynchIO::BufferBase* AsynchIO::getQueuedBuffer() { + QLock l(bufferQueueLock); + // Always keep at least one buffer (it might have data that was + // "unread" in it). + if (bufferQueue.size() <= 1) + return 0; + BufferBase* buff = bufferQueue.back(); + assert(buff); + bufferQueue.pop_back(); + return buff; +} + +void AsynchIO::notifyEof(void) { + if (eofCallback) + eofCallback(*this); +} + +void AsynchIO::notifyDisconnect(void) { + if (disCallback) + disCallback(*this); +} + +void AsynchIO::notifyClosed(void) { + if (closedCallback) + closedCallback(*this, socket); +} + +void AsynchIO::notifyBuffersEmpty(void) { + if (emptyCallback) + emptyCallback(*this); +} + +void AsynchIO::notifyIdle(void) { + if (idleCallback) + idleCallback(*this); +} + +/* + * Asynch reader/writer using overlapped I/O + */ + void AsynchIO::startWrite(AsynchIO::BufferBase* buff) { writeInProgress = true; InterlockedIncrement(&opsInProgress); @@ -622,9 +615,12 @@ void AsynchIO::readComplete(AsynchReadResult *result) { int status = result->getStatus(); size_t bytes = result->getTransferred(); if (status == 0 && bytes > 0) { + bool restartRead = true; // May not if receiver doesn't want more threadReadTotal += bytes; - dispatchReadComplete(result->getBuff()); - startRead(); + if (readCallback) + restartRead = readCallback(*this, result->getBuff()); + if (restartRead) + startReading(); } else { // No data read, so put the buffer back. It may be partially filled, |