diff options
author | Andrew Stitcher <astitcher@apache.org> | 2008-07-09 20:36:17 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2008-07-09 20:36:17 +0000 |
commit | 2212c5a5b56466491986220ddd6a3aa4e81ff4e4 (patch) | |
tree | b1104abc063d7cf0d869ce7be74e5e84474d2c88 /cpp/src | |
parent | 9575428feb2a81323f0426361830bc543eba29db (diff) | |
download | qpid-python-2212c5a5b56466491986220ddd6a3aa4e81ff4e4.tar.gz |
Some small changes which clean up header file inclusions
and generally start to tidy up the network layer so that it's
a bit easier to implement new network transports
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@675338 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/client/Connection.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.cpp | 21 | ||||
-rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/client/Connector.h | 13 | ||||
-rw-r--r-- | cpp/src/qpid/sys/AsynchIO.h | 37 | ||||
-rw-r--r-- | cpp/src/qpid/sys/AsynchIOHandler.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/sys/AsynchIOHandler.h | 8 | ||||
-rw-r--r-- | cpp/src/qpid/sys/ProtocolFactory.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/sys/TCPIOPlugin.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/sys/posix/AsynchIO.cpp | 1 |
10 files changed, 57 insertions, 38 deletions
diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h index a476f2d880..ee543e20d2 100644 --- a/cpp/src/qpid/client/Connection.h +++ b/cpp/src/qpid/client/Connection.h @@ -23,7 +23,6 @@ */ #include <map> #include <string> -#include "ConnectionImpl.h" #include "qpid/client/Session.h" namespace qpid { diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp index 6dca4dcf21..f32e21c389 100644 --- a/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/ConnectionImpl.cpp @@ -19,6 +19,7 @@ * */ #include "ConnectionImpl.h" +#include "Connector.h" #include "ConnectionSettings.h" #include "SessionImpl.h" @@ -38,7 +39,7 @@ using namespace qpid::framing::connection;//for connection error codes ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSettings& settings) : Bounds(settings.maxFrameSize * settings.bounds), handler(settings, v), - connector(v, settings, this), + connector(new Connector(v, settings, this)), version(v), isClosed(true),//closed until successfully opened isClosing(false) @@ -48,9 +49,9 @@ ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSetti handler.out = boost::bind(&Connector::send, boost::ref(connector), _1); handler.onClose = boost::bind(&ConnectionImpl::closed, this, NORMAL, std::string()); - connector.setInputHandler(&handler); - connector.setTimeoutHandler(this); - connector.setShutdownHandler(this); + connector->setInputHandler(&handler); + connector->setTimeoutHandler(this); + connector->setShutdownHandler(this); //only set error handler once open handler.onError = boost::bind(&ConnectionImpl::closed, this, _1, _2); @@ -60,7 +61,7 @@ ConnectionImpl::~ConnectionImpl() { // Important to close the connector first, to ensure the // connector thread does not call on us while the destructor // is running. - connector.close(); + connector->close(); } void ConnectionImpl::addSession(const boost::shared_ptr<SessionImpl>& session) @@ -97,8 +98,8 @@ bool ConnectionImpl::isOpen() const void ConnectionImpl::open(const std::string& host, int port) { QPID_LOG(info, "Connecting to " << host << ":" << port); - connector.connect(host, port); - connector.init(); + connector->connect(host, port); + connector->init(); handler.waitForOpen(); Mutex::ScopedLock l(lock); isClosed = false; @@ -112,7 +113,7 @@ void ConnectionImpl::idleIn() void ConnectionImpl::idleOut() { AMQFrame frame(in_place<AMQHeartbeatBody>()); - connector.send(frame); + connector->send(frame); } void ConnectionImpl::close() @@ -130,8 +131,8 @@ void ConnectionImpl::close() template <class F> void ConnectionImpl::closeInternal(const F& f) { isClosed = true; - connector.close(); - for (SessionMap::iterator i=sessions.begin(); i != sessions.end(); ++i) { + connector->close(); + for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) { boost::shared_ptr<SessionImpl> s = i->second.lock(); if (s) f(s); } diff --git a/cpp/src/qpid/client/ConnectionImpl.h b/cpp/src/qpid/client/ConnectionImpl.h index b02dda5af7..98fb212c3e 100644 --- a/cpp/src/qpid/client/ConnectionImpl.h +++ b/cpp/src/qpid/client/ConnectionImpl.h @@ -24,7 +24,6 @@ #include "Bounds.h" #include "ConnectionHandler.h" -#include "Connector.h" #include "qpid/framing/FrameHandler.h" #include "qpid/sys/Mutex.h" #include "qpid/sys/ShutdownHandler.h" @@ -33,11 +32,13 @@ #include <map> #include <boost/shared_ptr.hpp> #include <boost/weak_ptr.hpp> +#include <boost/scoped_ptr.hpp> #include <boost/enable_shared_from_this.hpp> namespace qpid { namespace client { +class Connector; class ConnectionSettings; class SessionImpl; @@ -52,7 +53,7 @@ class ConnectionImpl : public Bounds, SessionMap sessions; ConnectionHandler handler; - Connector connector; + boost::scoped_ptr<Connector> connector; framing::ProtocolVersion version; sys::Mutex lock; bool isClosed; diff --git a/cpp/src/qpid/client/Connector.h b/cpp/src/qpid/client/Connector.h index b35e77c726..c7f5be0936 100644 --- a/cpp/src/qpid/client/Connector.h +++ b/cpp/src/qpid/client/Connector.h @@ -34,13 +34,18 @@ #include "qpid/sys/Mutex.h" #include "qpid/sys/Socket.h" #include "qpid/sys/Time.h" -#include "qpid/sys/AsynchIO.h" #include <queue> #include <boost/weak_ptr.hpp> #include <boost/shared_ptr.hpp> namespace qpid { + +namespace sys { +class Poller; +class AsynchIO; +class AsynchIOBufferBase; +} namespace client { @@ -56,7 +61,7 @@ class Connector : public framing::OutputHandler, /** Batch up frames for writing to aio. */ class Writer : public framing::FrameHandler { - typedef sys::AsynchIO::BufferBase BufferBase; + typedef sys::AsynchIOBufferBase BufferBase; typedef std::vector<framing::AMQFrame> Frames; const uint16_t maxFrameSize; @@ -109,7 +114,7 @@ class Connector : public framing::OutputHandler, sys::Socket socket; sys::AsynchIO* aio; - sys::Poller::shared_ptr poller; + boost::shared_ptr<sys::Poller> poller; void checkIdle(ssize_t status); void setSocketTimeout(); @@ -118,7 +123,7 @@ class Connector : public framing::OutputHandler, void handleClosed(); bool closeInternal(); - void readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIO::BufferBase*); + void readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*); void writebuff(qpid::sys::AsynchIO&); void writeDataBlock(const framing::AMQDataBlock& data); void eof(qpid::sys::AsynchIO&); diff --git a/cpp/src/qpid/sys/AsynchIO.h b/cpp/src/qpid/sys/AsynchIO.h index 847bdc50e4..ff7823e00d 100644 --- a/cpp/src/qpid/sys/AsynchIO.h +++ b/cpp/src/qpid/sys/AsynchIO.h @@ -22,13 +22,14 @@ */ #include "Dispatcher.h" -#include "Socket.h" #include <boost/function.hpp> #include <deque> namespace qpid { namespace sys { + +class Socket; /* * Asynchronous acceptor: accepts connections then does a callback with the @@ -78,6 +79,23 @@ private: void failure(int, std::string); }; +struct AsynchIOBufferBase { + char* const bytes; + const int32_t byteCount; + int32_t dataStart; + int32_t dataCount; + + AsynchIOBufferBase(char* const b, const int32_t s) : + bytes(b), + byteCount(s), + dataStart(0), + dataCount(0) + {} + + virtual ~AsynchIOBufferBase() + {} +}; + /* * Asychronous reader/writer: * Reader accepts buffers to read into; reads into the provided buffers @@ -92,22 +110,7 @@ private: */ class AsynchIO : private DispatchHandle { public: - struct BufferBase { - char* const bytes; - const int32_t byteCount; - int32_t dataStart; - int32_t dataCount; - - BufferBase(char* const b, const int32_t s) : - bytes(b), - byteCount(s), - dataStart(0), - dataCount(0) - {} - - virtual ~BufferBase() - {} - }; + typedef AsynchIOBufferBase BufferBase; typedef boost::function2<void, AsynchIO&, BufferBase*> ReadCallback; typedef boost::function1<void, AsynchIO&> EofCallback; diff --git a/cpp/src/qpid/sys/AsynchIOHandler.cpp b/cpp/src/qpid/sys/AsynchIOHandler.cpp index ca2bd7c93c..886dbc8f43 100644 --- a/cpp/src/qpid/sys/AsynchIOHandler.cpp +++ b/cpp/src/qpid/sys/AsynchIOHandler.cpp @@ -20,6 +20,8 @@ */ #include "AsynchIOHandler.h" +#include "qpid/sys/AsynchIO.h" +#include "qpid/sys/Socket.h" #include "qpid/framing/AMQP_HighestVersion.h" #include "qpid/framing/ProtocolInitiation.h" #include "qpid/log/Statement.h" diff --git a/cpp/src/qpid/sys/AsynchIOHandler.h b/cpp/src/qpid/sys/AsynchIOHandler.h index 7448094a94..26e2cf4c5c 100644 --- a/cpp/src/qpid/sys/AsynchIOHandler.h +++ b/cpp/src/qpid/sys/AsynchIOHandler.h @@ -23,7 +23,6 @@ #include "OutputControl.h" #include "ConnectionCodec.h" -#include "AsynchIO.h" namespace qpid { @@ -32,6 +31,11 @@ namespace framing { } namespace sys { + +class AsynchIO; +class AsynchIOBufferBase; +class Socket; + class AsynchIOHandler : public OutputControl { std::string identifier; AsynchIO* aio; @@ -54,7 +58,7 @@ class AsynchIOHandler : public OutputControl { void activateOutput(); // Input side - void readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff); + void readbuff(AsynchIO& aio, AsynchIOBufferBase* buff); void eof(AsynchIO& aio); void disconnect(AsynchIO& aio); diff --git a/cpp/src/qpid/sys/ProtocolFactory.h b/cpp/src/qpid/sys/ProtocolFactory.h index e8eaefe1f6..4aa14d2cf6 100644 --- a/cpp/src/qpid/sys/ProtocolFactory.h +++ b/cpp/src/qpid/sys/ProtocolFactory.h @@ -35,6 +35,8 @@ class Poller; class ProtocolFactory : public qpid::SharedObject<ProtocolFactory> { public: + typedef boost::function2<void, int, std::string> ConnectFailedCallback; + virtual ~ProtocolFactory() = 0; virtual uint16_t getPort() const = 0; virtual std::string getHost() const = 0; @@ -43,7 +45,7 @@ class ProtocolFactory : public qpid::SharedObject<ProtocolFactory> boost::shared_ptr<Poller>, const std::string& host, int16_t port, ConnectionCodec::Factory* codec, - boost::function2<void, int, std::string> failed) = 0; + ConnectFailedCallback failed) = 0; }; inline ProtocolFactory::~ProtocolFactory() {} diff --git a/cpp/src/qpid/sys/TCPIOPlugin.cpp b/cpp/src/qpid/sys/TCPIOPlugin.cpp index e82a6a9102..4b661f1713 100644 --- a/cpp/src/qpid/sys/TCPIOPlugin.cpp +++ b/cpp/src/qpid/sys/TCPIOPlugin.cpp @@ -24,6 +24,7 @@ #include "AsynchIO.h" #include "qpid/Plugin.h" +#include "qpid/sys/Socket.h" #include "qpid/broker/Broker.h" #include "qpid/log/Statement.h" @@ -112,7 +113,7 @@ void AsynchIOProtocolFactory::connect( Poller::shared_ptr poller, const std::string& host, int16_t port, ConnectionCodec::Factory* fact, - boost::function2<void, int, std::string> failed) + ConnectFailedCallback failed) { // Note that the following logic does not cause a memory leak. // The allocated Socket is freed either by the AsynchConnector diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp index 470db4c614..58c7800514 100644 --- a/cpp/src/qpid/sys/posix/AsynchIO.cpp +++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp @@ -20,6 +20,7 @@ */ #include "qpid/sys/AsynchIO.h" +#include "qpid/sys/Socket.h" #include "qpid/sys/Time.h" #include "check.h" |