diff options
Diffstat (limited to 'cpp/src/qpid/client/Connector.cpp')
-rw-r--r-- | cpp/src/qpid/client/Connector.cpp | 206 |
1 files changed, 170 insertions, 36 deletions
diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp index f4f414bc63..6449088f92 100644 --- a/cpp/src/qpid/client/Connector.cpp +++ b/cpp/src/qpid/client/Connector.cpp @@ -32,6 +32,7 @@ #include "qpid/Msg.h" #include <iostream> +#include <map> #include <boost/bind.hpp> #include <boost/format.hpp> @@ -43,7 +44,135 @@ using namespace qpid::framing; using boost::format; using boost::str; -Connector::Connector(ProtocolVersion ver, +// Stuff for the registry of protocol connectors (maybe should be moved to its own file) +namespace { + typedef std::map<std::string, Connector::Factory*> ProtocolRegistry; + + ProtocolRegistry& theProtocolRegistry() { + static ProtocolRegistry protocolRegistry; + + return protocolRegistry; + } +} + +Connector* Connector::create(const std::string& proto, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) +{ + ProtocolRegistry::const_iterator i = theProtocolRegistry().find(proto); + if (i==theProtocolRegistry().end()) { + throw Exception(QPID_MSG("Unknown protocol: " << proto)); + } + return (i->second)(v, s, c); +} + +void Connector::registerFactory(const std::string& proto, Factory* connectorFactory) +{ + ProtocolRegistry::const_iterator i = theProtocolRegistry().find(proto); + if (i!=theProtocolRegistry().end()) { + QPID_LOG(error, "Tried to register protocol: " << proto << " more than once"); + } + theProtocolRegistry()[proto] = connectorFactory; +} + +class TCPConnector : public Connector, private sys::Runnable +{ + struct Buff; + + /** Batch up frames for writing to aio. */ + class Writer : public framing::FrameHandler { + typedef sys::AsynchIOBufferBase BufferBase; + typedef std::vector<framing::AMQFrame> Frames; + + const uint16_t maxFrameSize; + sys::Mutex lock; + sys::AsynchIO* aio; + BufferBase* buffer; + Frames frames; + size_t lastEof; // Position after last EOF in frames + framing::Buffer encode; + size_t framesEncoded; + std::string identifier; + Bounds* bounds; + + void writeOne(); + void newBuffer(); + + public: + + Writer(uint16_t maxFrameSize, Bounds*); + ~Writer(); + void init(std::string id, sys::AsynchIO*); + void handle(framing::AMQFrame&); + void write(sys::AsynchIO&); + }; + + const uint16_t maxFrameSize; + framing::ProtocolVersion version; + bool initiated; + + sys::Mutex closedLock; + bool closed; + bool joined; + + sys::ShutdownHandler* shutdownHandler; + framing::InputHandler* input; + framing::InitiationHandler* initialiser; + framing::OutputHandler* output; + + Writer writer; + + sys::Thread receiver; + + sys::Socket socket; + + sys::AsynchIO* aio; + boost::shared_ptr<sys::Poller> poller; + + ~TCPConnector(); + + void run(); + void handleClosed(); + bool closeInternal(); + + void readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*); + void writebuff(qpid::sys::AsynchIO&); + void writeDataBlock(const framing::AMQDataBlock& data); + void eof(qpid::sys::AsynchIO&); + + std::string identifier; + + ConnectionImpl* impl; + + void connect(const std::string& host, int port); + void init(); + void close(); + void send(framing::AMQFrame& frame); + + void setInputHandler(framing::InputHandler* handler); + void setShutdownHandler(sys::ShutdownHandler* handler); + sys::ShutdownHandler* getShutdownHandler() const; + framing::OutputHandler* getOutputHandler(); + const std::string& getIdentifier() const; + +public: + TCPConnector(framing::ProtocolVersion pVersion, + const ConnectionSettings&, + ConnectionImpl*); +}; + +// Static constructor which registers connector here +namespace { + Connector* create(framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) { + return new TCPConnector(v, s, c); + } + + struct StaticInit { + StaticInit() { + Connector::registerFactory("tcp", &create); + }; + } init; +} + +TCPConnector::TCPConnector(ProtocolVersion ver, const ConnectionSettings& settings, ConnectionImpl* cimpl) : maxFrameSize(settings.maxFrameSize), @@ -51,23 +180,20 @@ Connector::Connector(ProtocolVersion ver, initiated(false), closed(true), joined(true), - timeout(0), - idleIn(0), idleOut(0), - timeoutHandler(0), shutdownHandler(0), writer(maxFrameSize, cimpl), aio(0), impl(cimpl) { - QPID_LOG(debug, "Connector created for " << version); + QPID_LOG(debug, "TCPConnector created for " << version); settings.configureSocket(socket); } -Connector::~Connector() { +TCPConnector::~TCPConnector() { close(); } -void Connector::connect(const std::string& host, int port){ +void TCPConnector::connect(const std::string& host, int port){ Mutex::ScopedLock l(closedLock); assert(closed); socket.connect(host, port); @@ -75,16 +201,16 @@ void Connector::connect(const std::string& host, int port){ closed = false; poller = Poller::shared_ptr(new Poller); aio = new AsynchIO(socket, - boost::bind(&Connector::readbuff, this, _1, _2), - boost::bind(&Connector::eof, this, _1), - boost::bind(&Connector::eof, this, _1), + boost::bind(&TCPConnector::readbuff, this, _1, _2), + boost::bind(&TCPConnector::eof, this, _1), + boost::bind(&TCPConnector::eof, this, _1), 0, // closed 0, // nobuffs - boost::bind(&Connector::writebuff, this, _1)); + boost::bind(&TCPConnector::writebuff, this, _1)); writer.init(identifier, aio); } -void Connector::init(){ +void TCPConnector::init(){ Mutex::ScopedLock l(closedLock); assert(joined); ProtocolInitiation init(version); @@ -93,7 +219,7 @@ void Connector::init(){ receiver = Thread(this); } -bool Connector::closeInternal() { +bool TCPConnector::closeInternal() { Mutex::ScopedLock l(closedLock); bool ret = !closed; if (!closed) { @@ -108,49 +234,57 @@ bool Connector::closeInternal() { return ret; } -void Connector::close() { +void TCPConnector::close() { closeInternal(); } -void Connector::setInputHandler(InputHandler* handler){ +void TCPConnector::setInputHandler(InputHandler* handler){ input = handler; } -void Connector::setShutdownHandler(ShutdownHandler* handler){ +void TCPConnector::setShutdownHandler(ShutdownHandler* handler){ shutdownHandler = handler; } -OutputHandler* Connector::getOutputHandler(){ +OutputHandler* TCPConnector::getOutputHandler() { return this; } -void Connector::send(AMQFrame& frame) { +sys::ShutdownHandler* TCPConnector::getShutdownHandler() const { + return shutdownHandler; +} + +const std::string& TCPConnector::getIdentifier() const { + return identifier; +} + +void TCPConnector::send(AMQFrame& frame) { writer.handle(frame); } -void Connector::handleClosed() { +void TCPConnector::handleClosed() { if (closeInternal() && shutdownHandler) shutdownHandler->shutdown(); } -struct Connector::Buff : public AsynchIO::BufferBase { +struct TCPConnector::Buff : public AsynchIO::BufferBase { Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {} ~Buff() { delete [] bytes;} }; -Connector::Writer::Writer(uint16_t s, Bounds* b) : maxFrameSize(s), aio(0), buffer(0), lastEof(0), bounds(b) +TCPConnector::Writer::Writer(uint16_t s, Bounds* b) : maxFrameSize(s), aio(0), buffer(0), lastEof(0), bounds(b) { } -Connector::Writer::~Writer() { delete buffer; } +TCPConnector::Writer::~Writer() { delete buffer; } -void Connector::Writer::init(std::string id, sys::AsynchIO* a) { +void TCPConnector::Writer::init(std::string id, sys::AsynchIO* a) { Mutex::ScopedLock l(lock); identifier = id; aio = a; - newBuffer(l); + newBuffer(); } -void Connector::Writer::handle(framing::AMQFrame& frame) { +void TCPConnector::Writer::handle(framing::AMQFrame& frame) { Mutex::ScopedLock l(lock); frames.push_back(frame); if (frame.getEof()) {//or if we already have a buffers worth @@ -160,17 +294,17 @@ void Connector::Writer::handle(framing::AMQFrame& frame) { QPID_LOG(trace, "SENT " << identifier << ": " << frame); } -void Connector::Writer::writeOne(const Mutex::ScopedLock& l) { +void TCPConnector::Writer::writeOne() { assert(buffer); framesEncoded = 0; buffer->dataStart = 0; buffer->dataCount = encode.getPosition(); aio->queueWrite(buffer); - newBuffer(l); + newBuffer(); } -void Connector::Writer::newBuffer(const Mutex::ScopedLock&) { +void TCPConnector::Writer::newBuffer() { buffer = aio->getQueuedBuffer(); if (!buffer) buffer = new Buff(maxFrameSize); encode = framing::Buffer(buffer->bytes, buffer->byteCount); @@ -178,14 +312,14 @@ void Connector::Writer::newBuffer(const Mutex::ScopedLock&) { } // Called in IO thread. -void Connector::Writer::write(sys::AsynchIO&) { +void TCPConnector::Writer::write(sys::AsynchIO&) { Mutex::ScopedLock l(lock); assert(buffer); size_t bytesWritten(0); for (size_t i = 0; i < lastEof; ++i) { AMQFrame& frame = frames[i]; uint32_t size = frame.size(); - if (size > encode.available()) writeOne(l); + if (size > encode.available()) writeOne(); assert(size <= encode.available()); frame.encode(encode); ++framesEncoded; @@ -194,10 +328,10 @@ void Connector::Writer::write(sys::AsynchIO&) { frames.erase(frames.begin(), frames.begin()+lastEof); lastEof = 0; if (bounds) bounds->reduce(bytesWritten); - if (encode.getPosition() > 0) writeOne(l); + if (encode.getPosition() > 0) writeOne(); } -void Connector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) { +void TCPConnector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) { framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount); if (!initiated) { @@ -226,11 +360,11 @@ void Connector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) { } } -void Connector::writebuff(AsynchIO& aio_) { +void TCPConnector::writebuff(AsynchIO& aio_) { writer.write(aio_); } -void Connector::writeDataBlock(const AMQDataBlock& data) { +void TCPConnector::writeDataBlock(const AMQDataBlock& data) { AsynchIO::BufferBase* buff = new Buff(maxFrameSize); framing::Buffer out(buff->bytes, buff->byteCount); data.encode(out); @@ -238,13 +372,13 @@ void Connector::writeDataBlock(const AMQDataBlock& data) { aio->queueWrite(buff); } -void Connector::eof(AsynchIO&) { +void TCPConnector::eof(AsynchIO&) { handleClosed(); } // TODO: astitcher 20070908 This version of the code can never time out, so the idle processing // will never be called -void Connector::run(){ +void TCPConnector::run(){ // Keep the connection impl in memory until run() completes. boost::shared_ptr<ConnectionImpl> protect = impl->shared_from_this(); assert(protect); |