summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp')
-rw-r--r--cpp/include/qpid/sys/IOHandle.h15
-rw-r--r--cpp/include/qpid/sys/posix/PrivatePosix.h22
-rw-r--r--cpp/src/CMakeLists.txt4
-rw-r--r--cpp/src/Makefile.am6
-rw-r--r--cpp/src/qpid/broker/windows/SslProtocolFactory.cpp8
-rw-r--r--cpp/src/qpid/client/TCPConnector.cpp13
-rw-r--r--cpp/src/qpid/client/TCPConnector.h4
-rw-r--r--cpp/src/qpid/sys/Socket.h54
-rw-r--r--cpp/src/qpid/sys/TCPIOPlugin.cpp6
-rw-r--r--cpp/src/qpid/sys/epoll/EpollPoller.cpp9
-rw-r--r--cpp/src/qpid/sys/posix/BSDSocket.cpp (renamed from cpp/src/qpid/sys/posix/Socket.cpp)96
-rw-r--r--cpp/src/qpid/sys/posix/BSDSocket.h109
-rw-r--r--cpp/src/qpid/sys/posix/IOHandle.cpp15
-rw-r--r--cpp/src/qpid/sys/posix/PollableCondition.cpp7
-rw-r--r--cpp/src/qpid/sys/posix/PosixPoller.cpp8
-rw-r--r--cpp/src/qpid/sys/rdma/rdma_wrap.cpp22
-rw-r--r--cpp/src/qpid/sys/rdma/rdma_wrap.h11
-rw-r--r--cpp/src/qpid/sys/ssl/SslSocket.cpp32
-rw-r--r--cpp/src/qpid/sys/ssl/SslSocket.h10
-rw-r--r--cpp/src/qpid/sys/windows/AsynchIO.cpp39
-rwxr-xr-xcpp/src/qpid/sys/windows/IOHandle.cpp13
-rwxr-xr-xcpp/src/qpid/sys/windows/IoHandlePrivate.h17
-rwxr-xr-xcpp/src/qpid/sys/windows/IocpPoller.cpp4
-rw-r--r--cpp/src/qpid/sys/windows/PollableCondition.cpp3
-rw-r--r--cpp/src/qpid/sys/windows/WinSocket.cpp (renamed from cpp/src/qpid/sys/windows/Socket.cpp)105
-rw-r--r--cpp/src/qpid/sys/windows/WinSocket.h114
-rw-r--r--cpp/src/tests/DispatcherTest.cpp5
-rw-r--r--cpp/src/tests/PollerTest.cpp9
28 files changed, 463 insertions, 297 deletions
diff --git a/cpp/include/qpid/sys/IOHandle.h b/cpp/include/qpid/sys/IOHandle.h
index 45fc8c240a..06ae65f879 100644
--- a/cpp/include/qpid/sys/IOHandle.h
+++ b/cpp/include/qpid/sys/IOHandle.h
@@ -22,8 +22,6 @@
*
*/
-#include "qpid/CommonImportExport.h"
-
namespace qpid {
namespace sys {
@@ -31,18 +29,7 @@ namespace sys {
* This is a class intended to abstract the Unix concept of file descriptor
* or the Windows concept of HANDLE
*/
-class PollerHandle;
-class IOHandlePrivate;
-class IOHandle {
- friend class PollerHandle;
- friend class IOHandlePrivate;
-
-protected:
- IOHandlePrivate* const impl;
-
- IOHandle(IOHandlePrivate*);
- QPID_COMMON_EXTERN virtual ~IOHandle();
-};
+class IOHandle;
}}
diff --git a/cpp/include/qpid/sys/posix/PrivatePosix.h b/cpp/include/qpid/sys/posix/PrivatePosix.h
index 79cb950275..0f59fe3176 100644
--- a/cpp/include/qpid/sys/posix/PrivatePosix.h
+++ b/cpp/include/qpid/sys/posix/PrivatePosix.h
@@ -23,7 +23,6 @@
*/
#include "qpid/sys/Time.h"
-#include "qpid/sys/IOHandle.h"
struct timespec;
struct timeval;
@@ -41,32 +40,21 @@ Duration toTime(const struct timespec& ts);
class SocketAddress;
const struct addrinfo& getAddrInfo(const SocketAddress&);
-// Private fd related implementation details
-class IOHandlePrivate {
+// Posix fd as an IOHandle
+class IOHandle {
public:
- IOHandlePrivate(int f = -1) :
- fd(f)
+ IOHandle(int fd0 = -1) :
+ fd(fd0)
{}
int fd;
};
-int toFd(const IOHandlePrivate* h);
-
-// Posix fd as an IOHandle
-class PosixIOHandle : public IOHandle {
-public:
- PosixIOHandle(int fd) :
- IOHandle(new IOHandlePrivate(fd))
- {}
-};
-
// Dummy IOHandle for places it's required in the API
// but we promise not to actually try to do any operations on the IOHandle
class NullIOHandle : public IOHandle {
public:
- NullIOHandle() :
- IOHandle(new IOHandlePrivate)
+ NullIOHandle()
{}
};
diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt
index ce94c0a318..20868da674 100644
--- a/cpp/src/CMakeLists.txt
+++ b/cpp/src/CMakeLists.txt
@@ -748,7 +748,7 @@ if (CMAKE_SYSTEM_NAME STREQUAL Windows)
qpid/sys/windows/PipeHandle.cpp
qpid/sys/windows/PollableCondition.cpp
qpid/sys/windows/Shlib.cpp
- qpid/sys/windows/Socket.cpp
+ qpid/sys/windows/WinSocket.cpp
qpid/sys/windows/SocketAddress.cpp
qpid/sys/windows/StrError.cpp
qpid/sys/windows/SystemInfo.cpp
@@ -853,7 +853,7 @@ else (CMAKE_SYSTEM_NAME STREQUAL Windows)
qpid/sys/posix/PollableCondition.cpp
qpid/sys/posix/Shlib.cpp
qpid/log/posix/SinkOptions.cpp
- qpid/sys/posix/Socket.cpp
+ qpid/sys/posix/BSDSocket.cpp
qpid/sys/posix/SocketAddress.cpp
qpid/sys/posix/StrError.cpp
qpid/sys/posix/Thread.cpp
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 8149ea218b..488eb37eb0 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -46,7 +46,6 @@ windows_dist = \
qpid/sys/windows/QpidDllMain.h \
qpid/sys/windows/Shlib.cpp \
qpid/sys/windows/SocketAddress.cpp \
- qpid/sys/windows/Socket.cpp \
qpid/sys/windows/SslAsynchIO.cpp \
qpid/sys/windows/SslAsynchIO.h \
qpid/sys/windows/StrError.cpp \
@@ -56,6 +55,8 @@ windows_dist = \
../include/qpid/sys/windows/Time.h \
qpid/sys/windows/uuid.cpp \
qpid/sys/windows/uuid.h \
+ qpid/sys/windows/WinSocket.cpp \
+ qpid/sys/windows/WinSocket.h \
windows/QpiddBroker.cpp \
windows/SCM.h \
windows/SCM.cpp \
@@ -158,7 +159,8 @@ qpidd_SOURCES = qpidd.cpp qpidd.h $(posix_qpidd_src)
libqpidcommon_la_SOURCES += \
qpid/log/posix/SinkOptions.cpp \
qpid/sys/posix/IOHandle.cpp \
- qpid/sys/posix/Socket.cpp \
+ qpid/sys/posix/BSDSocket.cpp \
+ qpid/sys/posix/BSDSocket.h \
qpid/sys/posix/SocketAddress.cpp \
qpid/sys/posix/AsynchIO.cpp \
qpid/sys/posix/FileSysDir.cpp \
diff --git a/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp b/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp
index ff177ba499..5b801aa69f 100644
--- a/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp
+++ b/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp
@@ -158,7 +158,7 @@ SslProtocolFactory::SslProtocolFactory(const SslServerOptions& options,
// Get the certificate for this server.
DWORD flags = 0;
std::string certStoreLocation = options.certStoreLocation;
- std::transform(certStoreLocation.begin(), certStoreLocation.end(), certStoreLocation.begin(), ::tolower);
+ std::transform(certStoreLocation.begin(), certStoreLocation.end(), certStoreLocation.begin(), ::tolower);
if (certStoreLocation == "currentuser") {
flags = CERT_SYSTEM_STORE_CURRENT_USER;
} else if (certStoreLocation == "localmachine") {
@@ -217,14 +217,14 @@ SslProtocolFactory::SslProtocolFactory(const SslServerOptions& options,
// We must have at least one resolved address
QPID_LOG(info, "SSL Listening to: " << sa.asString())
- Socket* s = new Socket;
+ Socket* s = createSocket();
listeningPort = s->listen(sa, backlog);
listeners.push_back(s);
// Try any other resolved addresses
while (sa.nextAddress()) {
QPID_LOG(info, "SSL Listening to: " << sa.asString())
- Socket* s = new Socket;
+ Socket* s = createSocket();
s->listen(sa, backlog);
listeners.push_back(s);
}
@@ -325,7 +325,7 @@ void SslProtocolFactory::connect(sys::Poller::shared_ptr poller,
// upon connection failure or by the AsynchIO upon connection
// shutdown. The allocated AsynchConnector frees itself when it
// is no longer needed.
- qpid::sys::Socket* socket = new qpid::sys::Socket();
+ qpid::sys::Socket* socket = createSocket();
connectFailedCallback = failed;
AsynchConnector::create(*socket,
host,
diff --git a/cpp/src/qpid/client/TCPConnector.cpp b/cpp/src/qpid/client/TCPConnector.cpp
index a14acb214c..b92f342b74 100644
--- a/cpp/src/qpid/client/TCPConnector.cpp
+++ b/cpp/src/qpid/client/TCPConnector.cpp
@@ -72,12 +72,13 @@ TCPConnector::TCPConnector(Poller::shared_ptr p,
closed(true),
shutdownHandler(0),
input(0),
+ socket(createSocket()),
connector(0),
aio(0),
poller(p)
{
QPID_LOG(debug, "TCPConnector created for " << version);
- settings.configureSocket(socket);
+ settings.configureSocket(*socket);
}
TCPConnector::~TCPConnector() {
@@ -88,7 +89,7 @@ void TCPConnector::connect(const std::string& host, const std::string& port) {
Mutex::ScopedLock l(lock);
assert(closed);
connector = AsynchConnector::create(
- socket,
+ *socket,
host, port,
boost::bind(&TCPConnector::connected, this, _1),
boost::bind(&TCPConnector::connectFailed, this, _3));
@@ -99,7 +100,7 @@ void TCPConnector::connect(const std::string& host, const std::string& port) {
void TCPConnector::connected(const Socket&) {
connector = 0;
- aio = AsynchIO::create(socket,
+ aio = AsynchIO::create(*socket,
boost::bind(&TCPConnector::readbuff, this, _1, _2),
boost::bind(&TCPConnector::eof, this, _1),
boost::bind(&TCPConnector::disconnected, this, _1),
@@ -116,7 +117,7 @@ void TCPConnector::start(sys::AsynchIO* aio_) {
aio->createBuffers(maxFrameSize);
- identifier = str(format("[%1%]") % socket.getFullAddress());
+ identifier = str(format("[%1%]") % socket->getFullAddress());
}
void TCPConnector::initAmqp() {
@@ -127,7 +128,7 @@ void TCPConnector::initAmqp() {
void TCPConnector::connectFailed(const std::string& msg) {
connector = 0;
QPID_LOG(warning, "Connect failed: " << msg);
- socket.close();
+ socket->close();
if (!closed)
closed = true;
if (shutdownHandler)
@@ -318,7 +319,7 @@ void TCPConnector::eof(AsynchIO&) {
void TCPConnector::disconnected(AsynchIO&) {
close();
- socketClosed(*aio, socket);
+ socketClosed(*aio, *socket);
}
void TCPConnector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer> sl)
diff --git a/cpp/src/qpid/client/TCPConnector.h b/cpp/src/qpid/client/TCPConnector.h
index 5e1a3856e6..a90dffd3ef 100644
--- a/cpp/src/qpid/client/TCPConnector.h
+++ b/cpp/src/qpid/client/TCPConnector.h
@@ -35,7 +35,7 @@
#include "qpid/sys/Thread.h"
#include <boost/shared_ptr.hpp>
-#include <boost/weak_ptr.hpp>
+#include <boost/scoped_ptr.hpp>
#include <deque>
#include <string>
@@ -66,7 +66,7 @@ class TCPConnector : public Connector, public sys::Codec
sys::ShutdownHandler* shutdownHandler;
framing::InputHandler* input;
- sys::Socket socket;
+ boost::scoped_ptr<sys::Socket> socket;
sys::AsynchConnector* connector;
sys::AsynchIO* aio;
diff --git a/cpp/src/qpid/sys/Socket.h b/cpp/src/qpid/sys/Socket.h
index aa8a8a31d9..2119566d99 100644
--- a/cpp/src/qpid/sys/Socket.h
+++ b/cpp/src/qpid/sys/Socket.h
@@ -22,7 +22,6 @@
*
*/
-#include "qpid/sys/IOHandle.h"
#include "qpid/sys/IntegerTypes.h"
#include "qpid/CommonImportExport.h"
#include <string>
@@ -31,47 +30,42 @@ namespace qpid {
namespace sys {
class Duration;
+class IOHandle;
class SocketAddress;
-namespace ssl {
-class SslMuxSocket;
-}
-
-class QPID_COMMON_CLASS_EXTERN Socket : public IOHandle
+class Socket
{
public:
- /** Create a socket wrapper for descriptor. */
- QPID_COMMON_EXTERN Socket();
+ virtual ~Socket() {};
- /** Create a new Socket which is the same address family as this one */
- QPID_COMMON_EXTERN Socket* createSameTypeSocket() const;
+ virtual operator const IOHandle&() const = 0;
/** Set socket non blocking */
- void setNonblocking() const;
+ virtual void setNonblocking() const = 0;
- QPID_COMMON_EXTERN void setTcpNoDelay() const;
+ virtual void setTcpNoDelay() const = 0;
- QPID_COMMON_EXTERN void connect(const SocketAddress&) const;
+ virtual void connect(const SocketAddress&) const = 0;
- QPID_COMMON_EXTERN void close() const;
+ virtual void close() const = 0;
/** Bind to a port and start listening.
*@param port 0 means choose an available port.
*@param backlog maximum number of pending connections.
*@return The bound port.
*/
- QPID_COMMON_EXTERN int listen(const SocketAddress&, int backlog = 10) const;
+ virtual int listen(const SocketAddress&, int backlog = 10) const = 0;
/**
* Returns an address (host and port) for the remote end of the
* socket
*/
- QPID_COMMON_EXTERN std::string getPeerAddress() const;
+ virtual std::string getPeerAddress() const = 0;
/**
* Returns an address (host and port) for the local end of the
* socket
*/
- QPID_COMMON_EXTERN std::string getLocalAddress() const;
+ virtual std::string getLocalAddress() const = 0;
/**
* Returns the full address of the connection: local and remote host and port.
@@ -82,30 +76,20 @@ public:
* Returns the error code stored in the socket. This may be used
* to determine the result of a non-blocking connect.
*/
- QPID_COMMON_EXTERN int getError() const;
+ virtual int getError() const = 0;
/** Accept a connection from a socket that is already listening
* and has an incoming connection
*/
- QPID_COMMON_EXTERN Socket* accept() const;
-
- // TODO The following are raw operations, maybe they need better wrapping?
- QPID_COMMON_EXTERN int read(void *buf, size_t count) const;
- QPID_COMMON_EXTERN int write(const void *buf, size_t count) const;
-
-protected:
- /** Create socket */
- void createSocket(const SocketAddress&) const;
+ virtual Socket* accept() const = 0;
- mutable std::string localname;
- mutable std::string peername;
- mutable bool nonblocking;
- mutable bool nodelay;
-
- /** Construct socket with existing handle */
- Socket(IOHandlePrivate*);
- friend class qpid::sys::ssl::SslMuxSocket;
+ virtual int read(void *buf, size_t count) const = 0;
+ virtual int write(const void *buf, size_t count) const = 0;
};
+/** Make the default socket for whatever platform we are executing on
+ */
+QPID_COMMON_EXTERN Socket* createSocket();
+
}}
#endif /*!_sys_Socket_h*/
diff --git a/cpp/src/qpid/sys/TCPIOPlugin.cpp b/cpp/src/qpid/sys/TCPIOPlugin.cpp
index ed7cc3748d..2ff47e982c 100644
--- a/cpp/src/qpid/sys/TCPIOPlugin.cpp
+++ b/cpp/src/qpid/sys/TCPIOPlugin.cpp
@@ -126,7 +126,7 @@ AsynchIOProtocolFactory::AsynchIOProtocolFactory(const std::string& host, const
// We must have at least one resolved address
QPID_LOG(info, "Listening to: " << sa.asString())
- Socket* s = new Socket;
+ Socket* s = createSocket();
uint16_t lport = s->listen(sa, backlog);
QPID_LOG(debug, "Listened to: " << lport);
listeners.push_back(s);
@@ -138,7 +138,7 @@ AsynchIOProtocolFactory::AsynchIOProtocolFactory(const std::string& host, const
// Hack to ensure that all listening connections are on the same port
sa.setAddrInfoPort(listeningPort);
QPID_LOG(info, "Listening to: " << sa.asString())
- Socket* s = new Socket;
+ Socket* s = createSocket();
uint16_t lport = s->listen(sa, backlog);
QPID_LOG(debug, "Listened to: " << lport);
listeners.push_back(s);
@@ -204,7 +204,7 @@ void AsynchIOProtocolFactory::connect(
// upon connection failure or by the AsynchIO upon connection
// shutdown. The allocated AsynchConnector frees itself when it
// is no longer needed.
- Socket* socket = new Socket();
+ Socket* socket = createSocket();
try {
AsynchConnector* c = AsynchConnector::create(
*socket,
diff --git a/cpp/src/qpid/sys/epoll/EpollPoller.cpp b/cpp/src/qpid/sys/epoll/EpollPoller.cpp
index c23403c66d..3769a11f7d 100644
--- a/cpp/src/qpid/sys/epoll/EpollPoller.cpp
+++ b/cpp/src/qpid/sys/epoll/EpollPoller.cpp
@@ -20,7 +20,6 @@
*/
#include "qpid/sys/Poller.h"
-#include "qpid/sys/IOHandle.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/AtomicCount.h"
#include "qpid/sys/DeletionManager.h"
@@ -64,12 +63,12 @@ class PollerHandlePrivate {
};
::__uint32_t events;
- const IOHandlePrivate* ioHandle;
+ const IOHandle* ioHandle;
PollerHandle* pollerHandle;
FDStat stat;
Mutex lock;
- PollerHandlePrivate(const IOHandlePrivate* h, PollerHandle* p) :
+ PollerHandlePrivate(const IOHandle* h, PollerHandle* p) :
events(0),
ioHandle(h),
pollerHandle(p),
@@ -77,7 +76,7 @@ class PollerHandlePrivate {
}
int fd() const {
- return toFd(ioHandle);
+ return ioHandle->fd;
}
bool isActive() const {
@@ -138,7 +137,7 @@ class PollerHandlePrivate {
};
PollerHandle::PollerHandle(const IOHandle& h) :
- impl(new PollerHandlePrivate(h.impl, this))
+ impl(new PollerHandlePrivate(&h, this))
{}
PollerHandle::~PollerHandle() {
diff --git a/cpp/src/qpid/sys/posix/Socket.cpp b/cpp/src/qpid/sys/posix/BSDSocket.cpp
index 0c01374369..265142f629 100644
--- a/cpp/src/qpid/sys/posix/Socket.cpp
+++ b/cpp/src/qpid/sys/posix/BSDSocket.cpp
@@ -19,7 +19,7 @@
*
*/
-#include "qpid/sys/Socket.h"
+#include "qpid/sys/posix/BSDSocket.h"
#include "qpid/sys/SocketAddress.h"
#include "qpid/sys/posix/check.h"
@@ -67,25 +67,41 @@ uint16_t getLocalPort(int fd)
}
}
-Socket::Socket() :
- IOHandle(new IOHandlePrivate),
+BSDSocket::BSDSocket() :
+ fd(-1),
+ handle(new IOHandle),
nonblocking(false),
nodelay(false)
{}
-Socket::Socket(IOHandlePrivate* h) :
- IOHandle(h),
+Socket* createSocket()
+{
+ return new BSDSocket;
+}
+
+BSDSocket::BSDSocket(int fd0) :
+ fd(fd0),
+ handle(new IOHandle(fd)),
nonblocking(false),
nodelay(false)
{}
-void Socket::createSocket(const SocketAddress& sa) const
+BSDSocket::~BSDSocket()
+{}
+
+BSDSocket::operator const IOHandle&() const
+{
+ return *handle;
+}
+
+void BSDSocket::createSocket(const SocketAddress& sa) const
{
- int& socket = impl->fd;
- if (socket != -1) Socket::close();
+ int& socket = fd;
+ if (socket != -1) BSDSocket::close();
int s = ::socket(getAddrInfo(sa).ai_family, getAddrInfo(sa).ai_socktype, 0);
if (s < 0) throw QPID_POSIX_ERROR(errno);
socket = s;
+ *handle = IOHandle(s);
try {
if (nonblocking) setNonblocking();
@@ -98,44 +114,31 @@ void Socket::createSocket(const SocketAddress& sa) const
} catch (std::exception&) {
::close(s);
socket = -1;
+ *handle = IOHandle();
throw;
}
}
-Socket* Socket::createSameTypeSocket() const {
- int& socket = impl->fd;
- // Socket currently has no actual socket attached
- if (socket == -1)
- return new Socket;
-
- ::sockaddr_storage sa;
- ::socklen_t salen = sizeof(sa);
- QPID_POSIX_CHECK(::getsockname(socket, (::sockaddr*)&sa, &salen));
- int s = ::socket(sa.ss_family, SOCK_STREAM, 0); // Currently only work with SOCK_STREAM
- if (s < 0) throw QPID_POSIX_ERROR(errno);
- return new Socket(new IOHandlePrivate(s));
-}
-
-void Socket::setNonblocking() const {
- int& socket = impl->fd;
+void BSDSocket::setNonblocking() const {
+ int& socket = fd;
nonblocking = true;
if (socket != -1) {
QPID_POSIX_CHECK(::fcntl(socket, F_SETFL, O_NONBLOCK));
}
}
-void Socket::setTcpNoDelay() const
+void BSDSocket::setTcpNoDelay() const
{
- int& socket = impl->fd;
+ int& socket = fd;
nodelay = true;
if (socket != -1) {
int flag = 1;
- int result = ::setsockopt(impl->fd, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag));
+ int result = ::setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag));
QPID_POSIX_CHECK(result);
}
}
-void Socket::connect(const SocketAddress& addr) const
+void BSDSocket::connect(const SocketAddress& addr) const
{
// The display name for an outbound connection needs to be the name that was specified
// for the address rather than a resolved IP address as we don't know which of
@@ -148,7 +151,7 @@ void Socket::connect(const SocketAddress& addr) const
createSocket(addr);
- const int& socket = impl->fd;
+ const int& socket = fd;
// TODO the correct thing to do here is loop on failure until you've used all the returned addresses
if ((::connect(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) < 0) &&
(errno != EINPROGRESS)) {
@@ -174,19 +177,20 @@ void Socket::connect(const SocketAddress& addr) const
}
void
-Socket::close() const
+BSDSocket::close() const
{
- int& socket = impl->fd;
+ int& socket = fd;
if (socket == -1) return;
if (::close(socket) < 0) throw QPID_POSIX_ERROR(errno);
socket = -1;
+ *handle = IOHandle();
}
-int Socket::listen(const SocketAddress& sa, int backlog) const
+int BSDSocket::listen(const SocketAddress& sa, int backlog) const
{
createSocket(sa);
- const int& socket = impl->fd;
+ const int& socket = fd;
int yes=1;
QPID_POSIX_CHECK(::setsockopt(socket,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(yes)));
@@ -198,11 +202,11 @@ int Socket::listen(const SocketAddress& sa, int backlog) const
return getLocalPort(socket);
}
-Socket* Socket::accept() const
+Socket* BSDSocket::accept() const
{
- int afd = ::accept(impl->fd, 0, 0);
+ int afd = ::accept(fd, 0, 0);
if ( afd >= 0) {
- Socket* s = new Socket(new IOHandlePrivate(afd));
+ BSDSocket* s = new BSDSocket(afd);
s->localname = localname;
return s;
}
@@ -211,38 +215,38 @@ Socket* Socket::accept() const
else throw QPID_POSIX_ERROR(errno);
}
-int Socket::read(void *buf, size_t count) const
+int BSDSocket::read(void *buf, size_t count) const
{
- return ::read(impl->fd, buf, count);
+ return ::read(fd, buf, count);
}
-int Socket::write(const void *buf, size_t count) const
+int BSDSocket::write(const void *buf, size_t count) const
{
- return ::write(impl->fd, buf, count);
+ return ::write(fd, buf, count);
}
-std::string Socket::getPeerAddress() const
+std::string BSDSocket::getPeerAddress() const
{
if (peername.empty()) {
- peername = getName(impl->fd, false);
+ peername = getName(fd, false);
}
return peername;
}
-std::string Socket::getLocalAddress() const
+std::string BSDSocket::getLocalAddress() const
{
if (localname.empty()) {
- localname = getName(impl->fd, true);
+ localname = getName(fd, true);
}
return localname;
}
-int Socket::getError() const
+int BSDSocket::getError() const
{
int result;
socklen_t rSize = sizeof (result);
- if (::getsockopt(impl->fd, SOL_SOCKET, SO_ERROR, &result, &rSize) < 0)
+ if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &result, &rSize) < 0)
throw QPID_POSIX_ERROR(errno);
return result;
diff --git a/cpp/src/qpid/sys/posix/BSDSocket.h b/cpp/src/qpid/sys/posix/BSDSocket.h
new file mode 100644
index 0000000000..98d7eb6e4d
--- /dev/null
+++ b/cpp/src/qpid/sys/posix/BSDSocket.h
@@ -0,0 +1,109 @@
+#ifndef QPID_SYS_BSDSOCKET_H
+#define QPID_SYS_BSDSOCKET_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Socket.h"
+#include "qpid/sys/IntegerTypes.h"
+#include "qpid/CommonImportExport.h"
+#include <string>
+
+#include <boost/scoped_ptr.hpp>
+
+namespace qpid {
+namespace sys {
+
+class Duration;
+class IOHandle;
+class SocketAddress;
+
+namespace ssl {
+class SslMuxSocket;
+}
+
+class QPID_COMMON_CLASS_EXTERN BSDSocket : public Socket
+{
+public:
+ /** Create a socket wrapper for descriptor. */
+ QPID_COMMON_EXTERN BSDSocket();
+ QPID_COMMON_EXTERN ~BSDSocket();
+
+ QPID_COMMON_EXTERN operator const IOHandle&() const;
+
+ /** Set socket non blocking */
+ QPID_COMMON_EXTERN virtual void setNonblocking() const;
+
+ QPID_COMMON_EXTERN virtual void setTcpNoDelay() const;
+
+ QPID_COMMON_EXTERN virtual void connect(const SocketAddress&) const;
+
+ QPID_COMMON_EXTERN virtual void close() const;
+
+ /** Bind to a port and start listening.
+ *@return The bound port number
+ */
+ QPID_COMMON_EXTERN virtual int listen(const SocketAddress&, int backlog = 10) const;
+
+ /**
+ * Returns an address (host and port) for the remote end of the
+ * socket
+ */
+ QPID_COMMON_EXTERN std::string getPeerAddress() const;
+ /**
+ * Returns an address (host and port) for the local end of the
+ * socket
+ */
+ QPID_COMMON_EXTERN std::string getLocalAddress() const;
+
+ /**
+ * Returns the error code stored in the socket. This may be used
+ * to determine the result of a non-blocking connect.
+ */
+ QPID_COMMON_EXTERN int getError() const;
+
+ /** Accept a connection from a socket that is already listening
+ * and has an incoming connection
+ */
+ QPID_COMMON_EXTERN virtual Socket* accept() const;
+
+ // TODO The following are raw operations, maybe they need better wrapping?
+ QPID_COMMON_EXTERN virtual int read(void *buf, size_t count) const;
+ QPID_COMMON_EXTERN virtual int write(const void *buf, size_t count) const;
+
+protected:
+ /** Create socket */
+ void createSocket(const SocketAddress&) const;
+
+ mutable int fd;
+ mutable boost::scoped_ptr<IOHandle> handle;
+ mutable std::string localname;
+ mutable std::string peername;
+ mutable bool nonblocking;
+ mutable bool nodelay;
+
+ /** Construct socket with existing handle */
+ BSDSocket(int fd);
+ friend class qpid::sys::ssl::SslMuxSocket; // Needed for this constructor
+};
+
+}}
+#endif /*!QPID_SYS_BSDSOCKET_H*/
diff --git a/cpp/src/qpid/sys/posix/IOHandle.cpp b/cpp/src/qpid/sys/posix/IOHandle.cpp
index 9c049ee1de..d3f502a63c 100644
--- a/cpp/src/qpid/sys/posix/IOHandle.cpp
+++ b/cpp/src/qpid/sys/posix/IOHandle.cpp
@@ -19,26 +19,11 @@
*
*/
-#include "qpid/sys/IOHandle.h"
-
#include "qpid/sys/posix/PrivatePosix.h"
namespace qpid {
namespace sys {
-int toFd(const IOHandlePrivate* h)
-{
- return h->fd;
-}
-
NullIOHandle DummyIOHandle;
-IOHandle::IOHandle(IOHandlePrivate* h) :
- impl(h)
-{}
-
-IOHandle::~IOHandle() {
- delete impl;
-}
-
}} // namespace qpid::sys
diff --git a/cpp/src/qpid/sys/posix/PollableCondition.cpp b/cpp/src/qpid/sys/posix/PollableCondition.cpp
index abff8a5be8..aa129faf20 100644
--- a/cpp/src/qpid/sys/posix/PollableCondition.cpp
+++ b/cpp/src/qpid/sys/posix/PollableCondition.cpp
@@ -21,7 +21,6 @@
#include "qpid/sys/PollableCondition.h"
#include "qpid/sys/DispatchHandle.h"
-#include "qpid/sys/IOHandle.h"
#include "qpid/sys/posix/PrivatePosix.h"
#include "qpid/Exception.h"
@@ -58,14 +57,14 @@ PollableConditionPrivate::PollableConditionPrivate(
const sys::PollableCondition::Callback& cb,
sys::PollableCondition& parent,
const boost::shared_ptr<sys::Poller>& poller
-) : IOHandle(new sys::IOHandlePrivate), cb(cb), parent(parent)
+) : cb(cb), parent(parent)
{
int fds[2];
if (::pipe(fds) == -1)
throw ErrnoException(QPID_MSG("Can't create PollableCondition"));
- impl->fd = fds[0];
+ fd = fds[0];
writeFd = fds[1];
- if (::fcntl(impl->fd, F_SETFL, O_NONBLOCK) == -1)
+ if (::fcntl(fd, F_SETFL, O_NONBLOCK) == -1)
throw ErrnoException(QPID_MSG("Can't create PollableCondition"));
if (::fcntl(writeFd, F_SETFL, O_NONBLOCK) == -1)
throw ErrnoException(QPID_MSG("Can't create PollableCondition"));
diff --git a/cpp/src/qpid/sys/posix/PosixPoller.cpp b/cpp/src/qpid/sys/posix/PosixPoller.cpp
index eb0c3384d1..ae839b2e20 100644
--- a/cpp/src/qpid/sys/posix/PosixPoller.cpp
+++ b/cpp/src/qpid/sys/posix/PosixPoller.cpp
@@ -88,12 +88,12 @@ class PollerHandlePrivate {
};
short events;
- const IOHandlePrivate* ioHandle;
+ const IOHandle* ioHandle;
PollerHandle* pollerHandle;
FDStat stat;
Mutex lock;
- PollerHandlePrivate(const IOHandlePrivate* h, PollerHandle* p) :
+ PollerHandlePrivate(const IOHandle* h, PollerHandle* p) :
events(0),
ioHandle(h),
pollerHandle(p),
@@ -101,7 +101,7 @@ class PollerHandlePrivate {
}
int fd() const {
- return toFd(ioHandle);
+ return ioHandle->fd;
}
bool isActive() const {
@@ -162,7 +162,7 @@ class PollerHandlePrivate {
};
PollerHandle::PollerHandle(const IOHandle& h) :
- impl(new PollerHandlePrivate(h.impl, this))
+ impl(new PollerHandlePrivate(&h, this))
{}
PollerHandle::~PollerHandle() {
diff --git a/cpp/src/qpid/sys/rdma/rdma_wrap.cpp b/cpp/src/qpid/sys/rdma/rdma_wrap.cpp
index efe454c5be..889ee9ff75 100644
--- a/cpp/src/qpid/sys/rdma/rdma_wrap.cpp
+++ b/cpp/src/qpid/sys/rdma/rdma_wrap.cpp
@@ -105,7 +105,7 @@ namespace Rdma {
}
QueuePair::QueuePair(boost::shared_ptr< ::rdma_cm_id > i) :
- qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate),
+ handle(new qpid::sys::IOHandle),
pd(allocPd(i->verbs)),
cchannel(mkCChannel(i->verbs)),
scq(mkCq(i->verbs, DEFAULT_CQ_ENTRIES, 0, cchannel.get())),
@@ -113,7 +113,7 @@ namespace Rdma {
outstandingSendEvents(0),
outstandingRecvEvents(0)
{
- impl->fd = cchannel->fd;
+ handle->fd = cchannel->fd;
// Set cq context to this QueuePair object so we can find
// ourselves again
@@ -163,6 +163,11 @@ namespace Rdma {
// The buffers vectors automatically deletes all the buffers we've allocated
}
+ QueuePair::operator qpid::sys::IOHandle&() const
+ {
+ return *handle;
+ }
+
// Create buffers to use for writing
void QueuePair::createSendBuffers(int sendBufferCount, int bufferSize, int reserved)
{
@@ -359,11 +364,11 @@ namespace Rdma {
// Wrap the passed in rdma_cm_id with a Connection
// this basically happens only on connection request
Connection::Connection(::rdma_cm_id* i) :
- qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate),
+ handle(new qpid::sys::IOHandle),
id(mkId(i)),
context(0)
{
- impl->fd = id->channel->fd;
+ handle->fd = id->channel->fd;
// Just overwrite the previous context as it will
// have come from the listening connection
@@ -372,12 +377,12 @@ namespace Rdma {
}
Connection::Connection() :
- qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate),
+ handle(new qpid::sys::IOHandle),
channel(mkEChannel()),
id(mkId(channel.get(), this, RDMA_PS_TCP)),
context(0)
{
- impl->fd = channel->fd;
+ handle->fd = channel->fd;
}
Connection::~Connection() {
@@ -385,6 +390,11 @@ namespace Rdma {
id->context = 0;
}
+ Connection::operator qpid::sys::IOHandle&() const
+ {
+ return *handle;
+ }
+
void Connection::ensureQueuePair() {
assert(id.get());
diff --git a/cpp/src/qpid/sys/rdma/rdma_wrap.h b/cpp/src/qpid/sys/rdma/rdma_wrap.h
index 8e3429027b..5f84793a5b 100644
--- a/cpp/src/qpid/sys/rdma/rdma_wrap.h
+++ b/cpp/src/qpid/sys/rdma/rdma_wrap.h
@@ -28,6 +28,7 @@
#include "qpid/sys/Mutex.h"
#include <boost/shared_ptr.hpp>
+#include <boost/scoped_ptr.hpp>
#include <boost/intrusive_ptr.hpp>
#include <boost/ptr_container/ptr_deque.hpp>
@@ -116,9 +117,10 @@ namespace Rdma {
// Wrapper for a queue pair - this has the functionality for
// putting buffers on the receive queue and for sending buffers
// to the other end of the connection.
- class QueuePair : public qpid::sys::IOHandle, public qpid::RefCounted {
+ class QueuePair : public qpid::RefCounted {
friend class Connection;
+ boost::scoped_ptr< qpid::sys::IOHandle > handle;
boost::shared_ptr< ::ibv_pd > pd;
boost::shared_ptr< ::ibv_mr > smr;
boost::shared_ptr< ::ibv_mr > rmr;
@@ -139,6 +141,8 @@ namespace Rdma {
public:
typedef boost::intrusive_ptr<QueuePair> intrusive_ptr;
+ operator qpid::sys::IOHandle&() const;
+
// Create a buffers to use for writing
void createSendBuffers(int sendBufferCount, int dataSize, int headerSize);
@@ -195,7 +199,8 @@ namespace Rdma {
// registered buffers can't be shared between different connections
// (this can only happen between connections on the same controller in any case,
// so needs careful management if used)
- class Connection : public qpid::sys::IOHandle, public qpid::RefCounted {
+ class Connection : public qpid::RefCounted {
+ boost::scoped_ptr< qpid::sys::IOHandle > handle;
boost::shared_ptr< ::rdma_event_channel > channel;
boost::shared_ptr< ::rdma_cm_id > id;
QueuePair::intrusive_ptr qp;
@@ -216,6 +221,8 @@ namespace Rdma {
public:
typedef boost::intrusive_ptr<Connection> intrusive_ptr;
+ operator qpid::sys::IOHandle&() const;
+
static intrusive_ptr make();
static intrusive_ptr find(::rdma_cm_id* i);
diff --git a/cpp/src/qpid/sys/ssl/SslSocket.cpp b/cpp/src/qpid/sys/ssl/SslSocket.cpp
index 6b6f326492..22f9f63fff 100644
--- a/cpp/src/qpid/sys/ssl/SslSocket.cpp
+++ b/cpp/src/qpid/sys/ssl/SslSocket.cpp
@@ -98,16 +98,16 @@ SslSocket::SslSocket(const std::string& certName, bool clientAuth) :
* returned from accept. Because we use posix accept rather than
* PR_Accept, we have to reset the handshake.
*/
-SslSocket::SslSocket(IOHandlePrivate* ioph, PRFileDesc* model) : Socket(ioph), nssSocket(0), prototype(0)
+SslSocket::SslSocket(int fd, PRFileDesc* model) : BSDSocket(fd), nssSocket(0), prototype(0)
{
- nssSocket = SSL_ImportFD(model, PR_ImportTCPSocket(impl->fd));
+ nssSocket = SSL_ImportFD(model, PR_ImportTCPSocket(fd));
NSS_CHECK(SSL_ResetHandshake(nssSocket, PR_TRUE));
}
void SslSocket::setNonblocking() const
{
if (!nssSocket) {
- Socket::setNonblocking();
+ BSDSocket::setNonblocking();
return;
}
PRSocketOptionData option;
@@ -119,7 +119,7 @@ void SslSocket::setNonblocking() const
void SslSocket::setTcpNoDelay() const
{
if (!nssSocket) {
- Socket::setTcpNoDelay();
+ BSDSocket::setTcpNoDelay();
return;
}
PRSocketOptionData option;
@@ -130,9 +130,9 @@ void SslSocket::setTcpNoDelay() const
void SslSocket::connect(const SocketAddress& addr) const
{
- Socket::connect(addr);
+ BSDSocket::connect(addr);
- nssSocket = SSL_ImportFD(0, PR_ImportTCPSocket(impl->fd));
+ nssSocket = SSL_ImportFD(0, PR_ImportTCPSocket(fd));
void* arg;
// Use the connection's cert-name if it has one; else use global cert-name
@@ -155,12 +155,12 @@ void SslSocket::connect(const SocketAddress& addr) const
void SslSocket::close() const
{
if (!nssSocket) {
- Socket::close();
+ BSDSocket::close();
return;
}
- if (impl->fd > 0) {
+ if (fd > 0) {
PR_Close(nssSocket);
- impl->fd = -1;
+ fd = -1;
}
}
@@ -176,15 +176,15 @@ int SslSocket::listen(const SocketAddress& sa, int backlog) const
SECKEY_DestroyPrivateKey(key);
CERT_DestroyCertificate(cert);
- return Socket::listen(sa, backlog);
+ return BSDSocket::listen(sa, backlog);
}
-SslSocket* SslSocket::accept() const
+Socket* SslSocket::accept() const
{
QPID_LOG(trace, "Accepting SSL connection.");
- int afd = ::accept(impl->fd, 0, 0);
+ int afd = ::accept(fd, 0, 0);
if ( afd >= 0) {
- return new SslSocket(new IOHandlePrivate(afd), prototype);
+ return new SslSocket(afd, prototype);
} else if (errno == EAGAIN) {
return 0;
} else {
@@ -275,15 +275,15 @@ SslMuxSocket::SslMuxSocket(const std::string& certName, bool clientAuth) :
Socket* SslMuxSocket::accept() const
{
- int afd = ::accept(impl->fd, 0, 0);
+ int afd = ::accept(fd, 0, 0);
if (afd >= 0) {
QPID_LOG(trace, "Accepting connection with optional SSL wrapper.");
if (isSslStream(afd)) {
QPID_LOG(trace, "Accepted SSL connection.");
- return new SslSocket(new IOHandlePrivate(afd), prototype);
+ return new SslSocket(afd, prototype);
} else {
QPID_LOG(trace, "Accepted Plaintext connection.");
- return new Socket(new IOHandlePrivate(afd));
+ return new BSDSocket(afd);
}
} else if (errno == EAGAIN) {
return 0;
diff --git a/cpp/src/qpid/sys/ssl/SslSocket.h b/cpp/src/qpid/sys/ssl/SslSocket.h
index 1b5424cfeb..1efbbe4a88 100644
--- a/cpp/src/qpid/sys/ssl/SslSocket.h
+++ b/cpp/src/qpid/sys/ssl/SslSocket.h
@@ -23,7 +23,7 @@
*/
#include "qpid/sys/IOHandle.h"
-#include "qpid/sys/Socket.h"
+#include "qpid/sys/posix/BSDSocket.h"
#include <nspr.h>
#include <string>
@@ -37,7 +37,7 @@ class Duration;
namespace ssl {
-class SslSocket : public qpid::sys::Socket
+class SslSocket : public qpid::sys::BSDSocket
{
public:
/** Create a socket wrapper for descriptor.
@@ -71,7 +71,7 @@ public:
* Accept a connection from a socket that is already listening
* and has an incoming connection
*/
- SslSocket* accept() const;
+ virtual Socket* accept() const;
// TODO The following are raw operations, maybe they need better wrapping?
int read(void *buf, size_t count) const;
@@ -92,8 +92,8 @@ protected:
*/
mutable PRFileDesc* prototype;
- SslSocket(IOHandlePrivate* ioph, PRFileDesc* model);
- friend class SslMuxSocket;
+ SslSocket(int fd, PRFileDesc* model);
+ friend class SslMuxSocket; // Needed for this constructor
};
class SslMuxSocket : public SslSocket
diff --git a/cpp/src/qpid/sys/windows/AsynchIO.cpp b/cpp/src/qpid/sys/windows/AsynchIO.cpp
index 9fdf89c83b..e7e966519d 100644
--- a/cpp/src/qpid/sys/windows/AsynchIO.cpp
+++ b/cpp/src/qpid/sys/windows/AsynchIO.cpp
@@ -24,6 +24,7 @@
#include "qpid/sys/AsynchIO.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/Socket.h"
+#include "qpid/sys/windows/WinSocket.h"
#include "qpid/sys/SocketAddress.h"
#include "qpid/sys/Poller.h"
#include "qpid/sys/Thread.h"
@@ -51,8 +52,8 @@ namespace {
* The function pointers for AcceptEx and ConnectEx need to be looked up
* at run time.
*/
-const LPFN_ACCEPTEX lookUpAcceptEx(const qpid::sys::Socket& s) {
- SOCKET h = toSocketHandle(s);
+const LPFN_ACCEPTEX lookUpAcceptEx(const qpid::sys::IOHandle& io) {
+ SOCKET h = io.fd;
GUID guidAcceptEx = WSAID_ACCEPTEX;
DWORD dwBytes = 0;
LPFN_ACCEPTEX fnAcceptEx;
@@ -94,12 +95,14 @@ private:
AsynchAcceptor::Callback acceptedCallback;
const Socket& socket;
+ const SOCKET wSocket;
const LPFN_ACCEPTEX fnAcceptEx;
};
AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback)
: acceptedCallback(callback),
socket(s),
+ wSocket(IOHandle(s).fd),
fnAcceptEx(lookUpAcceptEx(s)) {
s.setNonblocking();
@@ -122,8 +125,8 @@ void AsynchAcceptor::restart(void) {
this,
socket);
BOOL status;
- status = fnAcceptEx(toSocketHandle(socket),
- toSocketHandle(*result->newSocket),
+ status = fnAcceptEx(wSocket,
+ IOHandle(*result->newSocket).fd,
result->addressBuffer,
0,
AsynchAcceptResult::SOCKADDRMAXLEN,
@@ -134,16 +137,30 @@ void AsynchAcceptor::restart(void) {
}
+Socket* createSameTypeSocket(const Socket& sock) {
+ SOCKET socket = IOHandle(sock).fd;
+ // Socket currently has no actual socket attached
+ if (socket == INVALID_SOCKET)
+ return new WinSocket;
+
+ ::sockaddr_storage sa;
+ ::socklen_t salen = sizeof(sa);
+ QPID_WINSOCK_CHECK(::getsockname(socket, (::sockaddr*)&sa, &salen));
+ SOCKET s = ::socket(sa.ss_family, SOCK_STREAM, 0); // Currently only work with SOCK_STREAM
+ if (s == INVALID_SOCKET) throw QPID_WINDOWS_ERROR(WSAGetLastError());
+ return new WinSocket(s);
+}
+
AsynchAcceptResult::AsynchAcceptResult(AsynchAcceptor::Callback cb,
AsynchAcceptor *acceptor,
- const Socket& listener)
+ const Socket& lsocket)
: callback(cb), acceptor(acceptor),
- listener(toSocketHandle(listener)),
- newSocket(listener.createSameTypeSocket()) {
+ listener(IOHandle(lsocket).fd),
+ newSocket(createSameTypeSocket(lsocket)) {
}
void AsynchAcceptResult::success(size_t /*bytesTransferred*/) {
- ::setsockopt (toSocketHandle(*newSocket),
+ ::setsockopt (IOHandle(*newSocket).fd,
SOL_SOCKET,
SO_UPDATE_ACCEPT_CONTEXT,
(char*)&listener,
@@ -363,7 +380,7 @@ class CallbackHandle : public IOHandle {
public:
CallbackHandle(AsynchIoResult::Completer completeCb,
AsynchIO::RequestCallback reqCb = 0) :
- IOHandle(new IOHandlePrivate (INVALID_SOCKET, completeCb, reqCb))
+ IOHandle(INVALID_SOCKET, completeCb, reqCb)
{}
};
@@ -516,7 +533,7 @@ void AsynchIO::startReading() {
DWORD bytesReceived = 0, flags = 0;
InterlockedIncrement(&opsInProgress);
readInProgress = true;
- int status = WSARecv(toSocketHandle(socket),
+ int status = WSARecv(IOHandle(socket).fd,
const_cast<LPWSABUF>(result->getWSABUF()), 1,
&bytesReceived,
&flags,
@@ -614,7 +631,7 @@ void AsynchIO::startWrite(AsynchIO::BufferBase* buff) {
buff,
buff->dataCount);
DWORD bytesSent = 0;
- int status = WSASend(toSocketHandle(socket),
+ int status = WSASend(IOHandle(socket).fd,
const_cast<LPWSABUF>(result->getWSABUF()), 1,
&bytesSent,
0,
diff --git a/cpp/src/qpid/sys/windows/IOHandle.cpp b/cpp/src/qpid/sys/windows/IOHandle.cpp
index 250737cb99..19a1c44875 100755
--- a/cpp/src/qpid/sys/windows/IOHandle.cpp
+++ b/cpp/src/qpid/sys/windows/IOHandle.cpp
@@ -19,24 +19,11 @@
*
*/
-#include "qpid/sys/IOHandle.h"
#include "qpid/sys/windows/IoHandlePrivate.h"
#include <windows.h>
namespace qpid {
namespace sys {
-SOCKET toFd(const IOHandlePrivate* h)
-{
- return h->fd;
-}
-
-IOHandle::IOHandle(IOHandlePrivate* h) :
- impl(h)
-{}
-
-IOHandle::~IOHandle() {
- delete impl;
-}
}} // namespace qpid::sys
diff --git a/cpp/src/qpid/sys/windows/IoHandlePrivate.h b/cpp/src/qpid/sys/windows/IoHandlePrivate.h
index 5943db5cc7..4529ad93ec 100755
--- a/cpp/src/qpid/sys/windows/IoHandlePrivate.h
+++ b/cpp/src/qpid/sys/windows/IoHandlePrivate.h
@@ -38,15 +38,14 @@ namespace sys {
// completer from an I/O thread. If the callback mechanism is used, there
// can be a RequestCallback set - this carries the callback object through
// from AsynchIO::requestCallback() through to the I/O completion processing.
-class IOHandlePrivate {
- friend QPID_COMMON_EXTERN SOCKET toSocketHandle(const Socket& s);
- static IOHandlePrivate* getImpl(const IOHandle& h);
-
+class IOHandle {
public:
- IOHandlePrivate(SOCKET f = INVALID_SOCKET,
- windows::AsynchIoResult::Completer cb = 0,
- AsynchIO::RequestCallback reqCallback = 0) :
- fd(f), event(cb), cbRequest(reqCallback)
+ IOHandle(SOCKET f = INVALID_SOCKET,
+ windows::AsynchIoResult::Completer cb = 0,
+ AsynchIO::RequestCallback reqCallback = 0) :
+ fd(f),
+ event(cb),
+ cbRequest(reqCallback)
{}
SOCKET fd;
@@ -54,8 +53,6 @@ public:
AsynchIO::RequestCallback cbRequest;
};
-QPID_COMMON_EXTERN SOCKET toSocketHandle(const Socket& s);
-
}}
#endif /* _sys_windows_IoHandlePrivate_h */
diff --git a/cpp/src/qpid/sys/windows/IocpPoller.cpp b/cpp/src/qpid/sys/windows/IocpPoller.cpp
index c81cef87b0..ecb33c5517 100755
--- a/cpp/src/qpid/sys/windows/IocpPoller.cpp
+++ b/cpp/src/qpid/sys/windows/IocpPoller.cpp
@@ -22,7 +22,7 @@
#include "qpid/sys/Poller.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/Dispatcher.h"
-
+#include "qpid/sys/IOHandle.h"
#include "qpid/sys/windows/AsynchIoResult.h"
#include "qpid/sys/windows/IoHandlePrivate.h"
#include "qpid/sys/windows/check.h"
@@ -55,7 +55,7 @@ class PollerHandlePrivate {
};
PollerHandle::PollerHandle(const IOHandle& h) :
- impl(new PollerHandlePrivate(toSocketHandle(static_cast<const Socket&>(h)), h.impl->event, h.impl->cbRequest))
+ impl(new PollerHandlePrivate(h.fd, h.event, h.cbRequest))
{}
PollerHandle::~PollerHandle() {
diff --git a/cpp/src/qpid/sys/windows/PollableCondition.cpp b/cpp/src/qpid/sys/windows/PollableCondition.cpp
index bb637be0a6..7bbcd4de1b 100644
--- a/cpp/src/qpid/sys/windows/PollableCondition.cpp
+++ b/cpp/src/qpid/sys/windows/PollableCondition.cpp
@@ -57,8 +57,7 @@ private:
PollableConditionPrivate::PollableConditionPrivate(const sys::PollableCondition::Callback& cb,
sys::PollableCondition& parent,
const boost::shared_ptr<sys::Poller>& poller)
- : IOHandle(new sys::IOHandlePrivate(INVALID_SOCKET,
- boost::bind(&PollableConditionPrivate::dispatch, this, _1))),
+ : IOHandle(INVALID_SOCKET, boost::bind(&PollableConditionPrivate::dispatch, this, _1)),
cb(cb), parent(parent), poller(poller), isSet(0)
{
}
diff --git a/cpp/src/qpid/sys/windows/Socket.cpp b/cpp/src/qpid/sys/windows/WinSocket.cpp
index 0c74b3a725..c1ac31de76 100644
--- a/cpp/src/qpid/sys/windows/Socket.cpp
+++ b/cpp/src/qpid/sys/windows/WinSocket.cpp
@@ -19,20 +19,13 @@
*
*/
-#include "qpid/sys/Socket.h"
+#include "qpid/sys/windows/WinSocket.h"
#include "qpid/sys/SocketAddress.h"
#include "qpid/sys/windows/check.h"
#include "qpid/sys/windows/IoHandlePrivate.h"
#include "qpid/sys/SystemInfo.h"
-// Ensure we get all of winsock2.h
-#ifndef _WIN32_WINNT
-#define _WIN32_WINNT 0x0501
-#endif
-
-#include <winsock2.h>
-
namespace qpid {
namespace sys {
@@ -108,22 +101,32 @@ uint16_t getLocalPort(int fd)
}
} // namespace
-Socket::Socket() :
- IOHandle(new IOHandlePrivate),
+WinSocket::WinSocket() :
+ handle(new IOHandle),
nonblocking(false),
nodelay(false)
{}
-Socket::Socket(IOHandlePrivate* h) :
- IOHandle(h),
+Socket* createSocket()
+{
+ return new WinSocket;
+}
+
+WinSocket::WinSocket(SOCKET fd) :
+ handle(new IOHandle(fd)),
nonblocking(false),
nodelay(false)
{}
-void Socket::createSocket(const SocketAddress& sa) const
+WinSocket::operator const IOHandle&() const
{
- SOCKET& socket = impl->fd;
- if (socket != INVALID_SOCKET) Socket::close();
+ return *handle;
+}
+
+void WinSocket::createSocket(const SocketAddress& sa) const
+{
+ SOCKET& socket = handle->fd;
+ if (socket != INVALID_SOCKET) WinSocket::close();
SOCKET s = ::socket (getAddrInfo(sa).ai_family,
getAddrInfo(sa).ai_socktype,
@@ -141,33 +144,19 @@ void Socket::createSocket(const SocketAddress& sa) const
}
}
-Socket* Socket::createSameTypeSocket() const {
- SOCKET& socket = impl->fd;
- // Socket currently has no actual socket attached
- if (socket == INVALID_SOCKET)
- return new Socket;
-
- ::sockaddr_storage sa;
- ::socklen_t salen = sizeof(sa);
- QPID_WINSOCK_CHECK(::getsockname(socket, (::sockaddr*)&sa, &salen));
- SOCKET s = ::socket(sa.ss_family, SOCK_STREAM, 0); // Currently only work with SOCK_STREAM
- if (s == INVALID_SOCKET) throw QPID_WINDOWS_ERROR(WSAGetLastError());
- return new Socket(new IOHandlePrivate(s));
-}
-
-void Socket::setNonblocking() const {
+void WinSocket::setNonblocking() const {
u_long nonblock = 1;
- QPID_WINSOCK_CHECK(ioctlsocket(impl->fd, FIONBIO, &nonblock));
+ QPID_WINSOCK_CHECK(ioctlsocket(handle->fd, FIONBIO, &nonblock));
}
void
-Socket::connect(const SocketAddress& addr) const
+WinSocket::connect(const SocketAddress& addr) const
{
peername = addr.asString(false);
createSocket(addr);
- const SOCKET& socket = impl->fd;
+ const SOCKET& socket = handle->fd;
int err;
WSASetLastError(0);
if ((::connect(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) != 0) &&
@@ -176,38 +165,38 @@ Socket::connect(const SocketAddress& addr) const
}
void
-Socket::close() const
+WinSocket::close() const
{
- SOCKET& socket = impl->fd;
+ SOCKET& socket = handle->fd;
if (socket == INVALID_SOCKET) return;
QPID_WINSOCK_CHECK(closesocket(socket));
socket = INVALID_SOCKET;
}
-int Socket::write(const void *buf, size_t count) const
+int WinSocket::write(const void *buf, size_t count) const
{
- const SOCKET& socket = impl->fd;
+ const SOCKET& socket = handle->fd;
int sent = ::send(socket, (const char *)buf, count, 0);
if (sent == SOCKET_ERROR)
return -1;
return sent;
}
-int Socket::read(void *buf, size_t count) const
+int WinSocket::read(void *buf, size_t count) const
{
- const SOCKET& socket = impl->fd;
+ const SOCKET& socket = handle->fd;
int received = ::recv(socket, (char *)buf, count, 0);
if (received == SOCKET_ERROR)
return -1;
return received;
}
-int Socket::listen(const SocketAddress& addr, int backlog) const
+int WinSocket::listen(const SocketAddress& addr, int backlog) const
{
createSocket(addr);
- const SOCKET& socket = impl->fd;
+ const SOCKET& socket = handle->fd;
BOOL yes=1;
QPID_WINSOCK_CHECK(setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, (char *)&yes, sizeof(yes)));
@@ -219,48 +208,48 @@ int Socket::listen(const SocketAddress& addr, int backlog) const
return getLocalPort(socket);
}
-Socket* Socket::accept() const
+Socket* WinSocket::accept() const
{
- SOCKET afd = ::accept(impl->fd, 0, 0);
+ SOCKET afd = ::accept(handle->fd, 0, 0);
if (afd != INVALID_SOCKET)
- return new Socket(new IOHandlePrivate(afd));
+ return new WinSocket(afd);
else if (WSAGetLastError() == EAGAIN)
return 0;
else throw QPID_WINDOWS_ERROR(WSAGetLastError());
}
-std::string Socket::getPeerAddress() const
+std::string WinSocket::getPeerAddress() const
{
if (peername.empty()) {
- peername = getName(impl->fd, false);
+ peername = getName(handle->fd, false);
}
return peername;
}
-std::string Socket::getLocalAddress() const
+std::string WinSocket::getLocalAddress() const
{
if (localname.empty()) {
- localname = getName(impl->fd, true);
+ localname = getName(handle->fd, true);
}
return localname;
}
-int Socket::getError() const
+int WinSocket::getError() const
{
int result;
socklen_t rSize = sizeof (result);
- QPID_WINSOCK_CHECK(::getsockopt(impl->fd, SOL_SOCKET, SO_ERROR, (char *)&result, &rSize));
+ QPID_WINSOCK_CHECK(::getsockopt(handle->fd, SOL_SOCKET, SO_ERROR, (char *)&result, &rSize));
return result;
}
-void Socket::setTcpNoDelay() const
+void WinSocket::setTcpNoDelay() const
{
- SOCKET& socket = impl->fd;
+ SOCKET& socket = handle->fd;
nodelay = true;
if (socket != INVALID_SOCKET) {
int flag = 1;
- int result = setsockopt(impl->fd,
+ int result = setsockopt(handle->fd,
IPPROTO_TCP,
TCP_NODELAY,
(char *)&flag,
@@ -269,14 +258,4 @@ void Socket::setTcpNoDelay() const
}
}
-inline IOHandlePrivate* IOHandlePrivate::getImpl(const qpid::sys::IOHandle &h)
-{
- return h.impl;
-}
-
-SOCKET toSocketHandle(const Socket& s)
-{
- return IOHandlePrivate::getImpl(s)->fd;
-}
-
}} // namespace qpid::sys
diff --git a/cpp/src/qpid/sys/windows/WinSocket.h b/cpp/src/qpid/sys/windows/WinSocket.h
new file mode 100644
index 0000000000..17905a6133
--- /dev/null
+++ b/cpp/src/qpid/sys/windows/WinSocket.h
@@ -0,0 +1,114 @@
+#ifndef QPID_SYS_WINDOWS_BSDSOCKET_H
+#define QPID_SYS_WINDOWS_BSDSOCKET_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Socket.h"
+#include "qpid/sys/IntegerTypes.h"
+#include "qpid/CommonImportExport.h"
+#include <string>
+
+#include <boost/scoped_ptr.hpp>
+
+// Ensure we get all of winsock2.h
+#ifndef _WIN32_WINNT
+#define _WIN32_WINNT 0x0501
+#endif
+
+#include <winsock2.h>
+
+namespace qpid {
+namespace sys {
+
+namespace windows {
+Socket* createSameTypeSocket(const Socket&);
+}
+
+class Duration;
+class IOHandle;
+class SocketAddress;
+
+class QPID_COMMON_CLASS_EXTERN WinSocket : public Socket
+{
+public:
+ /** Create a socket wrapper for descriptor. */
+ QPID_COMMON_EXTERN WinSocket();
+
+ QPID_COMMON_EXTERN operator const IOHandle&() const;
+
+ /** Set socket non blocking */
+ QPID_COMMON_EXTERN virtual void setNonblocking() const;
+
+ QPID_COMMON_EXTERN virtual void setTcpNoDelay() const;
+
+ QPID_COMMON_EXTERN virtual void connect(const SocketAddress&) const;
+
+ QPID_COMMON_EXTERN virtual void close() const;
+
+ /** Bind to a port and start listening.
+ *@return The bound port number
+ */
+ QPID_COMMON_EXTERN virtual int listen(const SocketAddress&, int backlog = 10) const;
+
+ /**
+ * Returns an address (host and port) for the remote end of the
+ * socket
+ */
+ QPID_COMMON_EXTERN std::string getPeerAddress() const;
+ /**
+ * Returns an address (host and port) for the local end of the
+ * socket
+ */
+ QPID_COMMON_EXTERN std::string getLocalAddress() const;
+
+ /**
+ * Returns the error code stored in the socket. This may be used
+ * to determine the result of a non-blocking connect.
+ */
+ QPID_COMMON_EXTERN int getError() const;
+
+ /** Accept a connection from a socket that is already listening
+ * and has an incoming connection
+ */
+ QPID_COMMON_EXTERN virtual Socket* accept() const;
+
+ // TODO The following are raw operations, maybe they need better wrapping?
+ QPID_COMMON_EXTERN virtual int read(void *buf, size_t count) const;
+ QPID_COMMON_EXTERN virtual int write(const void *buf, size_t count) const;
+
+protected:
+ /** Create socket */
+ void createSocket(const SocketAddress&) const;
+
+ mutable boost::scoped_ptr<IOHandle> handle;
+ mutable std::string localname;
+ mutable std::string peername;
+ mutable bool nonblocking;
+ mutable bool nodelay;
+
+ /** Construct socket with existing handle */
+ friend Socket* qpid::sys::windows::createSameTypeSocket(const Socket&);
+ WinSocket(SOCKET fd);
+};
+
+}}
+#endif /*!QPID_SYS_WINDOWS_BSDSOCKET_H*/
diff --git a/cpp/src/tests/DispatcherTest.cpp b/cpp/src/tests/DispatcherTest.cpp
index e1691db584..7312fe8d2e 100644
--- a/cpp/src/tests/DispatcherTest.cpp
+++ b/cpp/src/tests/DispatcherTest.cpp
@@ -20,7 +20,6 @@
*/
#include "qpid/sys/Poller.h"
-#include "qpid/sys/IOHandle.h"
#include "qpid/sys/Dispatcher.h"
#include "qpid/sys/DispatchHandle.h"
#include "qpid/sys/posix/PrivatePosix.h"
@@ -147,8 +146,8 @@ int main(int /*argc*/, char** /*argv*/)
for (int i = 0; i < 8; i++)
testString += testString;
- PosixIOHandle f0(sv[0]);
- PosixIOHandle f1(sv[1]);
+ IOHandle f0(sv[0]);
+ IOHandle f1(sv[1]);
rh = new DispatchHandleRef(f0, boost::bind(reader, _1, sv[0]), 0, 0);
wh = new DispatchHandleRef(f1, 0, boost::bind(writer, _1, sv[1], testString), 0);
diff --git a/cpp/src/tests/PollerTest.cpp b/cpp/src/tests/PollerTest.cpp
index 9fa5689c5f..5a1d02964c 100644
--- a/cpp/src/tests/PollerTest.cpp
+++ b/cpp/src/tests/PollerTest.cpp
@@ -23,7 +23,6 @@
* Use socketpair to test the poller
*/
-#include "qpid/sys/IOHandle.h"
#include "qpid/sys/Poller.h"
#include "qpid/sys/posix/PrivatePosix.h"
@@ -106,8 +105,8 @@ int main(int /*argc*/, char** /*argv*/)
auto_ptr<Poller> poller(new Poller);
- PosixIOHandle f0(sv[0]);
- PosixIOHandle f1(sv[1]);
+ IOHandle f0(sv[0]);
+ IOHandle f1(sv[1]);
PollerHandle h0(f0);
PollerHandle h1(f1);
@@ -225,8 +224,8 @@ int main(int /*argc*/, char** /*argv*/)
auto_ptr<Poller> poller1(new Poller);
- PosixIOHandle f2(sv[0]);
- PosixIOHandle f3(sv[1]);
+ IOHandle f2(sv[0]);
+ IOHandle f3(sv[1]);
PollerHandle h2(f2);
PollerHandle h3(f3);