summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/windows
diff options
context:
space:
mode:
authorStephen D. Huston <shuston@apache.org>2008-12-10 23:37:01 +0000
committerStephen D. Huston <shuston@apache.org>2008-12-10 23:37:01 +0000
commitb80ddfbfca160c20df9fc6d6f94806c013a43b03 (patch)
treebb25af1b12be42e08204db264cba665e1c95515f /cpp/src/qpid/sys/windows
parent6f03c29cfec851d9f63e7d8b0bbb98791dfc06db (diff)
downloadqpid-python-b80ddfbfca160c20df9fc6d6f94806c013a43b03.tar.gz
Add startReading() method required by sys::AsynchIO. Fixes QPID-1525.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@725486 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/windows')
-rw-r--r--cpp/src/qpid/sys/windows/AsynchIO.cpp116
1 files changed, 56 insertions, 60 deletions
diff --git a/cpp/src/qpid/sys/windows/AsynchIO.cpp b/cpp/src/qpid/sys/windows/AsynchIO.cpp
index e5efc874aa..ca56efd8dd 100644
--- a/cpp/src/qpid/sys/windows/AsynchIO.cpp
+++ b/cpp/src/qpid/sys/windows/AsynchIO.cpp
@@ -185,8 +185,8 @@ void AsynchAcceptResult::success(size_t /*bytesTransferred*/) {
}
void AsynchAcceptResult::failure(int status) {
- if (status != WSA_OPERATION_ABORTED)
- ;
+ //if (status != WSA_OPERATION_ABORTED)
+ // Can there be anything else? ;
delete this;
}
@@ -283,6 +283,7 @@ public:
virtual void notifyPendingWrite();
virtual void queueWriteClose();
virtual bool writeQueueEmpty();
+ virtual void startReading();
/**
* getQueuedBuffer returns a buffer from the buffer queue, if one is
@@ -320,7 +321,6 @@ private:
private:
// Dispatch events that have completed.
- void dispatchReadComplete(AsynchIO::BufferBase *buffer);
void notifyEof(void);
void notifyDisconnect(void);
void notifyClosed(void);
@@ -328,12 +328,6 @@ private:
void notifyIdle(void);
/**
- * Initiate a read operation. AsynchIO::dispatchReadComplete() will be
- * called when the read is complete and data is available.
- */
- void startRead(void);
-
- /**
* Initiate a write of the specified buffer. There's no callback for
* write completion to the AsynchIO object.
*/
@@ -431,7 +425,7 @@ void AsynchIO::start(Poller::shared_ptr poller0) {
poller->addFd(PollerHandle(socket), Poller::INPUT);
if (writeQueue.size() > 0) // Already have data queued for write
notifyPendingWrite();
- startRead();
+ startReading();
}
void AsynchIO::queueReadBuffer(AsynchIO::BufferBase* buff) {
@@ -487,56 +481,11 @@ bool AsynchIO::writeQueueEmpty() {
return writeQueue.size() == 0;
}
-/**
- * Return a queued buffer if there are enough to spare.
- */
-AsynchIO::BufferBase* AsynchIO::getQueuedBuffer() {
- QLock l(bufferQueueLock);
- // Always keep at least one buffer (it might have data that was
- // "unread" in it).
- if (bufferQueue.size() <= 1)
- return 0;
- BufferBase* buff = bufferQueue.back();
- assert(buff);
- bufferQueue.pop_back();
- return buff;
-}
-
-void AsynchIO::dispatchReadComplete(AsynchIO::BufferBase *buffer) {
- if (readCallback)
- readCallback(*this, buffer);
-}
-
-void AsynchIO::notifyEof(void) {
- if (eofCallback)
- eofCallback(*this);
-}
-
-void AsynchIO::notifyDisconnect(void) {
- if (disCallback)
- disCallback(*this);
-}
-
-void AsynchIO::notifyClosed(void) {
- if (closedCallback)
- closedCallback(*this, socket);
-}
-
-void AsynchIO::notifyBuffersEmpty(void) {
- if (emptyCallback)
- emptyCallback(*this);
-}
-
-void AsynchIO::notifyIdle(void) {
- if (idleCallback)
- idleCallback(*this);
-}
-
/*
- * Asynch reader/writer using overlapped I/O
+ * Initiate a read operation. AsynchIO::readComplete() will be
+ * called when the read is complete and data is available.
*/
-
-void AsynchIO::startRead(void) {
+void AsynchIO::startReading() {
if (queuedDelete)
return;
@@ -582,6 +531,50 @@ void AsynchIO::startRead(void) {
return;
}
+/**
+ * Return a queued buffer if there are enough to spare.
+ */
+AsynchIO::BufferBase* AsynchIO::getQueuedBuffer() {
+ QLock l(bufferQueueLock);
+ // Always keep at least one buffer (it might have data that was
+ // "unread" in it).
+ if (bufferQueue.size() <= 1)
+ return 0;
+ BufferBase* buff = bufferQueue.back();
+ assert(buff);
+ bufferQueue.pop_back();
+ return buff;
+}
+
+void AsynchIO::notifyEof(void) {
+ if (eofCallback)
+ eofCallback(*this);
+}
+
+void AsynchIO::notifyDisconnect(void) {
+ if (disCallback)
+ disCallback(*this);
+}
+
+void AsynchIO::notifyClosed(void) {
+ if (closedCallback)
+ closedCallback(*this, socket);
+}
+
+void AsynchIO::notifyBuffersEmpty(void) {
+ if (emptyCallback)
+ emptyCallback(*this);
+}
+
+void AsynchIO::notifyIdle(void) {
+ if (idleCallback)
+ idleCallback(*this);
+}
+
+/*
+ * Asynch reader/writer using overlapped I/O
+ */
+
void AsynchIO::startWrite(AsynchIO::BufferBase* buff) {
writeInProgress = true;
InterlockedIncrement(&opsInProgress);
@@ -622,9 +615,12 @@ void AsynchIO::readComplete(AsynchReadResult *result) {
int status = result->getStatus();
size_t bytes = result->getTransferred();
if (status == 0 && bytes > 0) {
+ bool restartRead = true; // May not if receiver doesn't want more
threadReadTotal += bytes;
- dispatchReadComplete(result->getBuff());
- startRead();
+ if (readCallback)
+ restartRead = readCallback(*this, result->getBuff());
+ if (restartRead)
+ startReading();
}
else {
// No data read, so put the buffer back. It may be partially filled,