diff options
Diffstat (limited to 'qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp | 399 |
1 files changed, 399 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp b/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp new file mode 100644 index 0000000000..631d116b41 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp @@ -0,0 +1,399 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/ProtocolFactory.h" + +#include "qpid/Plugin.h" +#include "qpid/broker/Broker.h" +#include "qpid/framing/AMQP_HighestVersion.h" +#include "qpid/log/Statement.h" +#include "qpid/sys/rdma/RdmaIO.h" +#include "qpid/sys/rdma/rdma_exception.h" +#include "qpid/sys/OutputControl.h" +#include "qpid/sys/SecuritySettings.h" + +#include <boost/bind.hpp> +#include <memory> + +#include <netdb.h> + +using std::auto_ptr; +using std::string; +using std::stringstream; + +namespace qpid { +namespace sys { + +class RdmaIOHandler : public OutputControl { + std::string identifier; + ConnectionCodec::Factory* factory; + ConnectionCodec* codec; + bool readError; + + sys::Mutex pollingLock; + bool polling; + + Rdma::AsynchIO* aio; + Rdma::Connection::intrusive_ptr connection; + + void write(const framing::ProtocolInitiation&); + void disconnectAction(); + + public: + RdmaIOHandler(Rdma::Connection::intrusive_ptr c, ConnectionCodec::Factory* f); + ~RdmaIOHandler(); + void init(Rdma::AsynchIO* a); + void start(Poller::shared_ptr poller); + + // Output side + void close(); + void abort(); + void activateOutput(); + void giveReadCredit(int32_t credit); + void initProtocolOut(); + + // Input side + void readbuff(Rdma::AsynchIO& aio, Rdma::Buffer* buff); + void initProtocolIn(Rdma::Buffer* buff); + + // Notifications + void full(Rdma::AsynchIO& aio); + void idle(Rdma::AsynchIO& aio); + void error(Rdma::AsynchIO& aio); + void disconnected(); + void drained(); +}; + +RdmaIOHandler::RdmaIOHandler(Rdma::Connection::intrusive_ptr c, qpid::sys::ConnectionCodec::Factory* f) : + identifier(c->getFullName()), + factory(f), + codec(0), + readError(false), + polling(false), + connection(c) +{ +} + +RdmaIOHandler::~RdmaIOHandler() { + if (codec) + codec->closed(); + delete codec; + delete aio; +} + +void RdmaIOHandler::init(Rdma::AsynchIO* a) { + aio = a; +} + +void RdmaIOHandler::start(Poller::shared_ptr poller) { + Mutex::ScopedLock l(pollingLock); + assert(!polling); + + polling = true; + + aio->start(poller); +} + +void RdmaIOHandler::write(const framing::ProtocolInitiation& data) +{ + QPID_LOG(debug, "Rdma: SENT [" << identifier << "] INIT(" << data << ")"); + Rdma::Buffer* buff = aio->getSendBuffer(); + assert(buff); + framing::Buffer out(buff->bytes(), buff->byteCount()); + data.encode(out); + buff->dataCount(data.encodedSize()); + aio->queueWrite(buff); +} + +void RdmaIOHandler::close() { + aio->drainWriteQueue(boost::bind(&RdmaIOHandler::drained, this)); +} + +// TODO: Dummy implementation, need to fill this in for heartbeat timeout to work +void RdmaIOHandler::abort() { +} + +void RdmaIOHandler::activateOutput() { + aio->notifyPendingWrite(); +} + +void RdmaIOHandler::idle(Rdma::AsynchIO&) { + // TODO: Shouldn't need this test as idle() should only ever be called when + // the connection is writable anyway + if ( !aio->writable() ) { + return; + } + if (codec == 0) return; + if (!codec->canEncode()) { + return; + } + Rdma::Buffer* buff = aio->getSendBuffer(); + if (buff) { + size_t encoded=codec->encode(buff->bytes(), buff->byteCount()); + buff->dataCount(encoded); + aio->queueWrite(buff); + if (codec->isClosed()) { + close(); + } + } +} + +void RdmaIOHandler::initProtocolOut() { + // We mustn't have already started the conversation + // but we must be able to send + assert( codec == 0 ); + assert( aio->writable() ); + codec = factory->create(*this, identifier, SecuritySettings()); + write(framing::ProtocolInitiation(codec->getVersion())); +} + +void RdmaIOHandler::error(Rdma::AsynchIO&) { + disconnected(); +} + +namespace { + void stopped(RdmaIOHandler* async) { + delete async; + } +} + +void RdmaIOHandler::disconnectAction() { + { + Mutex::ScopedLock l(pollingLock); + // If we're closed already then we'll get to drained() anyway + if (!polling) return; + polling = false; + } + aio->stop(boost::bind(&stopped, this)); +} + +void RdmaIOHandler::disconnected() { + aio->requestCallback(boost::bind(&RdmaIOHandler::disconnectAction, this)); +} + +void RdmaIOHandler::drained() { + // We know we've drained the write queue now, but we don't have to do anything + // because we can rely on the client to disconnect to trigger the connection + // cleanup. +} + +void RdmaIOHandler::full(Rdma::AsynchIO&) { + QPID_LOG(debug, "Rdma: buffer full [" << identifier << "]"); +} + +// TODO: Dummy implementation of read throttling +void RdmaIOHandler::giveReadCredit(int32_t) { +} + +// The logic here is subtly different from TCP as RDMA is message oriented +// so we define that an RDMA message is a frame - in this case there is no putting back +// of any message remainder - there shouldn't be any. And what we read here can't be +// smaller than a frame +void RdmaIOHandler::readbuff(Rdma::AsynchIO&, Rdma::Buffer* buff) { + if (readError) { + return; + } + size_t decoded = 0; + try { + if (codec) { + decoded = codec->decode(buff->bytes(), buff->dataCount()); + }else{ + // Need to start protocol processing + initProtocolIn(buff); + } + }catch(const std::exception& e){ + QPID_LOG(error, e.what()); + readError = true; + close(); + } +} + +void RdmaIOHandler::initProtocolIn(Rdma::Buffer* buff) { + framing::Buffer in(buff->bytes(), buff->dataCount()); + framing::ProtocolInitiation protocolInit; + size_t decoded = 0; + if (protocolInit.decode(in)) { + decoded = in.getPosition(); + QPID_LOG(debug, "Rdma: RECV [" << identifier << "] INIT(" << protocolInit << ")"); + + codec = factory->create(protocolInit.getVersion(), *this, identifier, SecuritySettings()); + + // If we failed to create the codec then we don't understand the offered protocol version + if (!codec) { + // send valid version header & close connection. + write(framing::ProtocolInitiation(framing::highestProtocolVersion)); + readError = true; + close(); + } + } +} + +class RdmaIOProtocolFactory : public ProtocolFactory { + auto_ptr<Rdma::Listener> listener; + const uint16_t listeningPort; + + public: + RdmaIOProtocolFactory(int16_t port, int backlog); + void accept(Poller::shared_ptr, ConnectionCodec::Factory*); + void connect(Poller::shared_ptr, const string& host, const std::string& port, ConnectionCodec::Factory*, ConnectFailedCallback); + + uint16_t getPort() const; + + private: + bool request(Rdma::Connection::intrusive_ptr, const Rdma::ConnectionParams&, ConnectionCodec::Factory*); + void established(Poller::shared_ptr, Rdma::Connection::intrusive_ptr); + void connected(Poller::shared_ptr, Rdma::Connection::intrusive_ptr, const Rdma::ConnectionParams&, ConnectionCodec::Factory*); + void connectionError(Rdma::Connection::intrusive_ptr, Rdma::ErrorType); + void disconnected(Rdma::Connection::intrusive_ptr); + void rejected(Rdma::Connection::intrusive_ptr, const Rdma::ConnectionParams&, ConnectFailedCallback); +}; + +// Static instance to initialise plugin +static class RdmaIOPlugin : public Plugin { + void earlyInitialize(Target&) { + } + + void initialize(Target& target) { + // Check whether we actually have any rdma devices + if ( Rdma::deviceCount() == 0 ) { + QPID_LOG(info, "Rdma: Disabled: no rdma devices found"); + return; + } + + broker::Broker* broker = dynamic_cast<broker::Broker*>(&target); + // Only provide to a Broker + if (broker) { + const broker::Broker::Options& opts = broker->getOptions(); + ProtocolFactory::shared_ptr protocol(new RdmaIOProtocolFactory(opts.port, opts.connectionBacklog)); + QPID_LOG(notice, "Rdma: Listening on RDMA port " << protocol->getPort()); + broker->registerProtocolFactory("rdma", protocol); + } + } +} rdmaPlugin; + +RdmaIOProtocolFactory::RdmaIOProtocolFactory(int16_t port, int /*backlog*/) : + listeningPort(port) +{} + +void RdmaIOProtocolFactory::established(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr ci) { + RdmaIOHandler* async = ci->getContext<RdmaIOHandler>(); + async->start(poller); +} + +bool RdmaIOProtocolFactory::request(Rdma::Connection::intrusive_ptr ci, const Rdma::ConnectionParams& cp, + ConnectionCodec::Factory* f) { + try { + if (cp.rdmaProtocolVersion == 0) { + QPID_LOG(warning, "Rdma: connection from protocol version 0 client"); + } + RdmaIOHandler* async = new RdmaIOHandler(ci, f); + Rdma::AsynchIO* aio = + new Rdma::AsynchIO(ci->getQueuePair(), + cp.rdmaProtocolVersion, + cp.maxRecvBufferSize, cp.initialXmitCredit, Rdma::DEFAULT_WR_ENTRIES, + boost::bind(&RdmaIOHandler::readbuff, async, _1, _2), + boost::bind(&RdmaIOHandler::idle, async, _1), + 0, // boost::bind(&RdmaIOHandler::full, async, _1), + boost::bind(&RdmaIOHandler::error, async, _1)); + async->init(aio); + + // Record aio so we can get it back from a connection + ci->addContext(async); + return true; + } catch (const Rdma::Exception& e) { + QPID_LOG(error, "Rdma: Cannot accept new connection (Rdma exception): " << e.what()); + } catch (const std::exception& e) { + QPID_LOG(error, "Rdma: Cannot accept new connection (unknown exception): " << e.what()); + } + + // If we get here we caught an exception so reject connection + return false; +} + +void RdmaIOProtocolFactory::connectionError(Rdma::Connection::intrusive_ptr, Rdma::ErrorType) { +} + +void RdmaIOProtocolFactory::disconnected(Rdma::Connection::intrusive_ptr ci) { + // If we've got a connection already tear it down, otherwise ignore + RdmaIOHandler* async = ci->getContext<RdmaIOHandler>(); + if (async) { + // Make sure we don't disconnect more than once + ci->removeContext(); + async->disconnected(); + } +} + +uint16_t RdmaIOProtocolFactory::getPort() const { + return listeningPort; // Immutable no need for lock. +} + +void RdmaIOProtocolFactory::accept(Poller::shared_ptr poller, ConnectionCodec::Factory* fact) { + ::sockaddr_in sin; + + sin.sin_family = AF_INET; + sin.sin_port = htons(listeningPort); + sin.sin_addr.s_addr = INADDR_ANY; + + listener.reset( + new Rdma::Listener( + Rdma::ConnectionParams(65536, Rdma::DEFAULT_WR_ENTRIES), + boost::bind(&RdmaIOProtocolFactory::established, this, poller, _1), + boost::bind(&RdmaIOProtocolFactory::connectionError, this, _1, _2), + boost::bind(&RdmaIOProtocolFactory::disconnected, this, _1), + boost::bind(&RdmaIOProtocolFactory::request, this, _1, _2, fact))); + + SocketAddress sa("",boost::lexical_cast<std::string>(listeningPort)); + listener->start(poller, sa); +} + +// Only used for outgoing connections (in federation) +void RdmaIOProtocolFactory::rejected(Rdma::Connection::intrusive_ptr, const Rdma::ConnectionParams&, ConnectFailedCallback failed) { + failed(-1, "Connection rejected"); +} + +// Do the same as connection request and established but mark a client too +void RdmaIOProtocolFactory::connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr ci, const Rdma::ConnectionParams& cp, + ConnectionCodec::Factory* f) { + (void) request(ci, cp, f); + established(poller, ci); + RdmaIOHandler* async = ci->getContext<RdmaIOHandler>(); + async->initProtocolOut(); +} + +void RdmaIOProtocolFactory::connect( + Poller::shared_ptr poller, + const std::string& host, const std::string& port, + ConnectionCodec::Factory* f, + ConnectFailedCallback failed) +{ + Rdma::Connector* c = + new Rdma::Connector( + Rdma::ConnectionParams(8000, Rdma::DEFAULT_WR_ENTRIES), + boost::bind(&RdmaIOProtocolFactory::connected, this, poller, _1, _2, f), + boost::bind(&RdmaIOProtocolFactory::connectionError, this, _1, _2), + boost::bind(&RdmaIOProtocolFactory::disconnected, this, _1), + boost::bind(&RdmaIOProtocolFactory::rejected, this, _1, _2, failed)); + + SocketAddress sa(host, port); + c->start(poller, sa); +} + +}} // namespace qpid::sys |