summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys')
-rw-r--r--cpp/src/qpid/sys/AsynchIO.h3
-rw-r--r--cpp/src/qpid/sys/AsynchIOAcceptor.cpp22
-rw-r--r--cpp/src/qpid/sys/Dispatcher.h4
-rw-r--r--cpp/src/qpid/sys/IOHandle.h45
-rw-r--r--cpp/src/qpid/sys/Poller.h6
-rw-r--r--cpp/src/qpid/sys/Socket.h17
-rw-r--r--cpp/src/qpid/sys/epoll/EpollPoller.cpp18
-rw-r--r--cpp/src/qpid/sys/posix/AsynchIO.cpp14
-rw-r--r--cpp/src/qpid/sys/posix/IOHandle.cpp42
-rw-r--r--cpp/src/qpid/sys/posix/PrivatePosix.h14
-rw-r--r--cpp/src/qpid/sys/posix/Socket.cpp51
11 files changed, 153 insertions, 83 deletions
diff --git a/cpp/src/qpid/sys/AsynchIO.h b/cpp/src/qpid/sys/AsynchIO.h
index ca34d82741..3bcee8ba22 100644
--- a/cpp/src/qpid/sys/AsynchIO.h
+++ b/cpp/src/qpid/sys/AsynchIO.h
@@ -40,6 +40,7 @@ public:
private:
Callback acceptedCallback;
DispatchHandle handle;
+ const Socket& socket;
public:
AsynchAcceptor(const Socket& s, Callback callback);
@@ -94,6 +95,7 @@ private:
ClosedCallback closedCallback;
BuffersEmptyCallback emptyCallback;
IdleCallback idleCallback;
+ const Socket& socket;
std::deque<BufferBase*> bufferQueue;
std::deque<BufferBase*> writeQueue;
bool queuedClose;
@@ -119,7 +121,6 @@ public:
void queueWriteClose();
bool writeQueueEmpty() { return writeQueue.empty(); }
BufferBase* getQueuedBuffer();
- const Socket& getSocket() const { return DispatchHandle::getSocket(); }
private:
~AsynchIO();
diff --git a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
index 5c784912b3..43fbfdf7be 100644
--- a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
+++ b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
@@ -84,19 +84,20 @@ struct Buff : public AsynchIO::BufferBase {
};
class AsynchIOHandler : public OutputControl {
+ std::string identifier;
AsynchIO* aio;
ConnectionCodec::Factory* factory;
ConnectionCodec* codec;
bool readError;
- std::string identifier;
bool isClient;
void write(const framing::ProtocolInitiation&);
public:
- AsynchIOHandler() :
+ AsynchIOHandler(std::string id, ConnectionCodec::Factory* f) :
+ identifier(id),
aio(0),
- factory(0),
+ factory(f),
codec(0),
readError(false),
isClient(false)
@@ -110,11 +111,8 @@ class AsynchIOHandler : public OutputControl {
void setClient() { isClient = true; }
- void init(AsynchIO* a, ConnectionCodec::Factory* f) {
+ void init(AsynchIO* a) {
aio = a;
- factory = f;
- identifier = aio->getSocket().getPeerAddress();
-
}
// Output side
@@ -133,7 +131,7 @@ class AsynchIOHandler : public OutputControl {
};
void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, ConnectionCodec::Factory* f) {
- AsynchIOHandler* async = new AsynchIOHandler;
+ AsynchIOHandler* async = new AsynchIOHandler(s.getPeerAddress(), f);
AsynchIO* aio = new AsynchIO(s,
boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
boost::bind(&AsynchIOHandler::eof, async, _1),
@@ -141,7 +139,8 @@ void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, Conn
boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
boost::bind(&AsynchIOHandler::nobuffs, async, _1),
boost::bind(&AsynchIOHandler::idle, async, _1));
- async->init(aio, f);
+ async->init(aio);
+
// Give connection some buffers to use
for (int i = 0; i < 4; i++) {
aio->queueReadBuffer(new Buff);
@@ -185,7 +184,7 @@ void AsynchIOAcceptor::connect(
{
Socket* socket = new Socket();//Should be deleted by handle when socket closes
socket->connect(host, port);
- AsynchIOHandler* async = new AsynchIOHandler;
+ AsynchIOHandler* async = new AsynchIOHandler(socket->getPeerAddress(), f);
async->setClient();
AsynchIO* aio = new AsynchIO(*socket,
boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
@@ -194,7 +193,8 @@ void AsynchIOAcceptor::connect(
boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
boost::bind(&AsynchIOHandler::nobuffs, async, _1),
boost::bind(&AsynchIOHandler::idle, async, _1));
- async->init(aio, f);
+ async->init(aio);
+
// Give connection some buffers to use
for (int i = 0; i < 4; i++) {
aio->queueReadBuffer(new Buff);
diff --git a/cpp/src/qpid/sys/Dispatcher.h b/cpp/src/qpid/sys/Dispatcher.h
index 7cc4873068..cd7a0bdb66 100644
--- a/cpp/src/qpid/sys/Dispatcher.h
+++ b/cpp/src/qpid/sys/Dispatcher.h
@@ -55,8 +55,8 @@ private:
} state;
public:
- DispatchHandle(const Socket& s, Callback rCb, Callback wCb, Callback dCb) :
- PollerHandle(s),
+ DispatchHandle(const IOHandle& h, Callback rCb, Callback wCb, Callback dCb) :
+ PollerHandle(h),
readableCallback(rCb),
writableCallback(wCb),
disconnectedCallback(dCb),
diff --git a/cpp/src/qpid/sys/IOHandle.h b/cpp/src/qpid/sys/IOHandle.h
new file mode 100644
index 0000000000..d06512da58
--- /dev/null
+++ b/cpp/src/qpid/sys/IOHandle.h
@@ -0,0 +1,45 @@
+#ifndef _sys_IOHandle_h
+#define _sys_IOHandle_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+namespace qpid {
+namespace sys {
+
+/**
+ * This is a class intended to abstract the Unix concept of file descriptor or the Windows concept of HANDLE
+ */
+class PollerHandle;
+class IOHandlePrivate;
+class IOHandle {
+ friend class PollerHandle;
+
+protected:
+ IOHandlePrivate* const impl;
+
+ IOHandle(IOHandlePrivate*);
+ virtual ~IOHandle();
+};
+
+}}
+
+#endif // _sys_IOHandle_h
diff --git a/cpp/src/qpid/sys/Poller.h b/cpp/src/qpid/sys/Poller.h
index 0d6b4f9308..dccc12479a 100644
--- a/cpp/src/qpid/sys/Poller.h
+++ b/cpp/src/qpid/sys/Poller.h
@@ -35,16 +35,16 @@ namespace sys {
/**
* Handle class to use for polling
*/
+class IOHandle;
class Poller;
class PollerHandlePrivate;
class PollerHandle {
friend class Poller;
PollerHandlePrivate* const impl;
- const Socket& socket;
public:
- PollerHandle(const Socket& s);
+ PollerHandle(const IOHandle& h);
// Usual way to delete (will defer deletion until we
// can't be returned from a Poller::wait any more)
@@ -52,8 +52,6 @@ public:
// Class clients shouldn't ever use this
virtual ~PollerHandle();
-
- const Socket& getSocket() const {return socket;}
};
/**
diff --git a/cpp/src/qpid/sys/Socket.h b/cpp/src/qpid/sys/Socket.h
index 0ebfc0c330..cab95654ad 100644
--- a/cpp/src/qpid/sys/Socket.h
+++ b/cpp/src/qpid/sys/Socket.h
@@ -21,25 +21,22 @@
* under the License.
*
*/
+#include "IOHandle.h"
+
#include <string>
-#include "qpid/sys/Time.h"
struct sockaddr;
namespace qpid {
namespace sys {
-class SocketPrivate;
-class Socket
-{
- friend class Poller;
-
- SocketPrivate* const impl;
+class Duration;
+class Socket : public IOHandle
+{
public:
/** Create a socket wrapper for descriptor. */
Socket();
- ~Socket();
/** Create an initialized TCP socket */
void createTcp() const;
@@ -106,10 +103,8 @@ public:
int read(void *buf, size_t count) const;
int write(const void *buf, size_t count) const;
- int toFd() const;
-
private:
- Socket(SocketPrivate*);
+ Socket(IOHandlePrivate*);
};
}}
diff --git a/cpp/src/qpid/sys/epoll/EpollPoller.cpp b/cpp/src/qpid/sys/epoll/EpollPoller.cpp
index 8936251f94..44b84c4239 100644
--- a/cpp/src/qpid/sys/epoll/EpollPoller.cpp
+++ b/cpp/src/qpid/sys/epoll/EpollPoller.cpp
@@ -20,6 +20,7 @@
*/
#include "qpid/sys/Poller.h"
+#include "qpid/sys/IOHandle.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/DeletionManager.h"
#include "qpid/sys/posix/check.h"
@@ -54,11 +55,13 @@ class PollerHandlePrivate {
MONITORED_HUNGUP
};
+ int fd;
::__uint32_t events;
FDStat stat;
Mutex lock;
- PollerHandlePrivate() :
+ PollerHandlePrivate(int f) :
+ fd(f),
events(0),
stat(ABSENT) {
}
@@ -97,9 +100,8 @@ class PollerHandlePrivate {
}
};
-PollerHandle::PollerHandle(const Socket& s) :
- impl(new PollerHandlePrivate),
- socket(s)
+PollerHandle::PollerHandle(const IOHandle& h) :
+ impl(new PollerHandlePrivate(toFd(h.impl)))
{}
PollerHandle::~PollerHandle() {
@@ -201,7 +203,7 @@ void Poller::addFd(PollerHandle& handle, Direction dir) {
}
epe.data.ptr = &handle;
- QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, op, toFd(handle.socket.impl), &epe));
+ QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, op, eh.fd, &epe));
// Record monitoring state of this fd
eh.events = epe.events;
@@ -212,7 +214,7 @@ void Poller::delFd(PollerHandle& handle) {
PollerHandlePrivate& eh = *handle.impl;
ScopedLock<Mutex> l(eh.lock);
assert(!eh.isIdle());
- int rc = ::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, toFd(handle.socket.impl), 0);
+ int rc = ::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, eh.fd, 0);
// Ignore EBADF since deleting a nonexistent fd has the overall required result!
// And allows the case where a sloppy program closes the fd and then does the delFd()
if (rc == -1 && errno != EBADF) {
@@ -231,7 +233,7 @@ void Poller::modFd(PollerHandle& handle, Direction dir) {
epe.events = PollerPrivate::directionToEpollEvent(dir) | ::EPOLLONESHOT;
epe.data.ptr = &handle;
- QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, toFd(handle.socket.impl), &epe));
+ QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
// Record monitoring state of this fd
eh.events = epe.events;
@@ -247,7 +249,7 @@ void Poller::rearmFd(PollerHandle& handle) {
epe.events = eh.events;
epe.data.ptr = &handle;
- QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, toFd(handle.socket.impl), &epe));
+ QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
eh.setActive();
}
diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp
index 94c68bd5d0..cedad5c011 100644
--- a/cpp/src/qpid/sys/posix/AsynchIO.cpp
+++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp
@@ -65,7 +65,8 @@ __thread int64_t threadMaxReadTimeNs = 2 * 1000000; // start at 2ms
AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback) :
acceptedCallback(callback),
- handle(s, boost::bind(&AsynchAcceptor::readable, this, _1), 0, 0) {
+ handle(s, boost::bind(&AsynchAcceptor::readable, this, _1), 0, 0),
+ socket(s) {
s.setNonblocking();
ignoreSigpipe();
@@ -84,7 +85,7 @@ void AsynchAcceptor::readable(DispatchHandle& h) {
errno = 0;
// TODO: Currently we ignore the peers address, perhaps we should
// log it or use it for connection acceptance.
- s = h.getSocket().accept(0, 0);
+ s = socket.accept(0, 0);
if (s) {
acceptedCallback(*s);
} else {
@@ -112,6 +113,7 @@ AsynchIO::AsynchIO(const Socket& s,
closedCallback(cCb),
emptyCallback(eCb),
idleCallback(iCb),
+ socket(s),
queuedClose(false),
writePending(false) {
@@ -209,7 +211,7 @@ void AsynchIO::readable(DispatchHandle& h) {
bufferQueue.pop_front();
errno = 0;
int readCount = buff->byteCount-buff->dataCount;
- int rc = h.getSocket().read(buff->bytes + buff->dataCount, readCount);
+ int rc = socket.read(buff->bytes + buff->dataCount, readCount);
if (rc > 0) {
buff->dataCount += rc;
threadReadTotal += rc;
@@ -276,7 +278,7 @@ void AsynchIO::writeable(DispatchHandle& h) {
writeQueue.pop_back();
errno = 0;
assert(buff->dataStart+buff->dataCount <= buff->byteCount);
- int rc = h.getSocket().write(buff->bytes+buff->dataStart, buff->dataCount);
+ int rc = socket.write(buff->bytes+buff->dataStart, buff->dataCount);
if (rc >= 0) {
threadWriteTotal += rc;
writeTotal += rc;
@@ -356,9 +358,9 @@ void AsynchIO::disconnected(DispatchHandle& h) {
*/
void AsynchIO::close(DispatchHandle& h) {
h.stopWatch();
- h.getSocket().close();
+ socket.close();
if (closedCallback) {
- closedCallback(*this, getSocket());
+ closedCallback(*this, socket);
}
}
diff --git a/cpp/src/qpid/sys/posix/IOHandle.cpp b/cpp/src/qpid/sys/posix/IOHandle.cpp
new file mode 100644
index 0000000000..80b487eadc
--- /dev/null
+++ b/cpp/src/qpid/sys/posix/IOHandle.cpp
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/IOHandle.h"
+
+#include "PrivatePosix.h"
+
+namespace qpid {
+namespace sys {
+
+int toFd(const IOHandlePrivate* h)
+{
+ return h->fd;
+}
+
+IOHandle::IOHandle(IOHandlePrivate* h) :
+ impl(h)
+{}
+
+IOHandle::~IOHandle() {
+ delete impl;
+}
+
+}} // namespace qpid::sys
diff --git a/cpp/src/qpid/sys/posix/PrivatePosix.h b/cpp/src/qpid/sys/posix/PrivatePosix.h
index 9ec9770cab..33c0cd81bc 100644
--- a/cpp/src/qpid/sys/posix/PrivatePosix.h
+++ b/cpp/src/qpid/sys/posix/PrivatePosix.h
@@ -35,9 +35,17 @@ struct timespec& toTimespec(struct timespec& ts, const Duration& t);
struct timeval& toTimeval(struct timeval& tv, const Duration& t);
Duration toTime(const struct timespec& ts);
-// Private socket related implementation details
-class SocketPrivate;
-int toFd(const SocketPrivate* s);
+// Private fd related implementation details
+class IOHandlePrivate {
+public:
+ IOHandlePrivate(int f = -1) :
+ fd(f)
+ {}
+
+ int fd;
+};
+
+int toFd(const IOHandlePrivate* h);
}}
diff --git a/cpp/src/qpid/sys/posix/Socket.cpp b/cpp/src/qpid/sys/posix/Socket.cpp
index c286ebce27..99cf7210b6 100644
--- a/cpp/src/qpid/sys/posix/Socket.cpp
+++ b/cpp/src/qpid/sys/posix/Socket.cpp
@@ -38,19 +38,8 @@
namespace qpid {
namespace sys {
-class SocketPrivate {
-public:
- SocketPrivate(int f = -1) :
- fd(f)
- {}
-
- int fd;
-
- std::string getName(bool local, bool includeService = false) const;
- std::string getService(bool local) const;
-};
-
-std::string SocketPrivate::getName(bool local, bool includeService) const
+namespace {
+std::string getName(int fd, bool local, bool includeService = false)
{
::sockaddr_storage name; // big enough for any socket address
::socklen_t namelen = sizeof(name);
@@ -80,7 +69,7 @@ std::string SocketPrivate::getName(bool local, bool includeService) const
}
}
-std::string SocketPrivate::getService(bool local) const
+std::string getService(int fd, bool local)
{
::sockaddr_storage name; // big enough for any socket address
::socklen_t namelen = sizeof(name);
@@ -101,21 +90,18 @@ std::string SocketPrivate::getService(bool local) const
throw QPID_POSIX_ERROR(rc);
return servName;
}
+}
Socket::Socket() :
- impl(new SocketPrivate)
+ IOHandle(new IOHandlePrivate)
{
createTcp();
}
-Socket::Socket(SocketPrivate* sp) :
- impl(sp)
+Socket::Socket(IOHandlePrivate* h) :
+ IOHandle(h)
{}
-Socket::~Socket() {
- delete impl;
-}
-
void Socket::createTcp() const
{
int& socket = impl->fd;
@@ -225,7 +211,7 @@ Socket* Socket::accept(struct sockaddr *addr, socklen_t *addrlen) const
{
int afd = ::accept(impl->fd, addr, addrlen);
if ( afd >= 0)
- return new Socket(new SocketPrivate(afd));
+ return new Socket(new IOHandlePrivate(afd));
else if (errno == EAGAIN)
return 0;
else throw QPID_POSIX_ERROR(errno);
@@ -243,41 +229,32 @@ int Socket::write(const void *buf, size_t count) const
std::string Socket::getSockname() const
{
- return impl->getName(true);
+ return getName(impl->fd, true);
}
std::string Socket::getPeername() const
{
- return impl->getName(false);
+ return getName(impl->fd, false);
}
std::string Socket::getPeerAddress() const
{
- return impl->getName(false, true);
+ return getName(impl->fd, false, true);
}
std::string Socket::getLocalAddress() const
{
- return impl->getName(true, true);
+ return getName(impl->fd, true, true);
}
uint16_t Socket::getLocalPort() const
{
- return atoi(impl->getService(true).c_str());
+ return atoi(getService(impl->fd, true).c_str());
}
uint16_t Socket::getRemotePort() const
{
- return atoi(impl->getService(true).c_str());
-}
-
-int Socket::toFd() const {
- return impl->fd;
-}
-
-int toFd(const SocketPrivate* s)
-{
- return s->fd;
+ return atoi(getService(impl->fd, true).c_str());
}
}} // namespace qpid::sys