summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp')
-rw-r--r--qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp399
1 files changed, 399 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp b/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp
new file mode 100644
index 0000000000..631d116b41
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp
@@ -0,0 +1,399 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/ProtocolFactory.h"
+
+#include "qpid/Plugin.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/framing/AMQP_HighestVersion.h"
+#include "qpid/log/Statement.h"
+#include "qpid/sys/rdma/RdmaIO.h"
+#include "qpid/sys/rdma/rdma_exception.h"
+#include "qpid/sys/OutputControl.h"
+#include "qpid/sys/SecuritySettings.h"
+
+#include <boost/bind.hpp>
+#include <memory>
+
+#include <netdb.h>
+
+using std::auto_ptr;
+using std::string;
+using std::stringstream;
+
+namespace qpid {
+namespace sys {
+
+class RdmaIOHandler : public OutputControl {
+ std::string identifier;
+ ConnectionCodec::Factory* factory;
+ ConnectionCodec* codec;
+ bool readError;
+
+ sys::Mutex pollingLock;
+ bool polling;
+
+ Rdma::AsynchIO* aio;
+ Rdma::Connection::intrusive_ptr connection;
+
+ void write(const framing::ProtocolInitiation&);
+ void disconnectAction();
+
+ public:
+ RdmaIOHandler(Rdma::Connection::intrusive_ptr c, ConnectionCodec::Factory* f);
+ ~RdmaIOHandler();
+ void init(Rdma::AsynchIO* a);
+ void start(Poller::shared_ptr poller);
+
+ // Output side
+ void close();
+ void abort();
+ void activateOutput();
+ void giveReadCredit(int32_t credit);
+ void initProtocolOut();
+
+ // Input side
+ void readbuff(Rdma::AsynchIO& aio, Rdma::Buffer* buff);
+ void initProtocolIn(Rdma::Buffer* buff);
+
+ // Notifications
+ void full(Rdma::AsynchIO& aio);
+ void idle(Rdma::AsynchIO& aio);
+ void error(Rdma::AsynchIO& aio);
+ void disconnected();
+ void drained();
+};
+
+RdmaIOHandler::RdmaIOHandler(Rdma::Connection::intrusive_ptr c, qpid::sys::ConnectionCodec::Factory* f) :
+ identifier(c->getFullName()),
+ factory(f),
+ codec(0),
+ readError(false),
+ polling(false),
+ connection(c)
+{
+}
+
+RdmaIOHandler::~RdmaIOHandler() {
+ if (codec)
+ codec->closed();
+ delete codec;
+ delete aio;
+}
+
+void RdmaIOHandler::init(Rdma::AsynchIO* a) {
+ aio = a;
+}
+
+void RdmaIOHandler::start(Poller::shared_ptr poller) {
+ Mutex::ScopedLock l(pollingLock);
+ assert(!polling);
+
+ polling = true;
+
+ aio->start(poller);
+}
+
+void RdmaIOHandler::write(const framing::ProtocolInitiation& data)
+{
+ QPID_LOG(debug, "Rdma: SENT [" << identifier << "] INIT(" << data << ")");
+ Rdma::Buffer* buff = aio->getSendBuffer();
+ assert(buff);
+ framing::Buffer out(buff->bytes(), buff->byteCount());
+ data.encode(out);
+ buff->dataCount(data.encodedSize());
+ aio->queueWrite(buff);
+}
+
+void RdmaIOHandler::close() {
+ aio->drainWriteQueue(boost::bind(&RdmaIOHandler::drained, this));
+}
+
+// TODO: Dummy implementation, need to fill this in for heartbeat timeout to work
+void RdmaIOHandler::abort() {
+}
+
+void RdmaIOHandler::activateOutput() {
+ aio->notifyPendingWrite();
+}
+
+void RdmaIOHandler::idle(Rdma::AsynchIO&) {
+ // TODO: Shouldn't need this test as idle() should only ever be called when
+ // the connection is writable anyway
+ if ( !aio->writable() ) {
+ return;
+ }
+ if (codec == 0) return;
+ if (!codec->canEncode()) {
+ return;
+ }
+ Rdma::Buffer* buff = aio->getSendBuffer();
+ if (buff) {
+ size_t encoded=codec->encode(buff->bytes(), buff->byteCount());
+ buff->dataCount(encoded);
+ aio->queueWrite(buff);
+ if (codec->isClosed()) {
+ close();
+ }
+ }
+}
+
+void RdmaIOHandler::initProtocolOut() {
+ // We mustn't have already started the conversation
+ // but we must be able to send
+ assert( codec == 0 );
+ assert( aio->writable() );
+ codec = factory->create(*this, identifier, SecuritySettings());
+ write(framing::ProtocolInitiation(codec->getVersion()));
+}
+
+void RdmaIOHandler::error(Rdma::AsynchIO&) {
+ disconnected();
+}
+
+namespace {
+ void stopped(RdmaIOHandler* async) {
+ delete async;
+ }
+}
+
+void RdmaIOHandler::disconnectAction() {
+ {
+ Mutex::ScopedLock l(pollingLock);
+ // If we're closed already then we'll get to drained() anyway
+ if (!polling) return;
+ polling = false;
+ }
+ aio->stop(boost::bind(&stopped, this));
+}
+
+void RdmaIOHandler::disconnected() {
+ aio->requestCallback(boost::bind(&RdmaIOHandler::disconnectAction, this));
+}
+
+void RdmaIOHandler::drained() {
+ // We know we've drained the write queue now, but we don't have to do anything
+ // because we can rely on the client to disconnect to trigger the connection
+ // cleanup.
+}
+
+void RdmaIOHandler::full(Rdma::AsynchIO&) {
+ QPID_LOG(debug, "Rdma: buffer full [" << identifier << "]");
+}
+
+// TODO: Dummy implementation of read throttling
+void RdmaIOHandler::giveReadCredit(int32_t) {
+}
+
+// The logic here is subtly different from TCP as RDMA is message oriented
+// so we define that an RDMA message is a frame - in this case there is no putting back
+// of any message remainder - there shouldn't be any. And what we read here can't be
+// smaller than a frame
+void RdmaIOHandler::readbuff(Rdma::AsynchIO&, Rdma::Buffer* buff) {
+ if (readError) {
+ return;
+ }
+ size_t decoded = 0;
+ try {
+ if (codec) {
+ decoded = codec->decode(buff->bytes(), buff->dataCount());
+ }else{
+ // Need to start protocol processing
+ initProtocolIn(buff);
+ }
+ }catch(const std::exception& e){
+ QPID_LOG(error, e.what());
+ readError = true;
+ close();
+ }
+}
+
+void RdmaIOHandler::initProtocolIn(Rdma::Buffer* buff) {
+ framing::Buffer in(buff->bytes(), buff->dataCount());
+ framing::ProtocolInitiation protocolInit;
+ size_t decoded = 0;
+ if (protocolInit.decode(in)) {
+ decoded = in.getPosition();
+ QPID_LOG(debug, "Rdma: RECV [" << identifier << "] INIT(" << protocolInit << ")");
+
+ codec = factory->create(protocolInit.getVersion(), *this, identifier, SecuritySettings());
+
+ // If we failed to create the codec then we don't understand the offered protocol version
+ if (!codec) {
+ // send valid version header & close connection.
+ write(framing::ProtocolInitiation(framing::highestProtocolVersion));
+ readError = true;
+ close();
+ }
+ }
+}
+
+class RdmaIOProtocolFactory : public ProtocolFactory {
+ auto_ptr<Rdma::Listener> listener;
+ const uint16_t listeningPort;
+
+ public:
+ RdmaIOProtocolFactory(int16_t port, int backlog);
+ void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
+ void connect(Poller::shared_ptr, const string& host, const std::string& port, ConnectionCodec::Factory*, ConnectFailedCallback);
+
+ uint16_t getPort() const;
+
+ private:
+ bool request(Rdma::Connection::intrusive_ptr, const Rdma::ConnectionParams&, ConnectionCodec::Factory*);
+ void established(Poller::shared_ptr, Rdma::Connection::intrusive_ptr);
+ void connected(Poller::shared_ptr, Rdma::Connection::intrusive_ptr, const Rdma::ConnectionParams&, ConnectionCodec::Factory*);
+ void connectionError(Rdma::Connection::intrusive_ptr, Rdma::ErrorType);
+ void disconnected(Rdma::Connection::intrusive_ptr);
+ void rejected(Rdma::Connection::intrusive_ptr, const Rdma::ConnectionParams&, ConnectFailedCallback);
+};
+
+// Static instance to initialise plugin
+static class RdmaIOPlugin : public Plugin {
+ void earlyInitialize(Target&) {
+ }
+
+ void initialize(Target& target) {
+ // Check whether we actually have any rdma devices
+ if ( Rdma::deviceCount() == 0 ) {
+ QPID_LOG(info, "Rdma: Disabled: no rdma devices found");
+ return;
+ }
+
+ broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
+ // Only provide to a Broker
+ if (broker) {
+ const broker::Broker::Options& opts = broker->getOptions();
+ ProtocolFactory::shared_ptr protocol(new RdmaIOProtocolFactory(opts.port, opts.connectionBacklog));
+ QPID_LOG(notice, "Rdma: Listening on RDMA port " << protocol->getPort());
+ broker->registerProtocolFactory("rdma", protocol);
+ }
+ }
+} rdmaPlugin;
+
+RdmaIOProtocolFactory::RdmaIOProtocolFactory(int16_t port, int /*backlog*/) :
+ listeningPort(port)
+{}
+
+void RdmaIOProtocolFactory::established(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr ci) {
+ RdmaIOHandler* async = ci->getContext<RdmaIOHandler>();
+ async->start(poller);
+}
+
+bool RdmaIOProtocolFactory::request(Rdma::Connection::intrusive_ptr ci, const Rdma::ConnectionParams& cp,
+ ConnectionCodec::Factory* f) {
+ try {
+ if (cp.rdmaProtocolVersion == 0) {
+ QPID_LOG(warning, "Rdma: connection from protocol version 0 client");
+ }
+ RdmaIOHandler* async = new RdmaIOHandler(ci, f);
+ Rdma::AsynchIO* aio =
+ new Rdma::AsynchIO(ci->getQueuePair(),
+ cp.rdmaProtocolVersion,
+ cp.maxRecvBufferSize, cp.initialXmitCredit, Rdma::DEFAULT_WR_ENTRIES,
+ boost::bind(&RdmaIOHandler::readbuff, async, _1, _2),
+ boost::bind(&RdmaIOHandler::idle, async, _1),
+ 0, // boost::bind(&RdmaIOHandler::full, async, _1),
+ boost::bind(&RdmaIOHandler::error, async, _1));
+ async->init(aio);
+
+ // Record aio so we can get it back from a connection
+ ci->addContext(async);
+ return true;
+ } catch (const Rdma::Exception& e) {
+ QPID_LOG(error, "Rdma: Cannot accept new connection (Rdma exception): " << e.what());
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "Rdma: Cannot accept new connection (unknown exception): " << e.what());
+ }
+
+ // If we get here we caught an exception so reject connection
+ return false;
+}
+
+void RdmaIOProtocolFactory::connectionError(Rdma::Connection::intrusive_ptr, Rdma::ErrorType) {
+}
+
+void RdmaIOProtocolFactory::disconnected(Rdma::Connection::intrusive_ptr ci) {
+ // If we've got a connection already tear it down, otherwise ignore
+ RdmaIOHandler* async = ci->getContext<RdmaIOHandler>();
+ if (async) {
+ // Make sure we don't disconnect more than once
+ ci->removeContext();
+ async->disconnected();
+ }
+}
+
+uint16_t RdmaIOProtocolFactory::getPort() const {
+ return listeningPort; // Immutable no need for lock.
+}
+
+void RdmaIOProtocolFactory::accept(Poller::shared_ptr poller, ConnectionCodec::Factory* fact) {
+ ::sockaddr_in sin;
+
+ sin.sin_family = AF_INET;
+ sin.sin_port = htons(listeningPort);
+ sin.sin_addr.s_addr = INADDR_ANY;
+
+ listener.reset(
+ new Rdma::Listener(
+ Rdma::ConnectionParams(65536, Rdma::DEFAULT_WR_ENTRIES),
+ boost::bind(&RdmaIOProtocolFactory::established, this, poller, _1),
+ boost::bind(&RdmaIOProtocolFactory::connectionError, this, _1, _2),
+ boost::bind(&RdmaIOProtocolFactory::disconnected, this, _1),
+ boost::bind(&RdmaIOProtocolFactory::request, this, _1, _2, fact)));
+
+ SocketAddress sa("",boost::lexical_cast<std::string>(listeningPort));
+ listener->start(poller, sa);
+}
+
+// Only used for outgoing connections (in federation)
+void RdmaIOProtocolFactory::rejected(Rdma::Connection::intrusive_ptr, const Rdma::ConnectionParams&, ConnectFailedCallback failed) {
+ failed(-1, "Connection rejected");
+}
+
+// Do the same as connection request and established but mark a client too
+void RdmaIOProtocolFactory::connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr ci, const Rdma::ConnectionParams& cp,
+ ConnectionCodec::Factory* f) {
+ (void) request(ci, cp, f);
+ established(poller, ci);
+ RdmaIOHandler* async = ci->getContext<RdmaIOHandler>();
+ async->initProtocolOut();
+}
+
+void RdmaIOProtocolFactory::connect(
+ Poller::shared_ptr poller,
+ const std::string& host, const std::string& port,
+ ConnectionCodec::Factory* f,
+ ConnectFailedCallback failed)
+{
+ Rdma::Connector* c =
+ new Rdma::Connector(
+ Rdma::ConnectionParams(8000, Rdma::DEFAULT_WR_ENTRIES),
+ boost::bind(&RdmaIOProtocolFactory::connected, this, poller, _1, _2, f),
+ boost::bind(&RdmaIOProtocolFactory::connectionError, this, _1, _2),
+ boost::bind(&RdmaIOProtocolFactory::disconnected, this, _1),
+ boost::bind(&RdmaIOProtocolFactory::rejected, this, _1, _2, failed));
+
+ SocketAddress sa(host, port);
+ c->start(poller, sa);
+}
+
+}} // namespace qpid::sys