diff options
Diffstat (limited to 'cpp/src/qpid/client/Connector.cpp')
-rw-r--r-- | cpp/src/qpid/client/Connector.cpp | 96 |
1 files changed, 60 insertions, 36 deletions
diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp index 946bf0138d..1558f292aa 100644 --- a/cpp/src/qpid/client/Connector.cpp +++ b/cpp/src/qpid/client/Connector.cpp @@ -27,11 +27,9 @@ #include "qpid/sys/Codec.h" #include "qpid/sys/Time.h" #include "qpid/framing/AMQFrame.h" -#include "qpid/framing/InitiationHandler.h" #include "qpid/sys/AsynchIO.h" #include "qpid/sys/Dispatcher.h" #include "qpid/sys/Poller.h" -#include "qpid/sys/Socket.h" #include "qpid/sys/SecurityLayer.h" #include "qpid/Msg.h" @@ -53,23 +51,21 @@ using boost::str; // Stuff for the registry of protocol connectors (maybe should be moved to its own file) namespace { typedef std::map<std::string, Connector::Factory*> ProtocolRegistry; - + ProtocolRegistry& theProtocolRegistry() { static ProtocolRegistry protocolRegistry; - + return protocolRegistry; } } -Connector* Connector::create(const std::string& proto, - Poller::shared_ptr p, - framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) +Connector* Connector::create(const std::string& proto, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) { ProtocolRegistry::const_iterator i = theProtocolRegistry().find(proto); if (i==theProtocolRegistry().end()) { throw Exception(QPID_MSG("Unknown protocol: " << proto)); } - return (i->second)(p, v, s, c); + return (i->second)(v, s, c); } void Connector::registerFactory(const std::string& proto, Factory* connectorFactory) @@ -85,7 +81,7 @@ void Connector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer>) { } -class TCPConnector : public Connector, public sys::Codec +class TCPConnector : public Connector, public sys::Codec, private sys::Runnable { typedef std::deque<framing::AMQFrame> Frames; struct Buff; @@ -97,7 +93,7 @@ class TCPConnector : public Connector, public sys::Codec size_t lastEof; // Position after last EOF in frames uint64_t currentSize; Bounds* bounds; - + framing::ProtocolVersion version; bool initiated; bool closed; @@ -108,25 +104,28 @@ class TCPConnector : public Connector, public sys::Codec framing::InitiationHandler* initialiser; framing::OutputHandler* output; + sys::Thread receiver; + sys::Socket socket; sys::AsynchIO* aio; std::string identifier; - Poller::shared_ptr poller; + boost::shared_ptr<sys::Poller> poller; std::auto_ptr<qpid::sys::SecurityLayer> securityLayer; ~TCPConnector(); + void run(); void handleClosed(); bool closeInternal(); - + bool readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*); void writebuff(qpid::sys::AsynchIO&); void writeDataBlock(const framing::AMQDataBlock& data); void eof(qpid::sys::AsynchIO&); boost::weak_ptr<ConnectionImpl> impl; - + void connect(const std::string& host, int port); void init(); void close(); @@ -143,23 +142,18 @@ class TCPConnector : public Connector, public sys::Codec size_t decode(const char* buffer, size_t size); size_t encode(const char* buffer, size_t size); bool canEncode(); + public: - TCPConnector(Poller::shared_ptr, - framing::ProtocolVersion pVersion, - const ConnectionSettings&, + TCPConnector(framing::ProtocolVersion pVersion, + const ConnectionSettings&, ConnectionImpl*); }; -struct TCPConnector::Buff : public AsynchIO::BufferBase { - Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {} - ~Buff() { delete [] bytes;} -}; - // Static constructor which registers connector here namespace { - Connector* create(Poller::shared_ptr p, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) { - return new TCPConnector(p, v, s, c); + Connector* create(framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) { + return new TCPConnector(v, s, c); } struct StaticInit { @@ -169,21 +163,19 @@ namespace { } init; } -TCPConnector::TCPConnector(Poller::shared_ptr p, - ProtocolVersion ver, +TCPConnector::TCPConnector(ProtocolVersion ver, const ConnectionSettings& settings, ConnectionImpl* cimpl) : maxFrameSize(settings.maxFrameSize), lastEof(0), currentSize(0), bounds(cimpl), - version(ver), + version(ver), initiated(false), closed(true), joined(true), shutdownHandler(0), aio(0), - poller(p), impl(cimpl->shared_from_this()) { QPID_LOG(debug, "TCPConnector created for " << version.toString()); @@ -205,6 +197,7 @@ void TCPConnector::connect(const std::string& host, int port){ } identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress()); + poller = Poller::shared_ptr(new Poller); aio = AsynchIO::create(socket, boost::bind(&TCPConnector::readbuff, this, _1, _2), boost::bind(&TCPConnector::eof, this, _1), @@ -221,24 +214,28 @@ void TCPConnector::init(){ ProtocolInitiation init(version); writeDataBlock(init); joined = false; - for (int i = 0; i < 32; i++) { - aio->queueReadBuffer(new Buff(maxFrameSize)); - } - - aio->start(poller); + receiver = Thread(this); } bool TCPConnector::closeInternal() { + bool ret; + { Mutex::ScopedLock l(lock); - bool ret = !closed; + ret = !closed; if (!closed) { closed = true; aio->queueForDeletion(); - socket.close(); + poller->shutdown(); + } + if (joined || receiver.id() == Thread::current().id()) { + return ret; + } + joined = true; } + receiver.join(); return ret; } - + void TCPConnector::close() { closeInternal(); } @@ -288,13 +285,18 @@ void TCPConnector::handleClosed() { shutdownHandler->shutdown(); } +struct TCPConnector::Buff : public AsynchIO::BufferBase { + Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {} + ~Buff() { delete [] bytes;} +}; + void TCPConnector::writebuff(AsynchIO& /*aio*/) { Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this; if (codec->canEncode()) { std::auto_ptr<AsynchIO::BufferBase> buffer = std::auto_ptr<AsynchIO::BufferBase>(aio->getQueuedBuffer()); if (!buffer.get()) buffer = std::auto_ptr<AsynchIO::BufferBase>(new Buff(maxFrameSize)); - + size_t encoded = codec->encode(buffer->bytes, buffer->byteCount); buffer->dataStart = 0; @@ -380,6 +382,28 @@ void TCPConnector::eof(AsynchIO&) { handleClosed(); } +void TCPConnector::run() { + // Keep the connection impl in memory until run() completes. + boost::shared_ptr<ConnectionImpl> protect = impl.lock(); + assert(protect); + try { + Dispatcher d(poller); + + for (int i = 0; i < 32; i++) { + aio->queueReadBuffer(new Buff(maxFrameSize)); + } + + aio->start(poller); + d.run(); + } catch (const std::exception& e) { + QPID_LOG(error, QPID_MSG("FAIL " << identifier << ": " << e.what())); + handleClosed(); + } + try { + socket.close(); + } catch (const std::exception&) {} +} + void TCPConnector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer> sl) { securityLayer = sl; |