summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp')
-rw-r--r--qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp31
1 files changed, 18 insertions, 13 deletions
diff --git a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
index 8545ebd9cb..c042dcef01 100644
--- a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
+++ b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
@@ -21,6 +21,7 @@
#include "qpid/sys/AsynchIO.h"
#include "qpid/sys/Socket.h"
+#include "qpid/sys/SocketAddress.h"
#include "qpid/sys/Poller.h"
#include "qpid/sys/DispatchHandle.h"
#include "qpid/sys/Time.h"
@@ -37,6 +38,7 @@
#include <string.h>
#include <boost/bind.hpp>
+#include <boost/lexical_cast.hpp>
using namespace qpid::sys;
@@ -161,11 +163,12 @@ class AsynchConnector : public qpid::sys::AsynchConnector,
private:
void connComplete(DispatchHandle& handle);
- void failure(int, std::string);
+ void failure(int, const std::string&);
private:
ConnectedCallback connCallback;
FailedCallback failCallback;
+ std::string errMsg;
const Socket& socket;
public:
@@ -174,7 +177,7 @@ public:
std::string hostname,
uint16_t port,
ConnectedCallback connCb,
- FailedCallback failCb = 0);
+ FailedCallback failCb);
};
AsynchConnector::AsynchConnector(const Socket& s,
@@ -192,12 +195,17 @@ AsynchConnector::AsynchConnector(const Socket& s,
socket(s)
{
socket.setNonblocking();
+ SocketAddress sa(hostname, boost::lexical_cast<std::string>(port));
try {
- socket.connect(hostname, port);
- startWatch(poller);
+ socket.connect(sa);
} catch(std::exception& e) {
- failure(-1, std::string(e.what()));
+ // Defer reporting failure
+ startWatch(poller);
+ errMsg = e.what();
+ DispatchHandle::call(boost::bind(&AsynchConnector::failure, this, -1, errMsg));
+ return;
}
+ startWatch(poller);
}
void AsynchConnector::connComplete(DispatchHandle& h)
@@ -209,17 +217,13 @@ void AsynchConnector::connComplete(DispatchHandle& h)
connCallback(socket);
DispatchHandle::doDelete();
} else {
- failure(errCode, std::string(strError(errCode)));
+ failure(errCode, strError(errCode));
}
}
-void AsynchConnector::failure(int errCode, std::string message)
+void AsynchConnector::failure(int errCode, const std::string& message)
{
- if (failCallback)
- failCallback(errCode, message);
-
- socket.close();
- delete &socket;
+ failCallback(socket, errCode, message);
DispatchHandle::doDelete();
}
@@ -467,7 +471,8 @@ void AsynchIO::readable(DispatchHandle& h) {
threadReadTotal += rc;
readTotal += rc;
- if (!readCallback(*this, buff)) {
+ readCallback(*this, buff);
+ if (readingStopped) {
// We have been flow controlled.
break;
}