summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/sys
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/sys')
-rw-r--r--qpid/cpp/src/qpid/sys/AsynchIO.h6
-rw-r--r--qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp39
-rw-r--r--qpid/cpp/src/qpid/sys/AsynchIOHandler.h2
-rw-r--r--qpid/cpp/src/qpid/sys/LatencyTracker.h157
-rw-r--r--qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp20
-rw-r--r--qpid/cpp/src/qpid/sys/Socket.h2
-rw-r--r--qpid/cpp/src/qpid/sys/SocketAddress.h51
-rw-r--r--qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp27
-rw-r--r--qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp31
-rw-r--r--qpid/cpp/src/qpid/sys/posix/Socket.cpp55
-rw-r--r--qpid/cpp/src/qpid/sys/posix/SocketAddress.cpp71
-rw-r--r--qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp17
-rw-r--r--qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp5
-rw-r--r--qpid/cpp/src/qpid/sys/rdma/RdmaIO.h9
-rw-r--r--qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp12
-rw-r--r--qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h9
-rw-r--r--qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp2
17 files changed, 235 insertions, 280 deletions
diff --git a/qpid/cpp/src/qpid/sys/AsynchIO.h b/qpid/cpp/src/qpid/sys/AsynchIO.h
index fb02183359..419770568a 100644
--- a/qpid/cpp/src/qpid/sys/AsynchIO.h
+++ b/qpid/cpp/src/qpid/sys/AsynchIO.h
@@ -57,7 +57,7 @@ public:
class AsynchConnector {
public:
typedef boost::function1<void, const Socket&> ConnectedCallback;
- typedef boost::function2<void, int, std::string> FailedCallback;
+ typedef boost::function3<void, const Socket&, int, const std::string&> FailedCallback;
// Call create() to allocate a new AsynchConnector object with the
// specified poller, addressing, and callbacks.
@@ -70,7 +70,7 @@ public:
std::string hostname,
uint16_t port,
ConnectedCallback connCb,
- FailedCallback failCb = 0);
+ FailedCallback failCb);
protected:
AsynchConnector() {}
@@ -108,7 +108,7 @@ class AsynchIO {
public:
typedef AsynchIOBufferBase BufferBase;
- typedef boost::function2<bool, AsynchIO&, BufferBase*> ReadCallback;
+ typedef boost::function2<void, AsynchIO&, BufferBase*> ReadCallback;
typedef boost::function1<void, AsynchIO&> EofCallback;
typedef boost::function1<void, AsynchIO&> DisconnectCallback;
typedef boost::function2<void, AsynchIO&, const Socket&> ClosedCallback;
diff --git a/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp b/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp
index 8094abd43d..eb0f213547 100644
--- a/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp
+++ b/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp
@@ -103,10 +103,31 @@ void AsynchIOHandler::giveReadCredit(int32_t credit) {
aio->startReading();
}
-bool AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) {
+void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) {
if (readError) {
- return false;
+ return;
+ }
+
+ // Check here for read credit
+ if (readCredit.get() != InfiniteCredit) {
+ if (readCredit.get() == 0) {
+ // FIXME aconway 2009-10-01: Workaround to avoid "false wakeups".
+ // readbuff is sometimes called with no credit.
+ // This should be fixed somewhere else to avoid such calls.
+ aio->unread(buff);
+ return;
+ }
+ // TODO In theory should be able to use an atomic operation before taking the lock
+ // but in practice there seems to be an unexplained race in that case
+ ScopedLock<Mutex> l(creditLock);
+ if (--readCredit == 0) {
+ assert(readCredit.get() >= 0);
+ if (readCredit.get() == 0) {
+ aio->stopReading();
+ }
+ }
}
+
size_t decoded = 0;
if (codec) { // Already initiated
try {
@@ -149,20 +170,6 @@ bool AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) {
// Give whole buffer back to aio subsystem
aio->queueReadBuffer(buff);
}
- // Check here for read credit
- if (readCredit.get() != InfiniteCredit) {
- // TODO In theory should be able to use an atomic operation before taking the lock
- // but in practice there seems to be an unexplained race in that case
- ScopedLock<Mutex> l(creditLock);
- if (--readCredit == 0) {
- assert(readCredit.get() >= 0);
- if (readCredit.get() == 0) {
- aio->stopReading();
- return false;
- }
- }
- }
- return true;
}
void AsynchIOHandler::eof(AsynchIO&) {
diff --git a/qpid/cpp/src/qpid/sys/AsynchIOHandler.h b/qpid/cpp/src/qpid/sys/AsynchIOHandler.h
index 9785f445a4..e1885bac79 100644
--- a/qpid/cpp/src/qpid/sys/AsynchIOHandler.h
+++ b/qpid/cpp/src/qpid/sys/AsynchIOHandler.h
@@ -65,7 +65,7 @@ class AsynchIOHandler : public OutputControl {
QPID_COMMON_EXTERN void giveReadCredit(int32_t credit);
// Input side
- QPID_COMMON_EXTERN bool readbuff(AsynchIO& aio, AsynchIOBufferBase* buff);
+ QPID_COMMON_EXTERN void readbuff(AsynchIO& aio, AsynchIOBufferBase* buff);
QPID_COMMON_EXTERN void eof(AsynchIO& aio);
QPID_COMMON_EXTERN void disconnect(AsynchIO& aio);
diff --git a/qpid/cpp/src/qpid/sys/LatencyTracker.h b/qpid/cpp/src/qpid/sys/LatencyTracker.h
deleted file mode 100644
index 3294528ff6..0000000000
--- a/qpid/cpp/src/qpid/sys/LatencyTracker.h
+++ /dev/null
@@ -1,157 +0,0 @@
-#ifndef QPID_SYS_LATENCYTRACKER_H
-#define QPID_SYS_LATENCYTRACKER_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/sys/Time.h"
-#include <string>
-#include <limits>
-#include <map>
-
-namespace qpid {
-namespace sys {
-
-/**@file Tools for measuring latency. NOT SUITABLE FOR PROUDCTION BUILDS.
- * Uses should be compiled only if QPID_LATENCY_TRACKER is defined.
- * See the convenience macros at the end of this file.
- */
-
-/** Used by LatencyCounter and LatencyTracker below */
-class LatencyStatistic {
- public:
- LatencyStatistic(std::string name_) : name(name_), count(0), total(0), min(std::numeric_limits<int64_t>::max()), max(0) {}
- ~LatencyStatistic() { print(); }
-
- void record(Duration d) {
- total += d;
- ++count;
- if (d > max) max=d;
- if (d < min) min=d;
- }
-
- void print() {
- if (count) {
- double meanMsec = (double(total)/count)/TIME_MSEC;
- printf("\n==== Latency metric %s: samples=%lu mean=%fms (%f-%f)\n", name.c_str(), count, meanMsec, double(min)/TIME_MSEC, double(max)/TIME_MSEC);
- }
- else
- printf("\n==== Latency metric %s: no samples.\n", name.c_str());
- }
-
- private:
- std::string name;
- unsigned long count;
- int64_t total, min, max;
-};
-
-/** Measure delay between seeing the same value at start and finish. */
-template <class T> class LatencyTracker {
- public:
- LatencyTracker(std::string name) : measuring(false), stat(name) {}
-
- void start(T value) {
- sys::Mutex::ScopedLock l(lock);
- if (!measuring) {
- measureAt = value;
- measuring = true;
- startTime = AbsTime::now();
- }
- }
-
- void finish(T value) {
- sys::Mutex::ScopedLock l(lock);
- if(measuring && measureAt == value) {
- stat.record(Duration(startTime, AbsTime::now()));
- measuring = false;
- }
- }
-
- private:
- sys::Mutex lock;
- bool measuring;
- T measureAt;
- AbsTime startTime;
- LatencyStatistic stat;
-};
-
-
-/** Measures delay between the nth call to start and the nth call to finish.
- * E.g. to measure latency between sending & receiving an ordered stream of messages.
- */
-class LatencyCounter {
- public:
- LatencyCounter(std::string name) : measuring(false), startCount(0), finishCount(0), stat(name) {}
-
- void start() {
- sys::Mutex::ScopedLock l(lock);
- if (!measuring) {
- measureAt = startCount;
- measuring = true;
- startTime = AbsTime::now();
- }
- ++startCount;
- }
-
- void finish() {
- sys::Mutex::ScopedLock l(lock);
- if (measuring && measureAt == finishCount) {
- stat.record(Duration(startTime, AbsTime::now()));
- measuring = false;
- }
- ++finishCount;
- }
-
- private:
- sys::Mutex lock;
- bool measuring;
- uint64_t startCount, finishCount, measureAt;
- AbsTime startTime;
- LatencyStatistic stat;
-};
-
-/** Measures time spent in a scope. */
-class LatencyScope {
- public:
- LatencyScope(LatencyStatistic& s) : stat(s), startTime(AbsTime::now()) {}
-
- ~LatencyScope() {
- sys::Mutex::ScopedLock l(lock);
- stat.record(Duration(startTime, AbsTime::now()));
- }
-
- private:
- sys::Mutex lock;
- LatencyStatistic& stat;
- AbsTime startTime;
-};
-
-
-/** Macros to wrap latency tracking so disabled unless QPID_LATENCY_TRACKER is defined */
-
-#if defined(QPID_LATENCY_TRACKER)
-#define LATENCY_TRACK(X) X
-#else
-#define LATENCY_TRACK(X)
-#endif
-}} // namespace qpid::sys
-
-#endif /*!QPID_SYS_LATENCYTRACKER_H*/
diff --git a/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp b/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp
index 6eafb6cf0b..28ff140237 100644
--- a/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp
+++ b/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp
@@ -29,6 +29,7 @@
#include "qpid/sys/OutputControl.h"
#include <boost/bind.hpp>
+#include <boost/lexical_cast.hpp>
#include <memory>
#include <netdb.h>
@@ -304,8 +305,9 @@ void RdmaIOProtocolFactory::accept(Poller::shared_ptr poller, ConnectionCodec::F
sin.sin_port = htons(listeningPort);
sin.sin_addr.s_addr = INADDR_ANY;
+ SocketAddress sa("",boost::lexical_cast<std::string>(listeningPort));
listener.reset(
- new Rdma::Listener((const sockaddr&)(sin),
+ new Rdma::Listener(sa,
Rdma::ConnectionParams(65536, Rdma::DEFAULT_WR_ENTRIES),
boost::bind(&RdmaIOProtocolFactory::established, this, poller, _1),
boost::bind(&RdmaIOProtocolFactory::connectionError, this, _1, _2),
@@ -331,24 +333,14 @@ void RdmaIOProtocolFactory::connected(Poller::shared_ptr poller, Rdma::Connectio
void RdmaIOProtocolFactory::connect(
Poller::shared_ptr poller,
- const std::string& host, int16_t p,
+ const std::string& host, int16_t port,
ConnectionCodec::Factory* f,
ConnectFailedCallback failed)
{
- ::addrinfo *res;
- ::addrinfo hints = {};
- hints.ai_family = AF_INET;
- hints.ai_socktype = SOCK_STREAM;
- stringstream ss; ss << p;
- string port = ss.str();
- int n = ::getaddrinfo(host.c_str(), port.c_str(), &hints, &res);
- if (n<0) {
- throw Exception(QPID_MSG("Rdma: Cannot resolve " << host << ": " << ::gai_strerror(n)));
- }
-
+ SocketAddress sa(host, boost::lexical_cast<std::string>(port));
Rdma::Connector* c =
new Rdma::Connector(
- *res->ai_addr,
+ sa,
Rdma::ConnectionParams(8000, Rdma::DEFAULT_WR_ENTRIES),
boost::bind(&RdmaIOProtocolFactory::connected, this, poller, _1, _2, f),
boost::bind(&RdmaIOProtocolFactory::connectionError, this, _1, _2),
diff --git a/qpid/cpp/src/qpid/sys/Socket.h b/qpid/cpp/src/qpid/sys/Socket.h
index f389e99cb8..d108402682 100644
--- a/qpid/cpp/src/qpid/sys/Socket.h
+++ b/qpid/cpp/src/qpid/sys/Socket.h
@@ -31,6 +31,7 @@ namespace qpid {
namespace sys {
class Duration;
+class SocketAddress;
class Socket : public IOHandle
{
@@ -48,6 +49,7 @@ public:
void setNonblocking() const;
QPID_COMMON_EXTERN void connect(const std::string& host, uint16_t port) const;
+ QPID_COMMON_EXTERN void connect(const SocketAddress&) const;
QPID_COMMON_EXTERN void close() const;
diff --git a/qpid/cpp/src/qpid/sys/SocketAddress.h b/qpid/cpp/src/qpid/sys/SocketAddress.h
new file mode 100644
index 0000000000..fcb9c81d43
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/SocketAddress.h
@@ -0,0 +1,51 @@
+#ifndef _sys_SocketAddress_h
+#define _sys_SocketAddress_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/IntegerTypes.h"
+#include "qpid/CommonImportExport.h"
+#include <string>
+
+struct addrinfo;
+
+namespace qpid {
+namespace sys {
+
+class SocketAddress {
+ friend const ::addrinfo& getAddrInfo(const SocketAddress&);
+
+public:
+ /** Create a SocketAddress from hostname and port*/
+ QPID_COMMON_EXTERN SocketAddress(const std::string& host, const std::string& port);
+ QPID_COMMON_EXTERN ~SocketAddress();
+
+ std::string asString() const;
+
+private:
+ std::string host;
+ std::string port;
+ ::addrinfo* addrInfo;
+};
+
+}}
+#endif /*!_sys_SocketAddress_h*/
diff --git a/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp b/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp
index b456beb098..3377be98f1 100644
--- a/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp
+++ b/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp
@@ -46,7 +46,7 @@ class AsynchIOProtocolFactory : public ProtocolFactory {
void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
void connect(Poller::shared_ptr, const std::string& host, int16_t port,
ConnectionCodec::Factory*,
- boost::function2<void, int, std::string> failed);
+ ConnectFailedCallback);
uint16_t getPort() const;
std::string getHost() const;
@@ -54,6 +54,7 @@ class AsynchIOProtocolFactory : public ProtocolFactory {
private:
void established(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*,
bool isClient);
+ void connectFailed(const Socket&, int, const std::string&, ConnectFailedCallback);
};
// Static instance to initialise plugin
@@ -118,6 +119,15 @@ void AsynchIOProtocolFactory::accept(Poller::shared_ptr poller,
acceptor->start(poller);
}
+void AsynchIOProtocolFactory::connectFailed(
+ const Socket& s, int ec, const std::string& emsg,
+ ConnectFailedCallback failedCb)
+{
+ failedCb(ec, emsg);
+ s.close();
+ delete &s;
+}
+
void AsynchIOProtocolFactory::connect(
Poller::shared_ptr poller,
const std::string& host, int16_t port,
@@ -131,13 +141,14 @@ void AsynchIOProtocolFactory::connect(
// is no longer needed.
Socket* socket = new Socket();
- AsynchConnector::create (*socket,
- poller,
- host,
- port,
- boost::bind(&AsynchIOProtocolFactory::established,
- this, poller, _1, fact, true),
- failed);
+ AsynchConnector::create(*socket,
+ poller,
+ host,
+ port,
+ boost::bind(&AsynchIOProtocolFactory::established,
+ this, poller, _1, fact, true),
+ boost::bind(&AsynchIOProtocolFactory::connectFailed,
+ this, _1, _2, _3, failed));
}
}} // namespace qpid::sys
diff --git a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
index 8545ebd9cb..c042dcef01 100644
--- a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
+++ b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
@@ -21,6 +21,7 @@
#include "qpid/sys/AsynchIO.h"
#include "qpid/sys/Socket.h"
+#include "qpid/sys/SocketAddress.h"
#include "qpid/sys/Poller.h"
#include "qpid/sys/DispatchHandle.h"
#include "qpid/sys/Time.h"
@@ -37,6 +38,7 @@
#include <string.h>
#include <boost/bind.hpp>
+#include <boost/lexical_cast.hpp>
using namespace qpid::sys;
@@ -161,11 +163,12 @@ class AsynchConnector : public qpid::sys::AsynchConnector,
private:
void connComplete(DispatchHandle& handle);
- void failure(int, std::string);
+ void failure(int, const std::string&);
private:
ConnectedCallback connCallback;
FailedCallback failCallback;
+ std::string errMsg;
const Socket& socket;
public:
@@ -174,7 +177,7 @@ public:
std::string hostname,
uint16_t port,
ConnectedCallback connCb,
- FailedCallback failCb = 0);
+ FailedCallback failCb);
};
AsynchConnector::AsynchConnector(const Socket& s,
@@ -192,12 +195,17 @@ AsynchConnector::AsynchConnector(const Socket& s,
socket(s)
{
socket.setNonblocking();
+ SocketAddress sa(hostname, boost::lexical_cast<std::string>(port));
try {
- socket.connect(hostname, port);
- startWatch(poller);
+ socket.connect(sa);
} catch(std::exception& e) {
- failure(-1, std::string(e.what()));
+ // Defer reporting failure
+ startWatch(poller);
+ errMsg = e.what();
+ DispatchHandle::call(boost::bind(&AsynchConnector::failure, this, -1, errMsg));
+ return;
}
+ startWatch(poller);
}
void AsynchConnector::connComplete(DispatchHandle& h)
@@ -209,17 +217,13 @@ void AsynchConnector::connComplete(DispatchHandle& h)
connCallback(socket);
DispatchHandle::doDelete();
} else {
- failure(errCode, std::string(strError(errCode)));
+ failure(errCode, strError(errCode));
}
}
-void AsynchConnector::failure(int errCode, std::string message)
+void AsynchConnector::failure(int errCode, const std::string& message)
{
- if (failCallback)
- failCallback(errCode, message);
-
- socket.close();
- delete &socket;
+ failCallback(socket, errCode, message);
DispatchHandle::doDelete();
}
@@ -467,7 +471,8 @@ void AsynchIO::readable(DispatchHandle& h) {
threadReadTotal += rc;
readTotal += rc;
- if (!readCallback(*this, buff)) {
+ readCallback(*this, buff);
+ if (readingStopped) {
// We have been flow controlled.
break;
}
diff --git a/qpid/cpp/src/qpid/sys/posix/Socket.cpp b/qpid/cpp/src/qpid/sys/posix/Socket.cpp
index 31044be9ca..02004b1999 100644
--- a/qpid/cpp/src/qpid/sys/posix/Socket.cpp
+++ b/qpid/cpp/src/qpid/sys/posix/Socket.cpp
@@ -21,6 +21,7 @@
#include "qpid/sys/Socket.h"
+#include "qpid/sys/SocketAddress.h"
#include "qpid/sys/posix/check.h"
#include "qpid/sys/posix/PrivatePosix.h"
@@ -36,6 +37,7 @@
#include <iostream>
#include <boost/format.hpp>
+#include <boost/lexical_cast.hpp>
namespace qpid {
namespace sys {
@@ -126,42 +128,23 @@ void Socket::setNonblocking() const {
QPID_POSIX_CHECK(::fcntl(impl->fd, F_SETFL, O_NONBLOCK));
}
-namespace {
-const char* h_errstr(int e) {
- switch (e) {
- case HOST_NOT_FOUND: return "Host not found";
- case NO_ADDRESS: return "Name does not have an IP address";
- case TRY_AGAIN: return "A temporary error occurred on an authoritative name server.";
- case NO_RECOVERY: return "Non-recoverable name server error";
- default: return "Unknown error";
- }
-}
+void Socket::connect(const std::string& host, uint16_t port) const
+{
+ SocketAddress sa(host, boost::lexical_cast<std::string>(port));
+ connect(sa);
}
-void Socket::connect(const std::string& host, uint16_t p) const
+void Socket::connect(const SocketAddress& addr) const
{
- std::stringstream portstream;
- portstream << p;
- std::string port = portstream.str();
- connectname = host + ":" + port;
+ connectname = addr.asString();
const int& socket = impl->fd;
- ::addrinfo *res;
- ::addrinfo hints;
- ::memset(&hints, 0, sizeof(hints));
- hints.ai_family = AF_INET; // In order to allow AF_INET6 we'd have to change createTcp() as well
- hints.ai_socktype = SOCK_STREAM;
- int n = ::getaddrinfo(host.c_str(), port.c_str(), &hints, &res);
- if (n != 0)
- throw Exception(QPID_MSG("Cannot resolve " << host << ": " << ::gai_strerror(n)));
// TODO the correct thing to do here is loop on failure until you've used all the returned addresses
- if ((::connect(socket, res->ai_addr, res->ai_addrlen) < 0) &&
+ if ((::connect(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) < 0) &&
(errno != EINPROGRESS)) {
- ::freeaddrinfo(res);
- throw qpid::Exception(QPID_MSG(strError(errno) << ": " << host << ":" << port));
+ throw Exception(QPID_MSG(strError(errno) << ": " << connectname));
}
- ::freeaddrinfo(res);
}
void
@@ -178,15 +161,14 @@ int Socket::listen(uint16_t port, int backlog) const
const int& socket = impl->fd;
int yes=1;
QPID_POSIX_CHECK(setsockopt(socket,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(yes)));
- struct sockaddr_in name;
- name.sin_family = AF_INET;
- name.sin_port = htons(port);
- name.sin_addr.s_addr = 0;
- if (::bind(socket, (struct sockaddr*)&name, sizeof(name)) < 0)
+
+ SocketAddress sa("", boost::lexical_cast<std::string>(port));
+ if (::bind(socket, getAddrInfo(sa).ai_addr, getAddrInfo(sa).ai_addrlen) < 0)
throw Exception(QPID_MSG("Can't bind to port " << port << ": " << strError(errno)));
if (::listen(socket, backlog) < 0)
throw Exception(QPID_MSG("Can't listen on port " << port << ": " << strError(errno)));
-
+
+ struct sockaddr_in name;
socklen_t namelen = sizeof(name);
if (::getsockname(socket, (struct sockaddr*)&name, &namelen) < 0)
throw QPID_POSIX_ERROR(errno);
@@ -226,9 +208,10 @@ std::string Socket::getPeername() const
std::string Socket::getPeerAddress() const
{
- if (!connectname.empty())
- return std::string (connectname);
- return getName(impl->fd, false, true);
+ if (connectname.empty()) {
+ connectname = getName(impl->fd, false, true);
+ }
+ return connectname;
}
std::string Socket::getLocalAddress() const
diff --git a/qpid/cpp/src/qpid/sys/posix/SocketAddress.cpp b/qpid/cpp/src/qpid/sys/posix/SocketAddress.cpp
new file mode 100644
index 0000000000..fe8812299c
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/posix/SocketAddress.cpp
@@ -0,0 +1,71 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/SocketAddress.h"
+
+#include "qpid/sys/posix/check.h"
+
+#include <sys/socket.h>
+#include <string.h>
+#include <netdb.h>
+
+namespace qpid {
+namespace sys {
+
+SocketAddress::SocketAddress(const std::string& host0, const std::string& port0) :
+ host(host0),
+ port(port0),
+ addrInfo(0)
+{
+ ::addrinfo hints;
+ ::memset(&hints, 0, sizeof(hints));
+ hints.ai_family = AF_INET; // In order to allow AF_INET6 we'd have to change createTcp() as well
+ hints.ai_socktype = SOCK_STREAM;
+
+ const char* node = 0;
+ if (host.empty()) {
+ hints.ai_flags |= AI_PASSIVE;
+ } else {
+ node = host.c_str();
+ }
+ const char* service = port.empty() ? "0" : port.c_str();
+
+ int n = ::getaddrinfo(node, service, &hints, &addrInfo);
+ if (n != 0)
+ throw Exception(QPID_MSG("Cannot resolve " << host << ": " << ::gai_strerror(n)));
+}
+
+SocketAddress::~SocketAddress()
+{
+ ::freeaddrinfo(addrInfo);
+}
+
+std::string SocketAddress::asString() const
+{
+ return host + ":" + port;
+}
+
+const ::addrinfo& getAddrInfo(const SocketAddress& sa)
+{
+ return *sa.addrInfo;
+}
+
+}}
diff --git a/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp b/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp
index 9da6c835ce..d39f7885a5 100644
--- a/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp
+++ b/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp
@@ -40,6 +40,7 @@ using std::rand;
using qpid::sys::Poller;
using qpid::sys::Dispatcher;
+using qpid::sys::SocketAddress;
using qpid::sys::AbsTime;
using qpid::sys::Duration;
using qpid::sys::TIME_SEC;
@@ -154,18 +155,8 @@ using namespace qpid::tests;
int main(int argc, char* argv[]) {
vector<string> args(&argv[0], &argv[argc]);
- ::addrinfo *res;
- ::addrinfo hints = {};
- hints.ai_family = AF_INET;
- hints.ai_socktype = SOCK_STREAM;
+ string host = args[1];
string port = (args.size() < 3) ? "20079" : args[2];
- int n = ::getaddrinfo(args[1].c_str(), port.c_str(), &hints, &res);
- if (n<0) {
- cerr << "Can't find information for: " << args[1] << "\n";
- return 1;
- } else {
- cout << "Connecting to: " << args[1] << ":" << port <<"\n";
- }
if (args.size() > 3)
msgsize = atoi(args[3].c_str());
@@ -181,8 +172,10 @@ int main(int argc, char* argv[]) {
boost::shared_ptr<Poller> p(new Poller());
Dispatcher d(p);
+ SocketAddress sa(host, port);
+ cout << "Connecting to: " << sa.asString() <<"\n";
Rdma::Connector c(
- *res->ai_addr,
+ sa,
Rdma::ConnectionParams(msgsize, Rdma::DEFAULT_WR_ENTRIES),
boost::bind(&connected, p, _1, _2),
boost::bind(&connectionError, p, _1, _2),
diff --git a/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp b/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp
index 491c1612fd..8d06fccba1 100644
--- a/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp
+++ b/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp
@@ -26,6 +26,7 @@
#include <iostream>
#include <boost/bind.hpp>
+using qpid::sys::SocketAddress;
using qpid::sys::DispatchHandle;
using qpid::sys::Poller;
@@ -461,7 +462,7 @@ namespace Rdma {
}
Listener::Listener(
- const sockaddr& src,
+ const SocketAddress& src,
const ConnectionParams& cp,
EstablishedCallback ec,
ErrorCallback errc,
@@ -541,7 +542,7 @@ namespace Rdma {
}
Connector::Connector(
- const sockaddr& dst,
+ const SocketAddress& dst,
const ConnectionParams& cp,
ConnectedCallback cc,
ErrorCallback errc,
diff --git a/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h b/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h
index 697d9387ce..12a1b98d24 100644
--- a/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h
+++ b/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h
@@ -27,6 +27,7 @@
#include "qpid/sys/Dispatcher.h"
#include "qpid/sys/DispatchHandle.h"
#include "qpid/sys/Mutex.h"
+#include "qpid/sys/SocketAddress.h"
#include <netinet/in.h>
@@ -173,14 +174,14 @@ namespace Rdma {
class Listener : public ConnectionManager
{
- sockaddr src_addr;
+ qpid::sys::SocketAddress src_addr;
ConnectionParams checkConnectionParams;
ConnectionRequestCallback connectionRequestCallback;
EstablishedCallback establishedCallback;
public:
Listener(
- const sockaddr& src,
+ const qpid::sys::SocketAddress& src,
const ConnectionParams& cp,
EstablishedCallback ec,
ErrorCallback errc,
@@ -198,14 +199,14 @@ namespace Rdma {
class Connector : public ConnectionManager
{
- sockaddr dst_addr;
+ qpid::sys::SocketAddress dst_addr;
ConnectionParams connectionParams;
RejectedCallback rejectedCallback;
ConnectedCallback connectedCallback;
public:
Connector(
- const sockaddr& dst,
+ const qpid::sys::SocketAddress& dst,
const ConnectionParams& cp,
ConnectedCallback cc,
ErrorCallback errc,
diff --git a/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp b/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp
index 07d6379362..4c11ba23eb 100644
--- a/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp
+++ b/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp
@@ -35,6 +35,7 @@ using std::string;
using std::cout;
using std::cerr;
+using qpid::sys::SocketAddress;
using qpid::sys::Poller;
using qpid::sys::Dispatcher;
@@ -144,20 +145,15 @@ using namespace qpid::tests;
int main(int argc, char* argv[]) {
vector<string> args(&argv[0], &argv[argc]);
- ::sockaddr_in sin;
-
- int port = (args.size() < 2) ? 20079 : atoi(args[1].c_str());
+ std::string port = (args.size() < 2) ? "20079" : args[1];
cout << "Listening on port: " << port << "\n";
- sin.sin_family = AF_INET;
- sin.sin_port = htons(port);
- sin.sin_addr.s_addr = INADDR_ANY;
-
try {
boost::shared_ptr<Poller> p(new Poller());
Dispatcher d(p);
- Rdma::Listener a((const sockaddr&)(sin),
+ SocketAddress sa("", port);
+ Rdma::Listener a(sa,
Rdma::ConnectionParams(16384, Rdma::DEFAULT_WR_ENTRIES),
boost::bind(connected, p, _1),
connectionError,
diff --git a/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h b/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h
index aa2e516e6b..e11497dc02 100644
--- a/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h
+++ b/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h
@@ -350,9 +350,9 @@ namespace Rdma {
return ConnectionEvent(e);
}
- void bind(sockaddr& src_addr) const {
+ void bind(qpid::sys::SocketAddress& src_addr) const {
assert(id.get());
- CHECK(::rdma_bind_addr(id.get(), &src_addr));
+ CHECK(::rdma_bind_addr(id.get(), getAddrInfo(src_addr).ai_addr));
}
void listen(int backlog = DEFAULT_BACKLOG) const {
@@ -361,12 +361,11 @@ namespace Rdma {
}
void resolve_addr(
- sockaddr& dst_addr,
- sockaddr* src_addr = 0,
+ qpid::sys::SocketAddress& dst_addr,
int timeout_ms = DEFAULT_TIMEOUT) const
{
assert(id.get());
- CHECK(::rdma_resolve_addr(id.get(), src_addr, &dst_addr, timeout_ms));
+ CHECK(::rdma_resolve_addr(id.get(), 0, getAddrInfo(dst_addr).ai_addr, timeout_ms));
}
void resolve_route(int timeout_ms = DEFAULT_TIMEOUT) const {
diff --git a/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
index 8905b87838..475b18600d 100644
--- a/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
+++ b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
@@ -634,7 +634,7 @@ void AsynchIO::readComplete(AsynchReadResult *result) {
if (status == 0 && bytes > 0) {
bool restartRead = true; // May not if receiver doesn't want more
if (readCallback)
- restartRead = readCallback(*this, result->getBuff());
+ readCallback(*this, result->getBuff());
if (restartRead)
startReading();
}