diff options
author | Andrew Stitcher <astitcher@apache.org> | 2007-07-27 17:19:30 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2007-07-27 17:19:30 +0000 |
commit | 65ea2f177bd0810590895d89a490af8cea60253b (patch) | |
tree | 1a1432d706ac5f43dc8cdd5fdb0d7b5566dd5d06 | |
parent | 0a7f3f5dde40e59e90588e4ab7ba2ba99651c0f4 (diff) | |
download | qpid-python-65ea2f177bd0810590895d89a490af8cea60253b.tar.gz |
* Asynchronous network IO subsystem
- This is now implemented such that it very nearly only depends on the platform
code (Socker & Poller), this is not 100% true at present, but should be simple
to finish.
- This is still not the default (use "./configure --disable-apr-netio" to get it)
- Interrupting the broker gives a known error
- Default for number of broker io threads is not correct (needs to be number of CPUs -
it will run slower with too many io threads)
* EventChannel code
- Deleted all EventChannel code as it's entirely superceded by this new shiny code ;-)
* Rearranged the platform Socket implementations a bit for better abstraction
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@560323 13f79535-47bb-0310-9956-ffa450edef68
26 files changed, 691 insertions, 1862 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index cf1598bcca..7950299f64 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -58,18 +58,12 @@ apr_plat_src = \ apr_plat_hdr = \ qpid/sys/apr/Condition.h \ qpid/sys/apr/Mutex.h \ - qpid/sys/apr/Socket.h \ qpid/sys/apr/Thread.h posix_netio_src = \ - qpid/sys/posix/EventChannel.cpp \ - qpid/sys/posix/EventChannelAcceptor.cpp \ - qpid/sys/posix/EventChannelConnection.cpp \ - qpid/sys/posix/EventChannelThreads.cpp + qpid/sys/AsynchIOAcceptor.cpp -posix_netio_hdr = \ - qpid/sys/posix/EventChannel.h \ - qpid/sys/posix/EventChannelThreads.h +posix_netio_hdr = posix_plat_src = \ qpid/sys/Dispatcher.cpp \ @@ -86,7 +80,6 @@ posix_plat_hdr = \ qpid/sys/posix/Condition.h \ qpid/sys/posix/PrivatePosix.h \ qpid/sys/posix/Mutex.h \ - qpid/sys/posix/Socket.h \ qpid/sys/posix/Thread.h if USE_APR_NETIO diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp index 257e2b577a..d92a177465 100644 --- a/cpp/src/qpid/client/Connector.cpp +++ b/cpp/src/qpid/client/Connector.cpp @@ -54,7 +54,6 @@ Connector::~Connector(){ } void Connector::connect(const std::string& host, int port){ - socket = Socket::createTcp(); socket.connect(host, port); closed = false; receiver = Thread(this); diff --git a/cpp/src/qpid/sys/Acceptor.h b/cpp/src/qpid/sys/Acceptor.h index 8d6bca8f29..8ad3186d07 100644 --- a/cpp/src/qpid/sys/Acceptor.h +++ b/cpp/src/qpid/sys/Acceptor.h @@ -37,10 +37,12 @@ class Acceptor : public qpid::SharedObject<Acceptor> virtual ~Acceptor() = 0; virtual uint16_t getPort() const = 0; virtual std::string getHost() const = 0; - virtual void run(qpid::sys::ConnectionInputHandlerFactory* factory) = 0; + virtual void run(ConnectionInputHandlerFactory* factory) = 0; virtual void shutdown() = 0; }; +inline Acceptor::~Acceptor() {} + }} diff --git a/cpp/src/qpid/sys/AsynchIO.h b/cpp/src/qpid/sys/AsynchIO.h index 7accde17b0..7cc7995ee2 100644 --- a/cpp/src/qpid/sys/AsynchIO.h +++ b/cpp/src/qpid/sys/AsynchIO.h @@ -35,14 +35,14 @@ namespace sys { */ class AsynchAcceptor { public: - typedef boost::function1<void, int> Callback; + typedef boost::function1<void, const Socket&> Callback; private: Callback acceptedCallback; DispatchHandle handle; public: - AsynchAcceptor(int fd, Callback callback); + AsynchAcceptor(const Socket& s, Callback callback); void start(Poller::shared_ptr poller); private: @@ -75,9 +75,9 @@ public: bytes(b), byteCount(s), dataStart(0), - dataCount(s) + dataCount(0) {} - + virtual ~Buffer() {} }; @@ -85,6 +85,7 @@ public: typedef boost::function2<void, AsynchIO&, Buffer*> ReadCallback; typedef boost::function1<void, AsynchIO&> EofCallback; typedef boost::function1<void, AsynchIO&> DisconnectCallback; + typedef boost::function2<void, AsynchIO&, const Socket&> ClosedCallback; typedef boost::function1<void, AsynchIO&> BuffersEmptyCallback; typedef boost::function1<void, AsynchIO&> IdleCallback; @@ -92,26 +93,33 @@ private: ReadCallback readCallback; EofCallback eofCallback; DisconnectCallback disCallback; + ClosedCallback closedCallback; BuffersEmptyCallback emptyCallback; IdleCallback idleCallback; std::deque<Buffer*> bufferQueue; std::deque<Buffer*> writeQueue; + bool queuedClose; public: - AsynchIO(int fd, + AsynchIO(const Socket& s, ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb, - BuffersEmptyCallback eCb = 0, IdleCallback iCb = 0); + ClosedCallback cCb = 0, BuffersEmptyCallback eCb = 0, IdleCallback iCb = 0); void queueForDeletion(); void start(Poller::shared_ptr poller); void queueReadBuffer(Buffer* buff); - void queueWrite(Buffer* buff); + void queueWrite(Buffer* buff = 0); + void unread(Buffer* buff); + void queueWriteClose(); + Buffer* getQueuedBuffer(); + const Socket& getSocket() const { return DispatchHandle::getSocket(); } private: ~AsynchIO(); void readable(DispatchHandle& handle); void writeable(DispatchHandle& handle); void disconnected(DispatchHandle& handle); + void close(DispatchHandle& handle); }; }} diff --git a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp new file mode 100644 index 0000000000..bf4a3ff842 --- /dev/null +++ b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp @@ -0,0 +1,308 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Acceptor.h" + +#include "Socket.h" +#include "AsynchIO.h" +#include "Mutex.h" +#include "Thread.h" + +#include "qpid/sys/ConnectionOutputHandler.h" +#include "qpid/sys/ConnectionInputHandler.h" +#include "qpid/sys/ConnectionInputHandlerFactory.h" +#include "qpid/framing/Buffer.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/log/Statement.h" + +#include <boost/bind.hpp> +#include <boost/assert.hpp> +#include <queue> +#include <vector> +#include <memory> + +namespace qpid { +namespace sys { + +class AsynchIOAcceptor : public Acceptor { + Poller::shared_ptr poller; + Socket listener; + int numIOThreads; + const uint16_t listeningPort; + +public: + AsynchIOAcceptor(int16_t port, int backlog, int threads, bool trace); + ~AsynchIOAcceptor() {} + void run(ConnectionInputHandlerFactory* factory); + void shutdown(); + + uint16_t getPort() const; + std::string getHost() const; + +private: + void accepted(Poller::shared_ptr, const Socket&, ConnectionInputHandlerFactory*); +}; + +Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog, int threads, bool trace) +{ + return + Acceptor::shared_ptr(new AsynchIOAcceptor(port, backlog, threads, trace)); +} + +AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog, int threads, bool) : + poller(new Poller), + numIOThreads(threads), + listeningPort(listener.listen(port, backlog)) +{} + +// Buffer definition +struct Buff : public AsynchIO::Buffer { + Buff() : + AsynchIO::Buffer(new char[65536], 65536) + {} + ~Buff() + { delete [] bytes;} +}; + +class AsynchIOHandler : public qpid::sys::ConnectionOutputHandler { + AsynchIO* aio; + ConnectionInputHandler* inputHandler; + std::queue<framing::AMQFrame> frameQueue; + Mutex frameQueueLock; + bool frameQueueClosed; + bool initiated; + +public: + AsynchIOHandler() : + inputHandler(0), + frameQueueClosed(false), + initiated(false) + {} + + ~AsynchIOHandler() { + if (inputHandler) + inputHandler->closed(); + delete inputHandler; + } + + void init(AsynchIO* a, ConnectionInputHandler* h) { + aio = a; + inputHandler = h; + } + + // Output side + void send(framing::AMQFrame&); + void close(); + + // Input side + void readbuff(AsynchIO& aio, AsynchIO::Buffer* buff); + void eof(AsynchIO& aio); + void disconnect(AsynchIO& aio); + + // Notifications + void nobuffs(AsynchIO& aio); + void idle(AsynchIO& aio); + void closedSocket(AsynchIO& aio, const Socket& s); +}; + +void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, ConnectionInputHandlerFactory* f) { + + AsynchIOHandler* async = new AsynchIOHandler; + ConnectionInputHandler* handler = f->create(async); + AsynchIO* aio = new AsynchIO(s, + boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), + boost::bind(&AsynchIOHandler::eof, async, _1), + boost::bind(&AsynchIOHandler::disconnect, async, _1), + boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), + boost::bind(&AsynchIOHandler::nobuffs, async, _1), + boost::bind(&AsynchIOHandler::idle, async, _1)); + async->init(aio, handler); + + // Give connection some buffers to use + for (int i = 0; i < 4; i++) { + aio->queueReadBuffer(new Buff); + } + aio->start(poller); +} + + +uint16_t AsynchIOAcceptor::getPort() const { + return listeningPort; // Immutable no need for lock. +} + +std::string AsynchIOAcceptor::getHost() const { + return listener.getSockname(); +} + +void AsynchIOAcceptor::run(ConnectionInputHandlerFactory* fact) { + Dispatcher d(poller); + AsynchAcceptor + acceptor(listener, + boost::bind(&AsynchIOAcceptor::accepted, this, poller, _1, fact)); + acceptor.start(poller); + + std::vector<Thread*> t(numIOThreads-1); + + // Run n-1 io threads + for (int i=0; i<numIOThreads-1; ++i) + t[i] = new Thread(d); + + // Run final thread + d.run(); + + // Now wait for n-1 io threads to exit + for (int i=0; i>numIOThreads-1; ++i) { + t[i]->join(); + delete t[i]; + } +} + +void AsynchIOAcceptor::shutdown() { + poller->shutdown(); +} + +// Output side +void AsynchIOHandler::send(framing::AMQFrame& frame) { + // TODO: Need to find out if we are in the callback context, + // in the callback thread if so we can go further than just queuing the frame + // to be handled later + { + ScopedLock<Mutex> l(frameQueueLock); + // Ignore anything seen after closing + if (!frameQueueClosed) + frameQueue.push(frame); + } + + // Activate aio for writing here + aio->queueWrite(); +} + +void AsynchIOHandler::close() { + ScopedLock<Mutex> l(frameQueueLock); + frameQueueClosed = true; +} + +// Input side +void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::Buffer* buff) { + framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount); + if(initiated){ + framing::AMQFrame frame; + try{ + while(frame.decode(in)) { + QPID_LOG(debug, "RECV: " << frame); + inputHandler->received(frame); + } + }catch(const std::exception& e){ + QPID_LOG(error, e.what()); + } + }else{ + framing::ProtocolInitiation protocolInit; + if(protocolInit.decode(in)){ + QPID_LOG(debug, "INIT [" << aio << "]"); + inputHandler->initiated(protocolInit); + initiated = true; + } + } + // TODO: unreading needs to go away, and when we can cope + // with multiple sub-buffers in the general buffer scheme, it will + if (in.available() != 0) { + // Adjust buffer for used bytes and then "unread them" + buff->dataStart += buff->dataCount-in.available(); + buff->dataCount = in.available(); + aio->unread(buff); + } else { + // Give whole buffer back to aio subsystem + aio->queueReadBuffer(buff); + } +} + +void AsynchIOHandler::eof(AsynchIO&) { + inputHandler->closed(); + aio->queueWriteClose(); +} + +void AsynchIOHandler::closedSocket(AsynchIO&, const Socket& s) { + delete &s; + aio->queueForDeletion(); + delete this; +} + +void AsynchIOHandler::disconnect(AsynchIO& a) { + // treat the same as eof + eof(a); +} + +// Notifications +void AsynchIOHandler::nobuffs(AsynchIO&) { +} + +void AsynchIOHandler::idle(AsynchIO&){ + ScopedLock<Mutex> l(frameQueueLock); + + if (frameQueue.empty()) { + // At this point we know that we're write idling the connection + // so we could note that somewhere or do something special + return; + } + + // Try and get a queued buffer if not then construct new one + AsynchIO::Buffer* buff = aio->getQueuedBuffer(); + if (!buff) + buff = new Buff; + std::auto_ptr<framing::Buffer> out(new framing::Buffer(buff->bytes, buff->byteCount)); + int buffUsed = 0; + + while (!frameQueue.empty()) { + framing::AMQFrame frame = frameQueue.front(); + frameQueue.pop(); + + // Encode output frame + int frameSize = frame.size(); + if (frameSize > buff->byteCount) + THROW_QPID_ERROR(FRAMING_ERROR, "Could not write frame, too large for buffer."); + + // If we've filled current buffer then flush and get new one + if (frameSize > int(out->available())) { + buff->dataCount = buffUsed; + aio->queueWrite(buff); + + buff = aio->getQueuedBuffer(); + if (!buff) + buff = new Buff; + out.reset(new framing::Buffer(buff->bytes, buff->byteCount)); + buffUsed = 0; + } + + frame.encode(*out); + buffUsed += frameSize; + QPID_LOG(debug, "SENT: " << frame); + } + + buff->dataCount = buffUsed; + aio->queueWrite(buff); + + if (frameQueueClosed) { + aio->queueWriteClose(); + } + +} + +}} // namespace qpid::sys diff --git a/cpp/src/qpid/sys/Dispatcher.cpp b/cpp/src/qpid/sys/Dispatcher.cpp index 3a1da13bd0..b8751168c2 100644 --- a/cpp/src/qpid/sys/Dispatcher.cpp +++ b/cpp/src/qpid/sys/Dispatcher.cpp @@ -94,10 +94,11 @@ void DispatchHandle::rewatch() { ScopedLock<Mutex> lock(stateLock); switch(state) { case IDLE: + case DELAYED_IDLE: break; case DELAYED_R: case DELAYED_W: - case CALLBACK: + case DELAYED_INACTIVE: state = r ? (w ? DELAYED_RW : DELAYED_R) : DELAYED_W; @@ -132,6 +133,7 @@ void DispatchHandle::rewatchRead() { ScopedLock<Mutex> lock(stateLock); switch(state) { case IDLE: + case DELAYED_IDLE: break; case DELAYED_R: case DELAYED_RW: @@ -140,7 +142,7 @@ void DispatchHandle::rewatchRead() { case DELAYED_W: state = DELAYED_RW; break; - case CALLBACK: + case DELAYED_INACTIVE: state = DELAYED_R; break; case ACTIVE_R: @@ -168,6 +170,7 @@ void DispatchHandle::rewatchWrite() { ScopedLock<Mutex> lock(stateLock); switch(state) { case IDLE: + case DELAYED_IDLE: break; case DELAYED_W: case DELAYED_RW: @@ -176,7 +179,7 @@ void DispatchHandle::rewatchWrite() { case DELAYED_R: state = DELAYED_RW; break; - case CALLBACK: + case DELAYED_INACTIVE: state = DELAYED_W; break; case INACTIVE: @@ -204,15 +207,16 @@ void DispatchHandle::unwatchRead() { ScopedLock<Mutex> lock(stateLock); switch(state) { case IDLE: + case DELAYED_IDLE: break; case DELAYED_R: - state = CALLBACK; + state = DELAYED_INACTIVE; break; case DELAYED_RW: state = DELAYED_W; break; case DELAYED_W: - case CALLBACK: + case DELAYED_INACTIVE: case DELAYED_DELETE: break; case ACTIVE_R: @@ -239,15 +243,16 @@ void DispatchHandle::unwatchWrite() { ScopedLock<Mutex> lock(stateLock); switch(state) { case IDLE: + case DELAYED_IDLE: break; case DELAYED_W: - state = CALLBACK; + state = DELAYED_INACTIVE; break; case DELAYED_RW: state = DELAYED_R; break; case DELAYED_R: - case CALLBACK: + case DELAYED_INACTIVE: case DELAYED_DELETE: break; case ACTIVE_W: @@ -270,12 +275,13 @@ void DispatchHandle::unwatch() { ScopedLock<Mutex> lock(stateLock); switch (state) { case IDLE: + case DELAYED_IDLE: break; case DELAYED_R: case DELAYED_W: case DELAYED_RW: - case CALLBACK: - state = CALLBACK; + case DELAYED_INACTIVE: + state = DELAYED_INACTIVE; break; case DELAYED_DELETE: break; @@ -289,32 +295,46 @@ void DispatchHandle::unwatch() { void DispatchHandle::stopWatch() { ScopedLock<Mutex> lock(stateLock); - if ( state == IDLE) { + switch (state) { + case IDLE: + case DELAYED_IDLE: + case DELAYED_DELETE: return; + case DELAYED_R: + case DELAYED_W: + case DELAYED_RW: + case DELAYED_INACTIVE: + state = DELAYED_IDLE; + break; + default: + state = IDLE; + break; } assert(poller); poller->delFd(*this); poller.reset(); - state = IDLE; } // The slightly strange switch structure // is to ensure that the lock is released before // we do the delete void DispatchHandle::doDelete() { + // Ensure that we're no longer watching anything + stopWatch(); + // If we're in the middle of a callback defer the delete { ScopedLock<Mutex> lock(stateLock); switch (state) { - case DELAYED_R: - case DELAYED_W: - case DELAYED_RW: - case CALLBACK: + case DELAYED_IDLE: case DELAYED_DELETE: state = DELAYED_DELETE; return; + case IDLE: + break; default: - break; + // Can only get out of stopWatch() in DELAYED_IDLE/DELAYED_DELETE/IDLE states + assert(false); } } // If we're not then do it right away @@ -359,7 +379,7 @@ void DispatchHandle::dispatchCallbacks(Poller::EventType type) { case Poller::DISCONNECTED: { ScopedLock<Mutex> lock(stateLock); - state = CALLBACK; + state = DELAYED_INACTIVE; } if (disconnectedCallback) { disconnectedCallback(*this); @@ -386,10 +406,11 @@ void DispatchHandle::dispatchCallbacks(Poller::EventType type) { poller->modFd(*this, Poller::INOUT); state = ACTIVE_RW; return; - case CALLBACK: + case DELAYED_INACTIVE: state = INACTIVE; return; - case IDLE: + case DELAYED_IDLE: + state = IDLE; return; default: // This should be impossible diff --git a/cpp/src/qpid/sys/Dispatcher.h b/cpp/src/qpid/sys/Dispatcher.h index 60e9522bc1..2de026e141 100644 --- a/cpp/src/qpid/sys/Dispatcher.h +++ b/cpp/src/qpid/sys/Dispatcher.h @@ -49,12 +49,13 @@ private: Mutex stateLock; enum { IDLE, INACTIVE, ACTIVE_R, ACTIVE_W, ACTIVE_RW, - CALLBACK, DELAYED_R, DELAYED_W, DELAYED_RW, DELAYED_DELETE + DELAYED_IDLE, DELAYED_INACTIVE, DELAYED_R, DELAYED_W, DELAYED_RW, + DELAYED_DELETE } state; public: - DispatchHandle(int fd, Callback rCb, Callback wCb, Callback dCb) : - PollerHandle(fd), + DispatchHandle(const Socket& s, Callback rCb, Callback wCb, Callback dCb) : + PollerHandle(s), readableCallback(rCb), writableCallback(wCb), disconnectedCallback(dCb), diff --git a/cpp/src/qpid/sys/Poller.h b/cpp/src/qpid/sys/Poller.h index 55fead55aa..843295e268 100644 --- a/cpp/src/qpid/sys/Poller.h +++ b/cpp/src/qpid/sys/Poller.h @@ -23,6 +23,7 @@ */ #include "Time.h" +#include "Socket.h" #include <stdint.h> @@ -39,14 +40,14 @@ class PollerHandlePrivate; class PollerHandle { friend class Poller; - PollerHandlePrivate* impl; - const int fd; + PollerHandlePrivate* const impl; + const Socket& socket; public: - PollerHandle(int fd0); + PollerHandle(const Socket& s); virtual ~PollerHandle(); - - int getFD() const { return fd; } + + const Socket& getSocket() const {return socket;} }; /** @@ -55,7 +56,7 @@ public: */ class PollerPrivate; class Poller { - PollerPrivate* impl; + PollerPrivate* const impl; public: typedef boost::shared_ptr<Poller> shared_ptr; diff --git a/cpp/src/qpid/sys/Socket.h b/cpp/src/qpid/sys/Socket.h index bb1ef27e65..56b6195b65 100644 --- a/cpp/src/qpid/sys/Socket.h +++ b/cpp/src/qpid/sys/Socket.h @@ -21,11 +21,74 @@ * under the License. * */ +#include <string> +#include "qpid/sys/Time.h" -#ifdef USE_APR_PLATFORM -#include "apr/Socket.h" -#else -#include "posix/Socket.h" -#endif +struct sockaddr; +namespace qpid { +namespace sys { + +class SocketPrivate; +class Socket +{ + friend class Poller; + + SocketPrivate* const impl; + +public: + /** Create a socket wrapper for descriptor. */ + Socket(); + ~Socket(); + + /** Create an initialized TCP socket */ + void createTcp() const; + + /** Set timeout for read and write */ + void setTimeout(const Duration& interval) const; + + /** Set socket non blocking */ + void setNonblocking() const; + + void connect(const std::string& host, int port) const; + + void close() const; + + enum { SOCKET_TIMEOUT=-2, SOCKET_EOF=-3 } ErrorCode; + + /** Returns bytes sent or an ErrorCode value < 0. */ + ssize_t send(const void* data, size_t size) const; + + /** + * Returns bytes received, an ErrorCode value < 0 or 0 + * if the connection closed in an orderly manner. + */ + ssize_t recv(void* data, size_t size) const; + + /** Bind to a port and start listening. + *@param port 0 means choose an available port. + *@param backlog maximum number of pending connections. + *@return The bound port. + */ + int listen(int port = 0, int backlog = 10) const; + + /** Returns the "socket name" ie the address bound to + * the near end of the socket + */ + std::string getSockname() const; + + /** Accept a connection from a socket that is already listening + * and has an incoming connection + */ + Socket* accept(struct sockaddr *addr, socklen_t *addrlen) const; + + // TODO The following are raw operations, maybe they need better wrapping? + int read(void *buf, size_t count) const; + int write(const void *buf, size_t count) const; + +private: + Socket(SocketPrivate*); +}; + +}} #endif /*!_sys_Socket_h*/ diff --git a/cpp/src/qpid/sys/apr/APRAcceptor.cpp b/cpp/src/qpid/sys/apr/APRAcceptor.cpp index 8662e602c2..b353b698ef 100644 --- a/cpp/src/qpid/sys/apr/APRAcceptor.cpp +++ b/cpp/src/qpid/sys/apr/APRAcceptor.cpp @@ -56,8 +56,6 @@ Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog, int threads, bo { return Acceptor::shared_ptr(new APRAcceptor(port, backlog, threads, trace)); } -// Must define Acceptor virtual dtor. -Acceptor::~Acceptor() {} APRAcceptor::APRAcceptor(int16_t port_, int backlog, int threads, bool trace_) : port(port_), diff --git a/cpp/src/qpid/sys/apr/Socket.cpp b/cpp/src/qpid/sys/apr/Socket.cpp index 6e64d656d2..577268844a 100644 --- a/cpp/src/qpid/sys/apr/Socket.cpp +++ b/cpp/src/qpid/sys/apr/Socket.cpp @@ -20,31 +20,56 @@ */ -#include "Socket.h" +#include "qpid/sys/Socket.h" + #include "APRBase.h" #include "APRPool.h" +#include <apr_network_io.h> + +namespace qpid { +namespace sys { + +class SocketPrivate { +public: + SocketPrivate(apr_socket_t* s = 0) : + socket(s) + {} + + apr_socket_t* socket; +}; + +Socket::Socket() : + impl(new SocketPrivate) +{ + createTcp(); +} + +Socket::Socket(SocketPrivate* sp) : + impl(sp) +{} -using namespace qpid::sys; +Socket::~Socket() { + delete impl; +} -Socket Socket::createTcp() { - Socket s; +void Socket::createTcp() const { + apr_socket_t*& socket = impl->socket; + apr_socket_t* s; CHECK_APR_SUCCESS( apr_socket_create( - &s.socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, + &s, APR_INET, SOCK_STREAM, APR_PROTO_TCP, APRPool::get())); - return s; -} - -Socket::Socket(apr_socket_t* s) { socket = s; } -void Socket::setTimeout(const Duration& interval) { +void Socket::setTimeout(const Duration& interval) const { + apr_socket_t*& socket = impl->socket; apr_socket_timeout_set(socket, interval/TIME_USEC); } -void Socket::connect(const std::string& host, int port) { +void Socket::connect(const std::string& host, int port) const { + apr_socket_t*& socket = impl->socket; apr_sockaddr_t* address; CHECK_APR_SUCCESS( apr_sockaddr_info_get( @@ -53,14 +78,16 @@ void Socket::connect(const std::string& host, int port) { CHECK_APR_SUCCESS(apr_socket_connect(socket, address)); } -void Socket::close() { +void Socket::close() const { + apr_socket_t*& socket = impl->socket; if (socket == 0) return; CHECK_APR_SUCCESS(apr_socket_close(socket)); socket = 0; } -ssize_t Socket::send(const void* data, size_t size) +ssize_t Socket::send(const void* data, size_t size) const { + apr_socket_t*& socket = impl->socket; apr_size_t sent = size; apr_status_t status = apr_socket_send(socket, reinterpret_cast<const char*>(data), &sent); @@ -70,8 +97,9 @@ ssize_t Socket::send(const void* data, size_t size) return sent; } -ssize_t Socket::recv(void* data, size_t size) +ssize_t Socket::recv(void* data, size_t size) const { + apr_socket_t*& socket = impl->socket; apr_size_t received = size; apr_status_t status = apr_socket_recv(socket, reinterpret_cast<char*>(data), &received); @@ -83,4 +111,4 @@ ssize_t Socket::recv(void* data, size_t size) return received; } - +}} // namespace qpid::sys diff --git a/cpp/src/qpid/sys/apr/Socket.h b/cpp/src/qpid/sys/apr/Socket.h deleted file mode 100644 index c20c36dcd9..0000000000 --- a/cpp/src/qpid/sys/apr/Socket.h +++ /dev/null @@ -1,75 +0,0 @@ -#ifndef _sys_apr_Socket_h -#define _sys_apr_Socket_h - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <string> -#include "qpid/sys/Time.h" - -#include <apr_network_io.h> - -namespace qpid { -namespace sys { - -class Socket -{ - public: - /** Create an initialized TCP socket */ - static Socket createTcp(); - - /** Create a socket wrapper for descriptor. */ - Socket(apr_socket_t* descriptor = 0); - - /** Set timeout for read and write */ - void setTimeout(const Duration& interval); - - void connect(const std::string& host, int port); - - void close(); - - enum { SOCKET_TIMEOUT=-2, SOCKET_EOF=-3 } ErrorCode; - - /** Returns bytes sent or an ErrorCode value < 0. */ - ssize_t send(const void* data, size_t size); - - /** - * Returns bytes received, an ErrorCode value < 0 or 0 - * if the connection closed in an orderly manner. - */ - ssize_t recv(void* data, size_t size); - - /** Bind to a port and start listening. - *@param port 0 means choose an available port. - *@param backlog maximum number of pending connections. - *@return The bound port. - */ - int listen(int port = 0, int backlog = 10); - - /** Get file descriptor */ - int fd(); - - private: - apr_socket_t* socket; -}; - -}} -#endif /*!_sys_Socket_h*/ diff --git a/cpp/src/qpid/sys/epoll/EpollPoller.cpp b/cpp/src/qpid/sys/epoll/EpollPoller.cpp index 8c3bdbc7d5..37a8510fe6 100644 --- a/cpp/src/qpid/sys/epoll/EpollPoller.cpp +++ b/cpp/src/qpid/sys/epoll/EpollPoller.cpp @@ -22,6 +22,7 @@ #include "qpid/sys/Poller.h" #include "qpid/sys/Mutex.h" #include "qpid/sys/posix/check.h" +#include "qpid/sys/posix/PrivatePosix.h" #include <sys/epoll.h> #include <errno.h> @@ -88,11 +89,11 @@ class PollerHandlePrivate { } }; -PollerHandle::PollerHandle(int fd0) : +PollerHandle::PollerHandle(const Socket& s) : impl(new PollerHandlePrivate), - fd(fd0) + socket(s) {} - + PollerHandle::~PollerHandle() { delete impl; } @@ -186,7 +187,7 @@ void Poller::addFd(PollerHandle& handle, Direction dir) { } epe.data.ptr = &handle; - QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, op, handle.getFD(), &epe)); + QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, op, toFd(handle.socket.impl), &epe)); // Record monitoring state of this fd eh.events = epe.events; @@ -197,7 +198,7 @@ void Poller::delFd(PollerHandle& handle) { PollerHandlePrivate& eh = *handle.impl; ScopedLock<Mutex> l(eh.lock); assert(!eh.isIdle()); - int rc = ::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, handle.getFD(), 0); + int rc = ::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, toFd(handle.socket.impl), 0); // Ignore EBADF since deleting a nonexistent fd has the overall required result! // And allows the case where a sloppy program closes the fd and then does the delFd() if (rc == -1 && errno != EBADF) { @@ -216,7 +217,7 @@ void Poller::modFd(PollerHandle& handle, Direction dir) { epe.events = PollerPrivate::directionToEpollEvent(dir) | ::EPOLLONESHOT; epe.data.ptr = &handle; - QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, handle.getFD(), &epe)); + QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, toFd(handle.socket.impl), &epe)); // Record monitoring state of this fd eh.events = epe.events; @@ -232,7 +233,7 @@ void Poller::rearmFd(PollerHandle& handle) { epe.events = eh.events; epe.data.ptr = &handle; - QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, handle.getFD(), &epe)); + QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, toFd(handle.socket.impl), &epe)); eh.setActive(); } diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp index 473ef7936f..2b462cbd7a 100644 --- a/cpp/src/qpid/sys/posix/AsynchIO.cpp +++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp @@ -24,8 +24,6 @@ #include "check.h" #include <unistd.h> -#include <fcntl.h> -#include <sys/types.h> #include <sys/socket.h> #include <signal.h> #include <errno.h> @@ -37,13 +35,6 @@ using namespace qpid::sys; namespace { /* - * Make file descriptor non-blocking - */ -void nonblocking(int fd) { - QPID_POSIX_CHECK(::fcntl(fd, F_SETFL, O_NONBLOCK)); -} - -/* * Make *process* not generate SIGPIPE when writing to closed * pipe/socket (necessary as default action is to terminate process) */ @@ -57,11 +48,11 @@ void ignoreSigpipe() { * Asynch Acceptor */ -AsynchAcceptor::AsynchAcceptor(int fd, Callback callback) : +AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback) : acceptedCallback(callback), - handle(fd, boost::bind(&AsynchAcceptor::readable, this, _1), 0, 0) { + handle(s, boost::bind(&AsynchAcceptor::readable, this, _1), 0, 0) { - nonblocking(fd); + s.setNonblocking(); ignoreSigpipe(); } @@ -73,18 +64,16 @@ void AsynchAcceptor::start(Poller::shared_ptr poller) { * We keep on accepting as long as there is something to accept */ void AsynchAcceptor::readable(DispatchHandle& h) { - int afd; + Socket* s; do { errno = 0; // TODO: Currently we ignore the peers address, perhaps we should // log it or use it for connection acceptance. - afd = ::accept(h.getFD(), 0, 0); - if (afd >= 0) { - acceptedCallback(afd); - } else if (errno == EAGAIN) { - break; + s = h.getSocket().accept(0, 0); + if (s) { + acceptedCallback(*s); } else { - QPID_POSIX_CHECK(afd); + break; } } while (true); @@ -94,21 +83,23 @@ void AsynchAcceptor::readable(DispatchHandle& h) { /* * Asynch reader/writer */ -AsynchIO::AsynchIO(int fd, +AsynchIO::AsynchIO(const Socket& s, ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb, - BuffersEmptyCallback eCb, IdleCallback iCb) : + ClosedCallback cCb, BuffersEmptyCallback eCb, IdleCallback iCb) : - DispatchHandle(fd, + DispatchHandle(s, boost::bind(&AsynchIO::readable, this, _1), boost::bind(&AsynchIO::writeable, this, _1), boost::bind(&AsynchIO::disconnected, this, _1)), readCallback(rCb), eofCallback(eofCb), disCallback(disCb), + closedCallback(cCb), emptyCallback(eCb), - idleCallback(iCb) { + idleCallback(iCb), + queuedClose(false) { - nonblocking(fd); + s.setNonblocking(); } struct deleter @@ -131,15 +122,58 @@ void AsynchIO::start(Poller::shared_ptr poller) { } void AsynchIO::queueReadBuffer(Buffer* buff) { + assert(buff); + buff->dataStart = 0; + buff->dataCount = 0; + bufferQueue.push_back(buff); + DispatchHandle::rewatchRead(); +} + +void AsynchIO::unread(Buffer* buff) { + assert(buff); + if (buff->dataStart != 0) { + memmove(buff->bytes, buff->bytes+buff->dataStart, buff->dataCount); + buff->dataStart = 0; + } bufferQueue.push_front(buff); DispatchHandle::rewatchRead(); } +// Either queue for writing or announce that there is something to write +// and we should ask for it void AsynchIO::queueWrite(Buffer* buff) { - writeQueue.push_front(buff); + // If no buffer then don't queue anything + // (but still wake up for writing) + if (buff) { + // If we've already closed the socket then throw the write away + if (queuedClose) { + bufferQueue.push_front(buff); + return; + } else { + writeQueue.push_front(buff); + } + } DispatchHandle::rewatchWrite(); } +void AsynchIO::queueWriteClose() { + queuedClose = true; +} + +/** Return a queued buffer if there are enough + * to spare + */ +AsynchIO::Buffer* AsynchIO::getQueuedBuffer() { + // Always keep at least one buffer (it might have data that was "unread" in it) + if (bufferQueue.size()<=1) + return 0; + Buffer* buff = bufferQueue.back(); + buff->dataStart = 0; + buff->dataCount = 0; + bufferQueue.pop_back(); + return buff; +} + /* * We keep on reading as long as we have something to read and a buffer to put * it in @@ -149,19 +183,19 @@ void AsynchIO::readable(DispatchHandle& h) { // (Try to) get a buffer if (!bufferQueue.empty()) { // Read into buffer - Buffer* buff = bufferQueue.back(); - bufferQueue.pop_back(); + Buffer* buff = bufferQueue.front(); + bufferQueue.pop_front(); errno = 0; - int rc = ::read(h.getFD(), buff->bytes, buff->byteCount); + int readCount = buff->byteCount-buff->dataCount; + int rc = h.getSocket().read(buff->bytes + buff->dataCount, readCount); if (rc == 0) { eofCallback(*this); h.unwatchRead(); return; } else if (rc > 0) { - buff->dataStart = 0; - buff->dataCount = rc; + buff->dataCount += rc; readCallback(*this, buff); - if (rc != buff->byteCount) { + if (rc != readCount) { // If we didn't fill the read buffer then time to stop reading return; } @@ -209,7 +243,7 @@ void AsynchIO::writeable(DispatchHandle& h) { writeQueue.pop_back(); errno = 0; assert(buff->dataStart+buff->dataCount <= buff->byteCount); - int rc = ::write(h.getFD(), buff->bytes+buff->dataStart, buff->dataCount); + int rc = h.getSocket().write(buff->bytes+buff->dataStart, buff->dataCount); if (rc >= 0) { // If we didn't write full buffer put rest back if (rc != buff->dataCount) { @@ -238,12 +272,17 @@ void AsynchIO::writeable(DispatchHandle& h) { } } } else { + // If we're waiting to close the socket then can do it now as there is nothing to write + if (queuedClose) { + close(h); + return; + } // Fd is writable, but nothing to write if (idleCallback) { idleCallback(*this); } // If we still have no buffers to write we can't do anything more - if (writeQueue.empty()) { + if (writeQueue.empty() && !queuedClose) { h.unwatchWrite(); return; } @@ -252,8 +291,25 @@ void AsynchIO::writeable(DispatchHandle& h) { } void AsynchIO::disconnected(DispatchHandle& h) { + // If we've already queued close do it before callback + if (queuedClose) { + close(h); + } + if (disCallback) { disCallback(*this); h.unwatch(); } } + +/* + * Close the socket and callback to say we've done it + */ +void AsynchIO::close(DispatchHandle& h) { + h.stopWatch(); + h.getSocket().close(); + if (closedCallback) { + closedCallback(*this, getSocket()); + } +} + diff --git a/cpp/src/qpid/sys/posix/EventChannel.cpp b/cpp/src/qpid/sys/posix/EventChannel.cpp deleted file mode 100644 index d35eedf5a5..0000000000 --- a/cpp/src/qpid/sys/posix/EventChannel.cpp +++ /dev/null @@ -1,596 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -// TODO aconway 2006-12-15: Locking review. - -// TODO aconway 2006-12-15: use Descriptor pointers everywhere, -// get them from channel, pass them to Event constructors. -// Eliminate lookup. - - -#include "EventChannel.h" -#include "check.h" - -#include "qpid/QpidError.h" -#include "qpid/sys/AtomicCount.h" - -#include <mqueue.h> -#include <string.h> -#include <iostream> - -#include <sys/errno.h> -#include <sys/socket.h> -#include <sys/epoll.h> - -#include <typeinfo> -#include <iostream> -#include <queue> - -#include <boost/ptr_container/ptr_map.hpp> -#include <boost/noncopyable.hpp> -#include <boost/bind.hpp> - -using namespace std; - - -namespace qpid { -namespace sys { - - -// ================================================================ -// Private class declarations - -namespace { - -typedef enum { IN, OUT } Direction; - -typedef std::pair<Event*, Event*> EventPair; - -/** - * Template to zero out a C-struct on construction. Avoids uninitialized memory - * warnings from valgrind or other mem checking tool. - */ -template <class T> struct CleanStruct : public T { - CleanStruct() { memset(this, 0, sizeof(*this)); } -}; - -} // namespace - -/** - * Queue of events corresponding to one IO direction (IN or OUT). - * Each Descriptor contains two Queues. - */ -class EventChannel::Queue : private boost::noncopyable -{ - public: - Queue(Descriptor& container, Direction dir); - - /** Called by Event classes in prepare() */ - void push(Event* e); - - /** Called when epoll wakes. - *@return The next completed event or 0. - */ - Event* wake(uint32_t epollFlags); - - Event* pop() { Event* e = queue.front(); queue.pop_front(); return e; } - - bool empty() { return queue.empty(); } - - void setBit(uint32_t &epollFlags); - - void shutdown(); - - private: - typedef std::deque<Event*> EventQ; - - inline bool isMyEvent(uint32_t flags) { return flags | myEvent; } - - Mutex& lock; // Shared with Descriptor. - Descriptor& descriptor; - uint32_t myEvent; // Epoll event flag. - EventQ queue; -}; - - -/** - * Manages a file descriptor in an epoll set. - * - * Can be shutdown and re-activated for the same file descriptor. - */ -class EventChannel::Descriptor : private boost::noncopyable { - public: - explicit Descriptor(int fd) : epollFd(-1), myFd(fd), - inQueue(*this, IN), outQueue(*this, OUT) {} - - void activate(int epollFd_); - - /** Epoll woke up for this descriptor. */ - Event* wake(uint32_t epollEvents); - - /** Shut down: close and remove file descriptor. - * May be re-activated if fd is reused. - */ - void shutdown(); - - // TODO aconway 2006-12-18: Nasty. Need to clean up interaction. - void shutdownUnsafe(); - - bool isShutdown() { return epollFd == -1; } - - Queue& getQueue(Direction d) { return d==IN ? inQueue : outQueue; } - int getFD() const { return myFd; } - - private: - void update(); - void epollCtl(int op, uint32_t events); - Queue* pick(); - - Mutex lock; - int epollFd; - int myFd; - Queue inQueue, outQueue; - bool preferIn; - - friend class Queue; -}; - - -/** - * Holds a map of Descriptors, which do most of the work. - */ -class EventChannel::Impl { - public: - Impl(int size = 256); - - ~Impl(); - - /** - * Activate descriptor - */ - void activate(Descriptor& d) { - d.activate(epollFd); - } - - /** Wait for an event, return 0 on timeout */ - Event* wait(Duration timeout); - - void shutdown(); - - private: - - Monitor monitor; - int epollFd; - int shutdownPipe[2]; - AtomicCount nWaiters; - bool isShutdown; -}; - - -// ================================================================ -// EventChannel::Queue::implementation. - -static const char* shutdownMsg = "Event queue shut down."; - -EventChannel::Queue::Queue(Descriptor& d, Direction dir) : - lock(d.lock), descriptor(d), - myEvent(dir==IN ? EPOLLIN : EPOLLOUT) -{} - -void EventChannel::Queue::push(Event* e) { - Mutex::ScopedLock l(lock); - if (descriptor.isShutdown()) - THROW_QPID_ERROR(INTERNAL_ERROR, shutdownMsg); - queue.push_back(e); - descriptor.update(); -} - -void EventChannel::Queue::setBit(uint32_t &epollFlags) { - if (queue.empty()) - epollFlags &= ~myEvent; - else - epollFlags |= myEvent; -} - -// TODO aconway 2006-12-20: REMOVE -Event* EventChannel::Queue::wake(uint32_t epollFlags) { - // Called with lock held. - if (!queue.empty() && (isMyEvent(epollFlags))) { - assert(!queue.empty()); - Event* e = queue.front(); - assert(e); - if (!e->getException()) { - // TODO aconway 2006-12-20: Can/should we move event completion - // out into dispatch() so it doesn't happen in Descriptor locks? - e->complete(descriptor); - } - queue.pop_front(); - return e; - } - return 0; -} - -void EventChannel::Queue::shutdown() { - // Mark all pending events with a shutdown exception. - // The server threads will remove and dispatch the events. - // - qpid::QpidError ex(INTERNAL_ERROR, shutdownMsg, SRCLINE); - for_each(queue.begin(), queue.end(), - boost::bind(&Event::setException, _1, ex)); -} - - -// ================================================================ -// Descriptor - - -void EventChannel::Descriptor::activate(int epollFd_) { - Mutex::ScopedLock l(lock); - if (isShutdown()) { - epollFd = epollFd_; // We're back in business. - epollCtl(EPOLL_CTL_ADD, 0); - } -} - -void EventChannel::Descriptor::shutdown() { - Mutex::ScopedLock l(lock); - shutdownUnsafe(); -} - -void EventChannel::Descriptor::shutdownUnsafe() { - // Caller holds lock. - ::close(myFd); - epollFd = -1; // Mark myself as shutdown. - inQueue.shutdown(); - outQueue.shutdown(); -} - -// TODO aconway 2006-12-20: Inline into wake(). -void EventChannel::Descriptor::update() { - // Caller holds lock. - if (isShutdown()) // Nothing to do - return; - uint32_t events = EPOLLONESHOT | EPOLLERR | EPOLLHUP; - inQueue.setBit(events); - outQueue.setBit(events); - epollCtl(EPOLL_CTL_MOD, events); -} - -void EventChannel::Descriptor::epollCtl(int op, uint32_t events) { - // Caller holds lock - assert(!isShutdown()); - CleanStruct<epoll_event> ee; - ee.data.ptr = this; - ee.events = events; - int status = ::epoll_ctl(epollFd, op, myFd, &ee); - if (status < 0) { - if (errno == EEXIST) // It's okay to add an existing fd - return; - else if (errno == EBADF) // FD was closed externally. - shutdownUnsafe(); - else - throw QPID_POSIX_ERROR(errno); - } -} - - -EventChannel::Queue* EventChannel::Descriptor::pick() { - if (inQueue.empty() && outQueue.empty()) - return 0; - if (inQueue.empty() || outQueue.empty()) - return !inQueue.empty() ? &inQueue : &outQueue; - // Neither is empty, pick fairly. - preferIn = !preferIn; - return preferIn ? &inQueue : &outQueue; -} - -Event* EventChannel::Descriptor::wake(uint32_t epollEvents) { - Mutex::ScopedLock l(lock); - // On error, shut down the Descriptor and both queues. - if (epollEvents & (EPOLLERR | EPOLLHUP)) { - shutdownUnsafe(); - // TODO aconway 2006-12-20: This error handling models means - // that any error reported by epoll will result in a shutdown - // exception on the events. Can we get more accurate error - // reporting somehow? - } - Queue*q = 0; - bool in = (epollEvents & EPOLLIN); - bool out = (epollEvents & EPOLLOUT); - if ((in && out) || isShutdown()) - q = pick(); // Choose fairly, either non-empty queue. - else if (in) - q = &inQueue; - else if (out) - q = &outQueue; - Event* e = (q && !q->empty()) ? q->pop() : 0; - update(); - if (e) - e->complete(*this); - return e; -} - - - -// ================================================================ -// EventChannel::Impl - - -EventChannel::Impl::Impl(int epollSize): - epollFd(-1), isShutdown(false) -{ - // Create the epoll file descriptor. - epollFd = epoll_create(epollSize); - QPID_POSIX_CHECK(epollFd); - - // Create a pipe and write a single byte. The byte is never - // read so the pipes read fd is always ready for read. - // We activate the FD when there are messages in the queue. - QPID_POSIX_CHECK(::pipe(shutdownPipe)); - static char zero = '\0'; - QPID_POSIX_CHECK(::write(shutdownPipe[1], &zero, 1)); -} - -EventChannel::Impl::~Impl() { - shutdown(); - ::close(epollFd); - ::close(shutdownPipe[0]); - ::close(shutdownPipe[1]); -} - - -void EventChannel::Impl::shutdown() { - Monitor::ScopedLock l(monitor); - if (!isShutdown) { // I'm starting shutdown. - isShutdown = true; - if (nWaiters == 0) - return; - - // TODO aconway 2006-12-20: If I just close the epollFd will - // that wake all threads? If so with what? Would be simpler than: - - CleanStruct<epoll_event> ee; - ee.data.ptr = 0; - ee.events = EPOLLIN; - QPID_POSIX_CHECK( - epoll_ctl(epollFd, EPOLL_CTL_ADD, shutdownPipe[0], &ee)); - } - // Wait for nWaiters to get out. - while (nWaiters > 0) { - monitor.wait(); - } -} - -// TODO aconway 2006-12-20: DEBUG remove -struct epoll { - epoll(uint32_t e) : events(e) { } - uint32_t events; -}; - -#define BIT(X) out << ((e.events & X) ? __STRING(X) "." : "") -ostream& operator << (ostream& out, epoll e) { - out << "epoll_event.events: "; - BIT(EPOLLIN); - BIT(EPOLLPRI); - BIT(EPOLLOUT); - BIT(EPOLLRDNORM); - BIT(EPOLLRDBAND); - BIT(EPOLLWRNORM); - BIT(EPOLLWRBAND); - BIT(EPOLLMSG); - BIT(EPOLLERR); - BIT(EPOLLHUP); - BIT(EPOLLONESHOT); - BIT(EPOLLET); - return out; -} - - - -/** - * Wait for epoll to wake up, return the descriptor or 0 on timeout. - */ -Event* EventChannel::Impl::wait(Duration timeoutNs) -{ - { - Monitor::ScopedLock l(monitor); - if (isShutdown) - throw ShutdownException(); - } - - // Increase nWaiters for the duration, notify the monitor if I'm - // the last one out. - // - AtomicCount::ScopedIncrement si( - nWaiters, boost::bind(&Monitor::notifyAll, &monitor)); - - // No lock, all thread safe calls or local variables: - // - const long timeoutMs = - (timeoutNs == TIME_INFINITE) ? -1 : timeoutNs/TIME_MSEC; - CleanStruct<epoll_event> ee; - Event* event = 0; - - // Loop till we get a completed event. Some events may repost - // themselves and return 0, e.g. incomplete read or write events. - //TODO aconway 2006-12-20: FIX THIS! - while (!event) { - int n = ::epoll_wait(epollFd, &ee, 1, timeoutMs); // Thread safe. - if (n == 0) // Timeout - return 0; - if (n < 0 && errno == EINTR) // Interrupt, ignore it. - continue; - if (n < 0) - throw QPID_POSIX_ERROR(errno); - assert(n == 1); - Descriptor* ed = - reinterpret_cast<Descriptor*>(ee.data.ptr); - if (ed == 0) // We're being shut-down. - throw ShutdownException(); - assert(ed != 0); - event = ed->wake(ee.events); - } - return event; -} - -//EventChannel::Descriptor& EventChannel::Impl::getDescriptor(int fd) { -// Mutex::ScopedLock l(monitor); -// Descriptor& ed = descriptors[fd]; -// ed.activate(epollFd, fd); -// return ed; -//} - - -// ================================================================ -// EventChannel - -EventChannel::shared_ptr EventChannel::create() { - return shared_ptr(new EventChannel()); -} - -EventChannel::EventChannel() : impl(new EventChannel::Impl()) {} - -EventChannel::~EventChannel() {} - -void EventChannel::post(Event& e) -{ - e.prepare(*impl); -} - -Event* EventChannel::wait(Duration timeoutNs) -{ - return impl->wait(timeoutNs); -} - -void EventChannel::shutdown() { - impl->shutdown(); -} - - -// ================================================================ -// Event and subclasses. - -Event::~Event() {} - -Exception::shared_ptr_const Event::getException() const { - return exception; -} - -void Event::throwIfException() { - if (getException()) - exception->throwSelf(); -} - -void Event::dispatch() -{ - if (!callback.empty()) - callback(); -} - -void Event::setException(const std::exception& e) { - const Exception* ex = dynamic_cast<const Exception*>(&e); - if (ex) - exception.reset(ex->clone().release()); - else - exception.reset(new Exception(e)); -#ifndef NDEBUG - // Throw and re-catch the exception. Has no effect on the - // program but it triggers debuggers watching for throw. The - // context that sets the exception is more informative for - // debugging purposes than the one that ultimately throws it. - // - try { - throwIfException(); - } - catch (...) { } // Ignored. -#endif -} - -int FDEvent::getFDescriptor() const { - return descriptor.getFD(); -} - -// TODO: AMS 21/12/06 Don't like the inline new, probably cause a memory leak -ReadEvent::ReadEvent(int fd, void* buf, size_t sz,Callback cb, bool noWait) : - IOEvent(cb, *(new EventChannel::Descriptor(fd)), sz, noWait), buffer(buf), bytesRead(0) { -} - -void ReadEvent::prepare(EventChannel::Impl& impl) { - EventChannel::Descriptor& d = getDescriptor(); - impl.activate(d); - d.getQueue(IN).push(this); -} - -void ReadEvent::complete(EventChannel::Descriptor& ed) -{ - ssize_t n = ::read(getFDescriptor(), - static_cast<char*>(buffer) + bytesRead, - size - bytesRead); - if (n > 0) - bytesRead += n; - if (n == 0 || (n < 0 && errno != EAGAIN)) { - // Use ENODATA for file closed. - setException(QPID_POSIX_ERROR(n == 0 ? ENODATA : errno)); - ed.shutdownUnsafe(); - } -} - -WriteEvent::WriteEvent(int fd, const void* buf, size_t sz, Callback cb) : - IOEvent(cb, *(new EventChannel::Descriptor(fd)), sz, noWait), buffer(buf), bytesWritten(0) { -} - -void WriteEvent::prepare(EventChannel::Impl& impl) { - EventChannel::Descriptor& d = getDescriptor(); - impl.activate(d); - d.getQueue(OUT).push(this); -} - - -void WriteEvent::complete(EventChannel::Descriptor& ed) -{ - ssize_t n = ::write(getFDescriptor(), - static_cast<const char*>(buffer) + bytesWritten, - size - bytesWritten); - if (n > 0) - bytesWritten += n; - if(n < 0 && errno != EAGAIN) { - setException(QPID_POSIX_ERROR(errno)); - ed.shutdownUnsafe(); // Called with lock held. - } -} - -AcceptEvent::AcceptEvent(int fd, Callback cb) : - FDEvent(cb, *(new EventChannel::Descriptor(fd))), accepted(0) { -} - -void AcceptEvent::prepare(EventChannel::Impl& impl) { - EventChannel::Descriptor& d = getDescriptor(); - impl.activate(d); - d.getQueue(IN).push(this); -} - -void AcceptEvent::complete(EventChannel::Descriptor& ed) -{ - accepted = ::accept(getFDescriptor(), 0, 0); - if (accepted < 0) { - setException(QPID_POSIX_ERROR(errno)); - ed.shutdownUnsafe(); // Called with lock held. - } -} - -}} diff --git a/cpp/src/qpid/sys/posix/EventChannel.h b/cpp/src/qpid/sys/posix/EventChannel.h deleted file mode 100644 index 85e121379a..0000000000 --- a/cpp/src/qpid/sys/posix/EventChannel.h +++ /dev/null @@ -1,214 +0,0 @@ -#ifndef _sys_EventChannel_h -#define _sys_EventChannel_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "qpid/SharedObject.h" -#include "qpid/Exception.h" -#include "qpid/sys/Monitor.h" -#include "qpid/sys/Time.h" - -#include <boost/function.hpp> -#include <memory> - -namespace qpid { -namespace sys { - -class Event; - -/** - * Channel to post and wait for events. - */ -class EventChannel : public qpid::SharedObject<EventChannel> -{ - public: - static shared_ptr create(); - - /** Exception throw from wait() if channel is shut down. */ - class ShutdownException : public qpid::Exception {}; - - ~EventChannel(); - - /** Post an event to the channel. */ - void post(Event& event); - - /** - * Wait for the next complete event, up to timeout. - *@return Pointer to event or 0 if timeout elapses. - *@exception ShutdownException if the channel is shut down. - */ - Event* wait(Duration timeout = TIME_INFINITE); - - /** - * Shut down the event channel. - * Blocks till all threads have exited wait() - */ - void shutdown(); - - - // Internal classes. - class Impl; - class Queue; - class Descriptor; - - private: - - EventChannel(); - - Mutex lock; - boost::shared_ptr<Impl> impl; -}; - -/** - * Base class for all Events. - * - * Derived classes define events representing various async IO operations. - * When an event is complete, it is returned by the EventChannel to - * a thread calling wait. The thread will call Event::dispatch() to - * execute code associated with event completion. - */ -class Event -{ - public: - /** Type for callback when event is dispatched */ - typedef boost::function0<void> Callback; - - virtual ~Event(); - - /** Call the callback provided to the constructor, if any. */ - void dispatch(); - - /** - *If there was an exception processing this Event, return it. - *@return 0 if there was no exception. - */ - qpid::Exception::shared_ptr_const getException() const; - - /** If getException() throw the corresponding exception. */ - void throwIfException(); - - /** Set the dispatch callback. */ - void setCallback(Callback cb) { callback = cb; } - - /** Set the exception. */ - void setException(const std::exception& e); - - protected: - Event(Callback cb=0) : callback(cb) {} - - virtual void prepare(EventChannel::Impl&) = 0; - virtual void complete(EventChannel::Descriptor&) = 0; - - Callback callback; - Exception::shared_ptr_const exception; - - friend class EventChannel; - friend class EventChannel::Queue; -}; - -/** Base class for events related to a file descriptor */ -class FDEvent : public Event { - public: - EventChannel::Descriptor& getDescriptor() const { return descriptor; } - int getFDescriptor() const; - - protected: - FDEvent(Callback cb, EventChannel::Descriptor& fd) - : Event(cb), descriptor(fd) {} - // TODO AMS: 1/6/07 I really don't think this is correct, but - // the descriptor is immutable - FDEvent& operator=(const FDEvent& rhs) { Event::operator=(rhs); return *this; } - - private: - EventChannel::Descriptor& descriptor; -}; - -/** Base class for read or write events. */ -class IOEvent : public FDEvent { - public: - size_t getSize() const { return size; } - - protected: - IOEvent(Callback cb, EventChannel::Descriptor& fd, size_t sz, bool noWait_) : - FDEvent(cb, fd), size(sz), noWait(noWait_) {} - - size_t size; - bool noWait; -}; - -/** Asynchronous read event */ -class ReadEvent : public IOEvent -{ - public: - explicit ReadEvent(int fd, void* buf=0, size_t sz=0,Callback cb=0, bool noWait=false); - - void* getBuffer() const { return buffer; } - size_t getBytesRead() const { return bytesRead; } - - private: - void prepare(EventChannel::Impl&); - void complete(EventChannel::Descriptor&); - ssize_t doRead(); - - void* buffer; - size_t bytesRead; -}; - -/** Asynchronous write event */ -class WriteEvent : public IOEvent -{ - public: - explicit WriteEvent(int fd, const void* buf=0, size_t sz=0, Callback cb=0); - - const void* getBuffer() const { return buffer; } - size_t getBytesWritten() const { return bytesWritten; } - - private: - void prepare(EventChannel::Impl&); - void complete(EventChannel::Descriptor&); - ssize_t doWrite(); - - const void* buffer; - size_t bytesWritten; -}; - - -/** Asynchronous socket accept event */ -class AcceptEvent : public FDEvent -{ - public: - /** Accept a connection on fd. */ - explicit AcceptEvent(int fd, Callback cb=0); - - /** Get descriptor for accepted server socket */ - int getAcceptedDesscriptor() const { return accepted; } - - private: - void prepare(EventChannel::Impl&); - void complete(EventChannel::Descriptor&); - - int accepted; -}; - - -}} - - - -#endif /*!_sys_EventChannel_h*/ diff --git a/cpp/src/qpid/sys/posix/EventChannelAcceptor.cpp b/cpp/src/qpid/sys/posix/EventChannelAcceptor.cpp deleted file mode 100644 index a61a66c577..0000000000 --- a/cpp/src/qpid/sys/posix/EventChannelAcceptor.cpp +++ /dev/null @@ -1,165 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "EventChannelConnection.h" - -#include "qpid/sys/ConnectionOutputHandler.h" -#include "qpid/sys/ConnectionInputHandler.h" -#include "qpid/sys/ConnectionInputHandlerFactory.h" -#include "qpid/sys/Acceptor.h" -#include "qpid/sys/Socket.h" -#include "qpid/framing/Buffer.h" -#include "qpid/framing/AMQFrame.h" -#include "qpid/Exception.h" - -#include <sys/socket.h> -#include <netdb.h> - -#include <boost/assert.hpp> -#include <boost/ptr_container/ptr_vector.hpp> -#include <boost/ptr_container/ptr_deque.hpp> -#include <boost/bind.hpp> -#include <boost/scoped_ptr.hpp> - -#include <iostream> - -namespace qpid { -namespace sys { - -using namespace qpid::framing; -using namespace std; - -class EventChannelAcceptor : public Acceptor { - public: - - - EventChannelAcceptor( - int16_t port_, int backlog, int nThreads, bool trace_ - ); - - uint16_t getPort() const; - std::string getHost() const; - - void run(ConnectionInputHandlerFactory* factory); - - void shutdown(); - - private: - - void accept(); - - Mutex lock; - Socket listener; - const int port; - const bool isTrace; - bool isRunning; - boost::ptr_vector<EventChannelConnection> connections; - AcceptEvent acceptEvent; - ConnectionInputHandlerFactory* factory; - bool isShutdown; - EventChannelThreads::shared_ptr threads; -}; - -Acceptor::shared_ptr Acceptor::create( - int16_t port, int backlog, int threads, bool trace) -{ - return Acceptor::shared_ptr( - new EventChannelAcceptor(port, backlog, threads, trace)); -} - -// Must define Acceptor virtual dtor. -Acceptor::~Acceptor() {} - -EventChannelAcceptor::EventChannelAcceptor( - int16_t port_, int backlog, int nThreads, bool trace_ -) : listener(Socket::createTcp()), - port(listener.listen(int(port_), backlog)), - isTrace(trace_), - isRunning(false), - acceptEvent(listener.fd(), - boost::bind(&EventChannelAcceptor::accept, this)), - factory(0), - isShutdown(false), - threads(EventChannelThreads::create(EventChannel::create(), nThreads)) -{ } - -uint16_t EventChannelAcceptor::getPort() const { - return port; // Immutable no need for lock. -} - -std::string EventChannelAcceptor::getHost() const { - ::sockaddr_storage name; // big enough for any socket address - ::socklen_t namelen = sizeof(name); - if (::getsockname(listener.fd(), (::sockaddr*)&name, &namelen) < 0) - throw QPID_POSIX_ERROR(errno); - - char dispName[NI_MAXHOST]; - if (int rc=::getnameinfo((::sockaddr*)&name, namelen, dispName, sizeof(dispName), 0, 0, NI_NUMERICHOST) != 0) - throw QPID_POSIX_ERROR(rc); - return dispName; -} - -void EventChannelAcceptor::run(ConnectionInputHandlerFactory* f) { - { - Mutex::ScopedLock l(lock); - if (!isRunning && !isShutdown) { - isRunning = true; - factory = f; - threads->post(acceptEvent); - } - } - threads->join(); // Wait for shutdown. -} - -void EventChannelAcceptor::shutdown() { - bool doShutdown = false; - { - Mutex::ScopedLock l(lock); - doShutdown = !isShutdown; // I'm the shutdown thread. - isShutdown = true; - } - if (doShutdown) { - ::close(acceptEvent.getFDescriptor()); - threads->shutdown(); - for_each(connections.begin(), connections.end(), - boost::bind(&EventChannelConnection::close, _1)); - } - threads->join(); -} - -void EventChannelAcceptor::accept() -{ - // No lock, we only post one accept event at a time. - if (isShutdown) - return; - if (acceptEvent.getException()) { - Exception::log(*acceptEvent.getException(), - "EventChannelAcceptor::accept"); - shutdown(); - return; - } - int fd = acceptEvent.getAcceptedDesscriptor(); - threads->post(acceptEvent); // Keep accepting. - // TODO aconway 2006-11-29: Need to reap closed connections also. - connections.push_back( - new EventChannelConnection(threads, *factory, fd, fd, isTrace)); -} - -}} // namespace qpid::sys diff --git a/cpp/src/qpid/sys/posix/EventChannelConnection.cpp b/cpp/src/qpid/sys/posix/EventChannelConnection.cpp deleted file mode 100644 index f4b6396dd1..0000000000 --- a/cpp/src/qpid/sys/posix/EventChannelConnection.cpp +++ /dev/null @@ -1,237 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <iostream> - -#include <boost/bind.hpp> -#include <boost/assert.hpp> - -#include "EventChannelConnection.h" -#include "qpid/sys/ConnectionInputHandlerFactory.h" -#include "qpid/QpidError.h" - -using namespace std; -using namespace qpid; -using namespace qpid::framing; - -namespace qpid { -namespace sys { - -const size_t EventChannelConnection::bufferSize = 65536; - -EventChannelConnection::EventChannelConnection( - EventChannelThreads::shared_ptr threads_, - ConnectionInputHandlerFactory& factory_, - int rfd, - int wfd, - bool isTrace_ -) : - readFd(rfd), - writeFd(wfd ? wfd : rfd), - readEvent(readFd), - writeEvent(writeFd), - readCallback(boost::bind(&EventChannelConnection::closeOnException, - this, &EventChannelConnection::endInitRead)), - - isWriting(false), - isClosed(false), - threads(threads_), - handler(factory_.create(this)), - in(bufferSize), - out(bufferSize), - isTrace(isTrace_) -{ - assert(readFd > 0); - assert(writeFd > 0); - closeOnException(&EventChannelConnection::startRead); -} - - -void EventChannelConnection::send(AMQFrame& frame) { - { - Monitor::ScopedLock lock(monitor); - writeFrames.push_back(frame); - } - closeOnException(&EventChannelConnection::startWrite); -} - -void EventChannelConnection::close() { - { - Monitor::ScopedLock lock(monitor); - if (isClosed) - return; - isClosed = true; - } - ::close(readFd); - ::close(writeFd); - { - Monitor::ScopedLock lock(monitor); - while (busyThreads > 0) - monitor.wait(); - } - handler->closed(); -} - -void EventChannelConnection::closeNoThrow() { - Exception::tryCatchLog<void>( - boost::bind(&EventChannelConnection::close, this), - false, - "Exception closing channel" - ); -} - -/** - * Call f in a try/catch block and close the connection if - * an exception is thrown. - */ -void EventChannelConnection::closeOnException(MemberFnPtr f) -{ - try { - Exception::tryCatchLog<void>( - boost::bind(f, this), - "Closing connection due to exception" - ); - return; - } catch (...) { - // Exception was already logged by tryCatchLog - closeNoThrow(); - } -} - -// Post the write event. -// Always called inside closeOnException. -// Called by endWrite and send, but only one thread writes at a time. -// -void EventChannelConnection::startWrite() { - { - Monitor::ScopedLock lock(monitor); - // Stop if closed or a write event is already in progress. - if (isClosed || isWriting) - return; - if (writeFrames.empty()) { - isWriting = false; - return; - } - isWriting = true; - AMQFrame& frame = writeFrames.front(); - writeFrames.pop_front(); - // No need to lock here - only one thread can be writing at a time. - out.clear(); - if (isTrace) - cout << "Send on socket " << writeFd << ": " << frame << endl; - frame.encode(out); - out.flip(); - } - // TODO: AMS 1/6/07 This only works because we already have the correct fd - // in the descriptor - change not to use assigment - writeEvent = WriteEvent( - writeFd, out.start(), out.available(), - boost::bind(&EventChannelConnection::closeOnException, - this, &EventChannelConnection::endWrite)); - threads->post(writeEvent); -} - -// ScopedBusy ctor increments busyThreads. -// dtor decrements and calls monitor.notifyAll if it reaches 0. -// -struct EventChannelConnection::ScopedBusy : public AtomicCount::ScopedIncrement -{ - ScopedBusy(EventChannelConnection& ecc) - : AtomicCount::ScopedIncrement( - ecc.busyThreads, boost::bind(&Monitor::notifyAll, &ecc.monitor)) - {} -}; - -// Write event completed. -// Always called by a channel thread inside closeOnException. -// -void EventChannelConnection::endWrite() { - ScopedBusy(*this); - { - Monitor::ScopedLock lock(monitor); - assert(isWriting); - isWriting = false; - if (isClosed) - return; - writeEvent.throwIfException(); - if (writeEvent.getBytesWritten() < writeEvent.getSize()) { - // Keep writing the current event till done. - isWriting = true; - threads->post(writeEvent); - } - } - // Continue writing from writeFrames queue. - startWrite(); -} - - -// Post the read event. -// Always called inside closeOnException. -// Called from ctor and end[Init]Read, so only one call at a time -// is possible since we only post one read event at a time. -// -void EventChannelConnection::startRead() { - // Non blocking read, as much as we can swallow. - readEvent = ReadEvent( - readFd, in.start(), in.available(), readCallback); - threads->post(readEvent); -} - -// Completion of initial read, expect protocolInit. -// Always called inside closeOnException in channel thread. -// Only called by one thread at a time. -void EventChannelConnection::endInitRead() { - ScopedBusy(*this); - if (!isClosed) { - readEvent.throwIfException(); - in.move(readEvent.getBytesRead()); - in.flip(); - ProtocolInitiation protocolInit; - if(protocolInit.decode(in)){ - handler->initiated(protocolInit); - readCallback = boost::bind( - &EventChannelConnection::closeOnException, - this, &EventChannelConnection::endRead); - } - in.compact(); - // Continue reading. - startRead(); - } -} - -// Normal reads, expect a frame. -// Always called inside closeOnException in channel thread. -void EventChannelConnection::endRead() { - ScopedBusy(*this); - if (!isClosed) { - readEvent.throwIfException(); - in.move(readEvent.getBytesRead()); - in.flip(); - AMQFrame frame; - while (frame.decode(in)) { - if (isTrace) - cout << "Received on socket " << readFd - << ": " << frame << endl; - handler->received(frame); - } - in.compact(); - startRead(); - } -} - -}} // namespace qpid::sys diff --git a/cpp/src/qpid/sys/posix/EventChannelConnection.h b/cpp/src/qpid/sys/posix/EventChannelConnection.h deleted file mode 100644 index a4ca5de517..0000000000 --- a/cpp/src/qpid/sys/posix/EventChannelConnection.h +++ /dev/null @@ -1,96 +0,0 @@ -#ifndef _posix_EventChannelConnection_h -#define _posix_EventChannelConnection_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <boost/ptr_container/ptr_deque.hpp> - -#include "EventChannelThreads.h" -#include "qpid/sys/Monitor.h" -#include "qpid/sys/ConnectionOutputHandler.h" -#include "qpid/sys/ConnectionInputHandler.h" -#include "qpid/sys/AtomicCount.h" -#include "qpid/framing/AMQFrame.h" - -namespace qpid { -namespace sys { - -class ConnectionInputHandlerFactory; - -/** - * Implements SessionContext and delegates to a SessionHandler - * for a connection via the EventChannel. - *@param readDescriptor file descriptor for reading. - *@param writeDescriptor file descriptor for writing, - * by default same as readDescriptor - */ -class EventChannelConnection : public ConnectionOutputHandler { - public: - EventChannelConnection( - EventChannelThreads::shared_ptr threads, - ConnectionInputHandlerFactory& factory, - int readDescriptor, - int writeDescriptor = 0, - bool isTrace = false - ); - - virtual void send(qpid::framing::AMQFrame& frame); - virtual void close(); - - private: - typedef std::deque<qpid::framing::AMQFrame> FrameQueue; - typedef void (EventChannelConnection::*MemberFnPtr)(); - struct ScopedBusy; - - void startWrite(); - void endWrite(); - void startRead(); - void endInitRead(); - void endRead(); - void closeNoThrow(); - void closeOnException(MemberFnPtr); - bool shouldContinue(bool& flag); - - static const size_t bufferSize; - - Monitor monitor; - - int readFd, writeFd; - ReadEvent readEvent; - WriteEvent writeEvent; - Event::Callback readCallback; - bool isWriting; - bool isClosed; - AtomicCount busyThreads; - - EventChannelThreads::shared_ptr threads; - std::auto_ptr<ConnectionInputHandler> handler; - qpid::framing::Buffer in, out; - FrameQueue writeFrames; - bool isTrace; - - friend struct ScopedBusy; -}; - - -}} // namespace qpid::sys - - - -#endif /*!_posix_EventChannelConnection_h*/ diff --git a/cpp/src/qpid/sys/posix/EventChannelThreads.cpp b/cpp/src/qpid/sys/posix/EventChannelThreads.cpp deleted file mode 100644 index 70954d0c16..0000000000 --- a/cpp/src/qpid/sys/posix/EventChannelThreads.cpp +++ /dev/null @@ -1,126 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <iostream> -#include <limits> - -#include <boost/bind.hpp> - -#include "qpid/sys/Runnable.h" - -#include "EventChannelThreads.h" - -namespace qpid { -namespace sys { - -const size_t EventChannelThreads::unlimited = - std::numeric_limits<size_t>::max(); - -EventChannelThreads::shared_ptr EventChannelThreads::create( - EventChannel::shared_ptr ec, size_t min, size_t max -) -{ - return EventChannelThreads::shared_ptr( - new EventChannelThreads(ec, min, max)); -} - -EventChannelThreads::EventChannelThreads( - EventChannel::shared_ptr ec, size_t min, size_t max) : - minThreads(std::max(size_t(1), min)), - maxThreads(std::min(min, max)), - channel(ec), - nWaiting(0), - state(RUNNING) -{ - Monitor::ScopedLock l(monitor); - while (workers.size() < minThreads) - workers.push_back(Thread(*this)); -} - -EventChannelThreads::~EventChannelThreads() { - shutdown(); - join(); -} - -void EventChannelThreads::shutdown() -{ - Monitor::ScopedLock lock(monitor); - if (state != RUNNING) // Already shutting down. - return; - state = TERMINATING; - channel->shutdown(); - monitor.notify(); // Wake up one join() thread. -} - -void EventChannelThreads::join() -{ - { - Monitor::ScopedLock lock(monitor); - while (state == RUNNING) // Wait for shutdown to start. - monitor.wait(); - if (state == SHUTDOWN) // Shutdown is complete - return; - if (state == JOINING) { - // Someone else is doing the join. - while (state != SHUTDOWN) - monitor.wait(); - return; - } - // I'm the joining thread - assert(state == TERMINATING); - state = JOINING; - } // Drop the lock. - - for (size_t i = 0; i < workers.size(); ++i) { - assert(state == JOINING); // Only this thread can change JOINING. - workers[i].join(); - } - state = SHUTDOWN; - monitor.notifyAll(); // Notify any other join() threads. -} - -void EventChannelThreads::addThread() { - Monitor::ScopedLock l(monitor); - if (workers.size() < maxThreads) - workers.push_back(Thread(*this)); -} - -void EventChannelThreads::run() -{ - // Start life waiting. Decrement on exit. - AtomicCount::ScopedIncrement inc(nWaiting); - try { - while (true) { - Event* e = channel->wait(); - assert(e != 0); - AtomicCount::ScopedDecrement dec(nWaiting); - // Make sure there's at least one waiting thread. - if (dec == 0 && state == RUNNING) - addThread(); - e->dispatch(); - } - } - catch (const EventChannel::ShutdownException& e) { - return; - } - catch (const std::exception& e) { - Exception::log(e, "Exception in EventChannelThreads::run()"); - } -} - -}} diff --git a/cpp/src/qpid/sys/posix/EventChannelThreads.h b/cpp/src/qpid/sys/posix/EventChannelThreads.h deleted file mode 100644 index 19112cf4db..0000000000 --- a/cpp/src/qpid/sys/posix/EventChannelThreads.h +++ /dev/null @@ -1,105 +0,0 @@ -#ifndef _posix_EventChannelThreads_h -#define _sys_EventChannelThreads_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "EventChannel.h" - -#include "qpid/Exception.h" -#include "qpid/sys/AtomicCount.h" -#include "qpid/sys/Monitor.h" -#include "qpid/sys/Thread.h" -#include "qpid/sys/Time.h" -#include "qpid/sys/Runnable.h" - -#include <vector> - -namespace qpid { -namespace sys { - -/** - Dynamic thread pool serving an EventChannel. - - Threads run a loop { e = wait(); e->dispatch(); } - The size of the thread pool is automatically adjusted to optimal size. -*/ -class EventChannelThreads : - public qpid::SharedObject<EventChannelThreads>, - private sys::Runnable -{ - public: - /** Constant to represent an unlimited number of threads */ - static const size_t unlimited; - - /** - * Create the thread pool and start initial threads. - * @param minThreads Pool will initialy contain minThreads threads and - * will never shrink to less until shutdown. - * @param maxThreads Pool will never grow to more than maxThreads. - */ - static EventChannelThreads::shared_ptr create( - EventChannel::shared_ptr channel = EventChannel::create(), - size_t minThreads = 1, - size_t maxThreads = unlimited - ); - - ~EventChannelThreads(); - - /** Post event to the underlying channel */ - void post(Event& event) { channel->post(event); } - - /** - * Terminate all threads. - * - * Returns immediately, use join() to wait till all threads are - * shut down. - */ - void shutdown(); - - /** Wait for all threads to terminate. */ - void join(); - - private: - typedef std::vector<sys::Thread> Threads; - typedef enum { - RUNNING, TERMINATING, JOINING, SHUTDOWN - } State; - - EventChannelThreads( - EventChannel::shared_ptr channel, size_t min, size_t max); - - void addThread(); - - void run(); - bool keepRunning(); - void adjustThreads(); - - Monitor monitor; - size_t minThreads; - size_t maxThreads; - EventChannel::shared_ptr channel; - Threads workers; - sys::AtomicCount nWaiting; - State state; -}; - - -}} - - -#endif /*!_sys_EventChannelThreads_h*/ diff --git a/cpp/src/qpid/sys/posix/PosixAcceptor.cpp b/cpp/src/qpid/sys/posix/PosixAcceptor.cpp deleted file mode 100644 index 0575380a14..0000000000 --- a/cpp/src/qpid/sys/posix/PosixAcceptor.cpp +++ /dev/null @@ -1,49 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/sys/Acceptor.h" -#include "qpid/Exception.h" - -namespace qpid { -namespace sys { - -namespace { -void fail() { throw qpid::Exception("PosixAcceptor not implemented"); } -} - -class PosixAcceptor : public Acceptor { - public: - virtual uint16_t getPort() const { fail(); return 0; } - virtual std::string getPort() const { fail(); return std::string(); } - virtual void run(qpid::sys::ConnectionInputHandlerFactory* ) { fail(); } - virtual void shutdown() { fail(); } -}; - -// Define generic Acceptor::create() to return APRAcceptor. - Acceptor::shared_ptr Acceptor::create(int16_t , int, int, bool) -{ - return Acceptor::shared_ptr(new PosixAcceptor()); -} - -// Must define Acceptor virtual dtor. -Acceptor::~Acceptor() {} - -}} diff --git a/cpp/src/qpid/sys/posix/PrivatePosix.h b/cpp/src/qpid/sys/posix/PrivatePosix.h index 2707057ef0..9ec9770cab 100644 --- a/cpp/src/qpid/sys/posix/PrivatePosix.h +++ b/cpp/src/qpid/sys/posix/PrivatePosix.h @@ -30,10 +30,15 @@ struct timeval; namespace qpid { namespace sys { +// Private Time related implementation details struct timespec& toTimespec(struct timespec& ts, const Duration& t); struct timeval& toTimeval(struct timeval& tv, const Duration& t); Duration toTime(const struct timespec& ts); +// Private socket related implementation details +class SocketPrivate; +int toFd(const SocketPrivate* s); + }} #endif /*!_sys_posix_PrivatePosix_h*/ diff --git a/cpp/src/qpid/sys/posix/Socket.cpp b/cpp/src/qpid/sys/posix/Socket.cpp index d46e7943d9..6ee7a84beb 100644 --- a/cpp/src/qpid/sys/posix/Socket.cpp +++ b/cpp/src/qpid/sys/posix/Socket.cpp @@ -25,6 +25,8 @@ #include "check.h" #include "PrivatePosix.h" +#include <fcntl.h> +#include <sys/types.h> #include <sys/socket.h> #include <sys/errno.h> #include <netinet/in.h> @@ -32,30 +34,63 @@ #include <boost/format.hpp> -using namespace qpid::sys; +namespace qpid { +namespace sys { -Socket Socket::createTcp() +class SocketPrivate { +public: + SocketPrivate(int f = -1) : + fd(f) + {} + + int fd; +}; + +Socket::Socket() : + impl(new SocketPrivate) +{ + createTcp(); +} + +Socket::Socket(SocketPrivate* sp) : + impl(sp) +{} + +Socket::~Socket() { + delete impl; +} + +void Socket::createTcp() const { + int& socket = impl->fd; + if (socket != -1) Socket::close(); int s = ::socket (PF_INET, SOCK_STREAM, 0); if (s < 0) throw QPID_POSIX_ERROR(errno); - return s; + socket = s; } -Socket::Socket(int descriptor) : socket(descriptor) {} - -void Socket::setTimeout(const Duration& interval) +void Socket::setTimeout(const Duration& interval) const { + const int& socket = impl->fd; struct timeval tv; toTimeval(tv, interval); setsockopt(socket, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)); setsockopt(socket, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); } -void Socket::connect(const std::string& host, int port) +void Socket::setNonblocking() const { + QPID_POSIX_CHECK(::fcntl(impl->fd, F_SETFL, O_NONBLOCK)); +} + + +void Socket::connect(const std::string& host, int port) const { + const int& socket = impl->fd; struct sockaddr_in name; name.sin_family = AF_INET; name.sin_port = htons(port); + // TODO: Be good to make this work for IPv6 as well as IPv4 + // Use more modern lookup functions struct hostent* hp = gethostbyname ( host.c_str() ); if (hp == 0) throw QPID_POSIX_ERROR(errno); memcpy(&name.sin_addr.s_addr, hp->h_addr_list[0], hp->h_length); @@ -64,16 +99,18 @@ void Socket::connect(const std::string& host, int port) } void -Socket::close() +Socket::close() const { - if (socket == 0) return; + int& socket = impl->fd; + if (socket == -1) return; if (::close(socket) < 0) throw QPID_POSIX_ERROR(errno); - socket = 0; + socket = -1; } ssize_t -Socket::send(const void* data, size_t size) +Socket::send(const void* data, size_t size) const { + const int& socket = impl->fd; ssize_t sent = ::send(socket, data, size, 0); if (sent < 0) { if (errno == ECONNRESET) return SOCKET_EOF; @@ -84,8 +121,9 @@ Socket::send(const void* data, size_t size) } ssize_t -Socket::recv(void* data, size_t size) +Socket::recv(void* data, size_t size) const { + const int& socket = impl->fd; ssize_t received = ::recv(socket, data, size, 0); if (received < 0) { if (errno == ETIMEDOUT) return SOCKET_TIMEOUT; @@ -94,8 +132,9 @@ Socket::recv(void* data, size_t size) return received; } -int Socket::listen(int port, int backlog) +int Socket::listen(int port, int backlog) const { + const int& socket = impl->fd; struct sockaddr_in name; name.sin_family = AF_INET; name.sin_port = htons(port); @@ -111,8 +150,45 @@ int Socket::listen(int port, int backlog) return ntohs(name.sin_port); } + +Socket* Socket::accept(struct sockaddr *addr, socklen_t *addrlen) const +{ + int afd = ::accept(impl->fd, addr, addrlen); + if ( afd >= 0) + return new Socket(new SocketPrivate(afd)); + else if (errno == EAGAIN) + return 0; + else throw QPID_POSIX_ERROR(errno); +} + +int Socket::read(void *buf, size_t count) const +{ + return ::read(impl->fd, buf, count); +} + +int Socket::write(const void *buf, size_t count) const +{ + return ::write(impl->fd, buf, count); +} + +std::string Socket::getSockname() const +{ + ::sockaddr_storage name; // big enough for any socket address + ::socklen_t namelen = sizeof(name); + + const int& socket = impl->fd; + if (::getsockname(socket, (::sockaddr*)&name, &namelen) < 0) + throw QPID_POSIX_ERROR(errno); -int Socket::fd() const + char dispName[NI_MAXHOST]; + if (int rc=::getnameinfo((::sockaddr*)&name, namelen, dispName, sizeof(dispName), 0, 0, NI_NUMERICHOST) != 0) + throw QPID_POSIX_ERROR(rc); + return dispName; +} + +int toFd(const SocketPrivate* s) { - return socket; + return s->fd; } + +}} // namespace qpid::sys diff --git a/cpp/src/qpid/sys/posix/Socket.h b/cpp/src/qpid/sys/posix/Socket.h deleted file mode 100644 index ca87104471..0000000000 --- a/cpp/src/qpid/sys/posix/Socket.h +++ /dev/null @@ -1,76 +0,0 @@ -#ifndef _sys_posix_Socket_h -#define _sys_posix_Socket_h - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <string> -#include "qpid/sys/Time.h" - -namespace qpid { -namespace sys { - -class Socket -{ - public: - /** Create an initialized TCP socket */ - static Socket createTcp(); - - /** Create a socket wrapper for descriptor. */ - Socket(int descriptor = 0); - - /** Set timeout for read and write */ - void setTimeout(const Duration& interval); - - void connect(const std::string& host, int port); - - void close(); - - enum { SOCKET_TIMEOUT=-2, SOCKET_EOF=-3 } ErrorCode; - - /** Returns bytes sent or an ErrorCode value < 0. */ - ssize_t send(const void* data, size_t size); - - /** - * Returns bytes received, an ErrorCode value < 0 or 0 - * if the connection closed in an orderly manner. - */ - ssize_t recv(void* data, size_t size); - - /** Bind to a port and start listening. - *@param port 0 means choose an available port. - *@param backlog maximum number of pending connections. - *@return The bound port. - */ - int listen(int port = 0, int backlog = 10); - - /** Get file descriptor */ - int fd() const; - - private: - void init() const; - mutable int socket; // Initialized on demand. -}; - -}} - - -#endif /*!_sys_Socket_h*/ diff --git a/cpp/src/tests/.valgrind.supp-default b/cpp/src/tests/.valgrind.supp-default index 21fa58db45..18e69df6ff 100644 --- a/cpp/src/tests/.valgrind.supp-default +++ b/cpp/src/tests/.valgrind.supp-default @@ -7,4 +7,12 @@ obj:*/libcpg.so.2.0.0 } +{ + Uninitialised value problem in dlopen + Memcheck:Cond + fun:_dl_relocate_object + fun:*dl_* + obj:/lib64/ld-2.6.so + obj:* +} |