summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorStephen D. Huston <shuston@apache.org>2009-12-14 21:16:30 +0000
committerStephen D. Huston <shuston@apache.org>2009-12-14 21:16:30 +0000
commit9bae1f85fbdbb860e62f320fc87f43571ef22af5 (patch)
tree81f4d22fb2cb4d99186b43f505d74223e2755109 /cpp
parent5ebc4bc86dc767bba4e8679db6fd7fc9778490c5 (diff)
downloadqpid-python-9bae1f85fbdbb860e62f320fc87f43571ef22af5.tar.gz
Move the TCPConnector class to its own file to allow deriving from it; resolves QPID-2270.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@890481 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/CMakeLists.txt17
-rw-r--r--cpp/src/Makefile.am2
-rw-r--r--cpp/src/qpid/client/Connector.cpp353
-rw-r--r--cpp/src/qpid/client/TCPConnector.cpp327
-rw-r--r--cpp/src/qpid/client/TCPConnector.h117
5 files changed, 460 insertions, 356 deletions
diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt
index b6da2e758d..f09d4a8ad9 100644
--- a/cpp/src/CMakeLists.txt
+++ b/cpp/src/CMakeLists.txt
@@ -426,17 +426,25 @@ if (CMAKE_SYSTEM_NAME STREQUAL Windows)
qpid/sys/windows/Thread.cpp
qpid/sys/windows/Time.cpp
qpid/sys/windows/uuid.cpp
+ ${sslcommon_windows_SOURCES}
)
set (qpidcommon_platform_LIBS
- rpcrt4 ws2_32
+ ${windows_ssl_libs} rpcrt4 ws2_32
)
set (qpidbroker_platform_SOURCES
qpid/broker/windows/BrokerDefaults.cpp
qpid/broker/windows/SaslAuthenticator.cpp
+ ${sslbroker_windows_SOURCES}
+ )
+ set (qpidbroker_platform_LIBS
+ ${windows_ssl_libs}
)
-
set (qpidclient_platform_SOURCES
qpid/client/windows/SaslFactory.cpp
+ ${sslclient_windows_SOURCES}
+ )
+ set (qpidclient_platform_LIBS
+ ${windows_ssl_libs}
)
set (qpidd_platform_SOURCES
@@ -625,6 +633,7 @@ set (qpidclient_SOURCES
qpid/client/SubscriptionImpl.cpp
qpid/client/SubscriptionManager.cpp
qpid/client/SubscriptionManagerImpl.cpp
+ qpid/client/TCPConnector.cpp
qpid/messaging/Address.cpp
qpid/messaging/Connection.cpp
qpid/messaging/ConnectionImpl.h
@@ -665,7 +674,7 @@ set (qpidclient_SOURCES
)
add_library (qpidclient SHARED ${qpidclient_SOURCES})
-target_link_libraries (qpidclient qpidcommon)
+target_link_libraries (qpidclient qpidcommon ${qpidclient_platform_LIBS})
set_target_properties (qpidclient PROPERTIES VERSION ${qpidc_version})
install (TARGETS qpidclient
DESTINATION ${QPID_INSTALL_LIBDIR}
@@ -751,7 +760,7 @@ set (qpidbroker_SOURCES
qpid/sys/TCPIOPlugin.cpp
)
add_library (qpidbroker SHARED ${qpidbroker_SOURCES})
-target_link_libraries (qpidbroker qpidcommon)
+target_link_libraries (qpidbroker qpidcommon ${qpidbroker_platform_LIBS})
set_target_properties (qpidbroker PROPERTIES VERSION ${qpidc_version})
if (MSVC)
set_target_properties (qpidbroker PROPERTIES COMPILE_FLAGS /wd4290)
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 968bd7ca7a..ee2acb673b 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -691,6 +691,8 @@ libqpidclient_la_SOURCES = \
qpid/client/SubscriptionManager.cpp \
qpid/client/SubscriptionManagerImpl.cpp \
qpid/client/SubscriptionManagerImpl.h \
+ qpid/client/TCPConnector.cpp \
+ qpid/client/TCPConnector.h \
qpid/messaging/Address.cpp \
qpid/messaging/Connection.cpp \
qpid/messaging/ListContent.cpp \
diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp
index ad60c9d7e1..2c4feffdcf 100644
--- a/cpp/src/qpid/client/Connector.cpp
+++ b/cpp/src/qpid/client/Connector.cpp
@@ -18,9 +18,9 @@
* under the License.
*
*/
+
#include "qpid/client/Connector.h"
-#include "qpid/client/Bounds.h"
#include "qpid/client/ConnectionImpl.h"
#include "qpid/client/ConnectionSettings.h"
#include "qpid/log/Statement.h"
@@ -35,10 +35,8 @@
#include <iostream>
#include <map>
-#include <deque>
#include <boost/bind.hpp>
#include <boost/format.hpp>
-#include <boost/weak_ptr.hpp>
namespace qpid {
namespace client {
@@ -81,353 +79,4 @@ void Connector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer>)
{
}
-class TCPConnector : public Connector, public sys::Codec, private sys::Runnable
-{
- typedef std::deque<framing::AMQFrame> Frames;
- struct Buff;
-
- const uint16_t maxFrameSize;
-
- sys::Mutex lock;
- Frames frames; // Outgoing frame queue
- size_t lastEof; // Position after last EOF in frames
- uint64_t currentSize;
- Bounds* bounds;
-
- framing::ProtocolVersion version;
- bool initiated;
- bool closed;
- bool joined;
-
- sys::ShutdownHandler* shutdownHandler;
- framing::InputHandler* input;
- framing::InitiationHandler* initialiser;
- framing::OutputHandler* output;
-
- sys::Thread receiver;
-
- sys::Socket socket;
-
- sys::AsynchIO* aio;
- std::string identifier;
- boost::shared_ptr<sys::Poller> poller;
- std::auto_ptr<qpid::sys::SecurityLayer> securityLayer;
-
- ~TCPConnector();
-
- void run();
- void handleClosed();
- bool closeInternal();
-
- void connected(const Socket&);
- void connectFailed(const std::string& msg);
- bool readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*);
- void writebuff(qpid::sys::AsynchIO&);
- void writeDataBlock(const framing::AMQDataBlock& data);
- void eof(qpid::sys::AsynchIO&);
-
- boost::weak_ptr<ConnectionImpl> impl;
-
- void connect(const std::string& host, int port);
- void close();
- void send(framing::AMQFrame& frame);
- void abort();
-
- void setInputHandler(framing::InputHandler* handler);
- void setShutdownHandler(sys::ShutdownHandler* handler);
- sys::ShutdownHandler* getShutdownHandler() const;
- framing::OutputHandler* getOutputHandler();
- const std::string& getIdentifier() const;
- void activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer>);
-
- size_t decode(const char* buffer, size_t size);
- size_t encode(const char* buffer, size_t size);
- bool canEncode();
-
-public:
- TCPConnector(framing::ProtocolVersion pVersion,
- const ConnectionSettings&,
- ConnectionImpl*);
- unsigned int getSSF() { return 0; }
-};
-
-// Static constructor which registers connector here
-namespace {
- Connector* create(framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) {
- return new TCPConnector(v, s, c);
- }
-
- struct StaticInit {
- StaticInit() {
- Connector::registerFactory("tcp", &create);
- };
- } init;
-}
-
-struct TCPConnector::Buff : public AsynchIO::BufferBase {
- Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {}
- ~Buff() { delete [] bytes;}
-};
-
-TCPConnector::TCPConnector(ProtocolVersion ver,
- const ConnectionSettings& settings,
- ConnectionImpl* cimpl)
- : maxFrameSize(settings.maxFrameSize),
- lastEof(0),
- currentSize(0),
- bounds(cimpl),
- version(ver),
- initiated(false),
- closed(true),
- joined(true),
- shutdownHandler(0),
- aio(0),
- impl(cimpl->shared_from_this())
-{
- QPID_LOG(debug, "TCPConnector created for " << version.toString());
- settings.configureSocket(socket);
-}
-
-TCPConnector::~TCPConnector() {
- close();
-}
-
-void TCPConnector::connect(const std::string& host, int port){
- Mutex::ScopedLock l(lock);
- assert(closed);
- assert(joined);
- poller = Poller::shared_ptr(new Poller);
- AsynchConnector::create(socket,
- poller,
- host, port,
- boost::bind(&TCPConnector::connected, this, _1),
- boost::bind(&TCPConnector::connectFailed, this, _3));
- closed = false;
- joined = false;
- receiver = Thread(this);
-}
-
-void TCPConnector::connected(const Socket&) {
- aio = AsynchIO::create(socket,
- boost::bind(&TCPConnector::readbuff, this, _1, _2),
- boost::bind(&TCPConnector::eof, this, _1),
- boost::bind(&TCPConnector::eof, this, _1),
- 0, // closed
- 0, // nobuffs
- boost::bind(&TCPConnector::writebuff, this, _1));
- for (int i = 0; i < 32; i++) {
- aio->queueReadBuffer(new Buff(maxFrameSize));
- }
- aio->start(poller);
-
- identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress());
- ProtocolInitiation init(version);
- writeDataBlock(init);
-}
-
-void TCPConnector::connectFailed(const std::string& msg) {
- QPID_LOG(warning, "Connecting failed: " << msg);
- closed = true;
- poller->shutdown();
- closeInternal();
- if (shutdownHandler)
- shutdownHandler->shutdown();
-}
-
-bool TCPConnector::closeInternal() {
- bool ret;
- {
- Mutex::ScopedLock l(lock);
- ret = !closed;
- if (!closed) {
- closed = true;
- aio->queueForDeletion();
- poller->shutdown();
- }
- if (joined || receiver.id() == Thread::current().id()) {
- return ret;
- }
- joined = true;
- }
- receiver.join();
- return ret;
-}
-
-void TCPConnector::close() {
- closeInternal();
-}
-
-void TCPConnector::abort() {
- // Can't abort a closed connection
- if (!closed) {
- if (aio) {
- // Established connection
- aio->requestCallback(boost::bind(&TCPConnector::eof, this, _1));
- } else {
- // We're still connecting
- connectFailed("Connection timedout");
- }
- }
-}
-
-void TCPConnector::setInputHandler(InputHandler* handler){
- input = handler;
-}
-
-void TCPConnector::setShutdownHandler(ShutdownHandler* handler){
- shutdownHandler = handler;
-}
-
-OutputHandler* TCPConnector::getOutputHandler() {
- return this;
-}
-
-sys::ShutdownHandler* TCPConnector::getShutdownHandler() const {
- return shutdownHandler;
-}
-
-const std::string& TCPConnector::getIdentifier() const {
- return identifier;
-}
-
-void TCPConnector::send(AMQFrame& frame) {
- Mutex::ScopedLock l(lock);
- frames.push_back(frame);
- //only ask to write if this is the end of a frameset or if we
- //already have a buffers worth of data
- currentSize += frame.encodedSize();
- bool notifyWrite = false;
- if (frame.getEof()) {
- lastEof = frames.size();
- notifyWrite = true;
- } else {
- notifyWrite = (currentSize >= maxFrameSize);
- }
- if (notifyWrite && !closed) aio->notifyPendingWrite();
-}
-
-void TCPConnector::handleClosed() {
- if (closeInternal() && shutdownHandler)
- shutdownHandler->shutdown();
-}
-
-void TCPConnector::writebuff(AsynchIO& /*aio*/)
-{
- Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this;
- if (codec->canEncode()) {
- std::auto_ptr<AsynchIO::BufferBase> buffer = std::auto_ptr<AsynchIO::BufferBase>(aio->getQueuedBuffer());
- if (!buffer.get()) buffer = std::auto_ptr<AsynchIO::BufferBase>(new Buff(maxFrameSize));
-
- size_t encoded = codec->encode(buffer->bytes, buffer->byteCount);
-
- buffer->dataStart = 0;
- buffer->dataCount = encoded;
- aio->queueWrite(buffer.release());
- }
-}
-
-// Called in IO thread.
-bool TCPConnector::canEncode()
-{
- Mutex::ScopedLock l(lock);
- //have at least one full frameset or a whole buffers worth of data
- return lastEof || currentSize >= maxFrameSize;
-}
-
-// Called in IO thread.
-size_t TCPConnector::encode(const char* buffer, size_t size)
-{
- framing::Buffer out(const_cast<char*>(buffer), size);
- size_t bytesWritten(0);
- {
- Mutex::ScopedLock l(lock);
- while (!frames.empty() && out.available() >= frames.front().encodedSize() ) {
- frames.front().encode(out);
- QPID_LOG(trace, "SENT " << identifier << ": " << frames.front());
- frames.pop_front();
- if (lastEof) --lastEof;
- }
- bytesWritten = size - out.available();
- currentSize -= bytesWritten;
- }
- if (bounds) bounds->reduce(bytesWritten);
- return bytesWritten;
-}
-
-bool TCPConnector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff)
-{
- Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this;
- int32_t decoded = codec->decode(buff->bytes+buff->dataStart, buff->dataCount);
- // TODO: unreading needs to go away, and when we can cope
- // with multiple sub-buffers in the general buffer scheme, it will
- if (decoded < buff->dataCount) {
- // Adjust buffer for used bytes and then "unread them"
- buff->dataStart += decoded;
- buff->dataCount -= decoded;
- aio.unread(buff);
- } else {
- // Give whole buffer back to aio subsystem
- aio.queueReadBuffer(buff);
- }
- return true;
-}
-
-size_t TCPConnector::decode(const char* buffer, size_t size)
-{
- framing::Buffer in(const_cast<char*>(buffer), size);
- if (!initiated) {
- framing::ProtocolInitiation protocolInit;
- if (protocolInit.decode(in)) {
- QPID_LOG(debug, "RECV " << identifier << " INIT(" << protocolInit << ")");
- if(!(protocolInit==version)){
- throw Exception(QPID_MSG("Unsupported version: " << protocolInit
- << " supported version " << version));
- }
- }
- initiated = true;
- }
- AMQFrame frame;
- while(frame.decode(in)){
- QPID_LOG(trace, "RECV " << identifier << ": " << frame);
- input->received(frame);
- }
- return size - in.available();
-}
-
-void TCPConnector::writeDataBlock(const AMQDataBlock& data) {
- AsynchIO::BufferBase* buff = new Buff(maxFrameSize);
- framing::Buffer out(buff->bytes, buff->byteCount);
- data.encode(out);
- buff->dataCount = data.encodedSize();
- aio->queueWrite(buff);
-}
-
-void TCPConnector::eof(AsynchIO&) {
- handleClosed();
-}
-
-void TCPConnector::run() {
- // Keep the connection impl in memory until run() completes.
- boost::shared_ptr<ConnectionImpl> protect = impl.lock();
- assert(protect);
- try {
- Dispatcher d(poller);
-
- d.run();
- } catch (const std::exception& e) {
- QPID_LOG(error, QPID_MSG("FAIL " << identifier << ": " << e.what()));
- handleClosed();
- }
- try {
- socket.close();
- } catch (const std::exception&) {}
-}
-
-void TCPConnector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer> sl)
-{
- securityLayer = sl;
- securityLayer->init(this);
-}
-
-
}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/TCPConnector.cpp b/cpp/src/qpid/client/TCPConnector.cpp
new file mode 100644
index 0000000000..1a6e51d54d
--- /dev/null
+++ b/cpp/src/qpid/client/TCPConnector.cpp
@@ -0,0 +1,327 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/client/TCPConnector.h"
+
+#include "qpid/client/ConnectionImpl.h"
+#include "qpid/client/ConnectionSettings.h"
+#include "qpid/log/Statement.h"
+#include "qpid/sys/Codec.h"
+#include "qpid/sys/Time.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/sys/AsynchIO.h"
+#include "qpid/sys/Dispatcher.h"
+#include "qpid/sys/Poller.h"
+#include "qpid/sys/SecurityLayer.h"
+#include "qpid/Msg.h"
+
+#include <iostream>
+#include <boost/bind.hpp>
+#include <boost/format.hpp>
+
+namespace qpid {
+namespace client {
+
+using namespace qpid::sys;
+using namespace qpid::framing;
+using boost::format;
+using boost::str;
+
+// Static constructor which registers connector here
+namespace {
+ Connector* create(framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) {
+ return new TCPConnector(v, s, c);
+ }
+
+ struct StaticInit {
+ StaticInit() {
+ Connector::registerFactory("tcp", &create);
+ };
+ } init;
+}
+
+struct TCPConnector::Buff : public AsynchIO::BufferBase {
+ Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {}
+ ~Buff() { delete [] bytes;}
+};
+
+TCPConnector::TCPConnector(ProtocolVersion ver,
+ const ConnectionSettings& settings,
+ ConnectionImpl* cimpl)
+ : maxFrameSize(settings.maxFrameSize),
+ lastEof(0),
+ currentSize(0),
+ bounds(cimpl),
+ version(ver),
+ initiated(false),
+ closed(true),
+ joined(true),
+ shutdownHandler(0),
+ aio(0),
+ impl(cimpl->shared_from_this())
+{
+ QPID_LOG(debug, "TCPConnector created for " << version.toString());
+ settings.configureSocket(socket);
+}
+
+TCPConnector::~TCPConnector() {
+ close();
+}
+
+void TCPConnector::connect(const std::string& host, int port) {
+ Mutex::ScopedLock l(lock);
+ assert(closed);
+ assert(joined);
+ poller = Poller::shared_ptr(new Poller);
+ AsynchConnector::create(socket,
+ poller,
+ host, port,
+ boost::bind(&TCPConnector::connected, this, _1),
+ boost::bind(&TCPConnector::connectFailed, this, _3));
+ closed = false;
+ joined = false;
+ receiver = Thread(this);
+}
+
+void TCPConnector::connected(const Socket&) {
+ aio = AsynchIO::create(socket,
+ boost::bind(&TCPConnector::readbuff, this, _1, _2),
+ boost::bind(&TCPConnector::eof, this, _1),
+ boost::bind(&TCPConnector::eof, this, _1),
+ 0, // closed
+ 0, // nobuffs
+ boost::bind(&TCPConnector::writebuff, this, _1));
+ for (int i = 0; i < 32; i++) {
+ aio->queueReadBuffer(new Buff(maxFrameSize));
+ }
+ aio->start(poller);
+
+ identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress());
+ ProtocolInitiation init(version);
+ writeDataBlock(init);
+}
+
+void TCPConnector::connectFailed(const std::string& msg) {
+ QPID_LOG(warning, "Connecting failed: " << msg);
+ closed = true;
+ poller->shutdown();
+ closeInternal();
+ if (shutdownHandler)
+ shutdownHandler->shutdown();
+}
+
+bool TCPConnector::closeInternal() {
+ bool ret;
+ {
+ Mutex::ScopedLock l(lock);
+ ret = !closed;
+ if (!closed) {
+ closed = true;
+ aio->queueForDeletion();
+ poller->shutdown();
+ }
+ if (joined || receiver.id() == Thread::current().id()) {
+ return ret;
+ }
+ joined = true;
+ }
+ receiver.join();
+ return ret;
+}
+
+void TCPConnector::close() {
+ closeInternal();
+}
+
+void TCPConnector::abort() {
+ // Can't abort a closed connection
+ if (!closed) {
+ if (aio) {
+ // Established connection
+ aio->requestCallback(boost::bind(&TCPConnector::eof, this, _1));
+ } else {
+ // We're still connecting
+ connectFailed("Connection timedout");
+ }
+ }
+}
+
+void TCPConnector::setInputHandler(InputHandler* handler){
+ input = handler;
+}
+
+void TCPConnector::setShutdownHandler(ShutdownHandler* handler){
+ shutdownHandler = handler;
+}
+
+OutputHandler* TCPConnector::getOutputHandler() {
+ return this;
+}
+
+sys::ShutdownHandler* TCPConnector::getShutdownHandler() const {
+ return shutdownHandler;
+}
+
+const std::string& TCPConnector::getIdentifier() const {
+ return identifier;
+}
+
+void TCPConnector::send(AMQFrame& frame) {
+ Mutex::ScopedLock l(lock);
+ frames.push_back(frame);
+ //only ask to write if this is the end of a frameset or if we
+ //already have a buffers worth of data
+ currentSize += frame.encodedSize();
+ bool notifyWrite = false;
+ if (frame.getEof()) {
+ lastEof = frames.size();
+ notifyWrite = true;
+ } else {
+ notifyWrite = (currentSize >= maxFrameSize);
+ }
+ if (notifyWrite && !closed) aio->notifyPendingWrite();
+}
+
+void TCPConnector::handleClosed() {
+ if (closeInternal() && shutdownHandler)
+ shutdownHandler->shutdown();
+}
+
+void TCPConnector::writebuff(AsynchIO& /*aio*/)
+{
+ Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this;
+ if (codec->canEncode()) {
+ std::auto_ptr<AsynchIO::BufferBase> buffer = std::auto_ptr<AsynchIO::BufferBase>(aio->getQueuedBuffer());
+ if (!buffer.get()) buffer = std::auto_ptr<AsynchIO::BufferBase>(new Buff(maxFrameSize));
+
+ size_t encoded = codec->encode(buffer->bytes, buffer->byteCount);
+
+ buffer->dataStart = 0;
+ buffer->dataCount = encoded;
+ aio->queueWrite(buffer.release());
+ }
+}
+
+// Called in IO thread.
+bool TCPConnector::canEncode()
+{
+ Mutex::ScopedLock l(lock);
+ //have at least one full frameset or a whole buffers worth of data
+ return lastEof || currentSize >= maxFrameSize;
+}
+
+// Called in IO thread.
+size_t TCPConnector::encode(const char* buffer, size_t size)
+{
+ framing::Buffer out(const_cast<char*>(buffer), size);
+ size_t bytesWritten(0);
+ {
+ Mutex::ScopedLock l(lock);
+ while (!frames.empty() && out.available() >= frames.front().encodedSize() ) {
+ frames.front().encode(out);
+ QPID_LOG(trace, "SENT " << identifier << ": " << frames.front());
+ frames.pop_front();
+ if (lastEof) --lastEof;
+ }
+ bytesWritten = size - out.available();
+ currentSize -= bytesWritten;
+ }
+ if (bounds) bounds->reduce(bytesWritten);
+ return bytesWritten;
+}
+
+bool TCPConnector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff)
+{
+ Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this;
+ int32_t decoded = codec->decode(buff->bytes+buff->dataStart, buff->dataCount);
+ // TODO: unreading needs to go away, and when we can cope
+ // with multiple sub-buffers in the general buffer scheme, it will
+ if (decoded < buff->dataCount) {
+ // Adjust buffer for used bytes and then "unread them"
+ buff->dataStart += decoded;
+ buff->dataCount -= decoded;
+ aio.unread(buff);
+ } else {
+ // Give whole buffer back to aio subsystem
+ aio.queueReadBuffer(buff);
+ }
+ return true;
+}
+
+size_t TCPConnector::decode(const char* buffer, size_t size)
+{
+ framing::Buffer in(const_cast<char*>(buffer), size);
+ if (!initiated) {
+ framing::ProtocolInitiation protocolInit;
+ if (protocolInit.decode(in)) {
+ QPID_LOG(debug, "RECV " << identifier << " INIT(" << protocolInit << ")");
+ if(!(protocolInit==version)){
+ throw Exception(QPID_MSG("Unsupported version: " << protocolInit
+ << " supported version " << version));
+ }
+ }
+ initiated = true;
+ }
+ AMQFrame frame;
+ while(frame.decode(in)){
+ QPID_LOG(trace, "RECV " << identifier << ": " << frame);
+ input->received(frame);
+ }
+ return size - in.available();
+}
+
+void TCPConnector::writeDataBlock(const AMQDataBlock& data) {
+ AsynchIO::BufferBase* buff = new Buff(maxFrameSize);
+ framing::Buffer out(buff->bytes, buff->byteCount);
+ data.encode(out);
+ buff->dataCount = data.encodedSize();
+ aio->queueWrite(buff);
+}
+
+void TCPConnector::eof(AsynchIO&) {
+ handleClosed();
+}
+
+void TCPConnector::run() {
+ // Keep the connection impl in memory until run() completes.
+ boost::shared_ptr<ConnectionImpl> protect = impl.lock();
+ assert(protect);
+ try {
+ Dispatcher d(poller);
+
+ d.run();
+ } catch (const std::exception& e) {
+ QPID_LOG(error, QPID_MSG("FAIL " << identifier << ": " << e.what()));
+ handleClosed();
+ }
+ try {
+ socket.close();
+ } catch (const std::exception&) {}
+}
+
+void TCPConnector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer> sl)
+{
+ securityLayer = sl;
+ securityLayer->init(this);
+}
+
+
+}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/TCPConnector.h b/cpp/src/qpid/client/TCPConnector.h
new file mode 100644
index 0000000000..6dc07d1f5d
--- /dev/null
+++ b/cpp/src/qpid/client/TCPConnector.h
@@ -0,0 +1,117 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef _TCPConnector_
+#define _TCPConnector_
+
+#include "Connector.h"
+#include "qpid/client/Bounds.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/sys/AsynchIO.h"
+#include "qpid/sys/Codec.h"
+#include "qpid/sys/IntegerTypes.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Runnable.h"
+#include "qpid/sys/SecurityLayer.h"
+#include "qpid/sys/Socket.h"
+#include "qpid/sys/Thread.h"
+
+#include <boost/shared_ptr.hpp>
+#include <boost/weak_ptr.hpp>
+#include <deque>
+#include <string>
+
+namespace qpid {
+namespace client {
+
+class TCPConnector : public Connector, public sys::Codec, private sys::Runnable
+{
+ typedef std::deque<framing::AMQFrame> Frames;
+ struct Buff;
+
+ const uint16_t maxFrameSize;
+
+ sys::Mutex lock;
+ Frames frames; // Outgoing frame queue
+ size_t lastEof; // Position after last EOF in frames
+ uint64_t currentSize;
+ Bounds* bounds;
+
+ framing::ProtocolVersion version;
+ bool initiated;
+ bool closed;
+ bool joined;
+
+ sys::ShutdownHandler* shutdownHandler;
+ framing::InputHandler* input;
+ framing::InitiationHandler* initialiser;
+ framing::OutputHandler* output;
+
+ sys::Thread receiver;
+
+ sys::Socket socket;
+
+ sys::AsynchIO* aio;
+ std::string identifier;
+ boost::shared_ptr<sys::Poller> poller;
+ std::auto_ptr<qpid::sys::SecurityLayer> securityLayer;
+
+ ~TCPConnector();
+
+ void run();
+ void handleClosed();
+ bool closeInternal();
+
+ virtual void connected(const qpid::sys::Socket&);
+ void connectFailed(const std::string& msg);
+ bool readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*);
+ void writebuff(qpid::sys::AsynchIO&);
+ void writeDataBlock(const framing::AMQDataBlock& data);
+ void eof(qpid::sys::AsynchIO&);
+
+ boost::weak_ptr<ConnectionImpl> impl;
+
+ void connect(const std::string& host, int port);
+ void close();
+ void send(framing::AMQFrame& frame);
+ void abort();
+
+ void setInputHandler(framing::InputHandler* handler);
+ void setShutdownHandler(sys::ShutdownHandler* handler);
+ sys::ShutdownHandler* getShutdownHandler() const;
+ framing::OutputHandler* getOutputHandler();
+ const std::string& getIdentifier() const;
+ void activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer>);
+
+ size_t decode(const char* buffer, size_t size);
+ size_t encode(const char* buffer, size_t size);
+ bool canEncode();
+
+public:
+ TCPConnector(framing::ProtocolVersion pVersion,
+ const ConnectionSettings&,
+ ConnectionImpl*);
+ unsigned int getSSF() { return 0; }
+};
+
+}} // namespace qpid::client
+
+#endif /* _TCPConnector_ */