diff options
Diffstat (limited to 'qpid/cpp/src/qpid/client/Connector.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/client/Connector.cpp | 82 |
1 files changed, 47 insertions, 35 deletions
diff --git a/qpid/cpp/src/qpid/client/Connector.cpp b/qpid/cpp/src/qpid/client/Connector.cpp index f69032b26d..fbb571d40a 100644 --- a/qpid/cpp/src/qpid/client/Connector.cpp +++ b/qpid/cpp/src/qpid/client/Connector.cpp @@ -51,10 +51,10 @@ using boost::str; // Stuff for the registry of protocol connectors (maybe should be moved to its own file) namespace { typedef std::map<std::string, Connector::Factory*> ProtocolRegistry; - + ProtocolRegistry& theProtocolRegistry() { static ProtocolRegistry protocolRegistry; - + return protocolRegistry; } } @@ -93,7 +93,7 @@ class TCPConnector : public Connector, public sys::Codec, private sys::Runnable size_t lastEof; // Position after last EOF in frames uint64_t currentSize; Bounds* bounds; - + framing::ProtocolVersion version; bool initiated; bool closed; @@ -118,16 +118,17 @@ class TCPConnector : public Connector, public sys::Codec, private sys::Runnable void run(); void handleClosed(); bool closeInternal(); - + + void connected(const Socket&); + void connectFailed(const std::string& msg); bool readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*); void writebuff(qpid::sys::AsynchIO&); void writeDataBlock(const framing::AMQDataBlock& data); void eof(qpid::sys::AsynchIO&); boost::weak_ptr<ConnectionImpl> impl; - + void connect(const std::string& host, int port); - void init(); void close(); void send(framing::AMQFrame& frame); void abort(); @@ -142,7 +143,6 @@ class TCPConnector : public Connector, public sys::Codec, private sys::Runnable size_t decode(const char* buffer, size_t size); size_t encode(const char* buffer, size_t size); bool canEncode(); - public: TCPConnector(framing::ProtocolVersion pVersion, @@ -163,6 +163,11 @@ namespace { } init; } +struct TCPConnector::Buff : public AsynchIO::BufferBase { + Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {} + ~Buff() { delete [] bytes;} +}; + TCPConnector::TCPConnector(ProtocolVersion ver, const ConnectionSettings& settings, ConnectionImpl* cimpl) @@ -189,15 +194,19 @@ TCPConnector::~TCPConnector() { void TCPConnector::connect(const std::string& host, int port){ Mutex::ScopedLock l(lock); assert(closed); - try { - socket.connect(host, port); - } catch (const std::exception& /*e*/) { - socket.close(); - throw; - } - - identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress()); + assert(joined); poller = Poller::shared_ptr(new Poller); + AsynchConnector::create(socket, + poller, + host, port, + boost::bind(&TCPConnector::connected, this, _1), + boost::bind(&TCPConnector::connectFailed, this, _3)); + closed = false; + joined = false; + receiver = Thread(this); +} + +void TCPConnector::connected(const Socket&) { aio = AsynchIO::create(socket, boost::bind(&TCPConnector::readbuff, this, _1, _2), boost::bind(&TCPConnector::eof, this, _1), @@ -205,16 +214,23 @@ void TCPConnector::connect(const std::string& host, int port){ 0, // closed 0, // nobuffs boost::bind(&TCPConnector::writebuff, this, _1)); - closed = false; -} + for (int i = 0; i < 32; i++) { + aio->queueReadBuffer(new Buff(maxFrameSize)); + } + aio->start(poller); -void TCPConnector::init(){ - Mutex::ScopedLock l(lock); - assert(joined); + identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress()); ProtocolInitiation init(version); writeDataBlock(init); - joined = false; - receiver = Thread(this); +} + +void TCPConnector::connectFailed(const std::string& msg) { + QPID_LOG(warning, "Connecting failed: " << msg); + closed = true; + poller->shutdown(); + closeInternal(); + if (shutdownHandler) + shutdownHandler->shutdown(); } bool TCPConnector::closeInternal() { @@ -235,7 +251,7 @@ bool TCPConnector::closeInternal() { receiver.join(); return ret; } - + void TCPConnector::close() { closeInternal(); } @@ -243,7 +259,13 @@ void TCPConnector::close() { void TCPConnector::abort() { // Can't abort a closed connection if (!closed) { - aio->requestCallback(boost::bind(&TCPConnector::eof, this, _1)); + if (aio) { + // Established connection + aio->requestCallback(boost::bind(&TCPConnector::eof, this, _1)); + } else { + // We're still connecting + connectFailed("Connection timedout"); + } } } @@ -288,18 +310,13 @@ void TCPConnector::handleClosed() { shutdownHandler->shutdown(); } -struct TCPConnector::Buff : public AsynchIO::BufferBase { - Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {} - ~Buff() { delete [] bytes;} -}; - void TCPConnector::writebuff(AsynchIO& /*aio*/) { Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this; if (codec->canEncode()) { std::auto_ptr<AsynchIO::BufferBase> buffer = std::auto_ptr<AsynchIO::BufferBase>(aio->getQueuedBuffer()); if (!buffer.get()) buffer = std::auto_ptr<AsynchIO::BufferBase>(new Buff(maxFrameSize)); - + size_t encoded = codec->encode(buffer->bytes, buffer->byteCount); buffer->dataStart = 0; @@ -395,11 +412,6 @@ void TCPConnector::run() { try { Dispatcher d(poller); - for (int i = 0; i < 32; i++) { - aio->queueReadBuffer(new Buff(maxFrameSize)); - } - - aio->start(poller); d.run(); } catch (const std::exception& e) { QPID_LOG(error, QPID_MSG("FAIL " << identifier << ": " << e.what())); |