summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/client/RdmaConnector.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/client/RdmaConnector.cpp')
-rw-r--r--qpid/cpp/src/qpid/client/RdmaConnector.cpp431
1 files changed, 431 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/client/RdmaConnector.cpp b/qpid/cpp/src/qpid/client/RdmaConnector.cpp
new file mode 100644
index 0000000000..664640f5e7
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/RdmaConnector.cpp
@@ -0,0 +1,431 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/client/Connector.h"
+
+#include "qpid/client/Bounds.h"
+#include "qpid/client/ConnectionImpl.h"
+#include "qpid/client/ConnectionSettings.h"
+#include "qpid/log/Statement.h"
+#include "qpid/sys/Time.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/InitiationHandler.h"
+#include "qpid/sys/rdma/RdmaIO.h"
+#include "qpid/sys/rdma/rdma_exception.h"
+#include "qpid/sys/Dispatcher.h"
+#include "qpid/sys/Poller.h"
+#include "qpid/sys/SecurityLayer.h"
+#include "qpid/Msg.h"
+
+#include <iostream>
+#include <boost/bind.hpp>
+#include <boost/format.hpp>
+#include <boost/lexical_cast.hpp>
+
+// This stuff needs to abstracted out of here to a platform specific file
+#include <netdb.h>
+
+namespace qpid {
+namespace client {
+
+using namespace qpid::sys;
+using namespace qpid::framing;
+using boost::format;
+using boost::str;
+
+class RdmaConnector : public Connector, public sys::Codec
+{
+ typedef std::deque<framing::AMQFrame> Frames;
+
+ const uint16_t maxFrameSize;
+ sys::Mutex lock;
+ Frames frames;
+ size_t lastEof; // Position after last EOF in frames
+ uint64_t currentSize;
+ Bounds* bounds;
+
+ framing::ProtocolVersion version;
+ bool initiated;
+
+ sys::Mutex dataConnectedLock;
+ bool dataConnected;
+
+ sys::ShutdownHandler* shutdownHandler;
+ framing::InputHandler* input;
+ framing::InitiationHandler* initialiser;
+ framing::OutputHandler* output;
+
+ Rdma::AsynchIO* aio;
+ Rdma::Connector* acon;
+ sys::Poller::shared_ptr poller;
+ std::auto_ptr<qpid::sys::SecurityLayer> securityLayer;
+
+ ~RdmaConnector();
+
+ // Callbacks
+ void connected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr, const Rdma::ConnectionParams&);
+ void connectionError(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr, Rdma::ErrorType);
+ void disconnected();
+ void rejected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr, const Rdma::ConnectionParams&);
+
+ void readbuff(Rdma::AsynchIO&, Rdma::Buffer*);
+ void writebuff(Rdma::AsynchIO&);
+ void writeDataBlock(const framing::AMQDataBlock& data);
+ void dataError(Rdma::AsynchIO&);
+ void drained();
+ void connectionStopped(Rdma::Connector* acon, Rdma::AsynchIO* aio);
+ void dataStopped(Rdma::AsynchIO* aio);
+
+ std::string identifier;
+
+ void connect(const std::string& host, const std::string& port);
+ void close();
+ void send(framing::AMQFrame& frame);
+ void abort() {} // TODO: need to fix this for heartbeat timeouts to work
+
+ void setInputHandler(framing::InputHandler* handler);
+ void setShutdownHandler(sys::ShutdownHandler* handler);
+ sys::ShutdownHandler* getShutdownHandler() const;
+ framing::OutputHandler* getOutputHandler();
+ const std::string& getIdentifier() const;
+ void activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer>);
+ const qpid::sys::SecuritySettings* getSecuritySettings() { return 0; }
+
+ size_t decode(const char* buffer, size_t size);
+ size_t encode(const char* buffer, size_t size);
+ bool canEncode();
+
+public:
+ RdmaConnector(Poller::shared_ptr,
+ framing::ProtocolVersion pVersion,
+ const ConnectionSettings&,
+ ConnectionImpl*);
+};
+
+// Static constructor which registers connector here
+namespace {
+ Connector* create(Poller::shared_ptr p, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) {
+ return new RdmaConnector(p, v, s, c);
+ }
+
+ struct StaticInit {
+ StaticInit() {
+ Connector::registerFactory("rdma", &create);
+ Connector::registerFactory("ib", &create);
+ };
+ } init;
+}
+
+
+RdmaConnector::RdmaConnector(Poller::shared_ptr p,
+ ProtocolVersion ver,
+ const ConnectionSettings& settings,
+ ConnectionImpl* cimpl)
+ : maxFrameSize(settings.maxFrameSize),
+ lastEof(0),
+ currentSize(0),
+ bounds(cimpl),
+ version(ver),
+ initiated(false),
+ dataConnected(false),
+ shutdownHandler(0),
+ aio(0),
+ acon(0),
+ poller(p)
+{
+ QPID_LOG(debug, "RdmaConnector created for " << version);
+}
+
+namespace {
+ void deleteAsynchIO(Rdma::AsynchIO& aio) {
+ delete &aio;
+ }
+
+ void deleteConnector(Rdma::ConnectionManager& con) {
+ delete &con;
+ }
+}
+
+RdmaConnector::~RdmaConnector() {
+ QPID_LOG(debug, "~RdmaConnector " << identifier);
+ if (aio) {
+ aio->stop(deleteAsynchIO);
+ }
+ if (acon) {
+ acon->stop(deleteConnector);
+ }
+}
+
+void RdmaConnector::connect(const std::string& host, const std::string& port){
+ Mutex::ScopedLock l(dataConnectedLock);
+ assert(!dataConnected);
+
+ acon = new Rdma::Connector(
+ Rdma::ConnectionParams(maxFrameSize, Rdma::DEFAULT_WR_ENTRIES),
+ boost::bind(&RdmaConnector::connected, this, poller, _1, _2),
+ boost::bind(&RdmaConnector::connectionError, this, poller, _1, _2),
+ boost::bind(&RdmaConnector::disconnected, this),
+ boost::bind(&RdmaConnector::rejected, this, poller, _1, _2));
+
+ SocketAddress sa(host, port);
+ acon->start(poller, sa);
+}
+
+// The following only gets run when connected
+void RdmaConnector::connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr ci, const Rdma::ConnectionParams& cp) {
+ try {
+ Mutex::ScopedLock l(dataConnectedLock);
+ assert(!dataConnected);
+ Rdma::QueuePair::intrusive_ptr q = ci->getQueuePair();
+
+ aio = new Rdma::AsynchIO(ci->getQueuePair(),
+ cp.rdmaProtocolVersion,
+ cp.maxRecvBufferSize, cp.initialXmitCredit , Rdma::DEFAULT_WR_ENTRIES,
+ boost::bind(&RdmaConnector::readbuff, this, _1, _2),
+ boost::bind(&RdmaConnector::writebuff, this, _1),
+ 0, // write buffers full
+ boost::bind(&RdmaConnector::dataError, this, _1));
+
+ identifier = str(format("[%1% %2%]") % ci->getLocalName() % ci->getPeerName());
+ ProtocolInitiation init(version);
+ writeDataBlock(init);
+
+ aio->start(poller);
+
+ dataConnected = true;
+
+ return;
+ } catch (const Rdma::Exception& e) {
+ QPID_LOG(error, "Rdma: Cannot create new connection (Rdma exception): " << e.what());
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "Rdma: Cannot create new connection (unknown exception): " << e.what());
+ }
+ dataConnected = false;
+ connectionStopped(acon, aio);
+}
+
+void RdmaConnector::connectionError(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr, Rdma::ErrorType) {
+ QPID_LOG(debug, "Connection Error " << identifier);
+ connectionStopped(acon, aio);
+}
+
+// Bizarrely we seem to get rejected events *after* we've already got a connected event for some peer disconnects
+// so we need to check whether the data connection is started or not in here
+void RdmaConnector::rejected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr, const Rdma::ConnectionParams& cp) {
+ QPID_LOG(debug, "Connection Rejected " << identifier << ": " << cp.maxRecvBufferSize);
+ if (dataConnected) {
+ disconnected();
+ } else {
+ connectionStopped(acon, aio);
+ }
+}
+
+void RdmaConnector::disconnected() {
+ QPID_LOG(debug, "Connection disconnected " << identifier);
+ {
+ Mutex::ScopedLock l(dataConnectedLock);
+ // If we're closed already then we'll get to drained() anyway
+ if (!dataConnected) return;
+ dataConnected = false;
+ }
+ // Make sure that all the disconnected actions take place on the data "thread"
+ aio->requestCallback(boost::bind(&RdmaConnector::drained, this));
+}
+
+void RdmaConnector::dataError(Rdma::AsynchIO&) {
+ QPID_LOG(debug, "Data Error " << identifier);
+ {
+ Mutex::ScopedLock l(dataConnectedLock);
+ // If we're closed already then we'll get to drained() anyway
+ if (!dataConnected) return;
+ dataConnected = false;
+ }
+ drained();
+}
+
+void RdmaConnector::close() {
+ QPID_LOG(debug, "RdmaConnector::close " << identifier);
+ {
+ Mutex::ScopedLock l(dataConnectedLock);
+ if (!dataConnected) return;
+ dataConnected = false;
+ }
+ aio->drainWriteQueue(boost::bind(&RdmaConnector::drained, this));
+}
+
+void RdmaConnector::drained() {
+ QPID_LOG(debug, "RdmaConnector::drained " << identifier);
+ assert(!dataConnected);
+ assert(aio);
+ Rdma::AsynchIO* a = aio;
+ aio = 0;
+ a->stop(boost::bind(&RdmaConnector::dataStopped, this, a));
+}
+
+void RdmaConnector::dataStopped(Rdma::AsynchIO* a) {
+ QPID_LOG(debug, "RdmaConnector::dataStopped " << identifier);
+ assert(!dataConnected);
+ assert(acon);
+ Rdma::Connector* c = acon;
+ acon = 0;
+ c->stop(boost::bind(&RdmaConnector::connectionStopped, this, c, a));
+}
+
+void RdmaConnector::connectionStopped(Rdma::Connector* c, Rdma::AsynchIO* a) {
+ QPID_LOG(debug, "RdmaConnector::connectionStopped " << identifier);
+ assert(!dataConnected);
+ aio = 0;
+ acon = 0;
+ delete a;
+ delete c;
+ if (shutdownHandler) {
+ ShutdownHandler* s = shutdownHandler;
+ shutdownHandler = 0;
+ s->shutdown();
+ }
+}
+
+void RdmaConnector::setInputHandler(InputHandler* handler){
+ input = handler;
+}
+
+void RdmaConnector::setShutdownHandler(ShutdownHandler* handler){
+ shutdownHandler = handler;
+}
+
+OutputHandler* RdmaConnector::getOutputHandler(){
+ return this;
+}
+
+sys::ShutdownHandler* RdmaConnector::getShutdownHandler() const {
+ return shutdownHandler;
+}
+
+const std::string& RdmaConnector::getIdentifier() const {
+ return identifier;
+}
+
+void RdmaConnector::send(AMQFrame& frame) {
+ // It is possible that we are called to write after we are already shutting down
+ Mutex::ScopedLock l(dataConnectedLock);
+ if (!dataConnected) return;
+
+ bool notifyWrite = false;
+ {
+ Mutex::ScopedLock l(lock);
+ frames.push_back(frame);
+ //only ask to write if this is the end of a frameset or if we
+ //already have a buffers worth of data
+ currentSize += frame.encodedSize();
+ if (frame.getEof()) {
+ lastEof = frames.size();
+ notifyWrite = true;
+ } else {
+ notifyWrite = (currentSize >= maxFrameSize);
+ }
+ }
+ if (notifyWrite) aio->notifyPendingWrite();
+}
+
+// Called in IO thread. (write idle routine)
+// This is NOT only called in response to previously calling notifyPendingWrite
+void RdmaConnector::writebuff(Rdma::AsynchIO&) {
+ // It's possible to be disconnected and be writable
+ Mutex::ScopedLock l(dataConnectedLock);
+ if (!dataConnected) {
+ return;
+ }
+ Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this;
+ if (!codec->canEncode()) {
+ return;
+ }
+ Rdma::Buffer* buffer = aio->getSendBuffer();
+ if (buffer) {
+ size_t encoded = codec->encode(buffer->bytes(), buffer->byteCount());
+ buffer->dataCount(encoded);
+ aio->queueWrite(buffer);
+ }
+}
+
+bool RdmaConnector::canEncode()
+{
+ Mutex::ScopedLock l(lock);
+ //have at least one full frameset or a whole buffers worth of data
+ return aio->writable() && (lastEof || currentSize >= maxFrameSize);
+}
+
+size_t RdmaConnector::encode(const char* buffer, size_t size)
+{
+ framing::Buffer out(const_cast<char*>(buffer), size);
+ size_t bytesWritten(0);
+ {
+ Mutex::ScopedLock l(lock);
+ while (!frames.empty() && out.available() >= frames.front().encodedSize() ) {
+ frames.front().encode(out);
+ QPID_LOG(trace, "SENT " << identifier << ": " << frames.front());
+ frames.pop_front();
+ if (lastEof) --lastEof;
+ }
+ bytesWritten = size - out.available();
+ currentSize -= bytesWritten;
+ }
+ if (bounds) bounds->reduce(bytesWritten);
+ return bytesWritten;
+}
+
+void RdmaConnector::readbuff(Rdma::AsynchIO&, Rdma::Buffer* buff) {
+ Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this;
+ codec->decode(buff->bytes(), buff->dataCount());
+}
+
+size_t RdmaConnector::decode(const char* buffer, size_t size)
+{
+ framing::Buffer in(const_cast<char*>(buffer), size);
+ if (!initiated) {
+ framing::ProtocolInitiation protocolInit;
+ if (protocolInit.decode(in)) {
+ //TODO: check the version is correct
+ QPID_LOG(debug, "RECV " << identifier << " INIT(" << protocolInit << ")");
+ }
+ initiated = true;
+ }
+ AMQFrame frame;
+ while(frame.decode(in)){
+ QPID_LOG(trace, "RECV " << identifier << ": " << frame);
+ input->received(frame);
+ }
+ return size - in.available();
+}
+
+void RdmaConnector::writeDataBlock(const AMQDataBlock& data) {
+ Rdma::Buffer* buff = aio->getSendBuffer();
+ framing::Buffer out(buff->bytes(), buff->byteCount());
+ data.encode(out);
+ buff->dataCount(data.encodedSize());
+ aio->queueWrite(buff);
+}
+
+void RdmaConnector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer> sl)
+{
+ securityLayer = sl;
+ securityLayer->init(this);
+}
+
+}} // namespace qpid::client