diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/CMakeLists.txt | 17 | ||||
-rw-r--r-- | cpp/src/Makefile.am | 2 | ||||
-rw-r--r-- | cpp/src/qpid/client/Connector.cpp | 353 | ||||
-rw-r--r-- | cpp/src/qpid/client/TCPConnector.cpp | 327 | ||||
-rw-r--r-- | cpp/src/qpid/client/TCPConnector.h | 117 |
5 files changed, 460 insertions, 356 deletions
diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index b6da2e758d..f09d4a8ad9 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -426,17 +426,25 @@ if (CMAKE_SYSTEM_NAME STREQUAL Windows) qpid/sys/windows/Thread.cpp qpid/sys/windows/Time.cpp qpid/sys/windows/uuid.cpp + ${sslcommon_windows_SOURCES} ) set (qpidcommon_platform_LIBS - rpcrt4 ws2_32 + ${windows_ssl_libs} rpcrt4 ws2_32 ) set (qpidbroker_platform_SOURCES qpid/broker/windows/BrokerDefaults.cpp qpid/broker/windows/SaslAuthenticator.cpp + ${sslbroker_windows_SOURCES} + ) + set (qpidbroker_platform_LIBS + ${windows_ssl_libs} ) - set (qpidclient_platform_SOURCES qpid/client/windows/SaslFactory.cpp + ${sslclient_windows_SOURCES} + ) + set (qpidclient_platform_LIBS + ${windows_ssl_libs} ) set (qpidd_platform_SOURCES @@ -625,6 +633,7 @@ set (qpidclient_SOURCES qpid/client/SubscriptionImpl.cpp qpid/client/SubscriptionManager.cpp qpid/client/SubscriptionManagerImpl.cpp + qpid/client/TCPConnector.cpp qpid/messaging/Address.cpp qpid/messaging/Connection.cpp qpid/messaging/ConnectionImpl.h @@ -665,7 +674,7 @@ set (qpidclient_SOURCES ) add_library (qpidclient SHARED ${qpidclient_SOURCES}) -target_link_libraries (qpidclient qpidcommon) +target_link_libraries (qpidclient qpidcommon ${qpidclient_platform_LIBS}) set_target_properties (qpidclient PROPERTIES VERSION ${qpidc_version}) install (TARGETS qpidclient DESTINATION ${QPID_INSTALL_LIBDIR} @@ -751,7 +760,7 @@ set (qpidbroker_SOURCES qpid/sys/TCPIOPlugin.cpp ) add_library (qpidbroker SHARED ${qpidbroker_SOURCES}) -target_link_libraries (qpidbroker qpidcommon) +target_link_libraries (qpidbroker qpidcommon ${qpidbroker_platform_LIBS}) set_target_properties (qpidbroker PROPERTIES VERSION ${qpidc_version}) if (MSVC) set_target_properties (qpidbroker PROPERTIES COMPILE_FLAGS /wd4290) diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 968bd7ca7a..ee2acb673b 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -691,6 +691,8 @@ libqpidclient_la_SOURCES = \ qpid/client/SubscriptionManager.cpp \ qpid/client/SubscriptionManagerImpl.cpp \ qpid/client/SubscriptionManagerImpl.h \ + qpid/client/TCPConnector.cpp \ + qpid/client/TCPConnector.h \ qpid/messaging/Address.cpp \ qpid/messaging/Connection.cpp \ qpid/messaging/ListContent.cpp \ diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp index ad60c9d7e1..2c4feffdcf 100644 --- a/cpp/src/qpid/client/Connector.cpp +++ b/cpp/src/qpid/client/Connector.cpp @@ -18,9 +18,9 @@ * under the License. * */ + #include "qpid/client/Connector.h" -#include "qpid/client/Bounds.h" #include "qpid/client/ConnectionImpl.h" #include "qpid/client/ConnectionSettings.h" #include "qpid/log/Statement.h" @@ -35,10 +35,8 @@ #include <iostream> #include <map> -#include <deque> #include <boost/bind.hpp> #include <boost/format.hpp> -#include <boost/weak_ptr.hpp> namespace qpid { namespace client { @@ -81,353 +79,4 @@ void Connector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer>) { } -class TCPConnector : public Connector, public sys::Codec, private sys::Runnable -{ - typedef std::deque<framing::AMQFrame> Frames; - struct Buff; - - const uint16_t maxFrameSize; - - sys::Mutex lock; - Frames frames; // Outgoing frame queue - size_t lastEof; // Position after last EOF in frames - uint64_t currentSize; - Bounds* bounds; - - framing::ProtocolVersion version; - bool initiated; - bool closed; - bool joined; - - sys::ShutdownHandler* shutdownHandler; - framing::InputHandler* input; - framing::InitiationHandler* initialiser; - framing::OutputHandler* output; - - sys::Thread receiver; - - sys::Socket socket; - - sys::AsynchIO* aio; - std::string identifier; - boost::shared_ptr<sys::Poller> poller; - std::auto_ptr<qpid::sys::SecurityLayer> securityLayer; - - ~TCPConnector(); - - void run(); - void handleClosed(); - bool closeInternal(); - - void connected(const Socket&); - void connectFailed(const std::string& msg); - bool readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*); - void writebuff(qpid::sys::AsynchIO&); - void writeDataBlock(const framing::AMQDataBlock& data); - void eof(qpid::sys::AsynchIO&); - - boost::weak_ptr<ConnectionImpl> impl; - - void connect(const std::string& host, int port); - void close(); - void send(framing::AMQFrame& frame); - void abort(); - - void setInputHandler(framing::InputHandler* handler); - void setShutdownHandler(sys::ShutdownHandler* handler); - sys::ShutdownHandler* getShutdownHandler() const; - framing::OutputHandler* getOutputHandler(); - const std::string& getIdentifier() const; - void activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer>); - - size_t decode(const char* buffer, size_t size); - size_t encode(const char* buffer, size_t size); - bool canEncode(); - -public: - TCPConnector(framing::ProtocolVersion pVersion, - const ConnectionSettings&, - ConnectionImpl*); - unsigned int getSSF() { return 0; } -}; - -// Static constructor which registers connector here -namespace { - Connector* create(framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) { - return new TCPConnector(v, s, c); - } - - struct StaticInit { - StaticInit() { - Connector::registerFactory("tcp", &create); - }; - } init; -} - -struct TCPConnector::Buff : public AsynchIO::BufferBase { - Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {} - ~Buff() { delete [] bytes;} -}; - -TCPConnector::TCPConnector(ProtocolVersion ver, - const ConnectionSettings& settings, - ConnectionImpl* cimpl) - : maxFrameSize(settings.maxFrameSize), - lastEof(0), - currentSize(0), - bounds(cimpl), - version(ver), - initiated(false), - closed(true), - joined(true), - shutdownHandler(0), - aio(0), - impl(cimpl->shared_from_this()) -{ - QPID_LOG(debug, "TCPConnector created for " << version.toString()); - settings.configureSocket(socket); -} - -TCPConnector::~TCPConnector() { - close(); -} - -void TCPConnector::connect(const std::string& host, int port){ - Mutex::ScopedLock l(lock); - assert(closed); - assert(joined); - poller = Poller::shared_ptr(new Poller); - AsynchConnector::create(socket, - poller, - host, port, - boost::bind(&TCPConnector::connected, this, _1), - boost::bind(&TCPConnector::connectFailed, this, _3)); - closed = false; - joined = false; - receiver = Thread(this); -} - -void TCPConnector::connected(const Socket&) { - aio = AsynchIO::create(socket, - boost::bind(&TCPConnector::readbuff, this, _1, _2), - boost::bind(&TCPConnector::eof, this, _1), - boost::bind(&TCPConnector::eof, this, _1), - 0, // closed - 0, // nobuffs - boost::bind(&TCPConnector::writebuff, this, _1)); - for (int i = 0; i < 32; i++) { - aio->queueReadBuffer(new Buff(maxFrameSize)); - } - aio->start(poller); - - identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress()); - ProtocolInitiation init(version); - writeDataBlock(init); -} - -void TCPConnector::connectFailed(const std::string& msg) { - QPID_LOG(warning, "Connecting failed: " << msg); - closed = true; - poller->shutdown(); - closeInternal(); - if (shutdownHandler) - shutdownHandler->shutdown(); -} - -bool TCPConnector::closeInternal() { - bool ret; - { - Mutex::ScopedLock l(lock); - ret = !closed; - if (!closed) { - closed = true; - aio->queueForDeletion(); - poller->shutdown(); - } - if (joined || receiver.id() == Thread::current().id()) { - return ret; - } - joined = true; - } - receiver.join(); - return ret; -} - -void TCPConnector::close() { - closeInternal(); -} - -void TCPConnector::abort() { - // Can't abort a closed connection - if (!closed) { - if (aio) { - // Established connection - aio->requestCallback(boost::bind(&TCPConnector::eof, this, _1)); - } else { - // We're still connecting - connectFailed("Connection timedout"); - } - } -} - -void TCPConnector::setInputHandler(InputHandler* handler){ - input = handler; -} - -void TCPConnector::setShutdownHandler(ShutdownHandler* handler){ - shutdownHandler = handler; -} - -OutputHandler* TCPConnector::getOutputHandler() { - return this; -} - -sys::ShutdownHandler* TCPConnector::getShutdownHandler() const { - return shutdownHandler; -} - -const std::string& TCPConnector::getIdentifier() const { - return identifier; -} - -void TCPConnector::send(AMQFrame& frame) { - Mutex::ScopedLock l(lock); - frames.push_back(frame); - //only ask to write if this is the end of a frameset or if we - //already have a buffers worth of data - currentSize += frame.encodedSize(); - bool notifyWrite = false; - if (frame.getEof()) { - lastEof = frames.size(); - notifyWrite = true; - } else { - notifyWrite = (currentSize >= maxFrameSize); - } - if (notifyWrite && !closed) aio->notifyPendingWrite(); -} - -void TCPConnector::handleClosed() { - if (closeInternal() && shutdownHandler) - shutdownHandler->shutdown(); -} - -void TCPConnector::writebuff(AsynchIO& /*aio*/) -{ - Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this; - if (codec->canEncode()) { - std::auto_ptr<AsynchIO::BufferBase> buffer = std::auto_ptr<AsynchIO::BufferBase>(aio->getQueuedBuffer()); - if (!buffer.get()) buffer = std::auto_ptr<AsynchIO::BufferBase>(new Buff(maxFrameSize)); - - size_t encoded = codec->encode(buffer->bytes, buffer->byteCount); - - buffer->dataStart = 0; - buffer->dataCount = encoded; - aio->queueWrite(buffer.release()); - } -} - -// Called in IO thread. -bool TCPConnector::canEncode() -{ - Mutex::ScopedLock l(lock); - //have at least one full frameset or a whole buffers worth of data - return lastEof || currentSize >= maxFrameSize; -} - -// Called in IO thread. -size_t TCPConnector::encode(const char* buffer, size_t size) -{ - framing::Buffer out(const_cast<char*>(buffer), size); - size_t bytesWritten(0); - { - Mutex::ScopedLock l(lock); - while (!frames.empty() && out.available() >= frames.front().encodedSize() ) { - frames.front().encode(out); - QPID_LOG(trace, "SENT " << identifier << ": " << frames.front()); - frames.pop_front(); - if (lastEof) --lastEof; - } - bytesWritten = size - out.available(); - currentSize -= bytesWritten; - } - if (bounds) bounds->reduce(bytesWritten); - return bytesWritten; -} - -bool TCPConnector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) -{ - Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this; - int32_t decoded = codec->decode(buff->bytes+buff->dataStart, buff->dataCount); - // TODO: unreading needs to go away, and when we can cope - // with multiple sub-buffers in the general buffer scheme, it will - if (decoded < buff->dataCount) { - // Adjust buffer for used bytes and then "unread them" - buff->dataStart += decoded; - buff->dataCount -= decoded; - aio.unread(buff); - } else { - // Give whole buffer back to aio subsystem - aio.queueReadBuffer(buff); - } - return true; -} - -size_t TCPConnector::decode(const char* buffer, size_t size) -{ - framing::Buffer in(const_cast<char*>(buffer), size); - if (!initiated) { - framing::ProtocolInitiation protocolInit; - if (protocolInit.decode(in)) { - QPID_LOG(debug, "RECV " << identifier << " INIT(" << protocolInit << ")"); - if(!(protocolInit==version)){ - throw Exception(QPID_MSG("Unsupported version: " << protocolInit - << " supported version " << version)); - } - } - initiated = true; - } - AMQFrame frame; - while(frame.decode(in)){ - QPID_LOG(trace, "RECV " << identifier << ": " << frame); - input->received(frame); - } - return size - in.available(); -} - -void TCPConnector::writeDataBlock(const AMQDataBlock& data) { - AsynchIO::BufferBase* buff = new Buff(maxFrameSize); - framing::Buffer out(buff->bytes, buff->byteCount); - data.encode(out); - buff->dataCount = data.encodedSize(); - aio->queueWrite(buff); -} - -void TCPConnector::eof(AsynchIO&) { - handleClosed(); -} - -void TCPConnector::run() { - // Keep the connection impl in memory until run() completes. - boost::shared_ptr<ConnectionImpl> protect = impl.lock(); - assert(protect); - try { - Dispatcher d(poller); - - d.run(); - } catch (const std::exception& e) { - QPID_LOG(error, QPID_MSG("FAIL " << identifier << ": " << e.what())); - handleClosed(); - } - try { - socket.close(); - } catch (const std::exception&) {} -} - -void TCPConnector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer> sl) -{ - securityLayer = sl; - securityLayer->init(this); -} - - }} // namespace qpid::client diff --git a/cpp/src/qpid/client/TCPConnector.cpp b/cpp/src/qpid/client/TCPConnector.cpp new file mode 100644 index 0000000000..1a6e51d54d --- /dev/null +++ b/cpp/src/qpid/client/TCPConnector.cpp @@ -0,0 +1,327 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/client/TCPConnector.h" + +#include "qpid/client/ConnectionImpl.h" +#include "qpid/client/ConnectionSettings.h" +#include "qpid/log/Statement.h" +#include "qpid/sys/Codec.h" +#include "qpid/sys/Time.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/sys/AsynchIO.h" +#include "qpid/sys/Dispatcher.h" +#include "qpid/sys/Poller.h" +#include "qpid/sys/SecurityLayer.h" +#include "qpid/Msg.h" + +#include <iostream> +#include <boost/bind.hpp> +#include <boost/format.hpp> + +namespace qpid { +namespace client { + +using namespace qpid::sys; +using namespace qpid::framing; +using boost::format; +using boost::str; + +// Static constructor which registers connector here +namespace { + Connector* create(framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) { + return new TCPConnector(v, s, c); + } + + struct StaticInit { + StaticInit() { + Connector::registerFactory("tcp", &create); + }; + } init; +} + +struct TCPConnector::Buff : public AsynchIO::BufferBase { + Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {} + ~Buff() { delete [] bytes;} +}; + +TCPConnector::TCPConnector(ProtocolVersion ver, + const ConnectionSettings& settings, + ConnectionImpl* cimpl) + : maxFrameSize(settings.maxFrameSize), + lastEof(0), + currentSize(0), + bounds(cimpl), + version(ver), + initiated(false), + closed(true), + joined(true), + shutdownHandler(0), + aio(0), + impl(cimpl->shared_from_this()) +{ + QPID_LOG(debug, "TCPConnector created for " << version.toString()); + settings.configureSocket(socket); +} + +TCPConnector::~TCPConnector() { + close(); +} + +void TCPConnector::connect(const std::string& host, int port) { + Mutex::ScopedLock l(lock); + assert(closed); + assert(joined); + poller = Poller::shared_ptr(new Poller); + AsynchConnector::create(socket, + poller, + host, port, + boost::bind(&TCPConnector::connected, this, _1), + boost::bind(&TCPConnector::connectFailed, this, _3)); + closed = false; + joined = false; + receiver = Thread(this); +} + +void TCPConnector::connected(const Socket&) { + aio = AsynchIO::create(socket, + boost::bind(&TCPConnector::readbuff, this, _1, _2), + boost::bind(&TCPConnector::eof, this, _1), + boost::bind(&TCPConnector::eof, this, _1), + 0, // closed + 0, // nobuffs + boost::bind(&TCPConnector::writebuff, this, _1)); + for (int i = 0; i < 32; i++) { + aio->queueReadBuffer(new Buff(maxFrameSize)); + } + aio->start(poller); + + identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress()); + ProtocolInitiation init(version); + writeDataBlock(init); +} + +void TCPConnector::connectFailed(const std::string& msg) { + QPID_LOG(warning, "Connecting failed: " << msg); + closed = true; + poller->shutdown(); + closeInternal(); + if (shutdownHandler) + shutdownHandler->shutdown(); +} + +bool TCPConnector::closeInternal() { + bool ret; + { + Mutex::ScopedLock l(lock); + ret = !closed; + if (!closed) { + closed = true; + aio->queueForDeletion(); + poller->shutdown(); + } + if (joined || receiver.id() == Thread::current().id()) { + return ret; + } + joined = true; + } + receiver.join(); + return ret; +} + +void TCPConnector::close() { + closeInternal(); +} + +void TCPConnector::abort() { + // Can't abort a closed connection + if (!closed) { + if (aio) { + // Established connection + aio->requestCallback(boost::bind(&TCPConnector::eof, this, _1)); + } else { + // We're still connecting + connectFailed("Connection timedout"); + } + } +} + +void TCPConnector::setInputHandler(InputHandler* handler){ + input = handler; +} + +void TCPConnector::setShutdownHandler(ShutdownHandler* handler){ + shutdownHandler = handler; +} + +OutputHandler* TCPConnector::getOutputHandler() { + return this; +} + +sys::ShutdownHandler* TCPConnector::getShutdownHandler() const { + return shutdownHandler; +} + +const std::string& TCPConnector::getIdentifier() const { + return identifier; +} + +void TCPConnector::send(AMQFrame& frame) { + Mutex::ScopedLock l(lock); + frames.push_back(frame); + //only ask to write if this is the end of a frameset or if we + //already have a buffers worth of data + currentSize += frame.encodedSize(); + bool notifyWrite = false; + if (frame.getEof()) { + lastEof = frames.size(); + notifyWrite = true; + } else { + notifyWrite = (currentSize >= maxFrameSize); + } + if (notifyWrite && !closed) aio->notifyPendingWrite(); +} + +void TCPConnector::handleClosed() { + if (closeInternal() && shutdownHandler) + shutdownHandler->shutdown(); +} + +void TCPConnector::writebuff(AsynchIO& /*aio*/) +{ + Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this; + if (codec->canEncode()) { + std::auto_ptr<AsynchIO::BufferBase> buffer = std::auto_ptr<AsynchIO::BufferBase>(aio->getQueuedBuffer()); + if (!buffer.get()) buffer = std::auto_ptr<AsynchIO::BufferBase>(new Buff(maxFrameSize)); + + size_t encoded = codec->encode(buffer->bytes, buffer->byteCount); + + buffer->dataStart = 0; + buffer->dataCount = encoded; + aio->queueWrite(buffer.release()); + } +} + +// Called in IO thread. +bool TCPConnector::canEncode() +{ + Mutex::ScopedLock l(lock); + //have at least one full frameset or a whole buffers worth of data + return lastEof || currentSize >= maxFrameSize; +} + +// Called in IO thread. +size_t TCPConnector::encode(const char* buffer, size_t size) +{ + framing::Buffer out(const_cast<char*>(buffer), size); + size_t bytesWritten(0); + { + Mutex::ScopedLock l(lock); + while (!frames.empty() && out.available() >= frames.front().encodedSize() ) { + frames.front().encode(out); + QPID_LOG(trace, "SENT " << identifier << ": " << frames.front()); + frames.pop_front(); + if (lastEof) --lastEof; + } + bytesWritten = size - out.available(); + currentSize -= bytesWritten; + } + if (bounds) bounds->reduce(bytesWritten); + return bytesWritten; +} + +bool TCPConnector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) +{ + Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this; + int32_t decoded = codec->decode(buff->bytes+buff->dataStart, buff->dataCount); + // TODO: unreading needs to go away, and when we can cope + // with multiple sub-buffers in the general buffer scheme, it will + if (decoded < buff->dataCount) { + // Adjust buffer for used bytes and then "unread them" + buff->dataStart += decoded; + buff->dataCount -= decoded; + aio.unread(buff); + } else { + // Give whole buffer back to aio subsystem + aio.queueReadBuffer(buff); + } + return true; +} + +size_t TCPConnector::decode(const char* buffer, size_t size) +{ + framing::Buffer in(const_cast<char*>(buffer), size); + if (!initiated) { + framing::ProtocolInitiation protocolInit; + if (protocolInit.decode(in)) { + QPID_LOG(debug, "RECV " << identifier << " INIT(" << protocolInit << ")"); + if(!(protocolInit==version)){ + throw Exception(QPID_MSG("Unsupported version: " << protocolInit + << " supported version " << version)); + } + } + initiated = true; + } + AMQFrame frame; + while(frame.decode(in)){ + QPID_LOG(trace, "RECV " << identifier << ": " << frame); + input->received(frame); + } + return size - in.available(); +} + +void TCPConnector::writeDataBlock(const AMQDataBlock& data) { + AsynchIO::BufferBase* buff = new Buff(maxFrameSize); + framing::Buffer out(buff->bytes, buff->byteCount); + data.encode(out); + buff->dataCount = data.encodedSize(); + aio->queueWrite(buff); +} + +void TCPConnector::eof(AsynchIO&) { + handleClosed(); +} + +void TCPConnector::run() { + // Keep the connection impl in memory until run() completes. + boost::shared_ptr<ConnectionImpl> protect = impl.lock(); + assert(protect); + try { + Dispatcher d(poller); + + d.run(); + } catch (const std::exception& e) { + QPID_LOG(error, QPID_MSG("FAIL " << identifier << ": " << e.what())); + handleClosed(); + } + try { + socket.close(); + } catch (const std::exception&) {} +} + +void TCPConnector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer> sl) +{ + securityLayer = sl; + securityLayer->init(this); +} + + +}} // namespace qpid::client diff --git a/cpp/src/qpid/client/TCPConnector.h b/cpp/src/qpid/client/TCPConnector.h new file mode 100644 index 0000000000..6dc07d1f5d --- /dev/null +++ b/cpp/src/qpid/client/TCPConnector.h @@ -0,0 +1,117 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef _TCPConnector_ +#define _TCPConnector_ + +#include "Connector.h" +#include "qpid/client/Bounds.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/sys/AsynchIO.h" +#include "qpid/sys/Codec.h" +#include "qpid/sys/IntegerTypes.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/Runnable.h" +#include "qpid/sys/SecurityLayer.h" +#include "qpid/sys/Socket.h" +#include "qpid/sys/Thread.h" + +#include <boost/shared_ptr.hpp> +#include <boost/weak_ptr.hpp> +#include <deque> +#include <string> + +namespace qpid { +namespace client { + +class TCPConnector : public Connector, public sys::Codec, private sys::Runnable +{ + typedef std::deque<framing::AMQFrame> Frames; + struct Buff; + + const uint16_t maxFrameSize; + + sys::Mutex lock; + Frames frames; // Outgoing frame queue + size_t lastEof; // Position after last EOF in frames + uint64_t currentSize; + Bounds* bounds; + + framing::ProtocolVersion version; + bool initiated; + bool closed; + bool joined; + + sys::ShutdownHandler* shutdownHandler; + framing::InputHandler* input; + framing::InitiationHandler* initialiser; + framing::OutputHandler* output; + + sys::Thread receiver; + + sys::Socket socket; + + sys::AsynchIO* aio; + std::string identifier; + boost::shared_ptr<sys::Poller> poller; + std::auto_ptr<qpid::sys::SecurityLayer> securityLayer; + + ~TCPConnector(); + + void run(); + void handleClosed(); + bool closeInternal(); + + virtual void connected(const qpid::sys::Socket&); + void connectFailed(const std::string& msg); + bool readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*); + void writebuff(qpid::sys::AsynchIO&); + void writeDataBlock(const framing::AMQDataBlock& data); + void eof(qpid::sys::AsynchIO&); + + boost::weak_ptr<ConnectionImpl> impl; + + void connect(const std::string& host, int port); + void close(); + void send(framing::AMQFrame& frame); + void abort(); + + void setInputHandler(framing::InputHandler* handler); + void setShutdownHandler(sys::ShutdownHandler* handler); + sys::ShutdownHandler* getShutdownHandler() const; + framing::OutputHandler* getOutputHandler(); + const std::string& getIdentifier() const; + void activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer>); + + size_t decode(const char* buffer, size_t size); + size_t encode(const char* buffer, size_t size); + bool canEncode(); + +public: + TCPConnector(framing::ProtocolVersion pVersion, + const ConnectionSettings&, + ConnectionImpl*); + unsigned int getSSF() { return 0; } +}; + +}} // namespace qpid::client + +#endif /* _TCPConnector_ */ |