diff options
Diffstat (limited to 'cpp/src/qpid/sys/posix/AsynchIO.cpp')
-rw-r--r-- | cpp/src/qpid/sys/posix/AsynchIO.cpp | 67 |
1 files changed, 32 insertions, 35 deletions
diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp index 31355627cd..353a55f50c 100644 --- a/cpp/src/qpid/sys/posix/AsynchIO.cpp +++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp @@ -143,6 +143,7 @@ class AsynchConnector : public qpid::sys::AsynchConnector, private: void connComplete(DispatchHandle& handle); + void requestedCall(RequestCallback rCb); private: ConnectedCallback connCallback; @@ -158,6 +159,7 @@ public: FailedCallback failCb); void start(Poller::shared_ptr poller); void stop(); + void requestCallback(RequestCallback rCb); }; AsynchConnector::AsynchConnector(const Socket& s, @@ -191,11 +193,30 @@ void AsynchConnector::stop() stopWatch(); } +void AsynchConnector::requestCallback(RequestCallback callback) { + // TODO creating a function object every time isn't all that + // efficient - if this becomes heavily used do something better (what?) + assert(callback); + DispatchHandle::call(boost::bind(&AsynchConnector::requestedCall, this, callback)); +} + +void AsynchConnector::requestedCall(RequestCallback callback) { + assert(callback); + callback(*this); +} + void AsynchConnector::connComplete(DispatchHandle& h) { int errCode = socket.getError(); if (errCode == 0) { h.stopWatch(); + try { + socket.finishConnect(sa); + } catch (const std::exception& e) { + failCallback(socket, 0, e.what()); + DispatchHandle::doDelete(); + return; + } connCallback(socket); } else { // Retry while we cause an immediate exception @@ -247,10 +268,9 @@ public: virtual void notifyPendingWrite(); virtual void queueWriteClose(); virtual bool writeQueueEmpty(); - virtual void startReading(); - virtual void stopReading(); virtual void requestCallback(RequestCallback); virtual BufferBase* getQueuedBuffer(); + virtual SecuritySettings getSecuritySettings(); private: ~AsynchIO(); @@ -282,13 +302,6 @@ private: * thread processing this handle. */ volatile bool writePending; - /** - * This records whether we've been reading is flow controlled: - * it's safe as a simple boolean as the only way to be stopped - * is in calls only allowed in the callback context, the only calls - * checking it are also in calls only allowed in callback context. - */ - volatile bool readingStopped; }; AsynchIO::AsynchIO(const Socket& s, @@ -307,8 +320,7 @@ AsynchIO::AsynchIO(const Socket& s, idleCallback(iCb), socket(s), queuedClose(false), - writePending(false), - readingStopped(false) { + writePending(false) { s.setNonblocking(); } @@ -344,7 +356,7 @@ void AsynchIO::queueReadBuffer(BufferBase* buff) { bool queueWasEmpty = bufferQueue.empty(); bufferQueue.push_back(buff); - if (queueWasEmpty && !readingStopped) + if (queueWasEmpty) DispatchHandle::rewatchRead(); } @@ -354,7 +366,7 @@ void AsynchIO::unread(BufferBase* buff) { bool queueWasEmpty = bufferQueue.empty(); bufferQueue.push_front(buff); - if (queueWasEmpty && !readingStopped) + if (queueWasEmpty) DispatchHandle::rewatchRead(); } @@ -386,17 +398,6 @@ bool AsynchIO::writeQueueEmpty() { return writeQueue.empty(); } -// This can happen outside the callback context -void AsynchIO::startReading() { - readingStopped = false; - DispatchHandle::rewatchRead(); -} - -void AsynchIO::stopReading() { - readingStopped = true; - DispatchHandle::unwatchRead(); -} - void AsynchIO::requestCallback(RequestCallback callback) { // TODO creating a function object every time isn't all that // efficient - if this becomes heavily used do something better (what?) @@ -429,11 +430,6 @@ AsynchIO::BufferBase* AsynchIO::getQueuedBuffer() { * to put it in and reading is not stopped by flow control. */ void AsynchIO::readable(DispatchHandle& h) { - if (readingStopped) { - // We have been flow controlled. - QPID_PROBE1(asynchio_read_flowcontrolled, &h); - return; - } AbsTime readStartTime = AbsTime::now(); size_t total = 0; int readCalls = 0; @@ -455,12 +451,6 @@ void AsynchIO::readable(DispatchHandle& h) { total += rc; readCallback(*this, buff); - if (readingStopped) { - // We have been flow controlled. - QPID_PROBE4(asynchio_read_finished_flowcontrolled, &h, duration, total, readCalls); - break; - } - if (rc != readCount) { // If we didn't fill the read buffer then time to stop reading QPID_PROBE4(asynchio_read_finished_done, &h, duration, total, readCalls); @@ -626,6 +616,13 @@ void AsynchIO::close(DispatchHandle& h) { } } +SecuritySettings AsynchIO::getSecuritySettings() { + SecuritySettings settings; + settings.ssf = socket.getKeyLen(); + settings.authid = socket.getClientAuthId(); + return settings; +} + } // namespace posix AsynchAcceptor* AsynchAcceptor::create(const Socket& s, |