summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/Connector.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/Connector.cpp')
-rw-r--r--cpp/src/qpid/client/Connector.cpp206
1 files changed, 170 insertions, 36 deletions
diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp
index f4f414bc63..6449088f92 100644
--- a/cpp/src/qpid/client/Connector.cpp
+++ b/cpp/src/qpid/client/Connector.cpp
@@ -32,6 +32,7 @@
#include "qpid/Msg.h"
#include <iostream>
+#include <map>
#include <boost/bind.hpp>
#include <boost/format.hpp>
@@ -43,7 +44,135 @@ using namespace qpid::framing;
using boost::format;
using boost::str;
-Connector::Connector(ProtocolVersion ver,
+// Stuff for the registry of protocol connectors (maybe should be moved to its own file)
+namespace {
+ typedef std::map<std::string, Connector::Factory*> ProtocolRegistry;
+
+ ProtocolRegistry& theProtocolRegistry() {
+ static ProtocolRegistry protocolRegistry;
+
+ return protocolRegistry;
+ }
+}
+
+Connector* Connector::create(const std::string& proto, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c)
+{
+ ProtocolRegistry::const_iterator i = theProtocolRegistry().find(proto);
+ if (i==theProtocolRegistry().end()) {
+ throw Exception(QPID_MSG("Unknown protocol: " << proto));
+ }
+ return (i->second)(v, s, c);
+}
+
+void Connector::registerFactory(const std::string& proto, Factory* connectorFactory)
+{
+ ProtocolRegistry::const_iterator i = theProtocolRegistry().find(proto);
+ if (i!=theProtocolRegistry().end()) {
+ QPID_LOG(error, "Tried to register protocol: " << proto << " more than once");
+ }
+ theProtocolRegistry()[proto] = connectorFactory;
+}
+
+class TCPConnector : public Connector, private sys::Runnable
+{
+ struct Buff;
+
+ /** Batch up frames for writing to aio. */
+ class Writer : public framing::FrameHandler {
+ typedef sys::AsynchIOBufferBase BufferBase;
+ typedef std::vector<framing::AMQFrame> Frames;
+
+ const uint16_t maxFrameSize;
+ sys::Mutex lock;
+ sys::AsynchIO* aio;
+ BufferBase* buffer;
+ Frames frames;
+ size_t lastEof; // Position after last EOF in frames
+ framing::Buffer encode;
+ size_t framesEncoded;
+ std::string identifier;
+ Bounds* bounds;
+
+ void writeOne();
+ void newBuffer();
+
+ public:
+
+ Writer(uint16_t maxFrameSize, Bounds*);
+ ~Writer();
+ void init(std::string id, sys::AsynchIO*);
+ void handle(framing::AMQFrame&);
+ void write(sys::AsynchIO&);
+ };
+
+ const uint16_t maxFrameSize;
+ framing::ProtocolVersion version;
+ bool initiated;
+
+ sys::Mutex closedLock;
+ bool closed;
+ bool joined;
+
+ sys::ShutdownHandler* shutdownHandler;
+ framing::InputHandler* input;
+ framing::InitiationHandler* initialiser;
+ framing::OutputHandler* output;
+
+ Writer writer;
+
+ sys::Thread receiver;
+
+ sys::Socket socket;
+
+ sys::AsynchIO* aio;
+ boost::shared_ptr<sys::Poller> poller;
+
+ ~TCPConnector();
+
+ void run();
+ void handleClosed();
+ bool closeInternal();
+
+ void readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*);
+ void writebuff(qpid::sys::AsynchIO&);
+ void writeDataBlock(const framing::AMQDataBlock& data);
+ void eof(qpid::sys::AsynchIO&);
+
+ std::string identifier;
+
+ ConnectionImpl* impl;
+
+ void connect(const std::string& host, int port);
+ void init();
+ void close();
+ void send(framing::AMQFrame& frame);
+
+ void setInputHandler(framing::InputHandler* handler);
+ void setShutdownHandler(sys::ShutdownHandler* handler);
+ sys::ShutdownHandler* getShutdownHandler() const;
+ framing::OutputHandler* getOutputHandler();
+ const std::string& getIdentifier() const;
+
+public:
+ TCPConnector(framing::ProtocolVersion pVersion,
+ const ConnectionSettings&,
+ ConnectionImpl*);
+};
+
+// Static constructor which registers connector here
+namespace {
+ Connector* create(framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) {
+ return new TCPConnector(v, s, c);
+ }
+
+ struct StaticInit {
+ StaticInit() {
+ Connector::registerFactory("tcp", &create);
+ };
+ } init;
+}
+
+TCPConnector::TCPConnector(ProtocolVersion ver,
const ConnectionSettings& settings,
ConnectionImpl* cimpl)
: maxFrameSize(settings.maxFrameSize),
@@ -51,23 +180,20 @@ Connector::Connector(ProtocolVersion ver,
initiated(false),
closed(true),
joined(true),
- timeout(0),
- idleIn(0), idleOut(0),
- timeoutHandler(0),
shutdownHandler(0),
writer(maxFrameSize, cimpl),
aio(0),
impl(cimpl)
{
- QPID_LOG(debug, "Connector created for " << version);
+ QPID_LOG(debug, "TCPConnector created for " << version);
settings.configureSocket(socket);
}
-Connector::~Connector() {
+TCPConnector::~TCPConnector() {
close();
}
-void Connector::connect(const std::string& host, int port){
+void TCPConnector::connect(const std::string& host, int port){
Mutex::ScopedLock l(closedLock);
assert(closed);
socket.connect(host, port);
@@ -75,16 +201,16 @@ void Connector::connect(const std::string& host, int port){
closed = false;
poller = Poller::shared_ptr(new Poller);
aio = new AsynchIO(socket,
- boost::bind(&Connector::readbuff, this, _1, _2),
- boost::bind(&Connector::eof, this, _1),
- boost::bind(&Connector::eof, this, _1),
+ boost::bind(&TCPConnector::readbuff, this, _1, _2),
+ boost::bind(&TCPConnector::eof, this, _1),
+ boost::bind(&TCPConnector::eof, this, _1),
0, // closed
0, // nobuffs
- boost::bind(&Connector::writebuff, this, _1));
+ boost::bind(&TCPConnector::writebuff, this, _1));
writer.init(identifier, aio);
}
-void Connector::init(){
+void TCPConnector::init(){
Mutex::ScopedLock l(closedLock);
assert(joined);
ProtocolInitiation init(version);
@@ -93,7 +219,7 @@ void Connector::init(){
receiver = Thread(this);
}
-bool Connector::closeInternal() {
+bool TCPConnector::closeInternal() {
Mutex::ScopedLock l(closedLock);
bool ret = !closed;
if (!closed) {
@@ -108,49 +234,57 @@ bool Connector::closeInternal() {
return ret;
}
-void Connector::close() {
+void TCPConnector::close() {
closeInternal();
}
-void Connector::setInputHandler(InputHandler* handler){
+void TCPConnector::setInputHandler(InputHandler* handler){
input = handler;
}
-void Connector::setShutdownHandler(ShutdownHandler* handler){
+void TCPConnector::setShutdownHandler(ShutdownHandler* handler){
shutdownHandler = handler;
}
-OutputHandler* Connector::getOutputHandler(){
+OutputHandler* TCPConnector::getOutputHandler() {
return this;
}
-void Connector::send(AMQFrame& frame) {
+sys::ShutdownHandler* TCPConnector::getShutdownHandler() const {
+ return shutdownHandler;
+}
+
+const std::string& TCPConnector::getIdentifier() const {
+ return identifier;
+}
+
+void TCPConnector::send(AMQFrame& frame) {
writer.handle(frame);
}
-void Connector::handleClosed() {
+void TCPConnector::handleClosed() {
if (closeInternal() && shutdownHandler)
shutdownHandler->shutdown();
}
-struct Connector::Buff : public AsynchIO::BufferBase {
+struct TCPConnector::Buff : public AsynchIO::BufferBase {
Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {}
~Buff() { delete [] bytes;}
};
-Connector::Writer::Writer(uint16_t s, Bounds* b) : maxFrameSize(s), aio(0), buffer(0), lastEof(0), bounds(b)
+TCPConnector::Writer::Writer(uint16_t s, Bounds* b) : maxFrameSize(s), aio(0), buffer(0), lastEof(0), bounds(b)
{
}
-Connector::Writer::~Writer() { delete buffer; }
+TCPConnector::Writer::~Writer() { delete buffer; }
-void Connector::Writer::init(std::string id, sys::AsynchIO* a) {
+void TCPConnector::Writer::init(std::string id, sys::AsynchIO* a) {
Mutex::ScopedLock l(lock);
identifier = id;
aio = a;
- newBuffer(l);
+ newBuffer();
}
-void Connector::Writer::handle(framing::AMQFrame& frame) {
+void TCPConnector::Writer::handle(framing::AMQFrame& frame) {
Mutex::ScopedLock l(lock);
frames.push_back(frame);
if (frame.getEof()) {//or if we already have a buffers worth
@@ -160,17 +294,17 @@ void Connector::Writer::handle(framing::AMQFrame& frame) {
QPID_LOG(trace, "SENT " << identifier << ": " << frame);
}
-void Connector::Writer::writeOne(const Mutex::ScopedLock& l) {
+void TCPConnector::Writer::writeOne() {
assert(buffer);
framesEncoded = 0;
buffer->dataStart = 0;
buffer->dataCount = encode.getPosition();
aio->queueWrite(buffer);
- newBuffer(l);
+ newBuffer();
}
-void Connector::Writer::newBuffer(const Mutex::ScopedLock&) {
+void TCPConnector::Writer::newBuffer() {
buffer = aio->getQueuedBuffer();
if (!buffer) buffer = new Buff(maxFrameSize);
encode = framing::Buffer(buffer->bytes, buffer->byteCount);
@@ -178,14 +312,14 @@ void Connector::Writer::newBuffer(const Mutex::ScopedLock&) {
}
// Called in IO thread.
-void Connector::Writer::write(sys::AsynchIO&) {
+void TCPConnector::Writer::write(sys::AsynchIO&) {
Mutex::ScopedLock l(lock);
assert(buffer);
size_t bytesWritten(0);
for (size_t i = 0; i < lastEof; ++i) {
AMQFrame& frame = frames[i];
uint32_t size = frame.size();
- if (size > encode.available()) writeOne(l);
+ if (size > encode.available()) writeOne();
assert(size <= encode.available());
frame.encode(encode);
++framesEncoded;
@@ -194,10 +328,10 @@ void Connector::Writer::write(sys::AsynchIO&) {
frames.erase(frames.begin(), frames.begin()+lastEof);
lastEof = 0;
if (bounds) bounds->reduce(bytesWritten);
- if (encode.getPosition() > 0) writeOne(l);
+ if (encode.getPosition() > 0) writeOne();
}
-void Connector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) {
+void TCPConnector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) {
framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
if (!initiated) {
@@ -226,11 +360,11 @@ void Connector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) {
}
}
-void Connector::writebuff(AsynchIO& aio_) {
+void TCPConnector::writebuff(AsynchIO& aio_) {
writer.write(aio_);
}
-void Connector::writeDataBlock(const AMQDataBlock& data) {
+void TCPConnector::writeDataBlock(const AMQDataBlock& data) {
AsynchIO::BufferBase* buff = new Buff(maxFrameSize);
framing::Buffer out(buff->bytes, buff->byteCount);
data.encode(out);
@@ -238,13 +372,13 @@ void Connector::writeDataBlock(const AMQDataBlock& data) {
aio->queueWrite(buff);
}
-void Connector::eof(AsynchIO&) {
+void TCPConnector::eof(AsynchIO&) {
handleClosed();
}
// TODO: astitcher 20070908 This version of the code can never time out, so the idle processing
// will never be called
-void Connector::run(){
+void TCPConnector::run(){
// Keep the connection impl in memory until run() completes.
boost::shared_ptr<ConnectionImpl> protect = impl->shared_from_this();
assert(protect);