summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2007-07-27 17:19:30 +0000
committerAndrew Stitcher <astitcher@apache.org>2007-07-27 17:19:30 +0000
commit65ea2f177bd0810590895d89a490af8cea60253b (patch)
tree1a1432d706ac5f43dc8cdd5fdb0d7b5566dd5d06
parent0a7f3f5dde40e59e90588e4ab7ba2ba99651c0f4 (diff)
downloadqpid-python-65ea2f177bd0810590895d89a490af8cea60253b.tar.gz
* Asynchronous network IO subsystem
- This is now implemented such that it very nearly only depends on the platform code (Socker & Poller), this is not 100% true at present, but should be simple to finish. - This is still not the default (use "./configure --disable-apr-netio" to get it) - Interrupting the broker gives a known error - Default for number of broker io threads is not correct (needs to be number of CPUs - it will run slower with too many io threads) * EventChannel code - Deleted all EventChannel code as it's entirely superceded by this new shiny code ;-) * Rearranged the platform Socket implementations a bit for better abstraction git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@560323 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/Makefile.am11
-rw-r--r--cpp/src/qpid/client/Connector.cpp1
-rw-r--r--cpp/src/qpid/sys/Acceptor.h4
-rw-r--r--cpp/src/qpid/sys/AsynchIO.h22
-rw-r--r--cpp/src/qpid/sys/AsynchIOAcceptor.cpp308
-rw-r--r--cpp/src/qpid/sys/Dispatcher.cpp59
-rw-r--r--cpp/src/qpid/sys/Dispatcher.h7
-rw-r--r--cpp/src/qpid/sys/Poller.h13
-rw-r--r--cpp/src/qpid/sys/Socket.h73
-rw-r--r--cpp/src/qpid/sys/apr/APRAcceptor.cpp2
-rw-r--r--cpp/src/qpid/sys/apr/Socket.cpp58
-rw-r--r--cpp/src/qpid/sys/apr/Socket.h75
-rw-r--r--cpp/src/qpid/sys/epoll/EpollPoller.cpp15
-rw-r--r--cpp/src/qpid/sys/posix/AsynchIO.cpp122
-rw-r--r--cpp/src/qpid/sys/posix/EventChannel.cpp596
-rw-r--r--cpp/src/qpid/sys/posix/EventChannel.h214
-rw-r--r--cpp/src/qpid/sys/posix/EventChannelAcceptor.cpp165
-rw-r--r--cpp/src/qpid/sys/posix/EventChannelConnection.cpp237
-rw-r--r--cpp/src/qpid/sys/posix/EventChannelConnection.h96
-rw-r--r--cpp/src/qpid/sys/posix/EventChannelThreads.cpp126
-rw-r--r--cpp/src/qpid/sys/posix/EventChannelThreads.h105
-rw-r--r--cpp/src/qpid/sys/posix/PosixAcceptor.cpp49
-rw-r--r--cpp/src/qpid/sys/posix/PrivatePosix.h5
-rw-r--r--cpp/src/qpid/sys/posix/Socket.cpp106
-rw-r--r--cpp/src/qpid/sys/posix/Socket.h76
-rw-r--r--cpp/src/tests/.valgrind.supp-default8
26 files changed, 691 insertions, 1862 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index cf1598bcca..7950299f64 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -58,18 +58,12 @@ apr_plat_src = \
apr_plat_hdr = \
qpid/sys/apr/Condition.h \
qpid/sys/apr/Mutex.h \
- qpid/sys/apr/Socket.h \
qpid/sys/apr/Thread.h
posix_netio_src = \
- qpid/sys/posix/EventChannel.cpp \
- qpid/sys/posix/EventChannelAcceptor.cpp \
- qpid/sys/posix/EventChannelConnection.cpp \
- qpid/sys/posix/EventChannelThreads.cpp
+ qpid/sys/AsynchIOAcceptor.cpp
-posix_netio_hdr = \
- qpid/sys/posix/EventChannel.h \
- qpid/sys/posix/EventChannelThreads.h
+posix_netio_hdr =
posix_plat_src = \
qpid/sys/Dispatcher.cpp \
@@ -86,7 +80,6 @@ posix_plat_hdr = \
qpid/sys/posix/Condition.h \
qpid/sys/posix/PrivatePosix.h \
qpid/sys/posix/Mutex.h \
- qpid/sys/posix/Socket.h \
qpid/sys/posix/Thread.h
if USE_APR_NETIO
diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp
index 257e2b577a..d92a177465 100644
--- a/cpp/src/qpid/client/Connector.cpp
+++ b/cpp/src/qpid/client/Connector.cpp
@@ -54,7 +54,6 @@ Connector::~Connector(){
}
void Connector::connect(const std::string& host, int port){
- socket = Socket::createTcp();
socket.connect(host, port);
closed = false;
receiver = Thread(this);
diff --git a/cpp/src/qpid/sys/Acceptor.h b/cpp/src/qpid/sys/Acceptor.h
index 8d6bca8f29..8ad3186d07 100644
--- a/cpp/src/qpid/sys/Acceptor.h
+++ b/cpp/src/qpid/sys/Acceptor.h
@@ -37,10 +37,12 @@ class Acceptor : public qpid::SharedObject<Acceptor>
virtual ~Acceptor() = 0;
virtual uint16_t getPort() const = 0;
virtual std::string getHost() const = 0;
- virtual void run(qpid::sys::ConnectionInputHandlerFactory* factory) = 0;
+ virtual void run(ConnectionInputHandlerFactory* factory) = 0;
virtual void shutdown() = 0;
};
+inline Acceptor::~Acceptor() {}
+
}}
diff --git a/cpp/src/qpid/sys/AsynchIO.h b/cpp/src/qpid/sys/AsynchIO.h
index 7accde17b0..7cc7995ee2 100644
--- a/cpp/src/qpid/sys/AsynchIO.h
+++ b/cpp/src/qpid/sys/AsynchIO.h
@@ -35,14 +35,14 @@ namespace sys {
*/
class AsynchAcceptor {
public:
- typedef boost::function1<void, int> Callback;
+ typedef boost::function1<void, const Socket&> Callback;
private:
Callback acceptedCallback;
DispatchHandle handle;
public:
- AsynchAcceptor(int fd, Callback callback);
+ AsynchAcceptor(const Socket& s, Callback callback);
void start(Poller::shared_ptr poller);
private:
@@ -75,9 +75,9 @@ public:
bytes(b),
byteCount(s),
dataStart(0),
- dataCount(s)
+ dataCount(0)
{}
-
+
virtual ~Buffer()
{}
};
@@ -85,6 +85,7 @@ public:
typedef boost::function2<void, AsynchIO&, Buffer*> ReadCallback;
typedef boost::function1<void, AsynchIO&> EofCallback;
typedef boost::function1<void, AsynchIO&> DisconnectCallback;
+ typedef boost::function2<void, AsynchIO&, const Socket&> ClosedCallback;
typedef boost::function1<void, AsynchIO&> BuffersEmptyCallback;
typedef boost::function1<void, AsynchIO&> IdleCallback;
@@ -92,26 +93,33 @@ private:
ReadCallback readCallback;
EofCallback eofCallback;
DisconnectCallback disCallback;
+ ClosedCallback closedCallback;
BuffersEmptyCallback emptyCallback;
IdleCallback idleCallback;
std::deque<Buffer*> bufferQueue;
std::deque<Buffer*> writeQueue;
+ bool queuedClose;
public:
- AsynchIO(int fd,
+ AsynchIO(const Socket& s,
ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb,
- BuffersEmptyCallback eCb = 0, IdleCallback iCb = 0);
+ ClosedCallback cCb = 0, BuffersEmptyCallback eCb = 0, IdleCallback iCb = 0);
void queueForDeletion();
void start(Poller::shared_ptr poller);
void queueReadBuffer(Buffer* buff);
- void queueWrite(Buffer* buff);
+ void queueWrite(Buffer* buff = 0);
+ void unread(Buffer* buff);
+ void queueWriteClose();
+ Buffer* getQueuedBuffer();
+ const Socket& getSocket() const { return DispatchHandle::getSocket(); }
private:
~AsynchIO();
void readable(DispatchHandle& handle);
void writeable(DispatchHandle& handle);
void disconnected(DispatchHandle& handle);
+ void close(DispatchHandle& handle);
};
}}
diff --git a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
new file mode 100644
index 0000000000..bf4a3ff842
--- /dev/null
+++ b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
@@ -0,0 +1,308 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Acceptor.h"
+
+#include "Socket.h"
+#include "AsynchIO.h"
+#include "Mutex.h"
+#include "Thread.h"
+
+#include "qpid/sys/ConnectionOutputHandler.h"
+#include "qpid/sys/ConnectionInputHandler.h"
+#include "qpid/sys/ConnectionInputHandlerFactory.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/log/Statement.h"
+
+#include <boost/bind.hpp>
+#include <boost/assert.hpp>
+#include <queue>
+#include <vector>
+#include <memory>
+
+namespace qpid {
+namespace sys {
+
+class AsynchIOAcceptor : public Acceptor {
+ Poller::shared_ptr poller;
+ Socket listener;
+ int numIOThreads;
+ const uint16_t listeningPort;
+
+public:
+ AsynchIOAcceptor(int16_t port, int backlog, int threads, bool trace);
+ ~AsynchIOAcceptor() {}
+ void run(ConnectionInputHandlerFactory* factory);
+ void shutdown();
+
+ uint16_t getPort() const;
+ std::string getHost() const;
+
+private:
+ void accepted(Poller::shared_ptr, const Socket&, ConnectionInputHandlerFactory*);
+};
+
+Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog, int threads, bool trace)
+{
+ return
+ Acceptor::shared_ptr(new AsynchIOAcceptor(port, backlog, threads, trace));
+}
+
+AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog, int threads, bool) :
+ poller(new Poller),
+ numIOThreads(threads),
+ listeningPort(listener.listen(port, backlog))
+{}
+
+// Buffer definition
+struct Buff : public AsynchIO::Buffer {
+ Buff() :
+ AsynchIO::Buffer(new char[65536], 65536)
+ {}
+ ~Buff()
+ { delete [] bytes;}
+};
+
+class AsynchIOHandler : public qpid::sys::ConnectionOutputHandler {
+ AsynchIO* aio;
+ ConnectionInputHandler* inputHandler;
+ std::queue<framing::AMQFrame> frameQueue;
+ Mutex frameQueueLock;
+ bool frameQueueClosed;
+ bool initiated;
+
+public:
+ AsynchIOHandler() :
+ inputHandler(0),
+ frameQueueClosed(false),
+ initiated(false)
+ {}
+
+ ~AsynchIOHandler() {
+ if (inputHandler)
+ inputHandler->closed();
+ delete inputHandler;
+ }
+
+ void init(AsynchIO* a, ConnectionInputHandler* h) {
+ aio = a;
+ inputHandler = h;
+ }
+
+ // Output side
+ void send(framing::AMQFrame&);
+ void close();
+
+ // Input side
+ void readbuff(AsynchIO& aio, AsynchIO::Buffer* buff);
+ void eof(AsynchIO& aio);
+ void disconnect(AsynchIO& aio);
+
+ // Notifications
+ void nobuffs(AsynchIO& aio);
+ void idle(AsynchIO& aio);
+ void closedSocket(AsynchIO& aio, const Socket& s);
+};
+
+void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, ConnectionInputHandlerFactory* f) {
+
+ AsynchIOHandler* async = new AsynchIOHandler;
+ ConnectionInputHandler* handler = f->create(async);
+ AsynchIO* aio = new AsynchIO(s,
+ boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
+ boost::bind(&AsynchIOHandler::eof, async, _1),
+ boost::bind(&AsynchIOHandler::disconnect, async, _1),
+ boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
+ boost::bind(&AsynchIOHandler::nobuffs, async, _1),
+ boost::bind(&AsynchIOHandler::idle, async, _1));
+ async->init(aio, handler);
+
+ // Give connection some buffers to use
+ for (int i = 0; i < 4; i++) {
+ aio->queueReadBuffer(new Buff);
+ }
+ aio->start(poller);
+}
+
+
+uint16_t AsynchIOAcceptor::getPort() const {
+ return listeningPort; // Immutable no need for lock.
+}
+
+std::string AsynchIOAcceptor::getHost() const {
+ return listener.getSockname();
+}
+
+void AsynchIOAcceptor::run(ConnectionInputHandlerFactory* fact) {
+ Dispatcher d(poller);
+ AsynchAcceptor
+ acceptor(listener,
+ boost::bind(&AsynchIOAcceptor::accepted, this, poller, _1, fact));
+ acceptor.start(poller);
+
+ std::vector<Thread*> t(numIOThreads-1);
+
+ // Run n-1 io threads
+ for (int i=0; i<numIOThreads-1; ++i)
+ t[i] = new Thread(d);
+
+ // Run final thread
+ d.run();
+
+ // Now wait for n-1 io threads to exit
+ for (int i=0; i>numIOThreads-1; ++i) {
+ t[i]->join();
+ delete t[i];
+ }
+}
+
+void AsynchIOAcceptor::shutdown() {
+ poller->shutdown();
+}
+
+// Output side
+void AsynchIOHandler::send(framing::AMQFrame& frame) {
+ // TODO: Need to find out if we are in the callback context,
+ // in the callback thread if so we can go further than just queuing the frame
+ // to be handled later
+ {
+ ScopedLock<Mutex> l(frameQueueLock);
+ // Ignore anything seen after closing
+ if (!frameQueueClosed)
+ frameQueue.push(frame);
+ }
+
+ // Activate aio for writing here
+ aio->queueWrite();
+}
+
+void AsynchIOHandler::close() {
+ ScopedLock<Mutex> l(frameQueueLock);
+ frameQueueClosed = true;
+}
+
+// Input side
+void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::Buffer* buff) {
+ framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
+ if(initiated){
+ framing::AMQFrame frame;
+ try{
+ while(frame.decode(in)) {
+ QPID_LOG(debug, "RECV: " << frame);
+ inputHandler->received(frame);
+ }
+ }catch(const std::exception& e){
+ QPID_LOG(error, e.what());
+ }
+ }else{
+ framing::ProtocolInitiation protocolInit;
+ if(protocolInit.decode(in)){
+ QPID_LOG(debug, "INIT [" << aio << "]");
+ inputHandler->initiated(protocolInit);
+ initiated = true;
+ }
+ }
+ // TODO: unreading needs to go away, and when we can cope
+ // with multiple sub-buffers in the general buffer scheme, it will
+ if (in.available() != 0) {
+ // Adjust buffer for used bytes and then "unread them"
+ buff->dataStart += buff->dataCount-in.available();
+ buff->dataCount = in.available();
+ aio->unread(buff);
+ } else {
+ // Give whole buffer back to aio subsystem
+ aio->queueReadBuffer(buff);
+ }
+}
+
+void AsynchIOHandler::eof(AsynchIO&) {
+ inputHandler->closed();
+ aio->queueWriteClose();
+}
+
+void AsynchIOHandler::closedSocket(AsynchIO&, const Socket& s) {
+ delete &s;
+ aio->queueForDeletion();
+ delete this;
+}
+
+void AsynchIOHandler::disconnect(AsynchIO& a) {
+ // treat the same as eof
+ eof(a);
+}
+
+// Notifications
+void AsynchIOHandler::nobuffs(AsynchIO&) {
+}
+
+void AsynchIOHandler::idle(AsynchIO&){
+ ScopedLock<Mutex> l(frameQueueLock);
+
+ if (frameQueue.empty()) {
+ // At this point we know that we're write idling the connection
+ // so we could note that somewhere or do something special
+ return;
+ }
+
+ // Try and get a queued buffer if not then construct new one
+ AsynchIO::Buffer* buff = aio->getQueuedBuffer();
+ if (!buff)
+ buff = new Buff;
+ std::auto_ptr<framing::Buffer> out(new framing::Buffer(buff->bytes, buff->byteCount));
+ int buffUsed = 0;
+
+ while (!frameQueue.empty()) {
+ framing::AMQFrame frame = frameQueue.front();
+ frameQueue.pop();
+
+ // Encode output frame
+ int frameSize = frame.size();
+ if (frameSize > buff->byteCount)
+ THROW_QPID_ERROR(FRAMING_ERROR, "Could not write frame, too large for buffer.");
+
+ // If we've filled current buffer then flush and get new one
+ if (frameSize > int(out->available())) {
+ buff->dataCount = buffUsed;
+ aio->queueWrite(buff);
+
+ buff = aio->getQueuedBuffer();
+ if (!buff)
+ buff = new Buff;
+ out.reset(new framing::Buffer(buff->bytes, buff->byteCount));
+ buffUsed = 0;
+ }
+
+ frame.encode(*out);
+ buffUsed += frameSize;
+ QPID_LOG(debug, "SENT: " << frame);
+ }
+
+ buff->dataCount = buffUsed;
+ aio->queueWrite(buff);
+
+ if (frameQueueClosed) {
+ aio->queueWriteClose();
+ }
+
+}
+
+}} // namespace qpid::sys
diff --git a/cpp/src/qpid/sys/Dispatcher.cpp b/cpp/src/qpid/sys/Dispatcher.cpp
index 3a1da13bd0..b8751168c2 100644
--- a/cpp/src/qpid/sys/Dispatcher.cpp
+++ b/cpp/src/qpid/sys/Dispatcher.cpp
@@ -94,10 +94,11 @@ void DispatchHandle::rewatch() {
ScopedLock<Mutex> lock(stateLock);
switch(state) {
case IDLE:
+ case DELAYED_IDLE:
break;
case DELAYED_R:
case DELAYED_W:
- case CALLBACK:
+ case DELAYED_INACTIVE:
state = r ?
(w ? DELAYED_RW : DELAYED_R) :
DELAYED_W;
@@ -132,6 +133,7 @@ void DispatchHandle::rewatchRead() {
ScopedLock<Mutex> lock(stateLock);
switch(state) {
case IDLE:
+ case DELAYED_IDLE:
break;
case DELAYED_R:
case DELAYED_RW:
@@ -140,7 +142,7 @@ void DispatchHandle::rewatchRead() {
case DELAYED_W:
state = DELAYED_RW;
break;
- case CALLBACK:
+ case DELAYED_INACTIVE:
state = DELAYED_R;
break;
case ACTIVE_R:
@@ -168,6 +170,7 @@ void DispatchHandle::rewatchWrite() {
ScopedLock<Mutex> lock(stateLock);
switch(state) {
case IDLE:
+ case DELAYED_IDLE:
break;
case DELAYED_W:
case DELAYED_RW:
@@ -176,7 +179,7 @@ void DispatchHandle::rewatchWrite() {
case DELAYED_R:
state = DELAYED_RW;
break;
- case CALLBACK:
+ case DELAYED_INACTIVE:
state = DELAYED_W;
break;
case INACTIVE:
@@ -204,15 +207,16 @@ void DispatchHandle::unwatchRead() {
ScopedLock<Mutex> lock(stateLock);
switch(state) {
case IDLE:
+ case DELAYED_IDLE:
break;
case DELAYED_R:
- state = CALLBACK;
+ state = DELAYED_INACTIVE;
break;
case DELAYED_RW:
state = DELAYED_W;
break;
case DELAYED_W:
- case CALLBACK:
+ case DELAYED_INACTIVE:
case DELAYED_DELETE:
break;
case ACTIVE_R:
@@ -239,15 +243,16 @@ void DispatchHandle::unwatchWrite() {
ScopedLock<Mutex> lock(stateLock);
switch(state) {
case IDLE:
+ case DELAYED_IDLE:
break;
case DELAYED_W:
- state = CALLBACK;
+ state = DELAYED_INACTIVE;
break;
case DELAYED_RW:
state = DELAYED_R;
break;
case DELAYED_R:
- case CALLBACK:
+ case DELAYED_INACTIVE:
case DELAYED_DELETE:
break;
case ACTIVE_W:
@@ -270,12 +275,13 @@ void DispatchHandle::unwatch() {
ScopedLock<Mutex> lock(stateLock);
switch (state) {
case IDLE:
+ case DELAYED_IDLE:
break;
case DELAYED_R:
case DELAYED_W:
case DELAYED_RW:
- case CALLBACK:
- state = CALLBACK;
+ case DELAYED_INACTIVE:
+ state = DELAYED_INACTIVE;
break;
case DELAYED_DELETE:
break;
@@ -289,32 +295,46 @@ void DispatchHandle::unwatch() {
void DispatchHandle::stopWatch() {
ScopedLock<Mutex> lock(stateLock);
- if ( state == IDLE) {
+ switch (state) {
+ case IDLE:
+ case DELAYED_IDLE:
+ case DELAYED_DELETE:
return;
+ case DELAYED_R:
+ case DELAYED_W:
+ case DELAYED_RW:
+ case DELAYED_INACTIVE:
+ state = DELAYED_IDLE;
+ break;
+ default:
+ state = IDLE;
+ break;
}
assert(poller);
poller->delFd(*this);
poller.reset();
- state = IDLE;
}
// The slightly strange switch structure
// is to ensure that the lock is released before
// we do the delete
void DispatchHandle::doDelete() {
+ // Ensure that we're no longer watching anything
+ stopWatch();
+
// If we're in the middle of a callback defer the delete
{
ScopedLock<Mutex> lock(stateLock);
switch (state) {
- case DELAYED_R:
- case DELAYED_W:
- case DELAYED_RW:
- case CALLBACK:
+ case DELAYED_IDLE:
case DELAYED_DELETE:
state = DELAYED_DELETE;
return;
+ case IDLE:
+ break;
default:
- break;
+ // Can only get out of stopWatch() in DELAYED_IDLE/DELAYED_DELETE/IDLE states
+ assert(false);
}
}
// If we're not then do it right away
@@ -359,7 +379,7 @@ void DispatchHandle::dispatchCallbacks(Poller::EventType type) {
case Poller::DISCONNECTED:
{
ScopedLock<Mutex> lock(stateLock);
- state = CALLBACK;
+ state = DELAYED_INACTIVE;
}
if (disconnectedCallback) {
disconnectedCallback(*this);
@@ -386,10 +406,11 @@ void DispatchHandle::dispatchCallbacks(Poller::EventType type) {
poller->modFd(*this, Poller::INOUT);
state = ACTIVE_RW;
return;
- case CALLBACK:
+ case DELAYED_INACTIVE:
state = INACTIVE;
return;
- case IDLE:
+ case DELAYED_IDLE:
+ state = IDLE;
return;
default:
// This should be impossible
diff --git a/cpp/src/qpid/sys/Dispatcher.h b/cpp/src/qpid/sys/Dispatcher.h
index 60e9522bc1..2de026e141 100644
--- a/cpp/src/qpid/sys/Dispatcher.h
+++ b/cpp/src/qpid/sys/Dispatcher.h
@@ -49,12 +49,13 @@ private:
Mutex stateLock;
enum {
IDLE, INACTIVE, ACTIVE_R, ACTIVE_W, ACTIVE_RW,
- CALLBACK, DELAYED_R, DELAYED_W, DELAYED_RW, DELAYED_DELETE
+ DELAYED_IDLE, DELAYED_INACTIVE, DELAYED_R, DELAYED_W, DELAYED_RW,
+ DELAYED_DELETE
} state;
public:
- DispatchHandle(int fd, Callback rCb, Callback wCb, Callback dCb) :
- PollerHandle(fd),
+ DispatchHandle(const Socket& s, Callback rCb, Callback wCb, Callback dCb) :
+ PollerHandle(s),
readableCallback(rCb),
writableCallback(wCb),
disconnectedCallback(dCb),
diff --git a/cpp/src/qpid/sys/Poller.h b/cpp/src/qpid/sys/Poller.h
index 55fead55aa..843295e268 100644
--- a/cpp/src/qpid/sys/Poller.h
+++ b/cpp/src/qpid/sys/Poller.h
@@ -23,6 +23,7 @@
*/
#include "Time.h"
+#include "Socket.h"
#include <stdint.h>
@@ -39,14 +40,14 @@ class PollerHandlePrivate;
class PollerHandle {
friend class Poller;
- PollerHandlePrivate* impl;
- const int fd;
+ PollerHandlePrivate* const impl;
+ const Socket& socket;
public:
- PollerHandle(int fd0);
+ PollerHandle(const Socket& s);
virtual ~PollerHandle();
-
- int getFD() const { return fd; }
+
+ const Socket& getSocket() const {return socket;}
};
/**
@@ -55,7 +56,7 @@ public:
*/
class PollerPrivate;
class Poller {
- PollerPrivate* impl;
+ PollerPrivate* const impl;
public:
typedef boost::shared_ptr<Poller> shared_ptr;
diff --git a/cpp/src/qpid/sys/Socket.h b/cpp/src/qpid/sys/Socket.h
index bb1ef27e65..56b6195b65 100644
--- a/cpp/src/qpid/sys/Socket.h
+++ b/cpp/src/qpid/sys/Socket.h
@@ -21,11 +21,74 @@
* under the License.
*
*/
+#include <string>
+#include "qpid/sys/Time.h"
-#ifdef USE_APR_PLATFORM
-#include "apr/Socket.h"
-#else
-#include "posix/Socket.h"
-#endif
+struct sockaddr;
+namespace qpid {
+namespace sys {
+
+class SocketPrivate;
+class Socket
+{
+ friend class Poller;
+
+ SocketPrivate* const impl;
+
+public:
+ /** Create a socket wrapper for descriptor. */
+ Socket();
+ ~Socket();
+
+ /** Create an initialized TCP socket */
+ void createTcp() const;
+
+ /** Set timeout for read and write */
+ void setTimeout(const Duration& interval) const;
+
+ /** Set socket non blocking */
+ void setNonblocking() const;
+
+ void connect(const std::string& host, int port) const;
+
+ void close() const;
+
+ enum { SOCKET_TIMEOUT=-2, SOCKET_EOF=-3 } ErrorCode;
+
+ /** Returns bytes sent or an ErrorCode value < 0. */
+ ssize_t send(const void* data, size_t size) const;
+
+ /**
+ * Returns bytes received, an ErrorCode value < 0 or 0
+ * if the connection closed in an orderly manner.
+ */
+ ssize_t recv(void* data, size_t size) const;
+
+ /** Bind to a port and start listening.
+ *@param port 0 means choose an available port.
+ *@param backlog maximum number of pending connections.
+ *@return The bound port.
+ */
+ int listen(int port = 0, int backlog = 10) const;
+
+ /** Returns the "socket name" ie the address bound to
+ * the near end of the socket
+ */
+ std::string getSockname() const;
+
+ /** Accept a connection from a socket that is already listening
+ * and has an incoming connection
+ */
+ Socket* accept(struct sockaddr *addr, socklen_t *addrlen) const;
+
+ // TODO The following are raw operations, maybe they need better wrapping?
+ int read(void *buf, size_t count) const;
+ int write(const void *buf, size_t count) const;
+
+private:
+ Socket(SocketPrivate*);
+};
+
+}}
#endif /*!_sys_Socket_h*/
diff --git a/cpp/src/qpid/sys/apr/APRAcceptor.cpp b/cpp/src/qpid/sys/apr/APRAcceptor.cpp
index 8662e602c2..b353b698ef 100644
--- a/cpp/src/qpid/sys/apr/APRAcceptor.cpp
+++ b/cpp/src/qpid/sys/apr/APRAcceptor.cpp
@@ -56,8 +56,6 @@ Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog, int threads, bo
{
return Acceptor::shared_ptr(new APRAcceptor(port, backlog, threads, trace));
}
-// Must define Acceptor virtual dtor.
-Acceptor::~Acceptor() {}
APRAcceptor::APRAcceptor(int16_t port_, int backlog, int threads, bool trace_) :
port(port_),
diff --git a/cpp/src/qpid/sys/apr/Socket.cpp b/cpp/src/qpid/sys/apr/Socket.cpp
index 6e64d656d2..577268844a 100644
--- a/cpp/src/qpid/sys/apr/Socket.cpp
+++ b/cpp/src/qpid/sys/apr/Socket.cpp
@@ -20,31 +20,56 @@
*/
-#include "Socket.h"
+#include "qpid/sys/Socket.h"
+
#include "APRBase.h"
#include "APRPool.h"
+#include <apr_network_io.h>
+
+namespace qpid {
+namespace sys {
+
+class SocketPrivate {
+public:
+ SocketPrivate(apr_socket_t* s = 0) :
+ socket(s)
+ {}
+
+ apr_socket_t* socket;
+};
+
+Socket::Socket() :
+ impl(new SocketPrivate)
+{
+ createTcp();
+}
+
+Socket::Socket(SocketPrivate* sp) :
+ impl(sp)
+{}
-using namespace qpid::sys;
+Socket::~Socket() {
+ delete impl;
+}
-Socket Socket::createTcp() {
- Socket s;
+void Socket::createTcp() const {
+ apr_socket_t*& socket = impl->socket;
+ apr_socket_t* s;
CHECK_APR_SUCCESS(
apr_socket_create(
- &s.socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP,
+ &s, APR_INET, SOCK_STREAM, APR_PROTO_TCP,
APRPool::get()));
- return s;
-}
-
-Socket::Socket(apr_socket_t* s) {
socket = s;
}
-void Socket::setTimeout(const Duration& interval) {
+void Socket::setTimeout(const Duration& interval) const {
+ apr_socket_t*& socket = impl->socket;
apr_socket_timeout_set(socket, interval/TIME_USEC);
}
-void Socket::connect(const std::string& host, int port) {
+void Socket::connect(const std::string& host, int port) const {
+ apr_socket_t*& socket = impl->socket;
apr_sockaddr_t* address;
CHECK_APR_SUCCESS(
apr_sockaddr_info_get(
@@ -53,14 +78,16 @@ void Socket::connect(const std::string& host, int port) {
CHECK_APR_SUCCESS(apr_socket_connect(socket, address));
}
-void Socket::close() {
+void Socket::close() const {
+ apr_socket_t*& socket = impl->socket;
if (socket == 0) return;
CHECK_APR_SUCCESS(apr_socket_close(socket));
socket = 0;
}
-ssize_t Socket::send(const void* data, size_t size)
+ssize_t Socket::send(const void* data, size_t size) const
{
+ apr_socket_t*& socket = impl->socket;
apr_size_t sent = size;
apr_status_t status =
apr_socket_send(socket, reinterpret_cast<const char*>(data), &sent);
@@ -70,8 +97,9 @@ ssize_t Socket::send(const void* data, size_t size)
return sent;
}
-ssize_t Socket::recv(void* data, size_t size)
+ssize_t Socket::recv(void* data, size_t size) const
{
+ apr_socket_t*& socket = impl->socket;
apr_size_t received = size;
apr_status_t status =
apr_socket_recv(socket, reinterpret_cast<char*>(data), &received);
@@ -83,4 +111,4 @@ ssize_t Socket::recv(void* data, size_t size)
return received;
}
-
+}} // namespace qpid::sys
diff --git a/cpp/src/qpid/sys/apr/Socket.h b/cpp/src/qpid/sys/apr/Socket.h
deleted file mode 100644
index c20c36dcd9..0000000000
--- a/cpp/src/qpid/sys/apr/Socket.h
+++ /dev/null
@@ -1,75 +0,0 @@
-#ifndef _sys_apr_Socket_h
-#define _sys_apr_Socket_h
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <string>
-#include "qpid/sys/Time.h"
-
-#include <apr_network_io.h>
-
-namespace qpid {
-namespace sys {
-
-class Socket
-{
- public:
- /** Create an initialized TCP socket */
- static Socket createTcp();
-
- /** Create a socket wrapper for descriptor. */
- Socket(apr_socket_t* descriptor = 0);
-
- /** Set timeout for read and write */
- void setTimeout(const Duration& interval);
-
- void connect(const std::string& host, int port);
-
- void close();
-
- enum { SOCKET_TIMEOUT=-2, SOCKET_EOF=-3 } ErrorCode;
-
- /** Returns bytes sent or an ErrorCode value < 0. */
- ssize_t send(const void* data, size_t size);
-
- /**
- * Returns bytes received, an ErrorCode value < 0 or 0
- * if the connection closed in an orderly manner.
- */
- ssize_t recv(void* data, size_t size);
-
- /** Bind to a port and start listening.
- *@param port 0 means choose an available port.
- *@param backlog maximum number of pending connections.
- *@return The bound port.
- */
- int listen(int port = 0, int backlog = 10);
-
- /** Get file descriptor */
- int fd();
-
- private:
- apr_socket_t* socket;
-};
-
-}}
-#endif /*!_sys_Socket_h*/
diff --git a/cpp/src/qpid/sys/epoll/EpollPoller.cpp b/cpp/src/qpid/sys/epoll/EpollPoller.cpp
index 8c3bdbc7d5..37a8510fe6 100644
--- a/cpp/src/qpid/sys/epoll/EpollPoller.cpp
+++ b/cpp/src/qpid/sys/epoll/EpollPoller.cpp
@@ -22,6 +22,7 @@
#include "qpid/sys/Poller.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/posix/check.h"
+#include "qpid/sys/posix/PrivatePosix.h"
#include <sys/epoll.h>
#include <errno.h>
@@ -88,11 +89,11 @@ class PollerHandlePrivate {
}
};
-PollerHandle::PollerHandle(int fd0) :
+PollerHandle::PollerHandle(const Socket& s) :
impl(new PollerHandlePrivate),
- fd(fd0)
+ socket(s)
{}
-
+
PollerHandle::~PollerHandle() {
delete impl;
}
@@ -186,7 +187,7 @@ void Poller::addFd(PollerHandle& handle, Direction dir) {
}
epe.data.ptr = &handle;
- QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, op, handle.getFD(), &epe));
+ QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, op, toFd(handle.socket.impl), &epe));
// Record monitoring state of this fd
eh.events = epe.events;
@@ -197,7 +198,7 @@ void Poller::delFd(PollerHandle& handle) {
PollerHandlePrivate& eh = *handle.impl;
ScopedLock<Mutex> l(eh.lock);
assert(!eh.isIdle());
- int rc = ::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, handle.getFD(), 0);
+ int rc = ::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, toFd(handle.socket.impl), 0);
// Ignore EBADF since deleting a nonexistent fd has the overall required result!
// And allows the case where a sloppy program closes the fd and then does the delFd()
if (rc == -1 && errno != EBADF) {
@@ -216,7 +217,7 @@ void Poller::modFd(PollerHandle& handle, Direction dir) {
epe.events = PollerPrivate::directionToEpollEvent(dir) | ::EPOLLONESHOT;
epe.data.ptr = &handle;
- QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, handle.getFD(), &epe));
+ QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, toFd(handle.socket.impl), &epe));
// Record monitoring state of this fd
eh.events = epe.events;
@@ -232,7 +233,7 @@ void Poller::rearmFd(PollerHandle& handle) {
epe.events = eh.events;
epe.data.ptr = &handle;
- QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, handle.getFD(), &epe));
+ QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, toFd(handle.socket.impl), &epe));
eh.setActive();
}
diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp
index 473ef7936f..2b462cbd7a 100644
--- a/cpp/src/qpid/sys/posix/AsynchIO.cpp
+++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp
@@ -24,8 +24,6 @@
#include "check.h"
#include <unistd.h>
-#include <fcntl.h>
-#include <sys/types.h>
#include <sys/socket.h>
#include <signal.h>
#include <errno.h>
@@ -37,13 +35,6 @@ using namespace qpid::sys;
namespace {
/*
- * Make file descriptor non-blocking
- */
-void nonblocking(int fd) {
- QPID_POSIX_CHECK(::fcntl(fd, F_SETFL, O_NONBLOCK));
-}
-
-/*
* Make *process* not generate SIGPIPE when writing to closed
* pipe/socket (necessary as default action is to terminate process)
*/
@@ -57,11 +48,11 @@ void ignoreSigpipe() {
* Asynch Acceptor
*/
-AsynchAcceptor::AsynchAcceptor(int fd, Callback callback) :
+AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback) :
acceptedCallback(callback),
- handle(fd, boost::bind(&AsynchAcceptor::readable, this, _1), 0, 0) {
+ handle(s, boost::bind(&AsynchAcceptor::readable, this, _1), 0, 0) {
- nonblocking(fd);
+ s.setNonblocking();
ignoreSigpipe();
}
@@ -73,18 +64,16 @@ void AsynchAcceptor::start(Poller::shared_ptr poller) {
* We keep on accepting as long as there is something to accept
*/
void AsynchAcceptor::readable(DispatchHandle& h) {
- int afd;
+ Socket* s;
do {
errno = 0;
// TODO: Currently we ignore the peers address, perhaps we should
// log it or use it for connection acceptance.
- afd = ::accept(h.getFD(), 0, 0);
- if (afd >= 0) {
- acceptedCallback(afd);
- } else if (errno == EAGAIN) {
- break;
+ s = h.getSocket().accept(0, 0);
+ if (s) {
+ acceptedCallback(*s);
} else {
- QPID_POSIX_CHECK(afd);
+ break;
}
} while (true);
@@ -94,21 +83,23 @@ void AsynchAcceptor::readable(DispatchHandle& h) {
/*
* Asynch reader/writer
*/
-AsynchIO::AsynchIO(int fd,
+AsynchIO::AsynchIO(const Socket& s,
ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb,
- BuffersEmptyCallback eCb, IdleCallback iCb) :
+ ClosedCallback cCb, BuffersEmptyCallback eCb, IdleCallback iCb) :
- DispatchHandle(fd,
+ DispatchHandle(s,
boost::bind(&AsynchIO::readable, this, _1),
boost::bind(&AsynchIO::writeable, this, _1),
boost::bind(&AsynchIO::disconnected, this, _1)),
readCallback(rCb),
eofCallback(eofCb),
disCallback(disCb),
+ closedCallback(cCb),
emptyCallback(eCb),
- idleCallback(iCb) {
+ idleCallback(iCb),
+ queuedClose(false) {
- nonblocking(fd);
+ s.setNonblocking();
}
struct deleter
@@ -131,15 +122,58 @@ void AsynchIO::start(Poller::shared_ptr poller) {
}
void AsynchIO::queueReadBuffer(Buffer* buff) {
+ assert(buff);
+ buff->dataStart = 0;
+ buff->dataCount = 0;
+ bufferQueue.push_back(buff);
+ DispatchHandle::rewatchRead();
+}
+
+void AsynchIO::unread(Buffer* buff) {
+ assert(buff);
+ if (buff->dataStart != 0) {
+ memmove(buff->bytes, buff->bytes+buff->dataStart, buff->dataCount);
+ buff->dataStart = 0;
+ }
bufferQueue.push_front(buff);
DispatchHandle::rewatchRead();
}
+// Either queue for writing or announce that there is something to write
+// and we should ask for it
void AsynchIO::queueWrite(Buffer* buff) {
- writeQueue.push_front(buff);
+ // If no buffer then don't queue anything
+ // (but still wake up for writing)
+ if (buff) {
+ // If we've already closed the socket then throw the write away
+ if (queuedClose) {
+ bufferQueue.push_front(buff);
+ return;
+ } else {
+ writeQueue.push_front(buff);
+ }
+ }
DispatchHandle::rewatchWrite();
}
+void AsynchIO::queueWriteClose() {
+ queuedClose = true;
+}
+
+/** Return a queued buffer if there are enough
+ * to spare
+ */
+AsynchIO::Buffer* AsynchIO::getQueuedBuffer() {
+ // Always keep at least one buffer (it might have data that was "unread" in it)
+ if (bufferQueue.size()<=1)
+ return 0;
+ Buffer* buff = bufferQueue.back();
+ buff->dataStart = 0;
+ buff->dataCount = 0;
+ bufferQueue.pop_back();
+ return buff;
+}
+
/*
* We keep on reading as long as we have something to read and a buffer to put
* it in
@@ -149,19 +183,19 @@ void AsynchIO::readable(DispatchHandle& h) {
// (Try to) get a buffer
if (!bufferQueue.empty()) {
// Read into buffer
- Buffer* buff = bufferQueue.back();
- bufferQueue.pop_back();
+ Buffer* buff = bufferQueue.front();
+ bufferQueue.pop_front();
errno = 0;
- int rc = ::read(h.getFD(), buff->bytes, buff->byteCount);
+ int readCount = buff->byteCount-buff->dataCount;
+ int rc = h.getSocket().read(buff->bytes + buff->dataCount, readCount);
if (rc == 0) {
eofCallback(*this);
h.unwatchRead();
return;
} else if (rc > 0) {
- buff->dataStart = 0;
- buff->dataCount = rc;
+ buff->dataCount += rc;
readCallback(*this, buff);
- if (rc != buff->byteCount) {
+ if (rc != readCount) {
// If we didn't fill the read buffer then time to stop reading
return;
}
@@ -209,7 +243,7 @@ void AsynchIO::writeable(DispatchHandle& h) {
writeQueue.pop_back();
errno = 0;
assert(buff->dataStart+buff->dataCount <= buff->byteCount);
- int rc = ::write(h.getFD(), buff->bytes+buff->dataStart, buff->dataCount);
+ int rc = h.getSocket().write(buff->bytes+buff->dataStart, buff->dataCount);
if (rc >= 0) {
// If we didn't write full buffer put rest back
if (rc != buff->dataCount) {
@@ -238,12 +272,17 @@ void AsynchIO::writeable(DispatchHandle& h) {
}
}
} else {
+ // If we're waiting to close the socket then can do it now as there is nothing to write
+ if (queuedClose) {
+ close(h);
+ return;
+ }
// Fd is writable, but nothing to write
if (idleCallback) {
idleCallback(*this);
}
// If we still have no buffers to write we can't do anything more
- if (writeQueue.empty()) {
+ if (writeQueue.empty() && !queuedClose) {
h.unwatchWrite();
return;
}
@@ -252,8 +291,25 @@ void AsynchIO::writeable(DispatchHandle& h) {
}
void AsynchIO::disconnected(DispatchHandle& h) {
+ // If we've already queued close do it before callback
+ if (queuedClose) {
+ close(h);
+ }
+
if (disCallback) {
disCallback(*this);
h.unwatch();
}
}
+
+/*
+ * Close the socket and callback to say we've done it
+ */
+void AsynchIO::close(DispatchHandle& h) {
+ h.stopWatch();
+ h.getSocket().close();
+ if (closedCallback) {
+ closedCallback(*this, getSocket());
+ }
+}
+
diff --git a/cpp/src/qpid/sys/posix/EventChannel.cpp b/cpp/src/qpid/sys/posix/EventChannel.cpp
deleted file mode 100644
index d35eedf5a5..0000000000
--- a/cpp/src/qpid/sys/posix/EventChannel.cpp
+++ /dev/null
@@ -1,596 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-// TODO aconway 2006-12-15: Locking review.
-
-// TODO aconway 2006-12-15: use Descriptor pointers everywhere,
-// get them from channel, pass them to Event constructors.
-// Eliminate lookup.
-
-
-#include "EventChannel.h"
-#include "check.h"
-
-#include "qpid/QpidError.h"
-#include "qpid/sys/AtomicCount.h"
-
-#include <mqueue.h>
-#include <string.h>
-#include <iostream>
-
-#include <sys/errno.h>
-#include <sys/socket.h>
-#include <sys/epoll.h>
-
-#include <typeinfo>
-#include <iostream>
-#include <queue>
-
-#include <boost/ptr_container/ptr_map.hpp>
-#include <boost/noncopyable.hpp>
-#include <boost/bind.hpp>
-
-using namespace std;
-
-
-namespace qpid {
-namespace sys {
-
-
-// ================================================================
-// Private class declarations
-
-namespace {
-
-typedef enum { IN, OUT } Direction;
-
-typedef std::pair<Event*, Event*> EventPair;
-
-/**
- * Template to zero out a C-struct on construction. Avoids uninitialized memory
- * warnings from valgrind or other mem checking tool.
- */
-template <class T> struct CleanStruct : public T {
- CleanStruct() { memset(this, 0, sizeof(*this)); }
-};
-
-} // namespace
-
-/**
- * Queue of events corresponding to one IO direction (IN or OUT).
- * Each Descriptor contains two Queues.
- */
-class EventChannel::Queue : private boost::noncopyable
-{
- public:
- Queue(Descriptor& container, Direction dir);
-
- /** Called by Event classes in prepare() */
- void push(Event* e);
-
- /** Called when epoll wakes.
- *@return The next completed event or 0.
- */
- Event* wake(uint32_t epollFlags);
-
- Event* pop() { Event* e = queue.front(); queue.pop_front(); return e; }
-
- bool empty() { return queue.empty(); }
-
- void setBit(uint32_t &epollFlags);
-
- void shutdown();
-
- private:
- typedef std::deque<Event*> EventQ;
-
- inline bool isMyEvent(uint32_t flags) { return flags | myEvent; }
-
- Mutex& lock; // Shared with Descriptor.
- Descriptor& descriptor;
- uint32_t myEvent; // Epoll event flag.
- EventQ queue;
-};
-
-
-/**
- * Manages a file descriptor in an epoll set.
- *
- * Can be shutdown and re-activated for the same file descriptor.
- */
-class EventChannel::Descriptor : private boost::noncopyable {
- public:
- explicit Descriptor(int fd) : epollFd(-1), myFd(fd),
- inQueue(*this, IN), outQueue(*this, OUT) {}
-
- void activate(int epollFd_);
-
- /** Epoll woke up for this descriptor. */
- Event* wake(uint32_t epollEvents);
-
- /** Shut down: close and remove file descriptor.
- * May be re-activated if fd is reused.
- */
- void shutdown();
-
- // TODO aconway 2006-12-18: Nasty. Need to clean up interaction.
- void shutdownUnsafe();
-
- bool isShutdown() { return epollFd == -1; }
-
- Queue& getQueue(Direction d) { return d==IN ? inQueue : outQueue; }
- int getFD() const { return myFd; }
-
- private:
- void update();
- void epollCtl(int op, uint32_t events);
- Queue* pick();
-
- Mutex lock;
- int epollFd;
- int myFd;
- Queue inQueue, outQueue;
- bool preferIn;
-
- friend class Queue;
-};
-
-
-/**
- * Holds a map of Descriptors, which do most of the work.
- */
-class EventChannel::Impl {
- public:
- Impl(int size = 256);
-
- ~Impl();
-
- /**
- * Activate descriptor
- */
- void activate(Descriptor& d) {
- d.activate(epollFd);
- }
-
- /** Wait for an event, return 0 on timeout */
- Event* wait(Duration timeout);
-
- void shutdown();
-
- private:
-
- Monitor monitor;
- int epollFd;
- int shutdownPipe[2];
- AtomicCount nWaiters;
- bool isShutdown;
-};
-
-
-// ================================================================
-// EventChannel::Queue::implementation.
-
-static const char* shutdownMsg = "Event queue shut down.";
-
-EventChannel::Queue::Queue(Descriptor& d, Direction dir) :
- lock(d.lock), descriptor(d),
- myEvent(dir==IN ? EPOLLIN : EPOLLOUT)
-{}
-
-void EventChannel::Queue::push(Event* e) {
- Mutex::ScopedLock l(lock);
- if (descriptor.isShutdown())
- THROW_QPID_ERROR(INTERNAL_ERROR, shutdownMsg);
- queue.push_back(e);
- descriptor.update();
-}
-
-void EventChannel::Queue::setBit(uint32_t &epollFlags) {
- if (queue.empty())
- epollFlags &= ~myEvent;
- else
- epollFlags |= myEvent;
-}
-
-// TODO aconway 2006-12-20: REMOVE
-Event* EventChannel::Queue::wake(uint32_t epollFlags) {
- // Called with lock held.
- if (!queue.empty() && (isMyEvent(epollFlags))) {
- assert(!queue.empty());
- Event* e = queue.front();
- assert(e);
- if (!e->getException()) {
- // TODO aconway 2006-12-20: Can/should we move event completion
- // out into dispatch() so it doesn't happen in Descriptor locks?
- e->complete(descriptor);
- }
- queue.pop_front();
- return e;
- }
- return 0;
-}
-
-void EventChannel::Queue::shutdown() {
- // Mark all pending events with a shutdown exception.
- // The server threads will remove and dispatch the events.
- //
- qpid::QpidError ex(INTERNAL_ERROR, shutdownMsg, SRCLINE);
- for_each(queue.begin(), queue.end(),
- boost::bind(&Event::setException, _1, ex));
-}
-
-
-// ================================================================
-// Descriptor
-
-
-void EventChannel::Descriptor::activate(int epollFd_) {
- Mutex::ScopedLock l(lock);
- if (isShutdown()) {
- epollFd = epollFd_; // We're back in business.
- epollCtl(EPOLL_CTL_ADD, 0);
- }
-}
-
-void EventChannel::Descriptor::shutdown() {
- Mutex::ScopedLock l(lock);
- shutdownUnsafe();
-}
-
-void EventChannel::Descriptor::shutdownUnsafe() {
- // Caller holds lock.
- ::close(myFd);
- epollFd = -1; // Mark myself as shutdown.
- inQueue.shutdown();
- outQueue.shutdown();
-}
-
-// TODO aconway 2006-12-20: Inline into wake().
-void EventChannel::Descriptor::update() {
- // Caller holds lock.
- if (isShutdown()) // Nothing to do
- return;
- uint32_t events = EPOLLONESHOT | EPOLLERR | EPOLLHUP;
- inQueue.setBit(events);
- outQueue.setBit(events);
- epollCtl(EPOLL_CTL_MOD, events);
-}
-
-void EventChannel::Descriptor::epollCtl(int op, uint32_t events) {
- // Caller holds lock
- assert(!isShutdown());
- CleanStruct<epoll_event> ee;
- ee.data.ptr = this;
- ee.events = events;
- int status = ::epoll_ctl(epollFd, op, myFd, &ee);
- if (status < 0) {
- if (errno == EEXIST) // It's okay to add an existing fd
- return;
- else if (errno == EBADF) // FD was closed externally.
- shutdownUnsafe();
- else
- throw QPID_POSIX_ERROR(errno);
- }
-}
-
-
-EventChannel::Queue* EventChannel::Descriptor::pick() {
- if (inQueue.empty() && outQueue.empty())
- return 0;
- if (inQueue.empty() || outQueue.empty())
- return !inQueue.empty() ? &inQueue : &outQueue;
- // Neither is empty, pick fairly.
- preferIn = !preferIn;
- return preferIn ? &inQueue : &outQueue;
-}
-
-Event* EventChannel::Descriptor::wake(uint32_t epollEvents) {
- Mutex::ScopedLock l(lock);
- // On error, shut down the Descriptor and both queues.
- if (epollEvents & (EPOLLERR | EPOLLHUP)) {
- shutdownUnsafe();
- // TODO aconway 2006-12-20: This error handling models means
- // that any error reported by epoll will result in a shutdown
- // exception on the events. Can we get more accurate error
- // reporting somehow?
- }
- Queue*q = 0;
- bool in = (epollEvents & EPOLLIN);
- bool out = (epollEvents & EPOLLOUT);
- if ((in && out) || isShutdown())
- q = pick(); // Choose fairly, either non-empty queue.
- else if (in)
- q = &inQueue;
- else if (out)
- q = &outQueue;
- Event* e = (q && !q->empty()) ? q->pop() : 0;
- update();
- if (e)
- e->complete(*this);
- return e;
-}
-
-
-
-// ================================================================
-// EventChannel::Impl
-
-
-EventChannel::Impl::Impl(int epollSize):
- epollFd(-1), isShutdown(false)
-{
- // Create the epoll file descriptor.
- epollFd = epoll_create(epollSize);
- QPID_POSIX_CHECK(epollFd);
-
- // Create a pipe and write a single byte. The byte is never
- // read so the pipes read fd is always ready for read.
- // We activate the FD when there are messages in the queue.
- QPID_POSIX_CHECK(::pipe(shutdownPipe));
- static char zero = '\0';
- QPID_POSIX_CHECK(::write(shutdownPipe[1], &zero, 1));
-}
-
-EventChannel::Impl::~Impl() {
- shutdown();
- ::close(epollFd);
- ::close(shutdownPipe[0]);
- ::close(shutdownPipe[1]);
-}
-
-
-void EventChannel::Impl::shutdown() {
- Monitor::ScopedLock l(monitor);
- if (!isShutdown) { // I'm starting shutdown.
- isShutdown = true;
- if (nWaiters == 0)
- return;
-
- // TODO aconway 2006-12-20: If I just close the epollFd will
- // that wake all threads? If so with what? Would be simpler than:
-
- CleanStruct<epoll_event> ee;
- ee.data.ptr = 0;
- ee.events = EPOLLIN;
- QPID_POSIX_CHECK(
- epoll_ctl(epollFd, EPOLL_CTL_ADD, shutdownPipe[0], &ee));
- }
- // Wait for nWaiters to get out.
- while (nWaiters > 0) {
- monitor.wait();
- }
-}
-
-// TODO aconway 2006-12-20: DEBUG remove
-struct epoll {
- epoll(uint32_t e) : events(e) { }
- uint32_t events;
-};
-
-#define BIT(X) out << ((e.events & X) ? __STRING(X) "." : "")
-ostream& operator << (ostream& out, epoll e) {
- out << "epoll_event.events: ";
- BIT(EPOLLIN);
- BIT(EPOLLPRI);
- BIT(EPOLLOUT);
- BIT(EPOLLRDNORM);
- BIT(EPOLLRDBAND);
- BIT(EPOLLWRNORM);
- BIT(EPOLLWRBAND);
- BIT(EPOLLMSG);
- BIT(EPOLLERR);
- BIT(EPOLLHUP);
- BIT(EPOLLONESHOT);
- BIT(EPOLLET);
- return out;
-}
-
-
-
-/**
- * Wait for epoll to wake up, return the descriptor or 0 on timeout.
- */
-Event* EventChannel::Impl::wait(Duration timeoutNs)
-{
- {
- Monitor::ScopedLock l(monitor);
- if (isShutdown)
- throw ShutdownException();
- }
-
- // Increase nWaiters for the duration, notify the monitor if I'm
- // the last one out.
- //
- AtomicCount::ScopedIncrement si(
- nWaiters, boost::bind(&Monitor::notifyAll, &monitor));
-
- // No lock, all thread safe calls or local variables:
- //
- const long timeoutMs =
- (timeoutNs == TIME_INFINITE) ? -1 : timeoutNs/TIME_MSEC;
- CleanStruct<epoll_event> ee;
- Event* event = 0;
-
- // Loop till we get a completed event. Some events may repost
- // themselves and return 0, e.g. incomplete read or write events.
- //TODO aconway 2006-12-20: FIX THIS!
- while (!event) {
- int n = ::epoll_wait(epollFd, &ee, 1, timeoutMs); // Thread safe.
- if (n == 0) // Timeout
- return 0;
- if (n < 0 && errno == EINTR) // Interrupt, ignore it.
- continue;
- if (n < 0)
- throw QPID_POSIX_ERROR(errno);
- assert(n == 1);
- Descriptor* ed =
- reinterpret_cast<Descriptor*>(ee.data.ptr);
- if (ed == 0) // We're being shut-down.
- throw ShutdownException();
- assert(ed != 0);
- event = ed->wake(ee.events);
- }
- return event;
-}
-
-//EventChannel::Descriptor& EventChannel::Impl::getDescriptor(int fd) {
-// Mutex::ScopedLock l(monitor);
-// Descriptor& ed = descriptors[fd];
-// ed.activate(epollFd, fd);
-// return ed;
-//}
-
-
-// ================================================================
-// EventChannel
-
-EventChannel::shared_ptr EventChannel::create() {
- return shared_ptr(new EventChannel());
-}
-
-EventChannel::EventChannel() : impl(new EventChannel::Impl()) {}
-
-EventChannel::~EventChannel() {}
-
-void EventChannel::post(Event& e)
-{
- e.prepare(*impl);
-}
-
-Event* EventChannel::wait(Duration timeoutNs)
-{
- return impl->wait(timeoutNs);
-}
-
-void EventChannel::shutdown() {
- impl->shutdown();
-}
-
-
-// ================================================================
-// Event and subclasses.
-
-Event::~Event() {}
-
-Exception::shared_ptr_const Event::getException() const {
- return exception;
-}
-
-void Event::throwIfException() {
- if (getException())
- exception->throwSelf();
-}
-
-void Event::dispatch()
-{
- if (!callback.empty())
- callback();
-}
-
-void Event::setException(const std::exception& e) {
- const Exception* ex = dynamic_cast<const Exception*>(&e);
- if (ex)
- exception.reset(ex->clone().release());
- else
- exception.reset(new Exception(e));
-#ifndef NDEBUG
- // Throw and re-catch the exception. Has no effect on the
- // program but it triggers debuggers watching for throw. The
- // context that sets the exception is more informative for
- // debugging purposes than the one that ultimately throws it.
- //
- try {
- throwIfException();
- }
- catch (...) { } // Ignored.
-#endif
-}
-
-int FDEvent::getFDescriptor() const {
- return descriptor.getFD();
-}
-
-// TODO: AMS 21/12/06 Don't like the inline new, probably cause a memory leak
-ReadEvent::ReadEvent(int fd, void* buf, size_t sz,Callback cb, bool noWait) :
- IOEvent(cb, *(new EventChannel::Descriptor(fd)), sz, noWait), buffer(buf), bytesRead(0) {
-}
-
-void ReadEvent::prepare(EventChannel::Impl& impl) {
- EventChannel::Descriptor& d = getDescriptor();
- impl.activate(d);
- d.getQueue(IN).push(this);
-}
-
-void ReadEvent::complete(EventChannel::Descriptor& ed)
-{
- ssize_t n = ::read(getFDescriptor(),
- static_cast<char*>(buffer) + bytesRead,
- size - bytesRead);
- if (n > 0)
- bytesRead += n;
- if (n == 0 || (n < 0 && errno != EAGAIN)) {
- // Use ENODATA for file closed.
- setException(QPID_POSIX_ERROR(n == 0 ? ENODATA : errno));
- ed.shutdownUnsafe();
- }
-}
-
-WriteEvent::WriteEvent(int fd, const void* buf, size_t sz, Callback cb) :
- IOEvent(cb, *(new EventChannel::Descriptor(fd)), sz, noWait), buffer(buf), bytesWritten(0) {
-}
-
-void WriteEvent::prepare(EventChannel::Impl& impl) {
- EventChannel::Descriptor& d = getDescriptor();
- impl.activate(d);
- d.getQueue(OUT).push(this);
-}
-
-
-void WriteEvent::complete(EventChannel::Descriptor& ed)
-{
- ssize_t n = ::write(getFDescriptor(),
- static_cast<const char*>(buffer) + bytesWritten,
- size - bytesWritten);
- if (n > 0)
- bytesWritten += n;
- if(n < 0 && errno != EAGAIN) {
- setException(QPID_POSIX_ERROR(errno));
- ed.shutdownUnsafe(); // Called with lock held.
- }
-}
-
-AcceptEvent::AcceptEvent(int fd, Callback cb) :
- FDEvent(cb, *(new EventChannel::Descriptor(fd))), accepted(0) {
-}
-
-void AcceptEvent::prepare(EventChannel::Impl& impl) {
- EventChannel::Descriptor& d = getDescriptor();
- impl.activate(d);
- d.getQueue(IN).push(this);
-}
-
-void AcceptEvent::complete(EventChannel::Descriptor& ed)
-{
- accepted = ::accept(getFDescriptor(), 0, 0);
- if (accepted < 0) {
- setException(QPID_POSIX_ERROR(errno));
- ed.shutdownUnsafe(); // Called with lock held.
- }
-}
-
-}}
diff --git a/cpp/src/qpid/sys/posix/EventChannel.h b/cpp/src/qpid/sys/posix/EventChannel.h
deleted file mode 100644
index 85e121379a..0000000000
--- a/cpp/src/qpid/sys/posix/EventChannel.h
+++ /dev/null
@@ -1,214 +0,0 @@
-#ifndef _sys_EventChannel_h
-#define _sys_EventChannel_h
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "qpid/SharedObject.h"
-#include "qpid/Exception.h"
-#include "qpid/sys/Monitor.h"
-#include "qpid/sys/Time.h"
-
-#include <boost/function.hpp>
-#include <memory>
-
-namespace qpid {
-namespace sys {
-
-class Event;
-
-/**
- * Channel to post and wait for events.
- */
-class EventChannel : public qpid::SharedObject<EventChannel>
-{
- public:
- static shared_ptr create();
-
- /** Exception throw from wait() if channel is shut down. */
- class ShutdownException : public qpid::Exception {};
-
- ~EventChannel();
-
- /** Post an event to the channel. */
- void post(Event& event);
-
- /**
- * Wait for the next complete event, up to timeout.
- *@return Pointer to event or 0 if timeout elapses.
- *@exception ShutdownException if the channel is shut down.
- */
- Event* wait(Duration timeout = TIME_INFINITE);
-
- /**
- * Shut down the event channel.
- * Blocks till all threads have exited wait()
- */
- void shutdown();
-
-
- // Internal classes.
- class Impl;
- class Queue;
- class Descriptor;
-
- private:
-
- EventChannel();
-
- Mutex lock;
- boost::shared_ptr<Impl> impl;
-};
-
-/**
- * Base class for all Events.
- *
- * Derived classes define events representing various async IO operations.
- * When an event is complete, it is returned by the EventChannel to
- * a thread calling wait. The thread will call Event::dispatch() to
- * execute code associated with event completion.
- */
-class Event
-{
- public:
- /** Type for callback when event is dispatched */
- typedef boost::function0<void> Callback;
-
- virtual ~Event();
-
- /** Call the callback provided to the constructor, if any. */
- void dispatch();
-
- /**
- *If there was an exception processing this Event, return it.
- *@return 0 if there was no exception.
- */
- qpid::Exception::shared_ptr_const getException() const;
-
- /** If getException() throw the corresponding exception. */
- void throwIfException();
-
- /** Set the dispatch callback. */
- void setCallback(Callback cb) { callback = cb; }
-
- /** Set the exception. */
- void setException(const std::exception& e);
-
- protected:
- Event(Callback cb=0) : callback(cb) {}
-
- virtual void prepare(EventChannel::Impl&) = 0;
- virtual void complete(EventChannel::Descriptor&) = 0;
-
- Callback callback;
- Exception::shared_ptr_const exception;
-
- friend class EventChannel;
- friend class EventChannel::Queue;
-};
-
-/** Base class for events related to a file descriptor */
-class FDEvent : public Event {
- public:
- EventChannel::Descriptor& getDescriptor() const { return descriptor; }
- int getFDescriptor() const;
-
- protected:
- FDEvent(Callback cb, EventChannel::Descriptor& fd)
- : Event(cb), descriptor(fd) {}
- // TODO AMS: 1/6/07 I really don't think this is correct, but
- // the descriptor is immutable
- FDEvent& operator=(const FDEvent& rhs) { Event::operator=(rhs); return *this; }
-
- private:
- EventChannel::Descriptor& descriptor;
-};
-
-/** Base class for read or write events. */
-class IOEvent : public FDEvent {
- public:
- size_t getSize() const { return size; }
-
- protected:
- IOEvent(Callback cb, EventChannel::Descriptor& fd, size_t sz, bool noWait_) :
- FDEvent(cb, fd), size(sz), noWait(noWait_) {}
-
- size_t size;
- bool noWait;
-};
-
-/** Asynchronous read event */
-class ReadEvent : public IOEvent
-{
- public:
- explicit ReadEvent(int fd, void* buf=0, size_t sz=0,Callback cb=0, bool noWait=false);
-
- void* getBuffer() const { return buffer; }
- size_t getBytesRead() const { return bytesRead; }
-
- private:
- void prepare(EventChannel::Impl&);
- void complete(EventChannel::Descriptor&);
- ssize_t doRead();
-
- void* buffer;
- size_t bytesRead;
-};
-
-/** Asynchronous write event */
-class WriteEvent : public IOEvent
-{
- public:
- explicit WriteEvent(int fd, const void* buf=0, size_t sz=0, Callback cb=0);
-
- const void* getBuffer() const { return buffer; }
- size_t getBytesWritten() const { return bytesWritten; }
-
- private:
- void prepare(EventChannel::Impl&);
- void complete(EventChannel::Descriptor&);
- ssize_t doWrite();
-
- const void* buffer;
- size_t bytesWritten;
-};
-
-
-/** Asynchronous socket accept event */
-class AcceptEvent : public FDEvent
-{
- public:
- /** Accept a connection on fd. */
- explicit AcceptEvent(int fd, Callback cb=0);
-
- /** Get descriptor for accepted server socket */
- int getAcceptedDesscriptor() const { return accepted; }
-
- private:
- void prepare(EventChannel::Impl&);
- void complete(EventChannel::Descriptor&);
-
- int accepted;
-};
-
-
-}}
-
-
-
-#endif /*!_sys_EventChannel_h*/
diff --git a/cpp/src/qpid/sys/posix/EventChannelAcceptor.cpp b/cpp/src/qpid/sys/posix/EventChannelAcceptor.cpp
deleted file mode 100644
index a61a66c577..0000000000
--- a/cpp/src/qpid/sys/posix/EventChannelAcceptor.cpp
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "EventChannelConnection.h"
-
-#include "qpid/sys/ConnectionOutputHandler.h"
-#include "qpid/sys/ConnectionInputHandler.h"
-#include "qpid/sys/ConnectionInputHandlerFactory.h"
-#include "qpid/sys/Acceptor.h"
-#include "qpid/sys/Socket.h"
-#include "qpid/framing/Buffer.h"
-#include "qpid/framing/AMQFrame.h"
-#include "qpid/Exception.h"
-
-#include <sys/socket.h>
-#include <netdb.h>
-
-#include <boost/assert.hpp>
-#include <boost/ptr_container/ptr_vector.hpp>
-#include <boost/ptr_container/ptr_deque.hpp>
-#include <boost/bind.hpp>
-#include <boost/scoped_ptr.hpp>
-
-#include <iostream>
-
-namespace qpid {
-namespace sys {
-
-using namespace qpid::framing;
-using namespace std;
-
-class EventChannelAcceptor : public Acceptor {
- public:
-
-
- EventChannelAcceptor(
- int16_t port_, int backlog, int nThreads, bool trace_
- );
-
- uint16_t getPort() const;
- std::string getHost() const;
-
- void run(ConnectionInputHandlerFactory* factory);
-
- void shutdown();
-
- private:
-
- void accept();
-
- Mutex lock;
- Socket listener;
- const int port;
- const bool isTrace;
- bool isRunning;
- boost::ptr_vector<EventChannelConnection> connections;
- AcceptEvent acceptEvent;
- ConnectionInputHandlerFactory* factory;
- bool isShutdown;
- EventChannelThreads::shared_ptr threads;
-};
-
-Acceptor::shared_ptr Acceptor::create(
- int16_t port, int backlog, int threads, bool trace)
-{
- return Acceptor::shared_ptr(
- new EventChannelAcceptor(port, backlog, threads, trace));
-}
-
-// Must define Acceptor virtual dtor.
-Acceptor::~Acceptor() {}
-
-EventChannelAcceptor::EventChannelAcceptor(
- int16_t port_, int backlog, int nThreads, bool trace_
-) : listener(Socket::createTcp()),
- port(listener.listen(int(port_), backlog)),
- isTrace(trace_),
- isRunning(false),
- acceptEvent(listener.fd(),
- boost::bind(&EventChannelAcceptor::accept, this)),
- factory(0),
- isShutdown(false),
- threads(EventChannelThreads::create(EventChannel::create(), nThreads))
-{ }
-
-uint16_t EventChannelAcceptor::getPort() const {
- return port; // Immutable no need for lock.
-}
-
-std::string EventChannelAcceptor::getHost() const {
- ::sockaddr_storage name; // big enough for any socket address
- ::socklen_t namelen = sizeof(name);
- if (::getsockname(listener.fd(), (::sockaddr*)&name, &namelen) < 0)
- throw QPID_POSIX_ERROR(errno);
-
- char dispName[NI_MAXHOST];
- if (int rc=::getnameinfo((::sockaddr*)&name, namelen, dispName, sizeof(dispName), 0, 0, NI_NUMERICHOST) != 0)
- throw QPID_POSIX_ERROR(rc);
- return dispName;
-}
-
-void EventChannelAcceptor::run(ConnectionInputHandlerFactory* f) {
- {
- Mutex::ScopedLock l(lock);
- if (!isRunning && !isShutdown) {
- isRunning = true;
- factory = f;
- threads->post(acceptEvent);
- }
- }
- threads->join(); // Wait for shutdown.
-}
-
-void EventChannelAcceptor::shutdown() {
- bool doShutdown = false;
- {
- Mutex::ScopedLock l(lock);
- doShutdown = !isShutdown; // I'm the shutdown thread.
- isShutdown = true;
- }
- if (doShutdown) {
- ::close(acceptEvent.getFDescriptor());
- threads->shutdown();
- for_each(connections.begin(), connections.end(),
- boost::bind(&EventChannelConnection::close, _1));
- }
- threads->join();
-}
-
-void EventChannelAcceptor::accept()
-{
- // No lock, we only post one accept event at a time.
- if (isShutdown)
- return;
- if (acceptEvent.getException()) {
- Exception::log(*acceptEvent.getException(),
- "EventChannelAcceptor::accept");
- shutdown();
- return;
- }
- int fd = acceptEvent.getAcceptedDesscriptor();
- threads->post(acceptEvent); // Keep accepting.
- // TODO aconway 2006-11-29: Need to reap closed connections also.
- connections.push_back(
- new EventChannelConnection(threads, *factory, fd, fd, isTrace));
-}
-
-}} // namespace qpid::sys
diff --git a/cpp/src/qpid/sys/posix/EventChannelConnection.cpp b/cpp/src/qpid/sys/posix/EventChannelConnection.cpp
deleted file mode 100644
index f4b6396dd1..0000000000
--- a/cpp/src/qpid/sys/posix/EventChannelConnection.cpp
+++ /dev/null
@@ -1,237 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <iostream>
-
-#include <boost/bind.hpp>
-#include <boost/assert.hpp>
-
-#include "EventChannelConnection.h"
-#include "qpid/sys/ConnectionInputHandlerFactory.h"
-#include "qpid/QpidError.h"
-
-using namespace std;
-using namespace qpid;
-using namespace qpid::framing;
-
-namespace qpid {
-namespace sys {
-
-const size_t EventChannelConnection::bufferSize = 65536;
-
-EventChannelConnection::EventChannelConnection(
- EventChannelThreads::shared_ptr threads_,
- ConnectionInputHandlerFactory& factory_,
- int rfd,
- int wfd,
- bool isTrace_
-) :
- readFd(rfd),
- writeFd(wfd ? wfd : rfd),
- readEvent(readFd),
- writeEvent(writeFd),
- readCallback(boost::bind(&EventChannelConnection::closeOnException,
- this, &EventChannelConnection::endInitRead)),
-
- isWriting(false),
- isClosed(false),
- threads(threads_),
- handler(factory_.create(this)),
- in(bufferSize),
- out(bufferSize),
- isTrace(isTrace_)
-{
- assert(readFd > 0);
- assert(writeFd > 0);
- closeOnException(&EventChannelConnection::startRead);
-}
-
-
-void EventChannelConnection::send(AMQFrame& frame) {
- {
- Monitor::ScopedLock lock(monitor);
- writeFrames.push_back(frame);
- }
- closeOnException(&EventChannelConnection::startWrite);
-}
-
-void EventChannelConnection::close() {
- {
- Monitor::ScopedLock lock(monitor);
- if (isClosed)
- return;
- isClosed = true;
- }
- ::close(readFd);
- ::close(writeFd);
- {
- Monitor::ScopedLock lock(monitor);
- while (busyThreads > 0)
- monitor.wait();
- }
- handler->closed();
-}
-
-void EventChannelConnection::closeNoThrow() {
- Exception::tryCatchLog<void>(
- boost::bind(&EventChannelConnection::close, this),
- false,
- "Exception closing channel"
- );
-}
-
-/**
- * Call f in a try/catch block and close the connection if
- * an exception is thrown.
- */
-void EventChannelConnection::closeOnException(MemberFnPtr f)
-{
- try {
- Exception::tryCatchLog<void>(
- boost::bind(f, this),
- "Closing connection due to exception"
- );
- return;
- } catch (...) {
- // Exception was already logged by tryCatchLog
- closeNoThrow();
- }
-}
-
-// Post the write event.
-// Always called inside closeOnException.
-// Called by endWrite and send, but only one thread writes at a time.
-//
-void EventChannelConnection::startWrite() {
- {
- Monitor::ScopedLock lock(monitor);
- // Stop if closed or a write event is already in progress.
- if (isClosed || isWriting)
- return;
- if (writeFrames.empty()) {
- isWriting = false;
- return;
- }
- isWriting = true;
- AMQFrame& frame = writeFrames.front();
- writeFrames.pop_front();
- // No need to lock here - only one thread can be writing at a time.
- out.clear();
- if (isTrace)
- cout << "Send on socket " << writeFd << ": " << frame << endl;
- frame.encode(out);
- out.flip();
- }
- // TODO: AMS 1/6/07 This only works because we already have the correct fd
- // in the descriptor - change not to use assigment
- writeEvent = WriteEvent(
- writeFd, out.start(), out.available(),
- boost::bind(&EventChannelConnection::closeOnException,
- this, &EventChannelConnection::endWrite));
- threads->post(writeEvent);
-}
-
-// ScopedBusy ctor increments busyThreads.
-// dtor decrements and calls monitor.notifyAll if it reaches 0.
-//
-struct EventChannelConnection::ScopedBusy : public AtomicCount::ScopedIncrement
-{
- ScopedBusy(EventChannelConnection& ecc)
- : AtomicCount::ScopedIncrement(
- ecc.busyThreads, boost::bind(&Monitor::notifyAll, &ecc.monitor))
- {}
-};
-
-// Write event completed.
-// Always called by a channel thread inside closeOnException.
-//
-void EventChannelConnection::endWrite() {
- ScopedBusy(*this);
- {
- Monitor::ScopedLock lock(monitor);
- assert(isWriting);
- isWriting = false;
- if (isClosed)
- return;
- writeEvent.throwIfException();
- if (writeEvent.getBytesWritten() < writeEvent.getSize()) {
- // Keep writing the current event till done.
- isWriting = true;
- threads->post(writeEvent);
- }
- }
- // Continue writing from writeFrames queue.
- startWrite();
-}
-
-
-// Post the read event.
-// Always called inside closeOnException.
-// Called from ctor and end[Init]Read, so only one call at a time
-// is possible since we only post one read event at a time.
-//
-void EventChannelConnection::startRead() {
- // Non blocking read, as much as we can swallow.
- readEvent = ReadEvent(
- readFd, in.start(), in.available(), readCallback);
- threads->post(readEvent);
-}
-
-// Completion of initial read, expect protocolInit.
-// Always called inside closeOnException in channel thread.
-// Only called by one thread at a time.
-void EventChannelConnection::endInitRead() {
- ScopedBusy(*this);
- if (!isClosed) {
- readEvent.throwIfException();
- in.move(readEvent.getBytesRead());
- in.flip();
- ProtocolInitiation protocolInit;
- if(protocolInit.decode(in)){
- handler->initiated(protocolInit);
- readCallback = boost::bind(
- &EventChannelConnection::closeOnException,
- this, &EventChannelConnection::endRead);
- }
- in.compact();
- // Continue reading.
- startRead();
- }
-}
-
-// Normal reads, expect a frame.
-// Always called inside closeOnException in channel thread.
-void EventChannelConnection::endRead() {
- ScopedBusy(*this);
- if (!isClosed) {
- readEvent.throwIfException();
- in.move(readEvent.getBytesRead());
- in.flip();
- AMQFrame frame;
- while (frame.decode(in)) {
- if (isTrace)
- cout << "Received on socket " << readFd
- << ": " << frame << endl;
- handler->received(frame);
- }
- in.compact();
- startRead();
- }
-}
-
-}} // namespace qpid::sys
diff --git a/cpp/src/qpid/sys/posix/EventChannelConnection.h b/cpp/src/qpid/sys/posix/EventChannelConnection.h
deleted file mode 100644
index a4ca5de517..0000000000
--- a/cpp/src/qpid/sys/posix/EventChannelConnection.h
+++ /dev/null
@@ -1,96 +0,0 @@
-#ifndef _posix_EventChannelConnection_h
-#define _posix_EventChannelConnection_h
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <boost/ptr_container/ptr_deque.hpp>
-
-#include "EventChannelThreads.h"
-#include "qpid/sys/Monitor.h"
-#include "qpid/sys/ConnectionOutputHandler.h"
-#include "qpid/sys/ConnectionInputHandler.h"
-#include "qpid/sys/AtomicCount.h"
-#include "qpid/framing/AMQFrame.h"
-
-namespace qpid {
-namespace sys {
-
-class ConnectionInputHandlerFactory;
-
-/**
- * Implements SessionContext and delegates to a SessionHandler
- * for a connection via the EventChannel.
- *@param readDescriptor file descriptor for reading.
- *@param writeDescriptor file descriptor for writing,
- * by default same as readDescriptor
- */
-class EventChannelConnection : public ConnectionOutputHandler {
- public:
- EventChannelConnection(
- EventChannelThreads::shared_ptr threads,
- ConnectionInputHandlerFactory& factory,
- int readDescriptor,
- int writeDescriptor = 0,
- bool isTrace = false
- );
-
- virtual void send(qpid::framing::AMQFrame& frame);
- virtual void close();
-
- private:
- typedef std::deque<qpid::framing::AMQFrame> FrameQueue;
- typedef void (EventChannelConnection::*MemberFnPtr)();
- struct ScopedBusy;
-
- void startWrite();
- void endWrite();
- void startRead();
- void endInitRead();
- void endRead();
- void closeNoThrow();
- void closeOnException(MemberFnPtr);
- bool shouldContinue(bool& flag);
-
- static const size_t bufferSize;
-
- Monitor monitor;
-
- int readFd, writeFd;
- ReadEvent readEvent;
- WriteEvent writeEvent;
- Event::Callback readCallback;
- bool isWriting;
- bool isClosed;
- AtomicCount busyThreads;
-
- EventChannelThreads::shared_ptr threads;
- std::auto_ptr<ConnectionInputHandler> handler;
- qpid::framing::Buffer in, out;
- FrameQueue writeFrames;
- bool isTrace;
-
- friend struct ScopedBusy;
-};
-
-
-}} // namespace qpid::sys
-
-
-
-#endif /*!_posix_EventChannelConnection_h*/
diff --git a/cpp/src/qpid/sys/posix/EventChannelThreads.cpp b/cpp/src/qpid/sys/posix/EventChannelThreads.cpp
deleted file mode 100644
index 70954d0c16..0000000000
--- a/cpp/src/qpid/sys/posix/EventChannelThreads.cpp
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <iostream>
-#include <limits>
-
-#include <boost/bind.hpp>
-
-#include "qpid/sys/Runnable.h"
-
-#include "EventChannelThreads.h"
-
-namespace qpid {
-namespace sys {
-
-const size_t EventChannelThreads::unlimited =
- std::numeric_limits<size_t>::max();
-
-EventChannelThreads::shared_ptr EventChannelThreads::create(
- EventChannel::shared_ptr ec, size_t min, size_t max
-)
-{
- return EventChannelThreads::shared_ptr(
- new EventChannelThreads(ec, min, max));
-}
-
-EventChannelThreads::EventChannelThreads(
- EventChannel::shared_ptr ec, size_t min, size_t max) :
- minThreads(std::max(size_t(1), min)),
- maxThreads(std::min(min, max)),
- channel(ec),
- nWaiting(0),
- state(RUNNING)
-{
- Monitor::ScopedLock l(monitor);
- while (workers.size() < minThreads)
- workers.push_back(Thread(*this));
-}
-
-EventChannelThreads::~EventChannelThreads() {
- shutdown();
- join();
-}
-
-void EventChannelThreads::shutdown()
-{
- Monitor::ScopedLock lock(monitor);
- if (state != RUNNING) // Already shutting down.
- return;
- state = TERMINATING;
- channel->shutdown();
- monitor.notify(); // Wake up one join() thread.
-}
-
-void EventChannelThreads::join()
-{
- {
- Monitor::ScopedLock lock(monitor);
- while (state == RUNNING) // Wait for shutdown to start.
- monitor.wait();
- if (state == SHUTDOWN) // Shutdown is complete
- return;
- if (state == JOINING) {
- // Someone else is doing the join.
- while (state != SHUTDOWN)
- monitor.wait();
- return;
- }
- // I'm the joining thread
- assert(state == TERMINATING);
- state = JOINING;
- } // Drop the lock.
-
- for (size_t i = 0; i < workers.size(); ++i) {
- assert(state == JOINING); // Only this thread can change JOINING.
- workers[i].join();
- }
- state = SHUTDOWN;
- monitor.notifyAll(); // Notify any other join() threads.
-}
-
-void EventChannelThreads::addThread() {
- Monitor::ScopedLock l(monitor);
- if (workers.size() < maxThreads)
- workers.push_back(Thread(*this));
-}
-
-void EventChannelThreads::run()
-{
- // Start life waiting. Decrement on exit.
- AtomicCount::ScopedIncrement inc(nWaiting);
- try {
- while (true) {
- Event* e = channel->wait();
- assert(e != 0);
- AtomicCount::ScopedDecrement dec(nWaiting);
- // Make sure there's at least one waiting thread.
- if (dec == 0 && state == RUNNING)
- addThread();
- e->dispatch();
- }
- }
- catch (const EventChannel::ShutdownException& e) {
- return;
- }
- catch (const std::exception& e) {
- Exception::log(e, "Exception in EventChannelThreads::run()");
- }
-}
-
-}}
diff --git a/cpp/src/qpid/sys/posix/EventChannelThreads.h b/cpp/src/qpid/sys/posix/EventChannelThreads.h
deleted file mode 100644
index 19112cf4db..0000000000
--- a/cpp/src/qpid/sys/posix/EventChannelThreads.h
+++ /dev/null
@@ -1,105 +0,0 @@
-#ifndef _posix_EventChannelThreads_h
-#define _sys_EventChannelThreads_h
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "EventChannel.h"
-
-#include "qpid/Exception.h"
-#include "qpid/sys/AtomicCount.h"
-#include "qpid/sys/Monitor.h"
-#include "qpid/sys/Thread.h"
-#include "qpid/sys/Time.h"
-#include "qpid/sys/Runnable.h"
-
-#include <vector>
-
-namespace qpid {
-namespace sys {
-
-/**
- Dynamic thread pool serving an EventChannel.
-
- Threads run a loop { e = wait(); e->dispatch(); }
- The size of the thread pool is automatically adjusted to optimal size.
-*/
-class EventChannelThreads :
- public qpid::SharedObject<EventChannelThreads>,
- private sys::Runnable
-{
- public:
- /** Constant to represent an unlimited number of threads */
- static const size_t unlimited;
-
- /**
- * Create the thread pool and start initial threads.
- * @param minThreads Pool will initialy contain minThreads threads and
- * will never shrink to less until shutdown.
- * @param maxThreads Pool will never grow to more than maxThreads.
- */
- static EventChannelThreads::shared_ptr create(
- EventChannel::shared_ptr channel = EventChannel::create(),
- size_t minThreads = 1,
- size_t maxThreads = unlimited
- );
-
- ~EventChannelThreads();
-
- /** Post event to the underlying channel */
- void post(Event& event) { channel->post(event); }
-
- /**
- * Terminate all threads.
- *
- * Returns immediately, use join() to wait till all threads are
- * shut down.
- */
- void shutdown();
-
- /** Wait for all threads to terminate. */
- void join();
-
- private:
- typedef std::vector<sys::Thread> Threads;
- typedef enum {
- RUNNING, TERMINATING, JOINING, SHUTDOWN
- } State;
-
- EventChannelThreads(
- EventChannel::shared_ptr channel, size_t min, size_t max);
-
- void addThread();
-
- void run();
- bool keepRunning();
- void adjustThreads();
-
- Monitor monitor;
- size_t minThreads;
- size_t maxThreads;
- EventChannel::shared_ptr channel;
- Threads workers;
- sys::AtomicCount nWaiting;
- State state;
-};
-
-
-}}
-
-
-#endif /*!_sys_EventChannelThreads_h*/
diff --git a/cpp/src/qpid/sys/posix/PosixAcceptor.cpp b/cpp/src/qpid/sys/posix/PosixAcceptor.cpp
deleted file mode 100644
index 0575380a14..0000000000
--- a/cpp/src/qpid/sys/posix/PosixAcceptor.cpp
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/sys/Acceptor.h"
-#include "qpid/Exception.h"
-
-namespace qpid {
-namespace sys {
-
-namespace {
-void fail() { throw qpid::Exception("PosixAcceptor not implemented"); }
-}
-
-class PosixAcceptor : public Acceptor {
- public:
- virtual uint16_t getPort() const { fail(); return 0; }
- virtual std::string getPort() const { fail(); return std::string(); }
- virtual void run(qpid::sys::ConnectionInputHandlerFactory* ) { fail(); }
- virtual void shutdown() { fail(); }
-};
-
-// Define generic Acceptor::create() to return APRAcceptor.
- Acceptor::shared_ptr Acceptor::create(int16_t , int, int, bool)
-{
- return Acceptor::shared_ptr(new PosixAcceptor());
-}
-
-// Must define Acceptor virtual dtor.
-Acceptor::~Acceptor() {}
-
-}}
diff --git a/cpp/src/qpid/sys/posix/PrivatePosix.h b/cpp/src/qpid/sys/posix/PrivatePosix.h
index 2707057ef0..9ec9770cab 100644
--- a/cpp/src/qpid/sys/posix/PrivatePosix.h
+++ b/cpp/src/qpid/sys/posix/PrivatePosix.h
@@ -30,10 +30,15 @@ struct timeval;
namespace qpid {
namespace sys {
+// Private Time related implementation details
struct timespec& toTimespec(struct timespec& ts, const Duration& t);
struct timeval& toTimeval(struct timeval& tv, const Duration& t);
Duration toTime(const struct timespec& ts);
+// Private socket related implementation details
+class SocketPrivate;
+int toFd(const SocketPrivate* s);
+
}}
#endif /*!_sys_posix_PrivatePosix_h*/
diff --git a/cpp/src/qpid/sys/posix/Socket.cpp b/cpp/src/qpid/sys/posix/Socket.cpp
index d46e7943d9..6ee7a84beb 100644
--- a/cpp/src/qpid/sys/posix/Socket.cpp
+++ b/cpp/src/qpid/sys/posix/Socket.cpp
@@ -25,6 +25,8 @@
#include "check.h"
#include "PrivatePosix.h"
+#include <fcntl.h>
+#include <sys/types.h>
#include <sys/socket.h>
#include <sys/errno.h>
#include <netinet/in.h>
@@ -32,30 +34,63 @@
#include <boost/format.hpp>
-using namespace qpid::sys;
+namespace qpid {
+namespace sys {
-Socket Socket::createTcp()
+class SocketPrivate {
+public:
+ SocketPrivate(int f = -1) :
+ fd(f)
+ {}
+
+ int fd;
+};
+
+Socket::Socket() :
+ impl(new SocketPrivate)
+{
+ createTcp();
+}
+
+Socket::Socket(SocketPrivate* sp) :
+ impl(sp)
+{}
+
+Socket::~Socket() {
+ delete impl;
+}
+
+void Socket::createTcp() const
{
+ int& socket = impl->fd;
+ if (socket != -1) Socket::close();
int s = ::socket (PF_INET, SOCK_STREAM, 0);
if (s < 0) throw QPID_POSIX_ERROR(errno);
- return s;
+ socket = s;
}
-Socket::Socket(int descriptor) : socket(descriptor) {}
-
-void Socket::setTimeout(const Duration& interval)
+void Socket::setTimeout(const Duration& interval) const
{
+ const int& socket = impl->fd;
struct timeval tv;
toTimeval(tv, interval);
setsockopt(socket, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
setsockopt(socket, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
}
-void Socket::connect(const std::string& host, int port)
+void Socket::setNonblocking() const {
+ QPID_POSIX_CHECK(::fcntl(impl->fd, F_SETFL, O_NONBLOCK));
+}
+
+
+void Socket::connect(const std::string& host, int port) const
{
+ const int& socket = impl->fd;
struct sockaddr_in name;
name.sin_family = AF_INET;
name.sin_port = htons(port);
+ // TODO: Be good to make this work for IPv6 as well as IPv4
+ // Use more modern lookup functions
struct hostent* hp = gethostbyname ( host.c_str() );
if (hp == 0) throw QPID_POSIX_ERROR(errno);
memcpy(&name.sin_addr.s_addr, hp->h_addr_list[0], hp->h_length);
@@ -64,16 +99,18 @@ void Socket::connect(const std::string& host, int port)
}
void
-Socket::close()
+Socket::close() const
{
- if (socket == 0) return;
+ int& socket = impl->fd;
+ if (socket == -1) return;
if (::close(socket) < 0) throw QPID_POSIX_ERROR(errno);
- socket = 0;
+ socket = -1;
}
ssize_t
-Socket::send(const void* data, size_t size)
+Socket::send(const void* data, size_t size) const
{
+ const int& socket = impl->fd;
ssize_t sent = ::send(socket, data, size, 0);
if (sent < 0) {
if (errno == ECONNRESET) return SOCKET_EOF;
@@ -84,8 +121,9 @@ Socket::send(const void* data, size_t size)
}
ssize_t
-Socket::recv(void* data, size_t size)
+Socket::recv(void* data, size_t size) const
{
+ const int& socket = impl->fd;
ssize_t received = ::recv(socket, data, size, 0);
if (received < 0) {
if (errno == ETIMEDOUT) return SOCKET_TIMEOUT;
@@ -94,8 +132,9 @@ Socket::recv(void* data, size_t size)
return received;
}
-int Socket::listen(int port, int backlog)
+int Socket::listen(int port, int backlog) const
{
+ const int& socket = impl->fd;
struct sockaddr_in name;
name.sin_family = AF_INET;
name.sin_port = htons(port);
@@ -111,8 +150,45 @@ int Socket::listen(int port, int backlog)
return ntohs(name.sin_port);
}
+
+Socket* Socket::accept(struct sockaddr *addr, socklen_t *addrlen) const
+{
+ int afd = ::accept(impl->fd, addr, addrlen);
+ if ( afd >= 0)
+ return new Socket(new SocketPrivate(afd));
+ else if (errno == EAGAIN)
+ return 0;
+ else throw QPID_POSIX_ERROR(errno);
+}
+
+int Socket::read(void *buf, size_t count) const
+{
+ return ::read(impl->fd, buf, count);
+}
+
+int Socket::write(const void *buf, size_t count) const
+{
+ return ::write(impl->fd, buf, count);
+}
+
+std::string Socket::getSockname() const
+{
+ ::sockaddr_storage name; // big enough for any socket address
+ ::socklen_t namelen = sizeof(name);
+
+ const int& socket = impl->fd;
+ if (::getsockname(socket, (::sockaddr*)&name, &namelen) < 0)
+ throw QPID_POSIX_ERROR(errno);
-int Socket::fd() const
+ char dispName[NI_MAXHOST];
+ if (int rc=::getnameinfo((::sockaddr*)&name, namelen, dispName, sizeof(dispName), 0, 0, NI_NUMERICHOST) != 0)
+ throw QPID_POSIX_ERROR(rc);
+ return dispName;
+}
+
+int toFd(const SocketPrivate* s)
{
- return socket;
+ return s->fd;
}
+
+}} // namespace qpid::sys
diff --git a/cpp/src/qpid/sys/posix/Socket.h b/cpp/src/qpid/sys/posix/Socket.h
deleted file mode 100644
index ca87104471..0000000000
--- a/cpp/src/qpid/sys/posix/Socket.h
+++ /dev/null
@@ -1,76 +0,0 @@
-#ifndef _sys_posix_Socket_h
-#define _sys_posix_Socket_h
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <string>
-#include "qpid/sys/Time.h"
-
-namespace qpid {
-namespace sys {
-
-class Socket
-{
- public:
- /** Create an initialized TCP socket */
- static Socket createTcp();
-
- /** Create a socket wrapper for descriptor. */
- Socket(int descriptor = 0);
-
- /** Set timeout for read and write */
- void setTimeout(const Duration& interval);
-
- void connect(const std::string& host, int port);
-
- void close();
-
- enum { SOCKET_TIMEOUT=-2, SOCKET_EOF=-3 } ErrorCode;
-
- /** Returns bytes sent or an ErrorCode value < 0. */
- ssize_t send(const void* data, size_t size);
-
- /**
- * Returns bytes received, an ErrorCode value < 0 or 0
- * if the connection closed in an orderly manner.
- */
- ssize_t recv(void* data, size_t size);
-
- /** Bind to a port and start listening.
- *@param port 0 means choose an available port.
- *@param backlog maximum number of pending connections.
- *@return The bound port.
- */
- int listen(int port = 0, int backlog = 10);
-
- /** Get file descriptor */
- int fd() const;
-
- private:
- void init() const;
- mutable int socket; // Initialized on demand.
-};
-
-}}
-
-
-#endif /*!_sys_Socket_h*/
diff --git a/cpp/src/tests/.valgrind.supp-default b/cpp/src/tests/.valgrind.supp-default
index 21fa58db45..18e69df6ff 100644
--- a/cpp/src/tests/.valgrind.supp-default
+++ b/cpp/src/tests/.valgrind.supp-default
@@ -7,4 +7,12 @@
obj:*/libcpg.so.2.0.0
}
+{
+ Uninitialised value problem in dlopen
+ Memcheck:Cond
+ fun:_dl_relocate_object
+ fun:*dl_*
+ obj:/lib64/ld-2.6.so
+ obj:*
+}