summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/Connector.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/Connector.cpp')
-rw-r--r--cpp/src/qpid/client/Connector.cpp353
1 files changed, 1 insertions, 352 deletions
diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp
index ad60c9d7e1..2c4feffdcf 100644
--- a/cpp/src/qpid/client/Connector.cpp
+++ b/cpp/src/qpid/client/Connector.cpp
@@ -18,9 +18,9 @@
* under the License.
*
*/
+
#include "qpid/client/Connector.h"
-#include "qpid/client/Bounds.h"
#include "qpid/client/ConnectionImpl.h"
#include "qpid/client/ConnectionSettings.h"
#include "qpid/log/Statement.h"
@@ -35,10 +35,8 @@
#include <iostream>
#include <map>
-#include <deque>
#include <boost/bind.hpp>
#include <boost/format.hpp>
-#include <boost/weak_ptr.hpp>
namespace qpid {
namespace client {
@@ -81,353 +79,4 @@ void Connector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer>)
{
}
-class TCPConnector : public Connector, public sys::Codec, private sys::Runnable
-{
- typedef std::deque<framing::AMQFrame> Frames;
- struct Buff;
-
- const uint16_t maxFrameSize;
-
- sys::Mutex lock;
- Frames frames; // Outgoing frame queue
- size_t lastEof; // Position after last EOF in frames
- uint64_t currentSize;
- Bounds* bounds;
-
- framing::ProtocolVersion version;
- bool initiated;
- bool closed;
- bool joined;
-
- sys::ShutdownHandler* shutdownHandler;
- framing::InputHandler* input;
- framing::InitiationHandler* initialiser;
- framing::OutputHandler* output;
-
- sys::Thread receiver;
-
- sys::Socket socket;
-
- sys::AsynchIO* aio;
- std::string identifier;
- boost::shared_ptr<sys::Poller> poller;
- std::auto_ptr<qpid::sys::SecurityLayer> securityLayer;
-
- ~TCPConnector();
-
- void run();
- void handleClosed();
- bool closeInternal();
-
- void connected(const Socket&);
- void connectFailed(const std::string& msg);
- bool readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*);
- void writebuff(qpid::sys::AsynchIO&);
- void writeDataBlock(const framing::AMQDataBlock& data);
- void eof(qpid::sys::AsynchIO&);
-
- boost::weak_ptr<ConnectionImpl> impl;
-
- void connect(const std::string& host, int port);
- void close();
- void send(framing::AMQFrame& frame);
- void abort();
-
- void setInputHandler(framing::InputHandler* handler);
- void setShutdownHandler(sys::ShutdownHandler* handler);
- sys::ShutdownHandler* getShutdownHandler() const;
- framing::OutputHandler* getOutputHandler();
- const std::string& getIdentifier() const;
- void activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer>);
-
- size_t decode(const char* buffer, size_t size);
- size_t encode(const char* buffer, size_t size);
- bool canEncode();
-
-public:
- TCPConnector(framing::ProtocolVersion pVersion,
- const ConnectionSettings&,
- ConnectionImpl*);
- unsigned int getSSF() { return 0; }
-};
-
-// Static constructor which registers connector here
-namespace {
- Connector* create(framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) {
- return new TCPConnector(v, s, c);
- }
-
- struct StaticInit {
- StaticInit() {
- Connector::registerFactory("tcp", &create);
- };
- } init;
-}
-
-struct TCPConnector::Buff : public AsynchIO::BufferBase {
- Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {}
- ~Buff() { delete [] bytes;}
-};
-
-TCPConnector::TCPConnector(ProtocolVersion ver,
- const ConnectionSettings& settings,
- ConnectionImpl* cimpl)
- : maxFrameSize(settings.maxFrameSize),
- lastEof(0),
- currentSize(0),
- bounds(cimpl),
- version(ver),
- initiated(false),
- closed(true),
- joined(true),
- shutdownHandler(0),
- aio(0),
- impl(cimpl->shared_from_this())
-{
- QPID_LOG(debug, "TCPConnector created for " << version.toString());
- settings.configureSocket(socket);
-}
-
-TCPConnector::~TCPConnector() {
- close();
-}
-
-void TCPConnector::connect(const std::string& host, int port){
- Mutex::ScopedLock l(lock);
- assert(closed);
- assert(joined);
- poller = Poller::shared_ptr(new Poller);
- AsynchConnector::create(socket,
- poller,
- host, port,
- boost::bind(&TCPConnector::connected, this, _1),
- boost::bind(&TCPConnector::connectFailed, this, _3));
- closed = false;
- joined = false;
- receiver = Thread(this);
-}
-
-void TCPConnector::connected(const Socket&) {
- aio = AsynchIO::create(socket,
- boost::bind(&TCPConnector::readbuff, this, _1, _2),
- boost::bind(&TCPConnector::eof, this, _1),
- boost::bind(&TCPConnector::eof, this, _1),
- 0, // closed
- 0, // nobuffs
- boost::bind(&TCPConnector::writebuff, this, _1));
- for (int i = 0; i < 32; i++) {
- aio->queueReadBuffer(new Buff(maxFrameSize));
- }
- aio->start(poller);
-
- identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress());
- ProtocolInitiation init(version);
- writeDataBlock(init);
-}
-
-void TCPConnector::connectFailed(const std::string& msg) {
- QPID_LOG(warning, "Connecting failed: " << msg);
- closed = true;
- poller->shutdown();
- closeInternal();
- if (shutdownHandler)
- shutdownHandler->shutdown();
-}
-
-bool TCPConnector::closeInternal() {
- bool ret;
- {
- Mutex::ScopedLock l(lock);
- ret = !closed;
- if (!closed) {
- closed = true;
- aio->queueForDeletion();
- poller->shutdown();
- }
- if (joined || receiver.id() == Thread::current().id()) {
- return ret;
- }
- joined = true;
- }
- receiver.join();
- return ret;
-}
-
-void TCPConnector::close() {
- closeInternal();
-}
-
-void TCPConnector::abort() {
- // Can't abort a closed connection
- if (!closed) {
- if (aio) {
- // Established connection
- aio->requestCallback(boost::bind(&TCPConnector::eof, this, _1));
- } else {
- // We're still connecting
- connectFailed("Connection timedout");
- }
- }
-}
-
-void TCPConnector::setInputHandler(InputHandler* handler){
- input = handler;
-}
-
-void TCPConnector::setShutdownHandler(ShutdownHandler* handler){
- shutdownHandler = handler;
-}
-
-OutputHandler* TCPConnector::getOutputHandler() {
- return this;
-}
-
-sys::ShutdownHandler* TCPConnector::getShutdownHandler() const {
- return shutdownHandler;
-}
-
-const std::string& TCPConnector::getIdentifier() const {
- return identifier;
-}
-
-void TCPConnector::send(AMQFrame& frame) {
- Mutex::ScopedLock l(lock);
- frames.push_back(frame);
- //only ask to write if this is the end of a frameset or if we
- //already have a buffers worth of data
- currentSize += frame.encodedSize();
- bool notifyWrite = false;
- if (frame.getEof()) {
- lastEof = frames.size();
- notifyWrite = true;
- } else {
- notifyWrite = (currentSize >= maxFrameSize);
- }
- if (notifyWrite && !closed) aio->notifyPendingWrite();
-}
-
-void TCPConnector::handleClosed() {
- if (closeInternal() && shutdownHandler)
- shutdownHandler->shutdown();
-}
-
-void TCPConnector::writebuff(AsynchIO& /*aio*/)
-{
- Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this;
- if (codec->canEncode()) {
- std::auto_ptr<AsynchIO::BufferBase> buffer = std::auto_ptr<AsynchIO::BufferBase>(aio->getQueuedBuffer());
- if (!buffer.get()) buffer = std::auto_ptr<AsynchIO::BufferBase>(new Buff(maxFrameSize));
-
- size_t encoded = codec->encode(buffer->bytes, buffer->byteCount);
-
- buffer->dataStart = 0;
- buffer->dataCount = encoded;
- aio->queueWrite(buffer.release());
- }
-}
-
-// Called in IO thread.
-bool TCPConnector::canEncode()
-{
- Mutex::ScopedLock l(lock);
- //have at least one full frameset or a whole buffers worth of data
- return lastEof || currentSize >= maxFrameSize;
-}
-
-// Called in IO thread.
-size_t TCPConnector::encode(const char* buffer, size_t size)
-{
- framing::Buffer out(const_cast<char*>(buffer), size);
- size_t bytesWritten(0);
- {
- Mutex::ScopedLock l(lock);
- while (!frames.empty() && out.available() >= frames.front().encodedSize() ) {
- frames.front().encode(out);
- QPID_LOG(trace, "SENT " << identifier << ": " << frames.front());
- frames.pop_front();
- if (lastEof) --lastEof;
- }
- bytesWritten = size - out.available();
- currentSize -= bytesWritten;
- }
- if (bounds) bounds->reduce(bytesWritten);
- return bytesWritten;
-}
-
-bool TCPConnector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff)
-{
- Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this;
- int32_t decoded = codec->decode(buff->bytes+buff->dataStart, buff->dataCount);
- // TODO: unreading needs to go away, and when we can cope
- // with multiple sub-buffers in the general buffer scheme, it will
- if (decoded < buff->dataCount) {
- // Adjust buffer for used bytes and then "unread them"
- buff->dataStart += decoded;
- buff->dataCount -= decoded;
- aio.unread(buff);
- } else {
- // Give whole buffer back to aio subsystem
- aio.queueReadBuffer(buff);
- }
- return true;
-}
-
-size_t TCPConnector::decode(const char* buffer, size_t size)
-{
- framing::Buffer in(const_cast<char*>(buffer), size);
- if (!initiated) {
- framing::ProtocolInitiation protocolInit;
- if (protocolInit.decode(in)) {
- QPID_LOG(debug, "RECV " << identifier << " INIT(" << protocolInit << ")");
- if(!(protocolInit==version)){
- throw Exception(QPID_MSG("Unsupported version: " << protocolInit
- << " supported version " << version));
- }
- }
- initiated = true;
- }
- AMQFrame frame;
- while(frame.decode(in)){
- QPID_LOG(trace, "RECV " << identifier << ": " << frame);
- input->received(frame);
- }
- return size - in.available();
-}
-
-void TCPConnector::writeDataBlock(const AMQDataBlock& data) {
- AsynchIO::BufferBase* buff = new Buff(maxFrameSize);
- framing::Buffer out(buff->bytes, buff->byteCount);
- data.encode(out);
- buff->dataCount = data.encodedSize();
- aio->queueWrite(buff);
-}
-
-void TCPConnector::eof(AsynchIO&) {
- handleClosed();
-}
-
-void TCPConnector::run() {
- // Keep the connection impl in memory until run() completes.
- boost::shared_ptr<ConnectionImpl> protect = impl.lock();
- assert(protect);
- try {
- Dispatcher d(poller);
-
- d.run();
- } catch (const std::exception& e) {
- QPID_LOG(error, QPID_MSG("FAIL " << identifier << ": " << e.what()));
- handleClosed();
- }
- try {
- socket.close();
- } catch (const std::exception&) {}
-}
-
-void TCPConnector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer> sl)
-{
- securityLayer = sl;
- securityLayer->init(this);
-}
-
-
}} // namespace qpid::client