summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/TCPConnector.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/TCPConnector.cpp')
-rw-r--r--cpp/src/qpid/client/TCPConnector.cpp331
1 files changed, 0 insertions, 331 deletions
diff --git a/cpp/src/qpid/client/TCPConnector.cpp b/cpp/src/qpid/client/TCPConnector.cpp
deleted file mode 100644
index d90781b365..0000000000
--- a/cpp/src/qpid/client/TCPConnector.cpp
+++ /dev/null
@@ -1,331 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/client/TCPConnector.h"
-
-#include "qpid/client/ConnectionImpl.h"
-#include "qpid/client/ConnectionSettings.h"
-#include "qpid/log/Statement.h"
-#include "qpid/sys/Codec.h"
-#include "qpid/sys/Time.h"
-#include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/InitiationHandler.h"
-#include "qpid/sys/AsynchIO.h"
-#include "qpid/sys/Dispatcher.h"
-#include "qpid/sys/Poller.h"
-#include "qpid/sys/SecurityLayer.h"
-#include "qpid/Msg.h"
-
-#include <iostream>
-#include <boost/bind.hpp>
-#include <boost/format.hpp>
-
-namespace qpid {
-namespace client {
-
-using namespace qpid::sys;
-using namespace qpid::framing;
-using boost::format;
-using boost::str;
-
-struct TCPConnector::Buff : public AsynchIO::BufferBase {
- Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {}
- ~Buff() { delete [] bytes;}
-};
-
-// Static constructor which registers connector here
-namespace {
- Connector* create(Poller::shared_ptr p, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) {
- return new TCPConnector(p, v, s, c);
- }
-
- struct StaticInit {
- StaticInit() {
- Connector::registerFactory("tcp", &create);
- };
- } init;
-}
-
-TCPConnector::TCPConnector(Poller::shared_ptr p,
- ProtocolVersion ver,
- const ConnectionSettings& settings,
- ConnectionImpl* cimpl)
- : maxFrameSize(settings.maxFrameSize),
- lastEof(0),
- currentSize(0),
- bounds(cimpl),
- version(ver),
- initiated(false),
- closed(true),
- shutdownHandler(0),
- connector(0),
- aio(0),
- poller(p)
-{
- QPID_LOG(debug, "TCPConnector created for " << version);
- settings.configureSocket(socket);
-}
-
-TCPConnector::~TCPConnector() {
- close();
-}
-
-void TCPConnector::connect(const std::string& host, int port) {
- Mutex::ScopedLock l(lock);
- assert(closed);
- connector = AsynchConnector::create(
- socket,
- host, port,
- boost::bind(&TCPConnector::connected, this, _1),
- boost::bind(&TCPConnector::connectFailed, this, _3));
- closed = false;
-
- connector->start(poller);
-}
-
-void TCPConnector::connected(const Socket&) {
- connector = 0;
- aio = AsynchIO::create(socket,
- boost::bind(&TCPConnector::readbuff, this, _1, _2),
- boost::bind(&TCPConnector::eof, this, _1),
- boost::bind(&TCPConnector::disconnected, this, _1),
- boost::bind(&TCPConnector::socketClosed, this, _1, _2),
- 0, // nobuffs
- boost::bind(&TCPConnector::writebuff, this, _1));
- start(aio);
- initAmqp();
- aio->start(poller);
-}
-
-void TCPConnector::start(sys::AsynchIO* aio_) {
- aio = aio_;
- for (int i = 0; i < 4; i++) {
- aio->queueReadBuffer(new Buff(maxFrameSize));
- }
-
- identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress());
-}
-
-void TCPConnector::initAmqp() {
- ProtocolInitiation init(version);
- writeDataBlock(init);
-}
-
-void TCPConnector::connectFailed(const std::string& msg) {
- connector = 0;
- QPID_LOG(warning, "Connect failed: " << msg);
- socket.close();
- if (!closed)
- closed = true;
- if (shutdownHandler)
- shutdownHandler->shutdown();
-}
-
-void TCPConnector::close() {
- Mutex::ScopedLock l(lock);
- if (!closed) {
- closed = true;
- if (aio)
- aio->queueWriteClose();
- }
-}
-
-void TCPConnector::socketClosed(AsynchIO&, const Socket&) {
- if (aio)
- aio->queueForDeletion();
- if (shutdownHandler)
- shutdownHandler->shutdown();
-}
-
-void TCPConnector::abort() {
- // Can't abort a closed connection
- if (!closed) {
- if (aio) {
- // Established connection
- aio->requestCallback(boost::bind(&TCPConnector::eof, this, _1));
- } else if (connector) {
- // We're still connecting
- connector->stop();
- connectFailed("Connection timedout");
- }
- }
-}
-
-void TCPConnector::setInputHandler(InputHandler* handler){
- input = handler;
-}
-
-void TCPConnector::setShutdownHandler(ShutdownHandler* handler){
- shutdownHandler = handler;
-}
-
-OutputHandler* TCPConnector::getOutputHandler() {
- return this;
-}
-
-sys::ShutdownHandler* TCPConnector::getShutdownHandler() const {
- return shutdownHandler;
-}
-
-const std::string& TCPConnector::getIdentifier() const {
- return identifier;
-}
-
-void TCPConnector::send(AMQFrame& frame) {
- bool notifyWrite = false;
- {
- Mutex::ScopedLock l(lock);
- frames.push_back(frame);
- //only ask to write if this is the end of a frameset or if we
- //already have a buffers worth of data
- currentSize += frame.encodedSize();
- if (frame.getEof()) {
- lastEof = frames.size();
- notifyWrite = true;
- } else {
- notifyWrite = (currentSize >= maxFrameSize);
- }
- /*
- NOTE: Moving the following line into this mutex block
- is a workaround for BZ 570168, in which the test
- testConcurrentSenders causes a hang about 1.5%
- of the time. ( To see the hang much more frequently
- leave this line out of the mutex block, and put a
- small usleep just before it.)
-
- TODO mgoulish - fix the underlying cause and then
- move this call back outside the mutex.
- */
- if (notifyWrite && !closed) aio->notifyPendingWrite();
- }
-}
-
-void TCPConnector::writebuff(AsynchIO& /*aio*/)
-{
- // It's possible to be disconnected and be writable
- if (closed)
- return;
-
- Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this;
- if (codec->canEncode()) {
- std::auto_ptr<AsynchIO::BufferBase> buffer = std::auto_ptr<AsynchIO::BufferBase>(aio->getQueuedBuffer());
- if (!buffer.get()) buffer = std::auto_ptr<AsynchIO::BufferBase>(new Buff(maxFrameSize));
-
- size_t encoded = codec->encode(buffer->bytes, buffer->byteCount);
-
- buffer->dataStart = 0;
- buffer->dataCount = encoded;
- aio->queueWrite(buffer.release());
- }
-}
-
-// Called in IO thread.
-bool TCPConnector::canEncode()
-{
- Mutex::ScopedLock l(lock);
- //have at least one full frameset or a whole buffers worth of data
- return lastEof || currentSize >= maxFrameSize;
-}
-
-// Called in IO thread.
-size_t TCPConnector::encode(const char* buffer, size_t size)
-{
- framing::Buffer out(const_cast<char*>(buffer), size);
- size_t bytesWritten(0);
- {
- Mutex::ScopedLock l(lock);
- while (!frames.empty() && out.available() >= frames.front().encodedSize() ) {
- frames.front().encode(out);
- QPID_LOG(trace, "SENT " << identifier << ": " << frames.front());
- frames.pop_front();
- if (lastEof) --lastEof;
- }
- bytesWritten = size - out.available();
- currentSize -= bytesWritten;
- }
- if (bounds) bounds->reduce(bytesWritten);
- return bytesWritten;
-}
-
-bool TCPConnector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff)
-{
- Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this;
- int32_t decoded = codec->decode(buff->bytes+buff->dataStart, buff->dataCount);
- // TODO: unreading needs to go away, and when we can cope
- // with multiple sub-buffers in the general buffer scheme, it will
- if (decoded < buff->dataCount) {
- // Adjust buffer for used bytes and then "unread them"
- buff->dataStart += decoded;
- buff->dataCount -= decoded;
- aio.unread(buff);
- } else {
- // Give whole buffer back to aio subsystem
- aio.queueReadBuffer(buff);
- }
- return true;
-}
-
-size_t TCPConnector::decode(const char* buffer, size_t size)
-{
- framing::Buffer in(const_cast<char*>(buffer), size);
- if (!initiated) {
- framing::ProtocolInitiation protocolInit;
- if (protocolInit.decode(in)) {
- QPID_LOG(debug, "RECV " << identifier << " INIT(" << protocolInit << ")");
- if(!(protocolInit==version)){
- throw Exception(QPID_MSG("Unsupported version: " << protocolInit
- << " supported version " << version));
- }
- }
- initiated = true;
- }
- AMQFrame frame;
- while(frame.decode(in)){
- QPID_LOG(trace, "RECV " << identifier << ": " << frame);
- input->received(frame);
- }
- return size - in.available();
-}
-
-void TCPConnector::writeDataBlock(const AMQDataBlock& data) {
- AsynchIO::BufferBase* buff = aio->getQueuedBuffer();
- framing::Buffer out(buff->bytes, buff->byteCount);
- data.encode(out);
- buff->dataCount = data.encodedSize();
- aio->queueWrite(buff);
-}
-
-void TCPConnector::eof(AsynchIO&) {
- close();
-}
-
-void TCPConnector::disconnected(AsynchIO&) {
- close();
- socketClosed(*aio, socket);
-}
-
-void TCPConnector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer> sl)
-{
- securityLayer = sl;
- securityLayer->init(this);
-}
-
-}} // namespace qpid::client