summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2008-07-09 20:36:17 +0000
committerAndrew Stitcher <astitcher@apache.org>2008-07-09 20:36:17 +0000
commit2212c5a5b56466491986220ddd6a3aa4e81ff4e4 (patch)
treeb1104abc063d7cf0d869ce7be74e5e84474d2c88 /cpp/src
parent9575428feb2a81323f0426361830bc543eba29db (diff)
downloadqpid-python-2212c5a5b56466491986220ddd6a3aa4e81ff4e4.tar.gz
Some small changes which clean up header file inclusions
and generally start to tidy up the network layer so that it's a bit easier to implement new network transports git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@675338 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/client/Connection.h1
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.cpp21
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.h5
-rw-r--r--cpp/src/qpid/client/Connector.h13
-rw-r--r--cpp/src/qpid/sys/AsynchIO.h37
-rw-r--r--cpp/src/qpid/sys/AsynchIOHandler.cpp2
-rw-r--r--cpp/src/qpid/sys/AsynchIOHandler.h8
-rw-r--r--cpp/src/qpid/sys/ProtocolFactory.h4
-rw-r--r--cpp/src/qpid/sys/TCPIOPlugin.cpp3
-rw-r--r--cpp/src/qpid/sys/posix/AsynchIO.cpp1
10 files changed, 57 insertions, 38 deletions
diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h
index a476f2d880..ee543e20d2 100644
--- a/cpp/src/qpid/client/Connection.h
+++ b/cpp/src/qpid/client/Connection.h
@@ -23,7 +23,6 @@
*/
#include <map>
#include <string>
-#include "ConnectionImpl.h"
#include "qpid/client/Session.h"
namespace qpid {
diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp
index 6dca4dcf21..f32e21c389 100644
--- a/cpp/src/qpid/client/ConnectionImpl.cpp
+++ b/cpp/src/qpid/client/ConnectionImpl.cpp
@@ -19,6 +19,7 @@
*
*/
#include "ConnectionImpl.h"
+#include "Connector.h"
#include "ConnectionSettings.h"
#include "SessionImpl.h"
@@ -38,7 +39,7 @@ using namespace qpid::framing::connection;//for connection error codes
ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSettings& settings)
: Bounds(settings.maxFrameSize * settings.bounds),
handler(settings, v),
- connector(v, settings, this),
+ connector(new Connector(v, settings, this)),
version(v),
isClosed(true),//closed until successfully opened
isClosing(false)
@@ -48,9 +49,9 @@ ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSetti
handler.out = boost::bind(&Connector::send, boost::ref(connector), _1);
handler.onClose = boost::bind(&ConnectionImpl::closed, this,
NORMAL, std::string());
- connector.setInputHandler(&handler);
- connector.setTimeoutHandler(this);
- connector.setShutdownHandler(this);
+ connector->setInputHandler(&handler);
+ connector->setTimeoutHandler(this);
+ connector->setShutdownHandler(this);
//only set error handler once open
handler.onError = boost::bind(&ConnectionImpl::closed, this, _1, _2);
@@ -60,7 +61,7 @@ ConnectionImpl::~ConnectionImpl() {
// Important to close the connector first, to ensure the
// connector thread does not call on us while the destructor
// is running.
- connector.close();
+ connector->close();
}
void ConnectionImpl::addSession(const boost::shared_ptr<SessionImpl>& session)
@@ -97,8 +98,8 @@ bool ConnectionImpl::isOpen() const
void ConnectionImpl::open(const std::string& host, int port)
{
QPID_LOG(info, "Connecting to " << host << ":" << port);
- connector.connect(host, port);
- connector.init();
+ connector->connect(host, port);
+ connector->init();
handler.waitForOpen();
Mutex::ScopedLock l(lock);
isClosed = false;
@@ -112,7 +113,7 @@ void ConnectionImpl::idleIn()
void ConnectionImpl::idleOut()
{
AMQFrame frame(in_place<AMQHeartbeatBody>());
- connector.send(frame);
+ connector->send(frame);
}
void ConnectionImpl::close()
@@ -130,8 +131,8 @@ void ConnectionImpl::close()
template <class F> void ConnectionImpl::closeInternal(const F& f) {
isClosed = true;
- connector.close();
- for (SessionMap::iterator i=sessions.begin(); i != sessions.end(); ++i) {
+ connector->close();
+ for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) {
boost::shared_ptr<SessionImpl> s = i->second.lock();
if (s) f(s);
}
diff --git a/cpp/src/qpid/client/ConnectionImpl.h b/cpp/src/qpid/client/ConnectionImpl.h
index b02dda5af7..98fb212c3e 100644
--- a/cpp/src/qpid/client/ConnectionImpl.h
+++ b/cpp/src/qpid/client/ConnectionImpl.h
@@ -24,7 +24,6 @@
#include "Bounds.h"
#include "ConnectionHandler.h"
-#include "Connector.h"
#include "qpid/framing/FrameHandler.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/ShutdownHandler.h"
@@ -33,11 +32,13 @@
#include <map>
#include <boost/shared_ptr.hpp>
#include <boost/weak_ptr.hpp>
+#include <boost/scoped_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
namespace qpid {
namespace client {
+class Connector;
class ConnectionSettings;
class SessionImpl;
@@ -52,7 +53,7 @@ class ConnectionImpl : public Bounds,
SessionMap sessions;
ConnectionHandler handler;
- Connector connector;
+ boost::scoped_ptr<Connector> connector;
framing::ProtocolVersion version;
sys::Mutex lock;
bool isClosed;
diff --git a/cpp/src/qpid/client/Connector.h b/cpp/src/qpid/client/Connector.h
index b35e77c726..c7f5be0936 100644
--- a/cpp/src/qpid/client/Connector.h
+++ b/cpp/src/qpid/client/Connector.h
@@ -34,13 +34,18 @@
#include "qpid/sys/Mutex.h"
#include "qpid/sys/Socket.h"
#include "qpid/sys/Time.h"
-#include "qpid/sys/AsynchIO.h"
#include <queue>
#include <boost/weak_ptr.hpp>
#include <boost/shared_ptr.hpp>
namespace qpid {
+
+namespace sys {
+class Poller;
+class AsynchIO;
+class AsynchIOBufferBase;
+}
namespace client {
@@ -56,7 +61,7 @@ class Connector : public framing::OutputHandler,
/** Batch up frames for writing to aio. */
class Writer : public framing::FrameHandler {
- typedef sys::AsynchIO::BufferBase BufferBase;
+ typedef sys::AsynchIOBufferBase BufferBase;
typedef std::vector<framing::AMQFrame> Frames;
const uint16_t maxFrameSize;
@@ -109,7 +114,7 @@ class Connector : public framing::OutputHandler,
sys::Socket socket;
sys::AsynchIO* aio;
- sys::Poller::shared_ptr poller;
+ boost::shared_ptr<sys::Poller> poller;
void checkIdle(ssize_t status);
void setSocketTimeout();
@@ -118,7 +123,7 @@ class Connector : public framing::OutputHandler,
void handleClosed();
bool closeInternal();
- void readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIO::BufferBase*);
+ void readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*);
void writebuff(qpid::sys::AsynchIO&);
void writeDataBlock(const framing::AMQDataBlock& data);
void eof(qpid::sys::AsynchIO&);
diff --git a/cpp/src/qpid/sys/AsynchIO.h b/cpp/src/qpid/sys/AsynchIO.h
index 847bdc50e4..ff7823e00d 100644
--- a/cpp/src/qpid/sys/AsynchIO.h
+++ b/cpp/src/qpid/sys/AsynchIO.h
@@ -22,13 +22,14 @@
*/
#include "Dispatcher.h"
-#include "Socket.h"
#include <boost/function.hpp>
#include <deque>
namespace qpid {
namespace sys {
+
+class Socket;
/*
* Asynchronous acceptor: accepts connections then does a callback with the
@@ -78,6 +79,23 @@ private:
void failure(int, std::string);
};
+struct AsynchIOBufferBase {
+ char* const bytes;
+ const int32_t byteCount;
+ int32_t dataStart;
+ int32_t dataCount;
+
+ AsynchIOBufferBase(char* const b, const int32_t s) :
+ bytes(b),
+ byteCount(s),
+ dataStart(0),
+ dataCount(0)
+ {}
+
+ virtual ~AsynchIOBufferBase()
+ {}
+};
+
/*
* Asychronous reader/writer:
* Reader accepts buffers to read into; reads into the provided buffers
@@ -92,22 +110,7 @@ private:
*/
class AsynchIO : private DispatchHandle {
public:
- struct BufferBase {
- char* const bytes;
- const int32_t byteCount;
- int32_t dataStart;
- int32_t dataCount;
-
- BufferBase(char* const b, const int32_t s) :
- bytes(b),
- byteCount(s),
- dataStart(0),
- dataCount(0)
- {}
-
- virtual ~BufferBase()
- {}
- };
+ typedef AsynchIOBufferBase BufferBase;
typedef boost::function2<void, AsynchIO&, BufferBase*> ReadCallback;
typedef boost::function1<void, AsynchIO&> EofCallback;
diff --git a/cpp/src/qpid/sys/AsynchIOHandler.cpp b/cpp/src/qpid/sys/AsynchIOHandler.cpp
index ca2bd7c93c..886dbc8f43 100644
--- a/cpp/src/qpid/sys/AsynchIOHandler.cpp
+++ b/cpp/src/qpid/sys/AsynchIOHandler.cpp
@@ -20,6 +20,8 @@
*/
#include "AsynchIOHandler.h"
+#include "qpid/sys/AsynchIO.h"
+#include "qpid/sys/Socket.h"
#include "qpid/framing/AMQP_HighestVersion.h"
#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/log/Statement.h"
diff --git a/cpp/src/qpid/sys/AsynchIOHandler.h b/cpp/src/qpid/sys/AsynchIOHandler.h
index 7448094a94..26e2cf4c5c 100644
--- a/cpp/src/qpid/sys/AsynchIOHandler.h
+++ b/cpp/src/qpid/sys/AsynchIOHandler.h
@@ -23,7 +23,6 @@
#include "OutputControl.h"
#include "ConnectionCodec.h"
-#include "AsynchIO.h"
namespace qpid {
@@ -32,6 +31,11 @@ namespace framing {
}
namespace sys {
+
+class AsynchIO;
+class AsynchIOBufferBase;
+class Socket;
+
class AsynchIOHandler : public OutputControl {
std::string identifier;
AsynchIO* aio;
@@ -54,7 +58,7 @@ class AsynchIOHandler : public OutputControl {
void activateOutput();
// Input side
- void readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff);
+ void readbuff(AsynchIO& aio, AsynchIOBufferBase* buff);
void eof(AsynchIO& aio);
void disconnect(AsynchIO& aio);
diff --git a/cpp/src/qpid/sys/ProtocolFactory.h b/cpp/src/qpid/sys/ProtocolFactory.h
index e8eaefe1f6..4aa14d2cf6 100644
--- a/cpp/src/qpid/sys/ProtocolFactory.h
+++ b/cpp/src/qpid/sys/ProtocolFactory.h
@@ -35,6 +35,8 @@ class Poller;
class ProtocolFactory : public qpid::SharedObject<ProtocolFactory>
{
public:
+ typedef boost::function2<void, int, std::string> ConnectFailedCallback;
+
virtual ~ProtocolFactory() = 0;
virtual uint16_t getPort() const = 0;
virtual std::string getHost() const = 0;
@@ -43,7 +45,7 @@ class ProtocolFactory : public qpid::SharedObject<ProtocolFactory>
boost::shared_ptr<Poller>,
const std::string& host, int16_t port,
ConnectionCodec::Factory* codec,
- boost::function2<void, int, std::string> failed) = 0;
+ ConnectFailedCallback failed) = 0;
};
inline ProtocolFactory::~ProtocolFactory() {}
diff --git a/cpp/src/qpid/sys/TCPIOPlugin.cpp b/cpp/src/qpid/sys/TCPIOPlugin.cpp
index e82a6a9102..4b661f1713 100644
--- a/cpp/src/qpid/sys/TCPIOPlugin.cpp
+++ b/cpp/src/qpid/sys/TCPIOPlugin.cpp
@@ -24,6 +24,7 @@
#include "AsynchIO.h"
#include "qpid/Plugin.h"
+#include "qpid/sys/Socket.h"
#include "qpid/broker/Broker.h"
#include "qpid/log/Statement.h"
@@ -112,7 +113,7 @@ void AsynchIOProtocolFactory::connect(
Poller::shared_ptr poller,
const std::string& host, int16_t port,
ConnectionCodec::Factory* fact,
- boost::function2<void, int, std::string> failed)
+ ConnectFailedCallback failed)
{
// Note that the following logic does not cause a memory leak.
// The allocated Socket is freed either by the AsynchConnector
diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp
index 470db4c614..58c7800514 100644
--- a/cpp/src/qpid/sys/posix/AsynchIO.cpp
+++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp
@@ -20,6 +20,7 @@
*/
#include "qpid/sys/AsynchIO.h"
+#include "qpid/sys/Socket.h"
#include "qpid/sys/Time.h"
#include "check.h"