diff options
Diffstat (limited to 'cpp/src/qpid/sys/posix/Socket.cpp')
-rw-r--r-- | cpp/src/qpid/sys/posix/Socket.cpp | 171 |
1 files changed, 93 insertions, 78 deletions
diff --git a/cpp/src/qpid/sys/posix/Socket.cpp b/cpp/src/qpid/sys/posix/Socket.cpp index 4a6dc66f80..7b906f33e8 100644 --- a/cpp/src/qpid/sys/posix/Socket.cpp +++ b/cpp/src/qpid/sys/posix/Socket.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -34,35 +34,65 @@ #include <netdb.h> #include <cstdlib> #include <string.h> +#include <iostream> + +#include <boost/format.hpp> +#include <boost/lexical_cast.hpp> namespace qpid { namespace sys { namespace { -std::string getName(int fd, bool local) +std::string getName(int fd, bool local, bool includeService = false) { - ::sockaddr_storage name_s; // big enough for any socket address - ::sockaddr* name = (::sockaddr*)&name_s; - ::socklen_t namelen = sizeof(name_s); - + ::sockaddr_storage name; // big enough for any socket address + ::socklen_t namelen = sizeof(name); + + int result = -1; if (local) { - QPID_POSIX_CHECK( ::getsockname(fd, name, &namelen) ); + result = ::getsockname(fd, (::sockaddr*)&name, &namelen); } else { - QPID_POSIX_CHECK( ::getpeername(fd, name, &namelen) ); + result = ::getpeername(fd, (::sockaddr*)&name, &namelen); } - return SocketAddress::asString(name, namelen); + QPID_POSIX_CHECK(result); + + char servName[NI_MAXSERV]; + char dispName[NI_MAXHOST]; + if (includeService) { + if (int rc=::getnameinfo((::sockaddr*)&name, namelen, dispName, sizeof(dispName), + servName, sizeof(servName), + NI_NUMERICHOST | NI_NUMERICSERV) != 0) + throw QPID_POSIX_ERROR(rc); + return std::string(dispName) + ":" + std::string(servName); + + } else { + if (int rc=::getnameinfo((::sockaddr*)&name, namelen, dispName, sizeof(dispName), 0, 0, NI_NUMERICHOST) != 0) + throw QPID_POSIX_ERROR(rc); + return dispName; + } } -uint16_t getLocalPort(int fd) +std::string getService(int fd, bool local) { - ::sockaddr_storage name_s; // big enough for any socket address - ::sockaddr* name = (::sockaddr*)&name_s; - ::socklen_t namelen = sizeof(name_s); + ::sockaddr_storage name; // big enough for any socket address + ::socklen_t namelen = sizeof(name); + + int result = -1; + if (local) { + result = ::getsockname(fd, (::sockaddr*)&name, &namelen); + } else { + result = ::getpeername(fd, (::sockaddr*)&name, &namelen); + } - QPID_POSIX_CHECK( ::getsockname(fd, name, &namelen) ); + QPID_POSIX_CHECK(result); - return SocketAddress::getPort(name); + char servName[NI_MAXSERV]; + if (int rc=::getnameinfo((::sockaddr*)&name, namelen, 0, 0, + servName, sizeof(servName), + NI_NUMERICHOST | NI_NUMERICSERV) != 0) + throw QPID_POSIX_ERROR(rc); + return servName; } } @@ -89,11 +119,6 @@ void Socket::createSocket(const SocketAddress& sa) const try { if (nonblocking) setNonblocking(); if (nodelay) setTcpNoDelay(); - if (getAddrInfo(sa).ai_family == AF_INET6) { - int flag = 1; - int result = ::setsockopt(socket, IPPROTO_IPV6, IPV6_V6ONLY, (char *)&flag, sizeof(flag)); - QPID_POSIX_CHECK(result); - } } catch (std::exception&) { ::close(s); socket = -1; @@ -101,18 +126,13 @@ void Socket::createSocket(const SocketAddress& sa) const } } -Socket* Socket::createSameTypeSocket() const { - int& socket = impl->fd; - // Socket currently has no actual socket attached - if (socket == -1) - return new Socket; - - ::sockaddr_storage sa; - ::socklen_t salen = sizeof(sa); - QPID_POSIX_CHECK(::getsockname(socket, (::sockaddr*)&sa, &salen)); - int s = ::socket(sa.ss_family, SOCK_STREAM, 0); // Currently only work with SOCK_STREAM - if (s < 0) throw QPID_POSIX_ERROR(errno); - return new Socket(new IOHandlePrivate(s)); +void Socket::setTimeout(const Duration& interval) const +{ + const int& socket = impl->fd; + struct timeval tv; + toTimeval(tv, interval); + setsockopt(socket, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)); + setsockopt(socket, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); } void Socket::setNonblocking() const { @@ -129,27 +149,20 @@ void Socket::setTcpNoDelay() const nodelay = true; if (socket != -1) { int flag = 1; - int result = ::setsockopt(impl->fd, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag)); + int result = setsockopt(impl->fd, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag)); QPID_POSIX_CHECK(result); } } -void Socket::connect(const std::string& host, const std::string& port) const +void Socket::connect(const std::string& host, uint16_t port) const { - SocketAddress sa(host, port); + SocketAddress sa(host, boost::lexical_cast<std::string>(port)); connect(sa); } void Socket::connect(const SocketAddress& addr) const { - // The display name for an outbound connection needs to be the name that was specified - // for the address rather than a resolved IP address as we don't know which of - // the IP addresses is actually the one that will be connected to. - peername = addr.asString(false); - - // However the string we compare with the local port must be numeric or it might not - // match when it should as getLocalAddress() will always be numeric - std::string connectname = addr.asString(); + connectname = addr.asString(); createSocket(addr); @@ -157,24 +170,7 @@ void Socket::connect(const SocketAddress& addr) const // TODO the correct thing to do here is loop on failure until you've used all the returned addresses if ((::connect(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) < 0) && (errno != EINPROGRESS)) { - throw Exception(QPID_MSG(strError(errno) << ": " << peername)); - } - // When connecting to a port on the same host which no longer has - // a process associated with it, the OS occasionally chooses the - // remote port (which is unoccupied) as the port to bind the local - // end of the socket, resulting in a "circular" connection. - // - // This seems like something the OS should prevent but I have - // confirmed that sporadic hangs in - // cluster_tests.LongTests.test_failover on RHEL5 are caused by - // such a circular connection. - // - // Raise an error if we see such a connection, since we know there is - // no listener on the peer address. - // - if (getLocalAddress() == connectname) { - close(); - throw Exception(QPID_MSG("Connection refused: " << peername)); + throw Exception(QPID_MSG(strError(errno) << ": " << connectname)); } } @@ -187,9 +183,9 @@ Socket::close() const socket = -1; } -int Socket::listen(const std::string& host, const std::string& port, int backlog) const +int Socket::listen(uint16_t port, int backlog) const { - SocketAddress sa(host, port); + SocketAddress sa("", boost::lexical_cast<std::string>(port)); return listen(sa, backlog); } @@ -199,24 +195,26 @@ int Socket::listen(const SocketAddress& sa, int backlog) const const int& socket = impl->fd; int yes=1; - QPID_POSIX_CHECK(::setsockopt(socket,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(yes))); + QPID_POSIX_CHECK(setsockopt(socket,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(yes))); if (::bind(socket, getAddrInfo(sa).ai_addr, getAddrInfo(sa).ai_addrlen) < 0) throw Exception(QPID_MSG("Can't bind to port " << sa.asString() << ": " << strError(errno))); if (::listen(socket, backlog) < 0) throw Exception(QPID_MSG("Can't listen on port " << sa.asString() << ": " << strError(errno))); - return getLocalPort(socket); + struct sockaddr_in name; + socklen_t namelen = sizeof(name); + if (::getsockname(socket, (struct sockaddr*)&name, &namelen) < 0) + throw QPID_POSIX_ERROR(errno); + + return ntohs(name.sin_port); } Socket* Socket::accept() const { int afd = ::accept(impl->fd, 0, 0); - if ( afd >= 0) { - Socket* s = new Socket(new IOHandlePrivate(afd)); - s->localname = localname; - return s; - } + if ( afd >= 0) + return new Socket(new IOHandlePrivate(afd)); else if (errno == EAGAIN) return 0; else throw QPID_POSIX_ERROR(errno); @@ -232,20 +230,37 @@ int Socket::write(const void *buf, size_t count) const return ::write(impl->fd, buf, count); } +std::string Socket::getSockname() const +{ + return getName(impl->fd, true); +} + +std::string Socket::getPeername() const +{ + return getName(impl->fd, false); +} + std::string Socket::getPeerAddress() const { - if (peername.empty()) { - peername = getName(impl->fd, false); + if (connectname.empty()) { + connectname = getName(impl->fd, false, true); } - return peername; + return connectname; } std::string Socket::getLocalAddress() const { - if (localname.empty()) { - localname = getName(impl->fd, true); - } - return localname; + return getName(impl->fd, true, true); +} + +uint16_t Socket::getLocalPort() const +{ + return std::atoi(getService(impl->fd, true).c_str()); +} + +uint16_t Socket::getRemotePort() const +{ + return std::atoi(getService(impl->fd, true).c_str()); } int Socket::getError() const |