summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/posix/AsynchIO.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys/posix/AsynchIO.cpp')
-rw-r--r--cpp/src/qpid/sys/posix/AsynchIO.cpp67
1 files changed, 32 insertions, 35 deletions
diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp
index 31355627cd..353a55f50c 100644
--- a/cpp/src/qpid/sys/posix/AsynchIO.cpp
+++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp
@@ -143,6 +143,7 @@ class AsynchConnector : public qpid::sys::AsynchConnector,
private:
void connComplete(DispatchHandle& handle);
+ void requestedCall(RequestCallback rCb);
private:
ConnectedCallback connCallback;
@@ -158,6 +159,7 @@ public:
FailedCallback failCb);
void start(Poller::shared_ptr poller);
void stop();
+ void requestCallback(RequestCallback rCb);
};
AsynchConnector::AsynchConnector(const Socket& s,
@@ -191,11 +193,30 @@ void AsynchConnector::stop()
stopWatch();
}
+void AsynchConnector::requestCallback(RequestCallback callback) {
+ // TODO creating a function object every time isn't all that
+ // efficient - if this becomes heavily used do something better (what?)
+ assert(callback);
+ DispatchHandle::call(boost::bind(&AsynchConnector::requestedCall, this, callback));
+}
+
+void AsynchConnector::requestedCall(RequestCallback callback) {
+ assert(callback);
+ callback(*this);
+}
+
void AsynchConnector::connComplete(DispatchHandle& h)
{
int errCode = socket.getError();
if (errCode == 0) {
h.stopWatch();
+ try {
+ socket.finishConnect(sa);
+ } catch (const std::exception& e) {
+ failCallback(socket, 0, e.what());
+ DispatchHandle::doDelete();
+ return;
+ }
connCallback(socket);
} else {
// Retry while we cause an immediate exception
@@ -247,10 +268,9 @@ public:
virtual void notifyPendingWrite();
virtual void queueWriteClose();
virtual bool writeQueueEmpty();
- virtual void startReading();
- virtual void stopReading();
virtual void requestCallback(RequestCallback);
virtual BufferBase* getQueuedBuffer();
+ virtual SecuritySettings getSecuritySettings();
private:
~AsynchIO();
@@ -282,13 +302,6 @@ private:
* thread processing this handle.
*/
volatile bool writePending;
- /**
- * This records whether we've been reading is flow controlled:
- * it's safe as a simple boolean as the only way to be stopped
- * is in calls only allowed in the callback context, the only calls
- * checking it are also in calls only allowed in callback context.
- */
- volatile bool readingStopped;
};
AsynchIO::AsynchIO(const Socket& s,
@@ -307,8 +320,7 @@ AsynchIO::AsynchIO(const Socket& s,
idleCallback(iCb),
socket(s),
queuedClose(false),
- writePending(false),
- readingStopped(false) {
+ writePending(false) {
s.setNonblocking();
}
@@ -344,7 +356,7 @@ void AsynchIO::queueReadBuffer(BufferBase* buff) {
bool queueWasEmpty = bufferQueue.empty();
bufferQueue.push_back(buff);
- if (queueWasEmpty && !readingStopped)
+ if (queueWasEmpty)
DispatchHandle::rewatchRead();
}
@@ -354,7 +366,7 @@ void AsynchIO::unread(BufferBase* buff) {
bool queueWasEmpty = bufferQueue.empty();
bufferQueue.push_front(buff);
- if (queueWasEmpty && !readingStopped)
+ if (queueWasEmpty)
DispatchHandle::rewatchRead();
}
@@ -386,17 +398,6 @@ bool AsynchIO::writeQueueEmpty() {
return writeQueue.empty();
}
-// This can happen outside the callback context
-void AsynchIO::startReading() {
- readingStopped = false;
- DispatchHandle::rewatchRead();
-}
-
-void AsynchIO::stopReading() {
- readingStopped = true;
- DispatchHandle::unwatchRead();
-}
-
void AsynchIO::requestCallback(RequestCallback callback) {
// TODO creating a function object every time isn't all that
// efficient - if this becomes heavily used do something better (what?)
@@ -429,11 +430,6 @@ AsynchIO::BufferBase* AsynchIO::getQueuedBuffer() {
* to put it in and reading is not stopped by flow control.
*/
void AsynchIO::readable(DispatchHandle& h) {
- if (readingStopped) {
- // We have been flow controlled.
- QPID_PROBE1(asynchio_read_flowcontrolled, &h);
- return;
- }
AbsTime readStartTime = AbsTime::now();
size_t total = 0;
int readCalls = 0;
@@ -455,12 +451,6 @@ void AsynchIO::readable(DispatchHandle& h) {
total += rc;
readCallback(*this, buff);
- if (readingStopped) {
- // We have been flow controlled.
- QPID_PROBE4(asynchio_read_finished_flowcontrolled, &h, duration, total, readCalls);
- break;
- }
-
if (rc != readCount) {
// If we didn't fill the read buffer then time to stop reading
QPID_PROBE4(asynchio_read_finished_done, &h, duration, total, readCalls);
@@ -626,6 +616,13 @@ void AsynchIO::close(DispatchHandle& h) {
}
}
+SecuritySettings AsynchIO::getSecuritySettings() {
+ SecuritySettings settings;
+ settings.ssf = socket.getKeyLen();
+ settings.authid = socket.getClientAuthId();
+ return settings;
+}
+
} // namespace posix
AsynchAcceptor* AsynchAcceptor::create(const Socket& s,