diff options
Diffstat (limited to 'cpp')
28 files changed, 463 insertions, 297 deletions
diff --git a/cpp/include/qpid/sys/IOHandle.h b/cpp/include/qpid/sys/IOHandle.h index 45fc8c240a..06ae65f879 100644 --- a/cpp/include/qpid/sys/IOHandle.h +++ b/cpp/include/qpid/sys/IOHandle.h @@ -22,8 +22,6 @@ * */ -#include "qpid/CommonImportExport.h" - namespace qpid { namespace sys { @@ -31,18 +29,7 @@ namespace sys { * This is a class intended to abstract the Unix concept of file descriptor * or the Windows concept of HANDLE */ -class PollerHandle; -class IOHandlePrivate; -class IOHandle { - friend class PollerHandle; - friend class IOHandlePrivate; - -protected: - IOHandlePrivate* const impl; - - IOHandle(IOHandlePrivate*); - QPID_COMMON_EXTERN virtual ~IOHandle(); -}; +class IOHandle; }} diff --git a/cpp/include/qpid/sys/posix/PrivatePosix.h b/cpp/include/qpid/sys/posix/PrivatePosix.h index 79cb950275..0f59fe3176 100644 --- a/cpp/include/qpid/sys/posix/PrivatePosix.h +++ b/cpp/include/qpid/sys/posix/PrivatePosix.h @@ -23,7 +23,6 @@ */ #include "qpid/sys/Time.h" -#include "qpid/sys/IOHandle.h" struct timespec; struct timeval; @@ -41,32 +40,21 @@ Duration toTime(const struct timespec& ts); class SocketAddress; const struct addrinfo& getAddrInfo(const SocketAddress&); -// Private fd related implementation details -class IOHandlePrivate { +// Posix fd as an IOHandle +class IOHandle { public: - IOHandlePrivate(int f = -1) : - fd(f) + IOHandle(int fd0 = -1) : + fd(fd0) {} int fd; }; -int toFd(const IOHandlePrivate* h); - -// Posix fd as an IOHandle -class PosixIOHandle : public IOHandle { -public: - PosixIOHandle(int fd) : - IOHandle(new IOHandlePrivate(fd)) - {} -}; - // Dummy IOHandle for places it's required in the API // but we promise not to actually try to do any operations on the IOHandle class NullIOHandle : public IOHandle { public: - NullIOHandle() : - IOHandle(new IOHandlePrivate) + NullIOHandle() {} }; diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index ce94c0a318..20868da674 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -748,7 +748,7 @@ if (CMAKE_SYSTEM_NAME STREQUAL Windows) qpid/sys/windows/PipeHandle.cpp qpid/sys/windows/PollableCondition.cpp qpid/sys/windows/Shlib.cpp - qpid/sys/windows/Socket.cpp + qpid/sys/windows/WinSocket.cpp qpid/sys/windows/SocketAddress.cpp qpid/sys/windows/StrError.cpp qpid/sys/windows/SystemInfo.cpp @@ -853,7 +853,7 @@ else (CMAKE_SYSTEM_NAME STREQUAL Windows) qpid/sys/posix/PollableCondition.cpp qpid/sys/posix/Shlib.cpp qpid/log/posix/SinkOptions.cpp - qpid/sys/posix/Socket.cpp + qpid/sys/posix/BSDSocket.cpp qpid/sys/posix/SocketAddress.cpp qpid/sys/posix/StrError.cpp qpid/sys/posix/Thread.cpp diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 8149ea218b..488eb37eb0 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -46,7 +46,6 @@ windows_dist = \ qpid/sys/windows/QpidDllMain.h \ qpid/sys/windows/Shlib.cpp \ qpid/sys/windows/SocketAddress.cpp \ - qpid/sys/windows/Socket.cpp \ qpid/sys/windows/SslAsynchIO.cpp \ qpid/sys/windows/SslAsynchIO.h \ qpid/sys/windows/StrError.cpp \ @@ -56,6 +55,8 @@ windows_dist = \ ../include/qpid/sys/windows/Time.h \ qpid/sys/windows/uuid.cpp \ qpid/sys/windows/uuid.h \ + qpid/sys/windows/WinSocket.cpp \ + qpid/sys/windows/WinSocket.h \ windows/QpiddBroker.cpp \ windows/SCM.h \ windows/SCM.cpp \ @@ -158,7 +159,8 @@ qpidd_SOURCES = qpidd.cpp qpidd.h $(posix_qpidd_src) libqpidcommon_la_SOURCES += \ qpid/log/posix/SinkOptions.cpp \ qpid/sys/posix/IOHandle.cpp \ - qpid/sys/posix/Socket.cpp \ + qpid/sys/posix/BSDSocket.cpp \ + qpid/sys/posix/BSDSocket.h \ qpid/sys/posix/SocketAddress.cpp \ qpid/sys/posix/AsynchIO.cpp \ qpid/sys/posix/FileSysDir.cpp \ diff --git a/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp b/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp index ff177ba499..5b801aa69f 100644 --- a/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp +++ b/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp @@ -158,7 +158,7 @@ SslProtocolFactory::SslProtocolFactory(const SslServerOptions& options, // Get the certificate for this server. DWORD flags = 0; std::string certStoreLocation = options.certStoreLocation; - std::transform(certStoreLocation.begin(), certStoreLocation.end(), certStoreLocation.begin(), ::tolower);
+ std::transform(certStoreLocation.begin(), certStoreLocation.end(), certStoreLocation.begin(), ::tolower); if (certStoreLocation == "currentuser") { flags = CERT_SYSTEM_STORE_CURRENT_USER; } else if (certStoreLocation == "localmachine") { @@ -217,14 +217,14 @@ SslProtocolFactory::SslProtocolFactory(const SslServerOptions& options, // We must have at least one resolved address QPID_LOG(info, "SSL Listening to: " << sa.asString()) - Socket* s = new Socket; + Socket* s = createSocket(); listeningPort = s->listen(sa, backlog); listeners.push_back(s); // Try any other resolved addresses while (sa.nextAddress()) { QPID_LOG(info, "SSL Listening to: " << sa.asString()) - Socket* s = new Socket; + Socket* s = createSocket(); s->listen(sa, backlog); listeners.push_back(s); } @@ -325,7 +325,7 @@ void SslProtocolFactory::connect(sys::Poller::shared_ptr poller, // upon connection failure or by the AsynchIO upon connection // shutdown. The allocated AsynchConnector frees itself when it // is no longer needed. - qpid::sys::Socket* socket = new qpid::sys::Socket(); + qpid::sys::Socket* socket = createSocket(); connectFailedCallback = failed; AsynchConnector::create(*socket, host, diff --git a/cpp/src/qpid/client/TCPConnector.cpp b/cpp/src/qpid/client/TCPConnector.cpp index a14acb214c..b92f342b74 100644 --- a/cpp/src/qpid/client/TCPConnector.cpp +++ b/cpp/src/qpid/client/TCPConnector.cpp @@ -72,12 +72,13 @@ TCPConnector::TCPConnector(Poller::shared_ptr p, closed(true), shutdownHandler(0), input(0), + socket(createSocket()), connector(0), aio(0), poller(p) { QPID_LOG(debug, "TCPConnector created for " << version); - settings.configureSocket(socket); + settings.configureSocket(*socket); } TCPConnector::~TCPConnector() { @@ -88,7 +89,7 @@ void TCPConnector::connect(const std::string& host, const std::string& port) { Mutex::ScopedLock l(lock); assert(closed); connector = AsynchConnector::create( - socket, + *socket, host, port, boost::bind(&TCPConnector::connected, this, _1), boost::bind(&TCPConnector::connectFailed, this, _3)); @@ -99,7 +100,7 @@ void TCPConnector::connect(const std::string& host, const std::string& port) { void TCPConnector::connected(const Socket&) { connector = 0; - aio = AsynchIO::create(socket, + aio = AsynchIO::create(*socket, boost::bind(&TCPConnector::readbuff, this, _1, _2), boost::bind(&TCPConnector::eof, this, _1), boost::bind(&TCPConnector::disconnected, this, _1), @@ -116,7 +117,7 @@ void TCPConnector::start(sys::AsynchIO* aio_) { aio->createBuffers(maxFrameSize); - identifier = str(format("[%1%]") % socket.getFullAddress()); + identifier = str(format("[%1%]") % socket->getFullAddress()); } void TCPConnector::initAmqp() { @@ -127,7 +128,7 @@ void TCPConnector::initAmqp() { void TCPConnector::connectFailed(const std::string& msg) { connector = 0; QPID_LOG(warning, "Connect failed: " << msg); - socket.close(); + socket->close(); if (!closed) closed = true; if (shutdownHandler) @@ -318,7 +319,7 @@ void TCPConnector::eof(AsynchIO&) { void TCPConnector::disconnected(AsynchIO&) { close(); - socketClosed(*aio, socket); + socketClosed(*aio, *socket); } void TCPConnector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer> sl) diff --git a/cpp/src/qpid/client/TCPConnector.h b/cpp/src/qpid/client/TCPConnector.h index 5e1a3856e6..a90dffd3ef 100644 --- a/cpp/src/qpid/client/TCPConnector.h +++ b/cpp/src/qpid/client/TCPConnector.h @@ -35,7 +35,7 @@ #include "qpid/sys/Thread.h" #include <boost/shared_ptr.hpp> -#include <boost/weak_ptr.hpp> +#include <boost/scoped_ptr.hpp> #include <deque> #include <string> @@ -66,7 +66,7 @@ class TCPConnector : public Connector, public sys::Codec sys::ShutdownHandler* shutdownHandler; framing::InputHandler* input; - sys::Socket socket; + boost::scoped_ptr<sys::Socket> socket; sys::AsynchConnector* connector; sys::AsynchIO* aio; diff --git a/cpp/src/qpid/sys/Socket.h b/cpp/src/qpid/sys/Socket.h index aa8a8a31d9..2119566d99 100644 --- a/cpp/src/qpid/sys/Socket.h +++ b/cpp/src/qpid/sys/Socket.h @@ -22,7 +22,6 @@ * */ -#include "qpid/sys/IOHandle.h" #include "qpid/sys/IntegerTypes.h" #include "qpid/CommonImportExport.h" #include <string> @@ -31,47 +30,42 @@ namespace qpid { namespace sys { class Duration; +class IOHandle; class SocketAddress; -namespace ssl { -class SslMuxSocket; -} - -class QPID_COMMON_CLASS_EXTERN Socket : public IOHandle +class Socket { public: - /** Create a socket wrapper for descriptor. */ - QPID_COMMON_EXTERN Socket(); + virtual ~Socket() {}; - /** Create a new Socket which is the same address family as this one */ - QPID_COMMON_EXTERN Socket* createSameTypeSocket() const; + virtual operator const IOHandle&() const = 0; /** Set socket non blocking */ - void setNonblocking() const; + virtual void setNonblocking() const = 0; - QPID_COMMON_EXTERN void setTcpNoDelay() const; + virtual void setTcpNoDelay() const = 0; - QPID_COMMON_EXTERN void connect(const SocketAddress&) const; + virtual void connect(const SocketAddress&) const = 0; - QPID_COMMON_EXTERN void close() const; + virtual void close() const = 0; /** Bind to a port and start listening. *@param port 0 means choose an available port. *@param backlog maximum number of pending connections. *@return The bound port. */ - QPID_COMMON_EXTERN int listen(const SocketAddress&, int backlog = 10) const; + virtual int listen(const SocketAddress&, int backlog = 10) const = 0; /** * Returns an address (host and port) for the remote end of the * socket */ - QPID_COMMON_EXTERN std::string getPeerAddress() const; + virtual std::string getPeerAddress() const = 0; /** * Returns an address (host and port) for the local end of the * socket */ - QPID_COMMON_EXTERN std::string getLocalAddress() const; + virtual std::string getLocalAddress() const = 0; /** * Returns the full address of the connection: local and remote host and port. @@ -82,30 +76,20 @@ public: * Returns the error code stored in the socket. This may be used * to determine the result of a non-blocking connect. */ - QPID_COMMON_EXTERN int getError() const; + virtual int getError() const = 0; /** Accept a connection from a socket that is already listening * and has an incoming connection */ - QPID_COMMON_EXTERN Socket* accept() const; - - // TODO The following are raw operations, maybe they need better wrapping? - QPID_COMMON_EXTERN int read(void *buf, size_t count) const; - QPID_COMMON_EXTERN int write(const void *buf, size_t count) const; - -protected: - /** Create socket */ - void createSocket(const SocketAddress&) const; + virtual Socket* accept() const = 0; - mutable std::string localname; - mutable std::string peername; - mutable bool nonblocking; - mutable bool nodelay; - - /** Construct socket with existing handle */ - Socket(IOHandlePrivate*); - friend class qpid::sys::ssl::SslMuxSocket; + virtual int read(void *buf, size_t count) const = 0; + virtual int write(const void *buf, size_t count) const = 0; }; +/** Make the default socket for whatever platform we are executing on + */ +QPID_COMMON_EXTERN Socket* createSocket(); + }} #endif /*!_sys_Socket_h*/ diff --git a/cpp/src/qpid/sys/TCPIOPlugin.cpp b/cpp/src/qpid/sys/TCPIOPlugin.cpp index ed7cc3748d..2ff47e982c 100644 --- a/cpp/src/qpid/sys/TCPIOPlugin.cpp +++ b/cpp/src/qpid/sys/TCPIOPlugin.cpp @@ -126,7 +126,7 @@ AsynchIOProtocolFactory::AsynchIOProtocolFactory(const std::string& host, const // We must have at least one resolved address QPID_LOG(info, "Listening to: " << sa.asString()) - Socket* s = new Socket; + Socket* s = createSocket(); uint16_t lport = s->listen(sa, backlog); QPID_LOG(debug, "Listened to: " << lport); listeners.push_back(s); @@ -138,7 +138,7 @@ AsynchIOProtocolFactory::AsynchIOProtocolFactory(const std::string& host, const // Hack to ensure that all listening connections are on the same port sa.setAddrInfoPort(listeningPort); QPID_LOG(info, "Listening to: " << sa.asString()) - Socket* s = new Socket; + Socket* s = createSocket(); uint16_t lport = s->listen(sa, backlog); QPID_LOG(debug, "Listened to: " << lport); listeners.push_back(s); @@ -204,7 +204,7 @@ void AsynchIOProtocolFactory::connect( // upon connection failure or by the AsynchIO upon connection // shutdown. The allocated AsynchConnector frees itself when it // is no longer needed. - Socket* socket = new Socket(); + Socket* socket = createSocket(); try { AsynchConnector* c = AsynchConnector::create( *socket, diff --git a/cpp/src/qpid/sys/epoll/EpollPoller.cpp b/cpp/src/qpid/sys/epoll/EpollPoller.cpp index c23403c66d..3769a11f7d 100644 --- a/cpp/src/qpid/sys/epoll/EpollPoller.cpp +++ b/cpp/src/qpid/sys/epoll/EpollPoller.cpp @@ -20,7 +20,6 @@ */ #include "qpid/sys/Poller.h" -#include "qpid/sys/IOHandle.h" #include "qpid/sys/Mutex.h" #include "qpid/sys/AtomicCount.h" #include "qpid/sys/DeletionManager.h" @@ -64,12 +63,12 @@ class PollerHandlePrivate { }; ::__uint32_t events; - const IOHandlePrivate* ioHandle; + const IOHandle* ioHandle; PollerHandle* pollerHandle; FDStat stat; Mutex lock; - PollerHandlePrivate(const IOHandlePrivate* h, PollerHandle* p) : + PollerHandlePrivate(const IOHandle* h, PollerHandle* p) : events(0), ioHandle(h), pollerHandle(p), @@ -77,7 +76,7 @@ class PollerHandlePrivate { } int fd() const { - return toFd(ioHandle); + return ioHandle->fd; } bool isActive() const { @@ -138,7 +137,7 @@ class PollerHandlePrivate { }; PollerHandle::PollerHandle(const IOHandle& h) : - impl(new PollerHandlePrivate(h.impl, this)) + impl(new PollerHandlePrivate(&h, this)) {} PollerHandle::~PollerHandle() { diff --git a/cpp/src/qpid/sys/posix/Socket.cpp b/cpp/src/qpid/sys/posix/BSDSocket.cpp index 0c01374369..265142f629 100644 --- a/cpp/src/qpid/sys/posix/Socket.cpp +++ b/cpp/src/qpid/sys/posix/BSDSocket.cpp @@ -19,7 +19,7 @@ * */ -#include "qpid/sys/Socket.h" +#include "qpid/sys/posix/BSDSocket.h" #include "qpid/sys/SocketAddress.h" #include "qpid/sys/posix/check.h" @@ -67,25 +67,41 @@ uint16_t getLocalPort(int fd) } } -Socket::Socket() : - IOHandle(new IOHandlePrivate), +BSDSocket::BSDSocket() : + fd(-1), + handle(new IOHandle), nonblocking(false), nodelay(false) {} -Socket::Socket(IOHandlePrivate* h) : - IOHandle(h), +Socket* createSocket() +{ + return new BSDSocket; +} + +BSDSocket::BSDSocket(int fd0) : + fd(fd0), + handle(new IOHandle(fd)), nonblocking(false), nodelay(false) {} -void Socket::createSocket(const SocketAddress& sa) const +BSDSocket::~BSDSocket() +{} + +BSDSocket::operator const IOHandle&() const +{ + return *handle; +} + +void BSDSocket::createSocket(const SocketAddress& sa) const { - int& socket = impl->fd; - if (socket != -1) Socket::close(); + int& socket = fd; + if (socket != -1) BSDSocket::close(); int s = ::socket(getAddrInfo(sa).ai_family, getAddrInfo(sa).ai_socktype, 0); if (s < 0) throw QPID_POSIX_ERROR(errno); socket = s; + *handle = IOHandle(s); try { if (nonblocking) setNonblocking(); @@ -98,44 +114,31 @@ void Socket::createSocket(const SocketAddress& sa) const } catch (std::exception&) { ::close(s); socket = -1; + *handle = IOHandle(); throw; } } -Socket* Socket::createSameTypeSocket() const { - int& socket = impl->fd; - // Socket currently has no actual socket attached - if (socket == -1) - return new Socket; - - ::sockaddr_storage sa; - ::socklen_t salen = sizeof(sa); - QPID_POSIX_CHECK(::getsockname(socket, (::sockaddr*)&sa, &salen)); - int s = ::socket(sa.ss_family, SOCK_STREAM, 0); // Currently only work with SOCK_STREAM - if (s < 0) throw QPID_POSIX_ERROR(errno); - return new Socket(new IOHandlePrivate(s)); -} - -void Socket::setNonblocking() const { - int& socket = impl->fd; +void BSDSocket::setNonblocking() const { + int& socket = fd; nonblocking = true; if (socket != -1) { QPID_POSIX_CHECK(::fcntl(socket, F_SETFL, O_NONBLOCK)); } } -void Socket::setTcpNoDelay() const +void BSDSocket::setTcpNoDelay() const { - int& socket = impl->fd; + int& socket = fd; nodelay = true; if (socket != -1) { int flag = 1; - int result = ::setsockopt(impl->fd, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag)); + int result = ::setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag)); QPID_POSIX_CHECK(result); } } -void Socket::connect(const SocketAddress& addr) const +void BSDSocket::connect(const SocketAddress& addr) const { // The display name for an outbound connection needs to be the name that was specified // for the address rather than a resolved IP address as we don't know which of @@ -148,7 +151,7 @@ void Socket::connect(const SocketAddress& addr) const createSocket(addr); - const int& socket = impl->fd; + const int& socket = fd; // TODO the correct thing to do here is loop on failure until you've used all the returned addresses if ((::connect(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) < 0) && (errno != EINPROGRESS)) { @@ -174,19 +177,20 @@ void Socket::connect(const SocketAddress& addr) const } void -Socket::close() const +BSDSocket::close() const { - int& socket = impl->fd; + int& socket = fd; if (socket == -1) return; if (::close(socket) < 0) throw QPID_POSIX_ERROR(errno); socket = -1; + *handle = IOHandle(); } -int Socket::listen(const SocketAddress& sa, int backlog) const +int BSDSocket::listen(const SocketAddress& sa, int backlog) const { createSocket(sa); - const int& socket = impl->fd; + const int& socket = fd; int yes=1; QPID_POSIX_CHECK(::setsockopt(socket,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(yes))); @@ -198,11 +202,11 @@ int Socket::listen(const SocketAddress& sa, int backlog) const return getLocalPort(socket); } -Socket* Socket::accept() const +Socket* BSDSocket::accept() const { - int afd = ::accept(impl->fd, 0, 0); + int afd = ::accept(fd, 0, 0); if ( afd >= 0) { - Socket* s = new Socket(new IOHandlePrivate(afd)); + BSDSocket* s = new BSDSocket(afd); s->localname = localname; return s; } @@ -211,38 +215,38 @@ Socket* Socket::accept() const else throw QPID_POSIX_ERROR(errno); } -int Socket::read(void *buf, size_t count) const +int BSDSocket::read(void *buf, size_t count) const { - return ::read(impl->fd, buf, count); + return ::read(fd, buf, count); } -int Socket::write(const void *buf, size_t count) const +int BSDSocket::write(const void *buf, size_t count) const { - return ::write(impl->fd, buf, count); + return ::write(fd, buf, count); } -std::string Socket::getPeerAddress() const +std::string BSDSocket::getPeerAddress() const { if (peername.empty()) { - peername = getName(impl->fd, false); + peername = getName(fd, false); } return peername; } -std::string Socket::getLocalAddress() const +std::string BSDSocket::getLocalAddress() const { if (localname.empty()) { - localname = getName(impl->fd, true); + localname = getName(fd, true); } return localname; } -int Socket::getError() const +int BSDSocket::getError() const { int result; socklen_t rSize = sizeof (result); - if (::getsockopt(impl->fd, SOL_SOCKET, SO_ERROR, &result, &rSize) < 0) + if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &result, &rSize) < 0) throw QPID_POSIX_ERROR(errno); return result; diff --git a/cpp/src/qpid/sys/posix/BSDSocket.h b/cpp/src/qpid/sys/posix/BSDSocket.h new file mode 100644 index 0000000000..98d7eb6e4d --- /dev/null +++ b/cpp/src/qpid/sys/posix/BSDSocket.h @@ -0,0 +1,109 @@ +#ifndef QPID_SYS_BSDSOCKET_H +#define QPID_SYS_BSDSOCKET_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/Socket.h" +#include "qpid/sys/IntegerTypes.h" +#include "qpid/CommonImportExport.h" +#include <string> + +#include <boost/scoped_ptr.hpp> + +namespace qpid { +namespace sys { + +class Duration; +class IOHandle; +class SocketAddress; + +namespace ssl { +class SslMuxSocket; +} + +class QPID_COMMON_CLASS_EXTERN BSDSocket : public Socket +{ +public: + /** Create a socket wrapper for descriptor. */ + QPID_COMMON_EXTERN BSDSocket(); + QPID_COMMON_EXTERN ~BSDSocket(); + + QPID_COMMON_EXTERN operator const IOHandle&() const; + + /** Set socket non blocking */ + QPID_COMMON_EXTERN virtual void setNonblocking() const; + + QPID_COMMON_EXTERN virtual void setTcpNoDelay() const; + + QPID_COMMON_EXTERN virtual void connect(const SocketAddress&) const; + + QPID_COMMON_EXTERN virtual void close() const; + + /** Bind to a port and start listening. + *@return The bound port number + */ + QPID_COMMON_EXTERN virtual int listen(const SocketAddress&, int backlog = 10) const; + + /** + * Returns an address (host and port) for the remote end of the + * socket + */ + QPID_COMMON_EXTERN std::string getPeerAddress() const; + /** + * Returns an address (host and port) for the local end of the + * socket + */ + QPID_COMMON_EXTERN std::string getLocalAddress() const; + + /** + * Returns the error code stored in the socket. This may be used + * to determine the result of a non-blocking connect. + */ + QPID_COMMON_EXTERN int getError() const; + + /** Accept a connection from a socket that is already listening + * and has an incoming connection + */ + QPID_COMMON_EXTERN virtual Socket* accept() const; + + // TODO The following are raw operations, maybe they need better wrapping? + QPID_COMMON_EXTERN virtual int read(void *buf, size_t count) const; + QPID_COMMON_EXTERN virtual int write(const void *buf, size_t count) const; + +protected: + /** Create socket */ + void createSocket(const SocketAddress&) const; + + mutable int fd; + mutable boost::scoped_ptr<IOHandle> handle; + mutable std::string localname; + mutable std::string peername; + mutable bool nonblocking; + mutable bool nodelay; + + /** Construct socket with existing handle */ + BSDSocket(int fd); + friend class qpid::sys::ssl::SslMuxSocket; // Needed for this constructor +}; + +}} +#endif /*!QPID_SYS_BSDSOCKET_H*/ diff --git a/cpp/src/qpid/sys/posix/IOHandle.cpp b/cpp/src/qpid/sys/posix/IOHandle.cpp index 9c049ee1de..d3f502a63c 100644 --- a/cpp/src/qpid/sys/posix/IOHandle.cpp +++ b/cpp/src/qpid/sys/posix/IOHandle.cpp @@ -19,26 +19,11 @@ * */ -#include "qpid/sys/IOHandle.h" - #include "qpid/sys/posix/PrivatePosix.h" namespace qpid { namespace sys { -int toFd(const IOHandlePrivate* h) -{ - return h->fd; -} - NullIOHandle DummyIOHandle; -IOHandle::IOHandle(IOHandlePrivate* h) : - impl(h) -{} - -IOHandle::~IOHandle() { - delete impl; -} - }} // namespace qpid::sys diff --git a/cpp/src/qpid/sys/posix/PollableCondition.cpp b/cpp/src/qpid/sys/posix/PollableCondition.cpp index abff8a5be8..aa129faf20 100644 --- a/cpp/src/qpid/sys/posix/PollableCondition.cpp +++ b/cpp/src/qpid/sys/posix/PollableCondition.cpp @@ -21,7 +21,6 @@ #include "qpid/sys/PollableCondition.h" #include "qpid/sys/DispatchHandle.h" -#include "qpid/sys/IOHandle.h" #include "qpid/sys/posix/PrivatePosix.h" #include "qpid/Exception.h" @@ -58,14 +57,14 @@ PollableConditionPrivate::PollableConditionPrivate( const sys::PollableCondition::Callback& cb, sys::PollableCondition& parent, const boost::shared_ptr<sys::Poller>& poller -) : IOHandle(new sys::IOHandlePrivate), cb(cb), parent(parent) +) : cb(cb), parent(parent) { int fds[2]; if (::pipe(fds) == -1) throw ErrnoException(QPID_MSG("Can't create PollableCondition")); - impl->fd = fds[0]; + fd = fds[0]; writeFd = fds[1]; - if (::fcntl(impl->fd, F_SETFL, O_NONBLOCK) == -1) + if (::fcntl(fd, F_SETFL, O_NONBLOCK) == -1) throw ErrnoException(QPID_MSG("Can't create PollableCondition")); if (::fcntl(writeFd, F_SETFL, O_NONBLOCK) == -1) throw ErrnoException(QPID_MSG("Can't create PollableCondition")); diff --git a/cpp/src/qpid/sys/posix/PosixPoller.cpp b/cpp/src/qpid/sys/posix/PosixPoller.cpp index eb0c3384d1..ae839b2e20 100644 --- a/cpp/src/qpid/sys/posix/PosixPoller.cpp +++ b/cpp/src/qpid/sys/posix/PosixPoller.cpp @@ -88,12 +88,12 @@ class PollerHandlePrivate { }; short events; - const IOHandlePrivate* ioHandle; + const IOHandle* ioHandle; PollerHandle* pollerHandle; FDStat stat; Mutex lock; - PollerHandlePrivate(const IOHandlePrivate* h, PollerHandle* p) : + PollerHandlePrivate(const IOHandle* h, PollerHandle* p) : events(0), ioHandle(h), pollerHandle(p), @@ -101,7 +101,7 @@ class PollerHandlePrivate { } int fd() const { - return toFd(ioHandle); + return ioHandle->fd; } bool isActive() const { @@ -162,7 +162,7 @@ class PollerHandlePrivate { }; PollerHandle::PollerHandle(const IOHandle& h) : - impl(new PollerHandlePrivate(h.impl, this)) + impl(new PollerHandlePrivate(&h, this)) {} PollerHandle::~PollerHandle() { diff --git a/cpp/src/qpid/sys/rdma/rdma_wrap.cpp b/cpp/src/qpid/sys/rdma/rdma_wrap.cpp index efe454c5be..889ee9ff75 100644 --- a/cpp/src/qpid/sys/rdma/rdma_wrap.cpp +++ b/cpp/src/qpid/sys/rdma/rdma_wrap.cpp @@ -105,7 +105,7 @@ namespace Rdma { } QueuePair::QueuePair(boost::shared_ptr< ::rdma_cm_id > i) : - qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate), + handle(new qpid::sys::IOHandle), pd(allocPd(i->verbs)), cchannel(mkCChannel(i->verbs)), scq(mkCq(i->verbs, DEFAULT_CQ_ENTRIES, 0, cchannel.get())), @@ -113,7 +113,7 @@ namespace Rdma { outstandingSendEvents(0), outstandingRecvEvents(0) { - impl->fd = cchannel->fd; + handle->fd = cchannel->fd; // Set cq context to this QueuePair object so we can find // ourselves again @@ -163,6 +163,11 @@ namespace Rdma { // The buffers vectors automatically deletes all the buffers we've allocated } + QueuePair::operator qpid::sys::IOHandle&() const + { + return *handle; + } + // Create buffers to use for writing void QueuePair::createSendBuffers(int sendBufferCount, int bufferSize, int reserved) { @@ -359,11 +364,11 @@ namespace Rdma { // Wrap the passed in rdma_cm_id with a Connection // this basically happens only on connection request Connection::Connection(::rdma_cm_id* i) : - qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate), + handle(new qpid::sys::IOHandle), id(mkId(i)), context(0) { - impl->fd = id->channel->fd; + handle->fd = id->channel->fd; // Just overwrite the previous context as it will // have come from the listening connection @@ -372,12 +377,12 @@ namespace Rdma { } Connection::Connection() : - qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate), + handle(new qpid::sys::IOHandle), channel(mkEChannel()), id(mkId(channel.get(), this, RDMA_PS_TCP)), context(0) { - impl->fd = channel->fd; + handle->fd = channel->fd; } Connection::~Connection() { @@ -385,6 +390,11 @@ namespace Rdma { id->context = 0; } + Connection::operator qpid::sys::IOHandle&() const + { + return *handle; + } + void Connection::ensureQueuePair() { assert(id.get()); diff --git a/cpp/src/qpid/sys/rdma/rdma_wrap.h b/cpp/src/qpid/sys/rdma/rdma_wrap.h index 8e3429027b..5f84793a5b 100644 --- a/cpp/src/qpid/sys/rdma/rdma_wrap.h +++ b/cpp/src/qpid/sys/rdma/rdma_wrap.h @@ -28,6 +28,7 @@ #include "qpid/sys/Mutex.h" #include <boost/shared_ptr.hpp> +#include <boost/scoped_ptr.hpp> #include <boost/intrusive_ptr.hpp> #include <boost/ptr_container/ptr_deque.hpp> @@ -116,9 +117,10 @@ namespace Rdma { // Wrapper for a queue pair - this has the functionality for // putting buffers on the receive queue and for sending buffers // to the other end of the connection. - class QueuePair : public qpid::sys::IOHandle, public qpid::RefCounted { + class QueuePair : public qpid::RefCounted { friend class Connection; + boost::scoped_ptr< qpid::sys::IOHandle > handle; boost::shared_ptr< ::ibv_pd > pd; boost::shared_ptr< ::ibv_mr > smr; boost::shared_ptr< ::ibv_mr > rmr; @@ -139,6 +141,8 @@ namespace Rdma { public: typedef boost::intrusive_ptr<QueuePair> intrusive_ptr; + operator qpid::sys::IOHandle&() const; + // Create a buffers to use for writing void createSendBuffers(int sendBufferCount, int dataSize, int headerSize); @@ -195,7 +199,8 @@ namespace Rdma { // registered buffers can't be shared between different connections // (this can only happen between connections on the same controller in any case, // so needs careful management if used) - class Connection : public qpid::sys::IOHandle, public qpid::RefCounted { + class Connection : public qpid::RefCounted { + boost::scoped_ptr< qpid::sys::IOHandle > handle; boost::shared_ptr< ::rdma_event_channel > channel; boost::shared_ptr< ::rdma_cm_id > id; QueuePair::intrusive_ptr qp; @@ -216,6 +221,8 @@ namespace Rdma { public: typedef boost::intrusive_ptr<Connection> intrusive_ptr; + operator qpid::sys::IOHandle&() const; + static intrusive_ptr make(); static intrusive_ptr find(::rdma_cm_id* i); diff --git a/cpp/src/qpid/sys/ssl/SslSocket.cpp b/cpp/src/qpid/sys/ssl/SslSocket.cpp index 6b6f326492..22f9f63fff 100644 --- a/cpp/src/qpid/sys/ssl/SslSocket.cpp +++ b/cpp/src/qpid/sys/ssl/SslSocket.cpp @@ -98,16 +98,16 @@ SslSocket::SslSocket(const std::string& certName, bool clientAuth) : * returned from accept. Because we use posix accept rather than * PR_Accept, we have to reset the handshake. */ -SslSocket::SslSocket(IOHandlePrivate* ioph, PRFileDesc* model) : Socket(ioph), nssSocket(0), prototype(0) +SslSocket::SslSocket(int fd, PRFileDesc* model) : BSDSocket(fd), nssSocket(0), prototype(0) { - nssSocket = SSL_ImportFD(model, PR_ImportTCPSocket(impl->fd)); + nssSocket = SSL_ImportFD(model, PR_ImportTCPSocket(fd)); NSS_CHECK(SSL_ResetHandshake(nssSocket, PR_TRUE)); } void SslSocket::setNonblocking() const { if (!nssSocket) { - Socket::setNonblocking(); + BSDSocket::setNonblocking(); return; } PRSocketOptionData option; @@ -119,7 +119,7 @@ void SslSocket::setNonblocking() const void SslSocket::setTcpNoDelay() const { if (!nssSocket) { - Socket::setTcpNoDelay(); + BSDSocket::setTcpNoDelay(); return; } PRSocketOptionData option; @@ -130,9 +130,9 @@ void SslSocket::setTcpNoDelay() const void SslSocket::connect(const SocketAddress& addr) const { - Socket::connect(addr); + BSDSocket::connect(addr); - nssSocket = SSL_ImportFD(0, PR_ImportTCPSocket(impl->fd)); + nssSocket = SSL_ImportFD(0, PR_ImportTCPSocket(fd)); void* arg; // Use the connection's cert-name if it has one; else use global cert-name @@ -155,12 +155,12 @@ void SslSocket::connect(const SocketAddress& addr) const void SslSocket::close() const { if (!nssSocket) { - Socket::close(); + BSDSocket::close(); return; } - if (impl->fd > 0) { + if (fd > 0) { PR_Close(nssSocket); - impl->fd = -1; + fd = -1; } } @@ -176,15 +176,15 @@ int SslSocket::listen(const SocketAddress& sa, int backlog) const SECKEY_DestroyPrivateKey(key); CERT_DestroyCertificate(cert); - return Socket::listen(sa, backlog); + return BSDSocket::listen(sa, backlog); } -SslSocket* SslSocket::accept() const +Socket* SslSocket::accept() const { QPID_LOG(trace, "Accepting SSL connection."); - int afd = ::accept(impl->fd, 0, 0); + int afd = ::accept(fd, 0, 0); if ( afd >= 0) { - return new SslSocket(new IOHandlePrivate(afd), prototype); + return new SslSocket(afd, prototype); } else if (errno == EAGAIN) { return 0; } else { @@ -275,15 +275,15 @@ SslMuxSocket::SslMuxSocket(const std::string& certName, bool clientAuth) : Socket* SslMuxSocket::accept() const { - int afd = ::accept(impl->fd, 0, 0); + int afd = ::accept(fd, 0, 0); if (afd >= 0) { QPID_LOG(trace, "Accepting connection with optional SSL wrapper."); if (isSslStream(afd)) { QPID_LOG(trace, "Accepted SSL connection."); - return new SslSocket(new IOHandlePrivate(afd), prototype); + return new SslSocket(afd, prototype); } else { QPID_LOG(trace, "Accepted Plaintext connection."); - return new Socket(new IOHandlePrivate(afd)); + return new BSDSocket(afd); } } else if (errno == EAGAIN) { return 0; diff --git a/cpp/src/qpid/sys/ssl/SslSocket.h b/cpp/src/qpid/sys/ssl/SslSocket.h index 1b5424cfeb..1efbbe4a88 100644 --- a/cpp/src/qpid/sys/ssl/SslSocket.h +++ b/cpp/src/qpid/sys/ssl/SslSocket.h @@ -23,7 +23,7 @@ */ #include "qpid/sys/IOHandle.h" -#include "qpid/sys/Socket.h" +#include "qpid/sys/posix/BSDSocket.h" #include <nspr.h> #include <string> @@ -37,7 +37,7 @@ class Duration; namespace ssl { -class SslSocket : public qpid::sys::Socket +class SslSocket : public qpid::sys::BSDSocket { public: /** Create a socket wrapper for descriptor. @@ -71,7 +71,7 @@ public: * Accept a connection from a socket that is already listening * and has an incoming connection */ - SslSocket* accept() const; + virtual Socket* accept() const; // TODO The following are raw operations, maybe they need better wrapping? int read(void *buf, size_t count) const; @@ -92,8 +92,8 @@ protected: */ mutable PRFileDesc* prototype; - SslSocket(IOHandlePrivate* ioph, PRFileDesc* model); - friend class SslMuxSocket; + SslSocket(int fd, PRFileDesc* model); + friend class SslMuxSocket; // Needed for this constructor }; class SslMuxSocket : public SslSocket diff --git a/cpp/src/qpid/sys/windows/AsynchIO.cpp b/cpp/src/qpid/sys/windows/AsynchIO.cpp index 9fdf89c83b..e7e966519d 100644 --- a/cpp/src/qpid/sys/windows/AsynchIO.cpp +++ b/cpp/src/qpid/sys/windows/AsynchIO.cpp @@ -24,6 +24,7 @@ #include "qpid/sys/AsynchIO.h" #include "qpid/sys/Mutex.h" #include "qpid/sys/Socket.h" +#include "qpid/sys/windows/WinSocket.h" #include "qpid/sys/SocketAddress.h" #include "qpid/sys/Poller.h" #include "qpid/sys/Thread.h" @@ -51,8 +52,8 @@ namespace { * The function pointers for AcceptEx and ConnectEx need to be looked up * at run time. */ -const LPFN_ACCEPTEX lookUpAcceptEx(const qpid::sys::Socket& s) { - SOCKET h = toSocketHandle(s); +const LPFN_ACCEPTEX lookUpAcceptEx(const qpid::sys::IOHandle& io) { + SOCKET h = io.fd; GUID guidAcceptEx = WSAID_ACCEPTEX; DWORD dwBytes = 0; LPFN_ACCEPTEX fnAcceptEx; @@ -94,12 +95,14 @@ private: AsynchAcceptor::Callback acceptedCallback; const Socket& socket; + const SOCKET wSocket; const LPFN_ACCEPTEX fnAcceptEx; }; AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback) : acceptedCallback(callback), socket(s), + wSocket(IOHandle(s).fd), fnAcceptEx(lookUpAcceptEx(s)) { s.setNonblocking(); @@ -122,8 +125,8 @@ void AsynchAcceptor::restart(void) { this, socket); BOOL status; - status = fnAcceptEx(toSocketHandle(socket), - toSocketHandle(*result->newSocket), + status = fnAcceptEx(wSocket, + IOHandle(*result->newSocket).fd, result->addressBuffer, 0, AsynchAcceptResult::SOCKADDRMAXLEN, @@ -134,16 +137,30 @@ void AsynchAcceptor::restart(void) { } +Socket* createSameTypeSocket(const Socket& sock) { + SOCKET socket = IOHandle(sock).fd; + // Socket currently has no actual socket attached + if (socket == INVALID_SOCKET) + return new WinSocket; + + ::sockaddr_storage sa; + ::socklen_t salen = sizeof(sa); + QPID_WINSOCK_CHECK(::getsockname(socket, (::sockaddr*)&sa, &salen)); + SOCKET s = ::socket(sa.ss_family, SOCK_STREAM, 0); // Currently only work with SOCK_STREAM + if (s == INVALID_SOCKET) throw QPID_WINDOWS_ERROR(WSAGetLastError()); + return new WinSocket(s); +} + AsynchAcceptResult::AsynchAcceptResult(AsynchAcceptor::Callback cb, AsynchAcceptor *acceptor, - const Socket& listener) + const Socket& lsocket) : callback(cb), acceptor(acceptor), - listener(toSocketHandle(listener)), - newSocket(listener.createSameTypeSocket()) { + listener(IOHandle(lsocket).fd), + newSocket(createSameTypeSocket(lsocket)) { } void AsynchAcceptResult::success(size_t /*bytesTransferred*/) { - ::setsockopt (toSocketHandle(*newSocket), + ::setsockopt (IOHandle(*newSocket).fd, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char*)&listener, @@ -363,7 +380,7 @@ class CallbackHandle : public IOHandle { public: CallbackHandle(AsynchIoResult::Completer completeCb, AsynchIO::RequestCallback reqCb = 0) : - IOHandle(new IOHandlePrivate (INVALID_SOCKET, completeCb, reqCb)) + IOHandle(INVALID_SOCKET, completeCb, reqCb) {} }; @@ -516,7 +533,7 @@ void AsynchIO::startReading() { DWORD bytesReceived = 0, flags = 0; InterlockedIncrement(&opsInProgress); readInProgress = true; - int status = WSARecv(toSocketHandle(socket), + int status = WSARecv(IOHandle(socket).fd, const_cast<LPWSABUF>(result->getWSABUF()), 1, &bytesReceived, &flags, @@ -614,7 +631,7 @@ void AsynchIO::startWrite(AsynchIO::BufferBase* buff) { buff, buff->dataCount); DWORD bytesSent = 0; - int status = WSASend(toSocketHandle(socket), + int status = WSASend(IOHandle(socket).fd, const_cast<LPWSABUF>(result->getWSABUF()), 1, &bytesSent, 0, diff --git a/cpp/src/qpid/sys/windows/IOHandle.cpp b/cpp/src/qpid/sys/windows/IOHandle.cpp index 250737cb99..19a1c44875 100755 --- a/cpp/src/qpid/sys/windows/IOHandle.cpp +++ b/cpp/src/qpid/sys/windows/IOHandle.cpp @@ -19,24 +19,11 @@ * */ -#include "qpid/sys/IOHandle.h" #include "qpid/sys/windows/IoHandlePrivate.h" #include <windows.h> namespace qpid { namespace sys { -SOCKET toFd(const IOHandlePrivate* h) -{ - return h->fd; -} - -IOHandle::IOHandle(IOHandlePrivate* h) : - impl(h) -{} - -IOHandle::~IOHandle() { - delete impl; -} }} // namespace qpid::sys diff --git a/cpp/src/qpid/sys/windows/IoHandlePrivate.h b/cpp/src/qpid/sys/windows/IoHandlePrivate.h index 5943db5cc7..4529ad93ec 100755 --- a/cpp/src/qpid/sys/windows/IoHandlePrivate.h +++ b/cpp/src/qpid/sys/windows/IoHandlePrivate.h @@ -38,15 +38,14 @@ namespace sys { // completer from an I/O thread. If the callback mechanism is used, there // can be a RequestCallback set - this carries the callback object through // from AsynchIO::requestCallback() through to the I/O completion processing. -class IOHandlePrivate { - friend QPID_COMMON_EXTERN SOCKET toSocketHandle(const Socket& s); - static IOHandlePrivate* getImpl(const IOHandle& h); - +class IOHandle { public: - IOHandlePrivate(SOCKET f = INVALID_SOCKET, - windows::AsynchIoResult::Completer cb = 0, - AsynchIO::RequestCallback reqCallback = 0) : - fd(f), event(cb), cbRequest(reqCallback) + IOHandle(SOCKET f = INVALID_SOCKET, + windows::AsynchIoResult::Completer cb = 0, + AsynchIO::RequestCallback reqCallback = 0) : + fd(f), + event(cb), + cbRequest(reqCallback) {} SOCKET fd; @@ -54,8 +53,6 @@ public: AsynchIO::RequestCallback cbRequest; }; -QPID_COMMON_EXTERN SOCKET toSocketHandle(const Socket& s); - }} #endif /* _sys_windows_IoHandlePrivate_h */ diff --git a/cpp/src/qpid/sys/windows/IocpPoller.cpp b/cpp/src/qpid/sys/windows/IocpPoller.cpp index c81cef87b0..ecb33c5517 100755 --- a/cpp/src/qpid/sys/windows/IocpPoller.cpp +++ b/cpp/src/qpid/sys/windows/IocpPoller.cpp @@ -22,7 +22,7 @@ #include "qpid/sys/Poller.h" #include "qpid/sys/Mutex.h" #include "qpid/sys/Dispatcher.h" - +#include "qpid/sys/IOHandle.h" #include "qpid/sys/windows/AsynchIoResult.h" #include "qpid/sys/windows/IoHandlePrivate.h" #include "qpid/sys/windows/check.h" @@ -55,7 +55,7 @@ class PollerHandlePrivate { }; PollerHandle::PollerHandle(const IOHandle& h) : - impl(new PollerHandlePrivate(toSocketHandle(static_cast<const Socket&>(h)), h.impl->event, h.impl->cbRequest)) + impl(new PollerHandlePrivate(h.fd, h.event, h.cbRequest)) {} PollerHandle::~PollerHandle() { diff --git a/cpp/src/qpid/sys/windows/PollableCondition.cpp b/cpp/src/qpid/sys/windows/PollableCondition.cpp index bb637be0a6..7bbcd4de1b 100644 --- a/cpp/src/qpid/sys/windows/PollableCondition.cpp +++ b/cpp/src/qpid/sys/windows/PollableCondition.cpp @@ -57,8 +57,7 @@ private: PollableConditionPrivate::PollableConditionPrivate(const sys::PollableCondition::Callback& cb, sys::PollableCondition& parent, const boost::shared_ptr<sys::Poller>& poller) - : IOHandle(new sys::IOHandlePrivate(INVALID_SOCKET, - boost::bind(&PollableConditionPrivate::dispatch, this, _1))), + : IOHandle(INVALID_SOCKET, boost::bind(&PollableConditionPrivate::dispatch, this, _1)), cb(cb), parent(parent), poller(poller), isSet(0) { } diff --git a/cpp/src/qpid/sys/windows/Socket.cpp b/cpp/src/qpid/sys/windows/WinSocket.cpp index 0c74b3a725..c1ac31de76 100644 --- a/cpp/src/qpid/sys/windows/Socket.cpp +++ b/cpp/src/qpid/sys/windows/WinSocket.cpp @@ -19,20 +19,13 @@ * */ -#include "qpid/sys/Socket.h" +#include "qpid/sys/windows/WinSocket.h" #include "qpid/sys/SocketAddress.h" #include "qpid/sys/windows/check.h" #include "qpid/sys/windows/IoHandlePrivate.h" #include "qpid/sys/SystemInfo.h" -// Ensure we get all of winsock2.h -#ifndef _WIN32_WINNT -#define _WIN32_WINNT 0x0501 -#endif - -#include <winsock2.h> - namespace qpid { namespace sys { @@ -108,22 +101,32 @@ uint16_t getLocalPort(int fd) } } // namespace -Socket::Socket() : - IOHandle(new IOHandlePrivate), +WinSocket::WinSocket() : + handle(new IOHandle), nonblocking(false), nodelay(false) {} -Socket::Socket(IOHandlePrivate* h) : - IOHandle(h), +Socket* createSocket() +{ + return new WinSocket; +} + +WinSocket::WinSocket(SOCKET fd) : + handle(new IOHandle(fd)), nonblocking(false), nodelay(false) {} -void Socket::createSocket(const SocketAddress& sa) const +WinSocket::operator const IOHandle&() const { - SOCKET& socket = impl->fd; - if (socket != INVALID_SOCKET) Socket::close(); + return *handle; +} + +void WinSocket::createSocket(const SocketAddress& sa) const +{ + SOCKET& socket = handle->fd; + if (socket != INVALID_SOCKET) WinSocket::close(); SOCKET s = ::socket (getAddrInfo(sa).ai_family, getAddrInfo(sa).ai_socktype, @@ -141,33 +144,19 @@ void Socket::createSocket(const SocketAddress& sa) const } } -Socket* Socket::createSameTypeSocket() const { - SOCKET& socket = impl->fd; - // Socket currently has no actual socket attached - if (socket == INVALID_SOCKET) - return new Socket; - - ::sockaddr_storage sa; - ::socklen_t salen = sizeof(sa); - QPID_WINSOCK_CHECK(::getsockname(socket, (::sockaddr*)&sa, &salen)); - SOCKET s = ::socket(sa.ss_family, SOCK_STREAM, 0); // Currently only work with SOCK_STREAM - if (s == INVALID_SOCKET) throw QPID_WINDOWS_ERROR(WSAGetLastError()); - return new Socket(new IOHandlePrivate(s)); -} - -void Socket::setNonblocking() const { +void WinSocket::setNonblocking() const { u_long nonblock = 1; - QPID_WINSOCK_CHECK(ioctlsocket(impl->fd, FIONBIO, &nonblock)); + QPID_WINSOCK_CHECK(ioctlsocket(handle->fd, FIONBIO, &nonblock)); } void -Socket::connect(const SocketAddress& addr) const +WinSocket::connect(const SocketAddress& addr) const { peername = addr.asString(false); createSocket(addr); - const SOCKET& socket = impl->fd; + const SOCKET& socket = handle->fd; int err; WSASetLastError(0); if ((::connect(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) != 0) && @@ -176,38 +165,38 @@ Socket::connect(const SocketAddress& addr) const } void -Socket::close() const +WinSocket::close() const { - SOCKET& socket = impl->fd; + SOCKET& socket = handle->fd; if (socket == INVALID_SOCKET) return; QPID_WINSOCK_CHECK(closesocket(socket)); socket = INVALID_SOCKET; } -int Socket::write(const void *buf, size_t count) const +int WinSocket::write(const void *buf, size_t count) const { - const SOCKET& socket = impl->fd; + const SOCKET& socket = handle->fd; int sent = ::send(socket, (const char *)buf, count, 0); if (sent == SOCKET_ERROR) return -1; return sent; } -int Socket::read(void *buf, size_t count) const +int WinSocket::read(void *buf, size_t count) const { - const SOCKET& socket = impl->fd; + const SOCKET& socket = handle->fd; int received = ::recv(socket, (char *)buf, count, 0); if (received == SOCKET_ERROR) return -1; return received; } -int Socket::listen(const SocketAddress& addr, int backlog) const +int WinSocket::listen(const SocketAddress& addr, int backlog) const { createSocket(addr); - const SOCKET& socket = impl->fd; + const SOCKET& socket = handle->fd; BOOL yes=1; QPID_WINSOCK_CHECK(setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, (char *)&yes, sizeof(yes))); @@ -219,48 +208,48 @@ int Socket::listen(const SocketAddress& addr, int backlog) const return getLocalPort(socket); } -Socket* Socket::accept() const +Socket* WinSocket::accept() const { - SOCKET afd = ::accept(impl->fd, 0, 0); + SOCKET afd = ::accept(handle->fd, 0, 0); if (afd != INVALID_SOCKET) - return new Socket(new IOHandlePrivate(afd)); + return new WinSocket(afd); else if (WSAGetLastError() == EAGAIN) return 0; else throw QPID_WINDOWS_ERROR(WSAGetLastError()); } -std::string Socket::getPeerAddress() const +std::string WinSocket::getPeerAddress() const { if (peername.empty()) { - peername = getName(impl->fd, false); + peername = getName(handle->fd, false); } return peername; } -std::string Socket::getLocalAddress() const +std::string WinSocket::getLocalAddress() const { if (localname.empty()) { - localname = getName(impl->fd, true); + localname = getName(handle->fd, true); } return localname; } -int Socket::getError() const +int WinSocket::getError() const { int result; socklen_t rSize = sizeof (result); - QPID_WINSOCK_CHECK(::getsockopt(impl->fd, SOL_SOCKET, SO_ERROR, (char *)&result, &rSize)); + QPID_WINSOCK_CHECK(::getsockopt(handle->fd, SOL_SOCKET, SO_ERROR, (char *)&result, &rSize)); return result; } -void Socket::setTcpNoDelay() const +void WinSocket::setTcpNoDelay() const { - SOCKET& socket = impl->fd; + SOCKET& socket = handle->fd; nodelay = true; if (socket != INVALID_SOCKET) { int flag = 1; - int result = setsockopt(impl->fd, + int result = setsockopt(handle->fd, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, @@ -269,14 +258,4 @@ void Socket::setTcpNoDelay() const } } -inline IOHandlePrivate* IOHandlePrivate::getImpl(const qpid::sys::IOHandle &h) -{ - return h.impl; -} - -SOCKET toSocketHandle(const Socket& s) -{ - return IOHandlePrivate::getImpl(s)->fd; -} - }} // namespace qpid::sys diff --git a/cpp/src/qpid/sys/windows/WinSocket.h b/cpp/src/qpid/sys/windows/WinSocket.h new file mode 100644 index 0000000000..17905a6133 --- /dev/null +++ b/cpp/src/qpid/sys/windows/WinSocket.h @@ -0,0 +1,114 @@ +#ifndef QPID_SYS_WINDOWS_BSDSOCKET_H +#define QPID_SYS_WINDOWS_BSDSOCKET_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/Socket.h" +#include "qpid/sys/IntegerTypes.h" +#include "qpid/CommonImportExport.h" +#include <string> + +#include <boost/scoped_ptr.hpp> + +// Ensure we get all of winsock2.h +#ifndef _WIN32_WINNT +#define _WIN32_WINNT 0x0501 +#endif + +#include <winsock2.h> + +namespace qpid { +namespace sys { + +namespace windows { +Socket* createSameTypeSocket(const Socket&); +} + +class Duration; +class IOHandle; +class SocketAddress; + +class QPID_COMMON_CLASS_EXTERN WinSocket : public Socket +{ +public: + /** Create a socket wrapper for descriptor. */ + QPID_COMMON_EXTERN WinSocket(); + + QPID_COMMON_EXTERN operator const IOHandle&() const; + + /** Set socket non blocking */ + QPID_COMMON_EXTERN virtual void setNonblocking() const; + + QPID_COMMON_EXTERN virtual void setTcpNoDelay() const; + + QPID_COMMON_EXTERN virtual void connect(const SocketAddress&) const; + + QPID_COMMON_EXTERN virtual void close() const; + + /** Bind to a port and start listening. + *@return The bound port number + */ + QPID_COMMON_EXTERN virtual int listen(const SocketAddress&, int backlog = 10) const; + + /** + * Returns an address (host and port) for the remote end of the + * socket + */ + QPID_COMMON_EXTERN std::string getPeerAddress() const; + /** + * Returns an address (host and port) for the local end of the + * socket + */ + QPID_COMMON_EXTERN std::string getLocalAddress() const; + + /** + * Returns the error code stored in the socket. This may be used + * to determine the result of a non-blocking connect. + */ + QPID_COMMON_EXTERN int getError() const; + + /** Accept a connection from a socket that is already listening + * and has an incoming connection + */ + QPID_COMMON_EXTERN virtual Socket* accept() const; + + // TODO The following are raw operations, maybe they need better wrapping? + QPID_COMMON_EXTERN virtual int read(void *buf, size_t count) const; + QPID_COMMON_EXTERN virtual int write(const void *buf, size_t count) const; + +protected: + /** Create socket */ + void createSocket(const SocketAddress&) const; + + mutable boost::scoped_ptr<IOHandle> handle; + mutable std::string localname; + mutable std::string peername; + mutable bool nonblocking; + mutable bool nodelay; + + /** Construct socket with existing handle */ + friend Socket* qpid::sys::windows::createSameTypeSocket(const Socket&); + WinSocket(SOCKET fd); +}; + +}} +#endif /*!QPID_SYS_WINDOWS_BSDSOCKET_H*/ diff --git a/cpp/src/tests/DispatcherTest.cpp b/cpp/src/tests/DispatcherTest.cpp index e1691db584..7312fe8d2e 100644 --- a/cpp/src/tests/DispatcherTest.cpp +++ b/cpp/src/tests/DispatcherTest.cpp @@ -20,7 +20,6 @@ */ #include "qpid/sys/Poller.h" -#include "qpid/sys/IOHandle.h" #include "qpid/sys/Dispatcher.h" #include "qpid/sys/DispatchHandle.h" #include "qpid/sys/posix/PrivatePosix.h" @@ -147,8 +146,8 @@ int main(int /*argc*/, char** /*argv*/) for (int i = 0; i < 8; i++) testString += testString; - PosixIOHandle f0(sv[0]); - PosixIOHandle f1(sv[1]); + IOHandle f0(sv[0]); + IOHandle f1(sv[1]); rh = new DispatchHandleRef(f0, boost::bind(reader, _1, sv[0]), 0, 0); wh = new DispatchHandleRef(f1, 0, boost::bind(writer, _1, sv[1], testString), 0); diff --git a/cpp/src/tests/PollerTest.cpp b/cpp/src/tests/PollerTest.cpp index 9fa5689c5f..5a1d02964c 100644 --- a/cpp/src/tests/PollerTest.cpp +++ b/cpp/src/tests/PollerTest.cpp @@ -23,7 +23,6 @@ * Use socketpair to test the poller */ -#include "qpid/sys/IOHandle.h" #include "qpid/sys/Poller.h" #include "qpid/sys/posix/PrivatePosix.h" @@ -106,8 +105,8 @@ int main(int /*argc*/, char** /*argv*/) auto_ptr<Poller> poller(new Poller); - PosixIOHandle f0(sv[0]); - PosixIOHandle f1(sv[1]); + IOHandle f0(sv[0]); + IOHandle f1(sv[1]); PollerHandle h0(f0); PollerHandle h1(f1); @@ -225,8 +224,8 @@ int main(int /*argc*/, char** /*argv*/) auto_ptr<Poller> poller1(new Poller); - PosixIOHandle f2(sv[0]); - PosixIOHandle f3(sv[1]); + IOHandle f2(sv[0]); + IOHandle f3(sv[1]); PollerHandle h2(f2); PollerHandle h3(f3); |