diff options
Diffstat (limited to 'cpp/src/qpid/client/TCPConnector.cpp')
-rw-r--r-- | cpp/src/qpid/client/TCPConnector.cpp | 331 |
1 files changed, 0 insertions, 331 deletions
diff --git a/cpp/src/qpid/client/TCPConnector.cpp b/cpp/src/qpid/client/TCPConnector.cpp deleted file mode 100644 index d90781b365..0000000000 --- a/cpp/src/qpid/client/TCPConnector.cpp +++ /dev/null @@ -1,331 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/client/TCPConnector.h" - -#include "qpid/client/ConnectionImpl.h" -#include "qpid/client/ConnectionSettings.h" -#include "qpid/log/Statement.h" -#include "qpid/sys/Codec.h" -#include "qpid/sys/Time.h" -#include "qpid/framing/AMQFrame.h" -#include "qpid/framing/InitiationHandler.h" -#include "qpid/sys/AsynchIO.h" -#include "qpid/sys/Dispatcher.h" -#include "qpid/sys/Poller.h" -#include "qpid/sys/SecurityLayer.h" -#include "qpid/Msg.h" - -#include <iostream> -#include <boost/bind.hpp> -#include <boost/format.hpp> - -namespace qpid { -namespace client { - -using namespace qpid::sys; -using namespace qpid::framing; -using boost::format; -using boost::str; - -struct TCPConnector::Buff : public AsynchIO::BufferBase { - Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {} - ~Buff() { delete [] bytes;} -}; - -// Static constructor which registers connector here -namespace { - Connector* create(Poller::shared_ptr p, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) { - return new TCPConnector(p, v, s, c); - } - - struct StaticInit { - StaticInit() { - Connector::registerFactory("tcp", &create); - }; - } init; -} - -TCPConnector::TCPConnector(Poller::shared_ptr p, - ProtocolVersion ver, - const ConnectionSettings& settings, - ConnectionImpl* cimpl) - : maxFrameSize(settings.maxFrameSize), - lastEof(0), - currentSize(0), - bounds(cimpl), - version(ver), - initiated(false), - closed(true), - shutdownHandler(0), - connector(0), - aio(0), - poller(p) -{ - QPID_LOG(debug, "TCPConnector created for " << version); - settings.configureSocket(socket); -} - -TCPConnector::~TCPConnector() { - close(); -} - -void TCPConnector::connect(const std::string& host, int port) { - Mutex::ScopedLock l(lock); - assert(closed); - connector = AsynchConnector::create( - socket, - host, port, - boost::bind(&TCPConnector::connected, this, _1), - boost::bind(&TCPConnector::connectFailed, this, _3)); - closed = false; - - connector->start(poller); -} - -void TCPConnector::connected(const Socket&) { - connector = 0; - aio = AsynchIO::create(socket, - boost::bind(&TCPConnector::readbuff, this, _1, _2), - boost::bind(&TCPConnector::eof, this, _1), - boost::bind(&TCPConnector::disconnected, this, _1), - boost::bind(&TCPConnector::socketClosed, this, _1, _2), - 0, // nobuffs - boost::bind(&TCPConnector::writebuff, this, _1)); - start(aio); - initAmqp(); - aio->start(poller); -} - -void TCPConnector::start(sys::AsynchIO* aio_) { - aio = aio_; - for (int i = 0; i < 4; i++) { - aio->queueReadBuffer(new Buff(maxFrameSize)); - } - - identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress()); -} - -void TCPConnector::initAmqp() { - ProtocolInitiation init(version); - writeDataBlock(init); -} - -void TCPConnector::connectFailed(const std::string& msg) { - connector = 0; - QPID_LOG(warning, "Connect failed: " << msg); - socket.close(); - if (!closed) - closed = true; - if (shutdownHandler) - shutdownHandler->shutdown(); -} - -void TCPConnector::close() { - Mutex::ScopedLock l(lock); - if (!closed) { - closed = true; - if (aio) - aio->queueWriteClose(); - } -} - -void TCPConnector::socketClosed(AsynchIO&, const Socket&) { - if (aio) - aio->queueForDeletion(); - if (shutdownHandler) - shutdownHandler->shutdown(); -} - -void TCPConnector::abort() { - // Can't abort a closed connection - if (!closed) { - if (aio) { - // Established connection - aio->requestCallback(boost::bind(&TCPConnector::eof, this, _1)); - } else if (connector) { - // We're still connecting - connector->stop(); - connectFailed("Connection timedout"); - } - } -} - -void TCPConnector::setInputHandler(InputHandler* handler){ - input = handler; -} - -void TCPConnector::setShutdownHandler(ShutdownHandler* handler){ - shutdownHandler = handler; -} - -OutputHandler* TCPConnector::getOutputHandler() { - return this; -} - -sys::ShutdownHandler* TCPConnector::getShutdownHandler() const { - return shutdownHandler; -} - -const std::string& TCPConnector::getIdentifier() const { - return identifier; -} - -void TCPConnector::send(AMQFrame& frame) { - bool notifyWrite = false; - { - Mutex::ScopedLock l(lock); - frames.push_back(frame); - //only ask to write if this is the end of a frameset or if we - //already have a buffers worth of data - currentSize += frame.encodedSize(); - if (frame.getEof()) { - lastEof = frames.size(); - notifyWrite = true; - } else { - notifyWrite = (currentSize >= maxFrameSize); - } - /* - NOTE: Moving the following line into this mutex block - is a workaround for BZ 570168, in which the test - testConcurrentSenders causes a hang about 1.5% - of the time. ( To see the hang much more frequently - leave this line out of the mutex block, and put a - small usleep just before it.) - - TODO mgoulish - fix the underlying cause and then - move this call back outside the mutex. - */ - if (notifyWrite && !closed) aio->notifyPendingWrite(); - } -} - -void TCPConnector::writebuff(AsynchIO& /*aio*/) -{ - // It's possible to be disconnected and be writable - if (closed) - return; - - Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this; - if (codec->canEncode()) { - std::auto_ptr<AsynchIO::BufferBase> buffer = std::auto_ptr<AsynchIO::BufferBase>(aio->getQueuedBuffer()); - if (!buffer.get()) buffer = std::auto_ptr<AsynchIO::BufferBase>(new Buff(maxFrameSize)); - - size_t encoded = codec->encode(buffer->bytes, buffer->byteCount); - - buffer->dataStart = 0; - buffer->dataCount = encoded; - aio->queueWrite(buffer.release()); - } -} - -// Called in IO thread. -bool TCPConnector::canEncode() -{ - Mutex::ScopedLock l(lock); - //have at least one full frameset or a whole buffers worth of data - return lastEof || currentSize >= maxFrameSize; -} - -// Called in IO thread. -size_t TCPConnector::encode(const char* buffer, size_t size) -{ - framing::Buffer out(const_cast<char*>(buffer), size); - size_t bytesWritten(0); - { - Mutex::ScopedLock l(lock); - while (!frames.empty() && out.available() >= frames.front().encodedSize() ) { - frames.front().encode(out); - QPID_LOG(trace, "SENT " << identifier << ": " << frames.front()); - frames.pop_front(); - if (lastEof) --lastEof; - } - bytesWritten = size - out.available(); - currentSize -= bytesWritten; - } - if (bounds) bounds->reduce(bytesWritten); - return bytesWritten; -} - -bool TCPConnector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) -{ - Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this; - int32_t decoded = codec->decode(buff->bytes+buff->dataStart, buff->dataCount); - // TODO: unreading needs to go away, and when we can cope - // with multiple sub-buffers in the general buffer scheme, it will - if (decoded < buff->dataCount) { - // Adjust buffer for used bytes and then "unread them" - buff->dataStart += decoded; - buff->dataCount -= decoded; - aio.unread(buff); - } else { - // Give whole buffer back to aio subsystem - aio.queueReadBuffer(buff); - } - return true; -} - -size_t TCPConnector::decode(const char* buffer, size_t size) -{ - framing::Buffer in(const_cast<char*>(buffer), size); - if (!initiated) { - framing::ProtocolInitiation protocolInit; - if (protocolInit.decode(in)) { - QPID_LOG(debug, "RECV " << identifier << " INIT(" << protocolInit << ")"); - if(!(protocolInit==version)){ - throw Exception(QPID_MSG("Unsupported version: " << protocolInit - << " supported version " << version)); - } - } - initiated = true; - } - AMQFrame frame; - while(frame.decode(in)){ - QPID_LOG(trace, "RECV " << identifier << ": " << frame); - input->received(frame); - } - return size - in.available(); -} - -void TCPConnector::writeDataBlock(const AMQDataBlock& data) { - AsynchIO::BufferBase* buff = aio->getQueuedBuffer(); - framing::Buffer out(buff->bytes, buff->byteCount); - data.encode(out); - buff->dataCount = data.encodedSize(); - aio->queueWrite(buff); -} - -void TCPConnector::eof(AsynchIO&) { - close(); -} - -void TCPConnector::disconnected(AsynchIO&) { - close(); - socketClosed(*aio, socket); -} - -void TCPConnector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer> sl) -{ - securityLayer = sl; - securityLayer->init(this); -} - -}} // namespace qpid::client |