summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/client/TCPConnector.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/client/TCPConnector.cpp')
-rw-r--r--qpid/cpp/src/qpid/client/TCPConnector.cpp331
1 files changed, 331 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/client/TCPConnector.cpp b/qpid/cpp/src/qpid/client/TCPConnector.cpp
new file mode 100644
index 0000000000..0070b24ec0
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/TCPConnector.cpp
@@ -0,0 +1,331 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/client/TCPConnector.h"
+
+#include "qpid/client/ConnectionImpl.h"
+#include "qpid/client/ConnectionSettings.h"
+#include "qpid/log/Statement.h"
+#include "qpid/sys/Codec.h"
+#include "qpid/sys/Time.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/InitiationHandler.h"
+#include "qpid/sys/AsynchIO.h"
+#include "qpid/sys/Dispatcher.h"
+#include "qpid/sys/Poller.h"
+#include "qpid/sys/SecurityLayer.h"
+#include "qpid/Msg.h"
+
+#include <iostream>
+#include <boost/bind.hpp>
+#include <boost/format.hpp>
+
+namespace qpid {
+namespace client {
+
+using namespace qpid::sys;
+using namespace qpid::framing;
+using boost::format;
+using boost::str;
+
+struct TCPConnector::Buff : public AsynchIO::BufferBase {
+ Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {}
+ ~Buff() { delete [] bytes;}
+};
+
+// Static constructor which registers connector here
+namespace {
+ Connector* create(Poller::shared_ptr p, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) {
+ return new TCPConnector(p, v, s, c);
+ }
+
+ struct StaticInit {
+ StaticInit() {
+ Connector::registerFactory("tcp", &create);
+ };
+ } init;
+}
+
+TCPConnector::TCPConnector(Poller::shared_ptr p,
+ ProtocolVersion ver,
+ const ConnectionSettings& settings,
+ ConnectionImpl* cimpl)
+ : maxFrameSize(settings.maxFrameSize),
+ lastEof(0),
+ currentSize(0),
+ bounds(cimpl),
+ version(ver),
+ initiated(false),
+ closed(true),
+ shutdownHandler(0),
+ connector(0),
+ aio(0),
+ poller(p)
+{
+ QPID_LOG(debug, "TCPConnector created for " << version);
+ settings.configureSocket(socket);
+}
+
+TCPConnector::~TCPConnector() {
+ close();
+}
+
+void TCPConnector::connect(const std::string& host, const std::string& port) {
+ Mutex::ScopedLock l(lock);
+ assert(closed);
+ connector = AsynchConnector::create(
+ socket,
+ host, port,
+ boost::bind(&TCPConnector::connected, this, _1),
+ boost::bind(&TCPConnector::connectFailed, this, _3));
+ closed = false;
+
+ connector->start(poller);
+}
+
+void TCPConnector::connected(const Socket&) {
+ connector = 0;
+ aio = AsynchIO::create(socket,
+ boost::bind(&TCPConnector::readbuff, this, _1, _2),
+ boost::bind(&TCPConnector::eof, this, _1),
+ boost::bind(&TCPConnector::disconnected, this, _1),
+ boost::bind(&TCPConnector::socketClosed, this, _1, _2),
+ 0, // nobuffs
+ boost::bind(&TCPConnector::writebuff, this, _1));
+ start(aio);
+ initAmqp();
+ aio->start(poller);
+}
+
+void TCPConnector::start(sys::AsynchIO* aio_) {
+ aio = aio_;
+ for (int i = 0; i < 4; i++) {
+ aio->queueReadBuffer(new Buff(maxFrameSize));
+ }
+
+ identifier = str(format("[%1%]") % socket.getFullAddress());
+}
+
+void TCPConnector::initAmqp() {
+ ProtocolInitiation init(version);
+ writeDataBlock(init);
+}
+
+void TCPConnector::connectFailed(const std::string& msg) {
+ connector = 0;
+ QPID_LOG(warning, "Connect failed: " << msg);
+ socket.close();
+ if (!closed)
+ closed = true;
+ if (shutdownHandler)
+ shutdownHandler->shutdown();
+}
+
+void TCPConnector::close() {
+ Mutex::ScopedLock l(lock);
+ if (!closed) {
+ closed = true;
+ if (aio)
+ aio->queueWriteClose();
+ }
+}
+
+void TCPConnector::socketClosed(AsynchIO&, const Socket&) {
+ if (aio)
+ aio->queueForDeletion();
+ if (shutdownHandler)
+ shutdownHandler->shutdown();
+}
+
+void TCPConnector::abort() {
+ // Can't abort a closed connection
+ if (!closed) {
+ if (aio) {
+ // Established connection
+ aio->requestCallback(boost::bind(&TCPConnector::eof, this, _1));
+ } else if (connector) {
+ // We're still connecting
+ connector->stop();
+ connectFailed("Connection timedout");
+ }
+ }
+}
+
+void TCPConnector::setInputHandler(InputHandler* handler){
+ input = handler;
+}
+
+void TCPConnector::setShutdownHandler(ShutdownHandler* handler){
+ shutdownHandler = handler;
+}
+
+OutputHandler* TCPConnector::getOutputHandler() {
+ return this;
+}
+
+sys::ShutdownHandler* TCPConnector::getShutdownHandler() const {
+ return shutdownHandler;
+}
+
+const std::string& TCPConnector::getIdentifier() const {
+ return identifier;
+}
+
+void TCPConnector::send(AMQFrame& frame) {
+ bool notifyWrite = false;
+ {
+ Mutex::ScopedLock l(lock);
+ frames.push_back(frame);
+ //only ask to write if this is the end of a frameset or if we
+ //already have a buffers worth of data
+ currentSize += frame.encodedSize();
+ if (frame.getEof()) {
+ lastEof = frames.size();
+ notifyWrite = true;
+ } else {
+ notifyWrite = (currentSize >= maxFrameSize);
+ }
+ /*
+ NOTE: Moving the following line into this mutex block
+ is a workaround for BZ 570168, in which the test
+ testConcurrentSenders causes a hang about 1.5%
+ of the time. ( To see the hang much more frequently
+ leave this line out of the mutex block, and put a
+ small usleep just before it.)
+
+ TODO mgoulish - fix the underlying cause and then
+ move this call back outside the mutex.
+ */
+ if (notifyWrite && !closed) aio->notifyPendingWrite();
+ }
+}
+
+void TCPConnector::writebuff(AsynchIO& /*aio*/)
+{
+ // It's possible to be disconnected and be writable
+ if (closed)
+ return;
+
+ Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this;
+ if (codec->canEncode()) {
+ std::auto_ptr<AsynchIO::BufferBase> buffer = std::auto_ptr<AsynchIO::BufferBase>(aio->getQueuedBuffer());
+ if (!buffer.get()) buffer = std::auto_ptr<AsynchIO::BufferBase>(new Buff(maxFrameSize));
+
+ size_t encoded = codec->encode(buffer->bytes, buffer->byteCount);
+
+ buffer->dataStart = 0;
+ buffer->dataCount = encoded;
+ aio->queueWrite(buffer.release());
+ }
+}
+
+// Called in IO thread.
+bool TCPConnector::canEncode()
+{
+ Mutex::ScopedLock l(lock);
+ //have at least one full frameset or a whole buffers worth of data
+ return lastEof || currentSize >= maxFrameSize;
+}
+
+// Called in IO thread.
+size_t TCPConnector::encode(const char* buffer, size_t size)
+{
+ framing::Buffer out(const_cast<char*>(buffer), size);
+ size_t bytesWritten(0);
+ {
+ Mutex::ScopedLock l(lock);
+ while (!frames.empty() && out.available() >= frames.front().encodedSize() ) {
+ frames.front().encode(out);
+ QPID_LOG(trace, "SENT " << identifier << ": " << frames.front());
+ frames.pop_front();
+ if (lastEof) --lastEof;
+ }
+ bytesWritten = size - out.available();
+ currentSize -= bytesWritten;
+ }
+ if (bounds) bounds->reduce(bytesWritten);
+ return bytesWritten;
+}
+
+bool TCPConnector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff)
+{
+ Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this;
+ int32_t decoded = codec->decode(buff->bytes+buff->dataStart, buff->dataCount);
+ // TODO: unreading needs to go away, and when we can cope
+ // with multiple sub-buffers in the general buffer scheme, it will
+ if (decoded < buff->dataCount) {
+ // Adjust buffer for used bytes and then "unread them"
+ buff->dataStart += decoded;
+ buff->dataCount -= decoded;
+ aio.unread(buff);
+ } else {
+ // Give whole buffer back to aio subsystem
+ aio.queueReadBuffer(buff);
+ }
+ return true;
+}
+
+size_t TCPConnector::decode(const char* buffer, size_t size)
+{
+ framing::Buffer in(const_cast<char*>(buffer), size);
+ if (!initiated) {
+ framing::ProtocolInitiation protocolInit;
+ if (protocolInit.decode(in)) {
+ QPID_LOG(debug, "RECV " << identifier << " INIT(" << protocolInit << ")");
+ if(!(protocolInit==version)){
+ throw Exception(QPID_MSG("Unsupported version: " << protocolInit
+ << " supported version " << version));
+ }
+ }
+ initiated = true;
+ }
+ AMQFrame frame;
+ while(frame.decode(in)){
+ QPID_LOG(trace, "RECV " << identifier << ": " << frame);
+ input->received(frame);
+ }
+ return size - in.available();
+}
+
+void TCPConnector::writeDataBlock(const AMQDataBlock& data) {
+ AsynchIO::BufferBase* buff = aio->getQueuedBuffer();
+ framing::Buffer out(buff->bytes, buff->byteCount);
+ data.encode(out);
+ buff->dataCount = data.encodedSize();
+ aio->queueWrite(buff);
+}
+
+void TCPConnector::eof(AsynchIO&) {
+ close();
+}
+
+void TCPConnector::disconnected(AsynchIO&) {
+ close();
+ socketClosed(*aio, socket);
+}
+
+void TCPConnector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer> sl)
+{
+ securityLayer = sl;
+ securityLayer->init(this);
+}
+
+}} // namespace qpid::client