diff options
Diffstat (limited to 'qpid/cpp/src/qpid/sys')
-rw-r--r-- | qpid/cpp/src/qpid/sys/AsynchIO.h | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp | 39 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/AsynchIOHandler.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/LatencyTracker.h | 157 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp | 20 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/Socket.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/SocketAddress.h | 51 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp | 27 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp | 31 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/posix/Socket.cpp | 55 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/posix/SocketAddress.cpp | 71 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp | 17 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/rdma/RdmaIO.h | 9 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp | 12 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h | 9 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp | 2 |
17 files changed, 235 insertions, 280 deletions
diff --git a/qpid/cpp/src/qpid/sys/AsynchIO.h b/qpid/cpp/src/qpid/sys/AsynchIO.h index fb02183359..419770568a 100644 --- a/qpid/cpp/src/qpid/sys/AsynchIO.h +++ b/qpid/cpp/src/qpid/sys/AsynchIO.h @@ -57,7 +57,7 @@ public: class AsynchConnector { public: typedef boost::function1<void, const Socket&> ConnectedCallback; - typedef boost::function2<void, int, std::string> FailedCallback; + typedef boost::function3<void, const Socket&, int, const std::string&> FailedCallback; // Call create() to allocate a new AsynchConnector object with the // specified poller, addressing, and callbacks. @@ -70,7 +70,7 @@ public: std::string hostname, uint16_t port, ConnectedCallback connCb, - FailedCallback failCb = 0); + FailedCallback failCb); protected: AsynchConnector() {} @@ -108,7 +108,7 @@ class AsynchIO { public: typedef AsynchIOBufferBase BufferBase; - typedef boost::function2<bool, AsynchIO&, BufferBase*> ReadCallback; + typedef boost::function2<void, AsynchIO&, BufferBase*> ReadCallback; typedef boost::function1<void, AsynchIO&> EofCallback; typedef boost::function1<void, AsynchIO&> DisconnectCallback; typedef boost::function2<void, AsynchIO&, const Socket&> ClosedCallback; diff --git a/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp b/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp index 8094abd43d..eb0f213547 100644 --- a/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp +++ b/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp @@ -103,10 +103,31 @@ void AsynchIOHandler::giveReadCredit(int32_t credit) { aio->startReading(); } -bool AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) { +void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) { if (readError) { - return false; + return; + } + + // Check here for read credit + if (readCredit.get() != InfiniteCredit) { + if (readCredit.get() == 0) { + // FIXME aconway 2009-10-01: Workaround to avoid "false wakeups". + // readbuff is sometimes called with no credit. + // This should be fixed somewhere else to avoid such calls. + aio->unread(buff); + return; + } + // TODO In theory should be able to use an atomic operation before taking the lock + // but in practice there seems to be an unexplained race in that case + ScopedLock<Mutex> l(creditLock); + if (--readCredit == 0) { + assert(readCredit.get() >= 0); + if (readCredit.get() == 0) { + aio->stopReading(); + } + } } + size_t decoded = 0; if (codec) { // Already initiated try { @@ -149,20 +170,6 @@ bool AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) { // Give whole buffer back to aio subsystem aio->queueReadBuffer(buff); } - // Check here for read credit - if (readCredit.get() != InfiniteCredit) { - // TODO In theory should be able to use an atomic operation before taking the lock - // but in practice there seems to be an unexplained race in that case - ScopedLock<Mutex> l(creditLock); - if (--readCredit == 0) { - assert(readCredit.get() >= 0); - if (readCredit.get() == 0) { - aio->stopReading(); - return false; - } - } - } - return true; } void AsynchIOHandler::eof(AsynchIO&) { diff --git a/qpid/cpp/src/qpid/sys/AsynchIOHandler.h b/qpid/cpp/src/qpid/sys/AsynchIOHandler.h index 9785f445a4..e1885bac79 100644 --- a/qpid/cpp/src/qpid/sys/AsynchIOHandler.h +++ b/qpid/cpp/src/qpid/sys/AsynchIOHandler.h @@ -65,7 +65,7 @@ class AsynchIOHandler : public OutputControl { QPID_COMMON_EXTERN void giveReadCredit(int32_t credit); // Input side - QPID_COMMON_EXTERN bool readbuff(AsynchIO& aio, AsynchIOBufferBase* buff); + QPID_COMMON_EXTERN void readbuff(AsynchIO& aio, AsynchIOBufferBase* buff); QPID_COMMON_EXTERN void eof(AsynchIO& aio); QPID_COMMON_EXTERN void disconnect(AsynchIO& aio); diff --git a/qpid/cpp/src/qpid/sys/LatencyTracker.h b/qpid/cpp/src/qpid/sys/LatencyTracker.h deleted file mode 100644 index 3294528ff6..0000000000 --- a/qpid/cpp/src/qpid/sys/LatencyTracker.h +++ /dev/null @@ -1,157 +0,0 @@ -#ifndef QPID_SYS_LATENCYTRACKER_H -#define QPID_SYS_LATENCYTRACKER_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/sys/Time.h" -#include <string> -#include <limits> -#include <map> - -namespace qpid { -namespace sys { - -/**@file Tools for measuring latency. NOT SUITABLE FOR PROUDCTION BUILDS. - * Uses should be compiled only if QPID_LATENCY_TRACKER is defined. - * See the convenience macros at the end of this file. - */ - -/** Used by LatencyCounter and LatencyTracker below */ -class LatencyStatistic { - public: - LatencyStatistic(std::string name_) : name(name_), count(0), total(0), min(std::numeric_limits<int64_t>::max()), max(0) {} - ~LatencyStatistic() { print(); } - - void record(Duration d) { - total += d; - ++count; - if (d > max) max=d; - if (d < min) min=d; - } - - void print() { - if (count) { - double meanMsec = (double(total)/count)/TIME_MSEC; - printf("\n==== Latency metric %s: samples=%lu mean=%fms (%f-%f)\n", name.c_str(), count, meanMsec, double(min)/TIME_MSEC, double(max)/TIME_MSEC); - } - else - printf("\n==== Latency metric %s: no samples.\n", name.c_str()); - } - - private: - std::string name; - unsigned long count; - int64_t total, min, max; -}; - -/** Measure delay between seeing the same value at start and finish. */ -template <class T> class LatencyTracker { - public: - LatencyTracker(std::string name) : measuring(false), stat(name) {} - - void start(T value) { - sys::Mutex::ScopedLock l(lock); - if (!measuring) { - measureAt = value; - measuring = true; - startTime = AbsTime::now(); - } - } - - void finish(T value) { - sys::Mutex::ScopedLock l(lock); - if(measuring && measureAt == value) { - stat.record(Duration(startTime, AbsTime::now())); - measuring = false; - } - } - - private: - sys::Mutex lock; - bool measuring; - T measureAt; - AbsTime startTime; - LatencyStatistic stat; -}; - - -/** Measures delay between the nth call to start and the nth call to finish. - * E.g. to measure latency between sending & receiving an ordered stream of messages. - */ -class LatencyCounter { - public: - LatencyCounter(std::string name) : measuring(false), startCount(0), finishCount(0), stat(name) {} - - void start() { - sys::Mutex::ScopedLock l(lock); - if (!measuring) { - measureAt = startCount; - measuring = true; - startTime = AbsTime::now(); - } - ++startCount; - } - - void finish() { - sys::Mutex::ScopedLock l(lock); - if (measuring && measureAt == finishCount) { - stat.record(Duration(startTime, AbsTime::now())); - measuring = false; - } - ++finishCount; - } - - private: - sys::Mutex lock; - bool measuring; - uint64_t startCount, finishCount, measureAt; - AbsTime startTime; - LatencyStatistic stat; -}; - -/** Measures time spent in a scope. */ -class LatencyScope { - public: - LatencyScope(LatencyStatistic& s) : stat(s), startTime(AbsTime::now()) {} - - ~LatencyScope() { - sys::Mutex::ScopedLock l(lock); - stat.record(Duration(startTime, AbsTime::now())); - } - - private: - sys::Mutex lock; - LatencyStatistic& stat; - AbsTime startTime; -}; - - -/** Macros to wrap latency tracking so disabled unless QPID_LATENCY_TRACKER is defined */ - -#if defined(QPID_LATENCY_TRACKER) -#define LATENCY_TRACK(X) X -#else -#define LATENCY_TRACK(X) -#endif -}} // namespace qpid::sys - -#endif /*!QPID_SYS_LATENCYTRACKER_H*/ diff --git a/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp b/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp index 6eafb6cf0b..28ff140237 100644 --- a/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp +++ b/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp @@ -29,6 +29,7 @@ #include "qpid/sys/OutputControl.h" #include <boost/bind.hpp> +#include <boost/lexical_cast.hpp> #include <memory> #include <netdb.h> @@ -304,8 +305,9 @@ void RdmaIOProtocolFactory::accept(Poller::shared_ptr poller, ConnectionCodec::F sin.sin_port = htons(listeningPort); sin.sin_addr.s_addr = INADDR_ANY; + SocketAddress sa("",boost::lexical_cast<std::string>(listeningPort)); listener.reset( - new Rdma::Listener((const sockaddr&)(sin), + new Rdma::Listener(sa, Rdma::ConnectionParams(65536, Rdma::DEFAULT_WR_ENTRIES), boost::bind(&RdmaIOProtocolFactory::established, this, poller, _1), boost::bind(&RdmaIOProtocolFactory::connectionError, this, _1, _2), @@ -331,24 +333,14 @@ void RdmaIOProtocolFactory::connected(Poller::shared_ptr poller, Rdma::Connectio void RdmaIOProtocolFactory::connect( Poller::shared_ptr poller, - const std::string& host, int16_t p, + const std::string& host, int16_t port, ConnectionCodec::Factory* f, ConnectFailedCallback failed) { - ::addrinfo *res; - ::addrinfo hints = {}; - hints.ai_family = AF_INET; - hints.ai_socktype = SOCK_STREAM; - stringstream ss; ss << p; - string port = ss.str(); - int n = ::getaddrinfo(host.c_str(), port.c_str(), &hints, &res); - if (n<0) { - throw Exception(QPID_MSG("Rdma: Cannot resolve " << host << ": " << ::gai_strerror(n))); - } - + SocketAddress sa(host, boost::lexical_cast<std::string>(port)); Rdma::Connector* c = new Rdma::Connector( - *res->ai_addr, + sa, Rdma::ConnectionParams(8000, Rdma::DEFAULT_WR_ENTRIES), boost::bind(&RdmaIOProtocolFactory::connected, this, poller, _1, _2, f), boost::bind(&RdmaIOProtocolFactory::connectionError, this, _1, _2), diff --git a/qpid/cpp/src/qpid/sys/Socket.h b/qpid/cpp/src/qpid/sys/Socket.h index f389e99cb8..d108402682 100644 --- a/qpid/cpp/src/qpid/sys/Socket.h +++ b/qpid/cpp/src/qpid/sys/Socket.h @@ -31,6 +31,7 @@ namespace qpid { namespace sys { class Duration; +class SocketAddress; class Socket : public IOHandle { @@ -48,6 +49,7 @@ public: void setNonblocking() const; QPID_COMMON_EXTERN void connect(const std::string& host, uint16_t port) const; + QPID_COMMON_EXTERN void connect(const SocketAddress&) const; QPID_COMMON_EXTERN void close() const; diff --git a/qpid/cpp/src/qpid/sys/SocketAddress.h b/qpid/cpp/src/qpid/sys/SocketAddress.h new file mode 100644 index 0000000000..fcb9c81d43 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/SocketAddress.h @@ -0,0 +1,51 @@ +#ifndef _sys_SocketAddress_h +#define _sys_SocketAddress_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/IntegerTypes.h" +#include "qpid/CommonImportExport.h" +#include <string> + +struct addrinfo; + +namespace qpid { +namespace sys { + +class SocketAddress { + friend const ::addrinfo& getAddrInfo(const SocketAddress&); + +public: + /** Create a SocketAddress from hostname and port*/ + QPID_COMMON_EXTERN SocketAddress(const std::string& host, const std::string& port); + QPID_COMMON_EXTERN ~SocketAddress(); + + std::string asString() const; + +private: + std::string host; + std::string port; + ::addrinfo* addrInfo; +}; + +}} +#endif /*!_sys_SocketAddress_h*/ diff --git a/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp b/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp index b456beb098..3377be98f1 100644 --- a/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp +++ b/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp @@ -46,7 +46,7 @@ class AsynchIOProtocolFactory : public ProtocolFactory { void accept(Poller::shared_ptr, ConnectionCodec::Factory*); void connect(Poller::shared_ptr, const std::string& host, int16_t port, ConnectionCodec::Factory*, - boost::function2<void, int, std::string> failed); + ConnectFailedCallback); uint16_t getPort() const; std::string getHost() const; @@ -54,6 +54,7 @@ class AsynchIOProtocolFactory : public ProtocolFactory { private: void established(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*, bool isClient); + void connectFailed(const Socket&, int, const std::string&, ConnectFailedCallback); }; // Static instance to initialise plugin @@ -118,6 +119,15 @@ void AsynchIOProtocolFactory::accept(Poller::shared_ptr poller, acceptor->start(poller); } +void AsynchIOProtocolFactory::connectFailed( + const Socket& s, int ec, const std::string& emsg, + ConnectFailedCallback failedCb) +{ + failedCb(ec, emsg); + s.close(); + delete &s; +} + void AsynchIOProtocolFactory::connect( Poller::shared_ptr poller, const std::string& host, int16_t port, @@ -131,13 +141,14 @@ void AsynchIOProtocolFactory::connect( // is no longer needed. Socket* socket = new Socket(); - AsynchConnector::create (*socket, - poller, - host, - port, - boost::bind(&AsynchIOProtocolFactory::established, - this, poller, _1, fact, true), - failed); + AsynchConnector::create(*socket, + poller, + host, + port, + boost::bind(&AsynchIOProtocolFactory::established, + this, poller, _1, fact, true), + boost::bind(&AsynchIOProtocolFactory::connectFailed, + this, _1, _2, _3, failed)); } }} // namespace qpid::sys diff --git a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp index 8545ebd9cb..c042dcef01 100644 --- a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp +++ b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp @@ -21,6 +21,7 @@ #include "qpid/sys/AsynchIO.h" #include "qpid/sys/Socket.h" +#include "qpid/sys/SocketAddress.h" #include "qpid/sys/Poller.h" #include "qpid/sys/DispatchHandle.h" #include "qpid/sys/Time.h" @@ -37,6 +38,7 @@ #include <string.h> #include <boost/bind.hpp> +#include <boost/lexical_cast.hpp> using namespace qpid::sys; @@ -161,11 +163,12 @@ class AsynchConnector : public qpid::sys::AsynchConnector, private: void connComplete(DispatchHandle& handle); - void failure(int, std::string); + void failure(int, const std::string&); private: ConnectedCallback connCallback; FailedCallback failCallback; + std::string errMsg; const Socket& socket; public: @@ -174,7 +177,7 @@ public: std::string hostname, uint16_t port, ConnectedCallback connCb, - FailedCallback failCb = 0); + FailedCallback failCb); }; AsynchConnector::AsynchConnector(const Socket& s, @@ -192,12 +195,17 @@ AsynchConnector::AsynchConnector(const Socket& s, socket(s) { socket.setNonblocking(); + SocketAddress sa(hostname, boost::lexical_cast<std::string>(port)); try { - socket.connect(hostname, port); - startWatch(poller); + socket.connect(sa); } catch(std::exception& e) { - failure(-1, std::string(e.what())); + // Defer reporting failure + startWatch(poller); + errMsg = e.what(); + DispatchHandle::call(boost::bind(&AsynchConnector::failure, this, -1, errMsg)); + return; } + startWatch(poller); } void AsynchConnector::connComplete(DispatchHandle& h) @@ -209,17 +217,13 @@ void AsynchConnector::connComplete(DispatchHandle& h) connCallback(socket); DispatchHandle::doDelete(); } else { - failure(errCode, std::string(strError(errCode))); + failure(errCode, strError(errCode)); } } -void AsynchConnector::failure(int errCode, std::string message) +void AsynchConnector::failure(int errCode, const std::string& message) { - if (failCallback) - failCallback(errCode, message); - - socket.close(); - delete &socket; + failCallback(socket, errCode, message); DispatchHandle::doDelete(); } @@ -467,7 +471,8 @@ void AsynchIO::readable(DispatchHandle& h) { threadReadTotal += rc; readTotal += rc; - if (!readCallback(*this, buff)) { + readCallback(*this, buff); + if (readingStopped) { // We have been flow controlled. break; } diff --git a/qpid/cpp/src/qpid/sys/posix/Socket.cpp b/qpid/cpp/src/qpid/sys/posix/Socket.cpp index 31044be9ca..02004b1999 100644 --- a/qpid/cpp/src/qpid/sys/posix/Socket.cpp +++ b/qpid/cpp/src/qpid/sys/posix/Socket.cpp @@ -21,6 +21,7 @@ #include "qpid/sys/Socket.h" +#include "qpid/sys/SocketAddress.h" #include "qpid/sys/posix/check.h" #include "qpid/sys/posix/PrivatePosix.h" @@ -36,6 +37,7 @@ #include <iostream> #include <boost/format.hpp> +#include <boost/lexical_cast.hpp> namespace qpid { namespace sys { @@ -126,42 +128,23 @@ void Socket::setNonblocking() const { QPID_POSIX_CHECK(::fcntl(impl->fd, F_SETFL, O_NONBLOCK)); } -namespace { -const char* h_errstr(int e) { - switch (e) { - case HOST_NOT_FOUND: return "Host not found"; - case NO_ADDRESS: return "Name does not have an IP address"; - case TRY_AGAIN: return "A temporary error occurred on an authoritative name server."; - case NO_RECOVERY: return "Non-recoverable name server error"; - default: return "Unknown error"; - } -} +void Socket::connect(const std::string& host, uint16_t port) const +{ + SocketAddress sa(host, boost::lexical_cast<std::string>(port)); + connect(sa); } -void Socket::connect(const std::string& host, uint16_t p) const +void Socket::connect(const SocketAddress& addr) const { - std::stringstream portstream; - portstream << p; - std::string port = portstream.str(); - connectname = host + ":" + port; + connectname = addr.asString(); const int& socket = impl->fd; - ::addrinfo *res; - ::addrinfo hints; - ::memset(&hints, 0, sizeof(hints)); - hints.ai_family = AF_INET; // In order to allow AF_INET6 we'd have to change createTcp() as well - hints.ai_socktype = SOCK_STREAM; - int n = ::getaddrinfo(host.c_str(), port.c_str(), &hints, &res); - if (n != 0) - throw Exception(QPID_MSG("Cannot resolve " << host << ": " << ::gai_strerror(n))); // TODO the correct thing to do here is loop on failure until you've used all the returned addresses - if ((::connect(socket, res->ai_addr, res->ai_addrlen) < 0) && + if ((::connect(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) < 0) && (errno != EINPROGRESS)) { - ::freeaddrinfo(res); - throw qpid::Exception(QPID_MSG(strError(errno) << ": " << host << ":" << port)); + throw Exception(QPID_MSG(strError(errno) << ": " << connectname)); } - ::freeaddrinfo(res); } void @@ -178,15 +161,14 @@ int Socket::listen(uint16_t port, int backlog) const const int& socket = impl->fd; int yes=1; QPID_POSIX_CHECK(setsockopt(socket,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(yes))); - struct sockaddr_in name; - name.sin_family = AF_INET; - name.sin_port = htons(port); - name.sin_addr.s_addr = 0; - if (::bind(socket, (struct sockaddr*)&name, sizeof(name)) < 0) + + SocketAddress sa("", boost::lexical_cast<std::string>(port)); + if (::bind(socket, getAddrInfo(sa).ai_addr, getAddrInfo(sa).ai_addrlen) < 0) throw Exception(QPID_MSG("Can't bind to port " << port << ": " << strError(errno))); if (::listen(socket, backlog) < 0) throw Exception(QPID_MSG("Can't listen on port " << port << ": " << strError(errno))); - + + struct sockaddr_in name; socklen_t namelen = sizeof(name); if (::getsockname(socket, (struct sockaddr*)&name, &namelen) < 0) throw QPID_POSIX_ERROR(errno); @@ -226,9 +208,10 @@ std::string Socket::getPeername() const std::string Socket::getPeerAddress() const { - if (!connectname.empty()) - return std::string (connectname); - return getName(impl->fd, false, true); + if (connectname.empty()) { + connectname = getName(impl->fd, false, true); + } + return connectname; } std::string Socket::getLocalAddress() const diff --git a/qpid/cpp/src/qpid/sys/posix/SocketAddress.cpp b/qpid/cpp/src/qpid/sys/posix/SocketAddress.cpp new file mode 100644 index 0000000000..fe8812299c --- /dev/null +++ b/qpid/cpp/src/qpid/sys/posix/SocketAddress.cpp @@ -0,0 +1,71 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/SocketAddress.h" + +#include "qpid/sys/posix/check.h" + +#include <sys/socket.h> +#include <string.h> +#include <netdb.h> + +namespace qpid { +namespace sys { + +SocketAddress::SocketAddress(const std::string& host0, const std::string& port0) : + host(host0), + port(port0), + addrInfo(0) +{ + ::addrinfo hints; + ::memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_INET; // In order to allow AF_INET6 we'd have to change createTcp() as well + hints.ai_socktype = SOCK_STREAM; + + const char* node = 0; + if (host.empty()) { + hints.ai_flags |= AI_PASSIVE; + } else { + node = host.c_str(); + } + const char* service = port.empty() ? "0" : port.c_str(); + + int n = ::getaddrinfo(node, service, &hints, &addrInfo); + if (n != 0) + throw Exception(QPID_MSG("Cannot resolve " << host << ": " << ::gai_strerror(n))); +} + +SocketAddress::~SocketAddress() +{ + ::freeaddrinfo(addrInfo); +} + +std::string SocketAddress::asString() const +{ + return host + ":" + port; +} + +const ::addrinfo& getAddrInfo(const SocketAddress& sa) +{ + return *sa.addrInfo; +} + +}} diff --git a/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp b/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp index 9da6c835ce..d39f7885a5 100644 --- a/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp +++ b/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp @@ -40,6 +40,7 @@ using std::rand; using qpid::sys::Poller; using qpid::sys::Dispatcher; +using qpid::sys::SocketAddress; using qpid::sys::AbsTime; using qpid::sys::Duration; using qpid::sys::TIME_SEC; @@ -154,18 +155,8 @@ using namespace qpid::tests; int main(int argc, char* argv[]) { vector<string> args(&argv[0], &argv[argc]); - ::addrinfo *res; - ::addrinfo hints = {}; - hints.ai_family = AF_INET; - hints.ai_socktype = SOCK_STREAM; + string host = args[1]; string port = (args.size() < 3) ? "20079" : args[2]; - int n = ::getaddrinfo(args[1].c_str(), port.c_str(), &hints, &res); - if (n<0) { - cerr << "Can't find information for: " << args[1] << "\n"; - return 1; - } else { - cout << "Connecting to: " << args[1] << ":" << port <<"\n"; - } if (args.size() > 3) msgsize = atoi(args[3].c_str()); @@ -181,8 +172,10 @@ int main(int argc, char* argv[]) { boost::shared_ptr<Poller> p(new Poller()); Dispatcher d(p); + SocketAddress sa(host, port); + cout << "Connecting to: " << sa.asString() <<"\n"; Rdma::Connector c( - *res->ai_addr, + sa, Rdma::ConnectionParams(msgsize, Rdma::DEFAULT_WR_ENTRIES), boost::bind(&connected, p, _1, _2), boost::bind(&connectionError, p, _1, _2), diff --git a/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp b/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp index 491c1612fd..8d06fccba1 100644 --- a/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp +++ b/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp @@ -26,6 +26,7 @@ #include <iostream> #include <boost/bind.hpp> +using qpid::sys::SocketAddress; using qpid::sys::DispatchHandle; using qpid::sys::Poller; @@ -461,7 +462,7 @@ namespace Rdma { } Listener::Listener( - const sockaddr& src, + const SocketAddress& src, const ConnectionParams& cp, EstablishedCallback ec, ErrorCallback errc, @@ -541,7 +542,7 @@ namespace Rdma { } Connector::Connector( - const sockaddr& dst, + const SocketAddress& dst, const ConnectionParams& cp, ConnectedCallback cc, ErrorCallback errc, diff --git a/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h b/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h index 697d9387ce..12a1b98d24 100644 --- a/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h +++ b/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h @@ -27,6 +27,7 @@ #include "qpid/sys/Dispatcher.h" #include "qpid/sys/DispatchHandle.h" #include "qpid/sys/Mutex.h" +#include "qpid/sys/SocketAddress.h" #include <netinet/in.h> @@ -173,14 +174,14 @@ namespace Rdma { class Listener : public ConnectionManager { - sockaddr src_addr; + qpid::sys::SocketAddress src_addr; ConnectionParams checkConnectionParams; ConnectionRequestCallback connectionRequestCallback; EstablishedCallback establishedCallback; public: Listener( - const sockaddr& src, + const qpid::sys::SocketAddress& src, const ConnectionParams& cp, EstablishedCallback ec, ErrorCallback errc, @@ -198,14 +199,14 @@ namespace Rdma { class Connector : public ConnectionManager { - sockaddr dst_addr; + qpid::sys::SocketAddress dst_addr; ConnectionParams connectionParams; RejectedCallback rejectedCallback; ConnectedCallback connectedCallback; public: Connector( - const sockaddr& dst, + const qpid::sys::SocketAddress& dst, const ConnectionParams& cp, ConnectedCallback cc, ErrorCallback errc, diff --git a/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp b/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp index 07d6379362..4c11ba23eb 100644 --- a/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp +++ b/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp @@ -35,6 +35,7 @@ using std::string; using std::cout; using std::cerr; +using qpid::sys::SocketAddress; using qpid::sys::Poller; using qpid::sys::Dispatcher; @@ -144,20 +145,15 @@ using namespace qpid::tests; int main(int argc, char* argv[]) { vector<string> args(&argv[0], &argv[argc]); - ::sockaddr_in sin; - - int port = (args.size() < 2) ? 20079 : atoi(args[1].c_str()); + std::string port = (args.size() < 2) ? "20079" : args[1]; cout << "Listening on port: " << port << "\n"; - sin.sin_family = AF_INET; - sin.sin_port = htons(port); - sin.sin_addr.s_addr = INADDR_ANY; - try { boost::shared_ptr<Poller> p(new Poller()); Dispatcher d(p); - Rdma::Listener a((const sockaddr&)(sin), + SocketAddress sa("", port); + Rdma::Listener a(sa, Rdma::ConnectionParams(16384, Rdma::DEFAULT_WR_ENTRIES), boost::bind(connected, p, _1), connectionError, diff --git a/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h b/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h index aa2e516e6b..e11497dc02 100644 --- a/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h +++ b/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h @@ -350,9 +350,9 @@ namespace Rdma { return ConnectionEvent(e); } - void bind(sockaddr& src_addr) const { + void bind(qpid::sys::SocketAddress& src_addr) const { assert(id.get()); - CHECK(::rdma_bind_addr(id.get(), &src_addr)); + CHECK(::rdma_bind_addr(id.get(), getAddrInfo(src_addr).ai_addr)); } void listen(int backlog = DEFAULT_BACKLOG) const { @@ -361,12 +361,11 @@ namespace Rdma { } void resolve_addr( - sockaddr& dst_addr, - sockaddr* src_addr = 0, + qpid::sys::SocketAddress& dst_addr, int timeout_ms = DEFAULT_TIMEOUT) const { assert(id.get()); - CHECK(::rdma_resolve_addr(id.get(), src_addr, &dst_addr, timeout_ms)); + CHECK(::rdma_resolve_addr(id.get(), 0, getAddrInfo(dst_addr).ai_addr, timeout_ms)); } void resolve_route(int timeout_ms = DEFAULT_TIMEOUT) const { diff --git a/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp index 8905b87838..475b18600d 100644 --- a/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp +++ b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp @@ -634,7 +634,7 @@ void AsynchIO::readComplete(AsynchReadResult *result) { if (status == 0 && bytes > 0) { bool restartRead = true; // May not if receiver doesn't want more if (readCallback) - restartRead = readCallback(*this, result->getBuff()); + readCallback(*this, result->getBuff()); if (restartRead) startReading(); } |