diff options
Diffstat (limited to 'cpp/src/qpid/sys/RdmaIOPlugin.cpp')
-rw-r--r-- | cpp/src/qpid/sys/RdmaIOPlugin.cpp | 406 |
1 files changed, 0 insertions, 406 deletions
diff --git a/cpp/src/qpid/sys/RdmaIOPlugin.cpp b/cpp/src/qpid/sys/RdmaIOPlugin.cpp deleted file mode 100644 index d53db20598..0000000000 --- a/cpp/src/qpid/sys/RdmaIOPlugin.cpp +++ /dev/null @@ -1,406 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/sys/ProtocolFactory.h" - -#include "qpid/Plugin.h" -#include "qpid/broker/Broker.h" -#include "qpid/framing/AMQP_HighestVersion.h" -#include "qpid/log/Statement.h" -#include "qpid/sys/rdma/RdmaIO.h" -#include "qpid/sys/rdma/rdma_exception.h" -#include "qpid/sys/OutputControl.h" -#include "qpid/sys/SecuritySettings.h" - -#include <boost/bind.hpp> -#include <boost/lexical_cast.hpp> -#include <memory> - -#include <netdb.h> - -using std::auto_ptr; -using std::string; -using std::stringstream; - -namespace qpid { -namespace sys { - -class RdmaIOHandler : public OutputControl { - std::string identifier; - ConnectionCodec::Factory* factory; - ConnectionCodec* codec; - bool readError; - - sys::Mutex pollingLock; - bool polling; - - Rdma::AsynchIO* aio; - Rdma::Connection::intrusive_ptr connection; - - void write(const framing::ProtocolInitiation&); - void disconnectAction(); - - public: - RdmaIOHandler(Rdma::Connection::intrusive_ptr c, ConnectionCodec::Factory* f); - ~RdmaIOHandler(); - void init(Rdma::AsynchIO* a); - void start(Poller::shared_ptr poller); - - // Output side - void close(); - void abort(); - void activateOutput(); - void giveReadCredit(int32_t credit); - void initProtocolOut(); - - // Input side - void readbuff(Rdma::AsynchIO& aio, Rdma::Buffer* buff); - void initProtocolIn(Rdma::Buffer* buff); - - // Notifications - void full(Rdma::AsynchIO& aio); - void idle(Rdma::AsynchIO& aio); - void error(Rdma::AsynchIO& aio); - void disconnected(); - void drained(); -}; - -RdmaIOHandler::RdmaIOHandler(Rdma::Connection::intrusive_ptr c, qpid::sys::ConnectionCodec::Factory* f) : - identifier(c->getFullName()), - factory(f), - codec(0), - readError(false), - polling(false), - connection(c) -{ -} - -RdmaIOHandler::~RdmaIOHandler() { - if (codec) - codec->closed(); - delete codec; - delete aio; -} - -void RdmaIOHandler::init(Rdma::AsynchIO* a) { - aio = a; -} - -void RdmaIOHandler::start(Poller::shared_ptr poller) { - Mutex::ScopedLock l(pollingLock); - assert(!polling); - - polling = true; - - aio->start(poller); -} - -void RdmaIOHandler::write(const framing::ProtocolInitiation& data) -{ - QPID_LOG(debug, "Rdma: SENT [" << identifier << "] INIT(" << data << ")"); - Rdma::Buffer* buff = aio->getSendBuffer(); - assert(buff); - framing::Buffer out(buff->bytes(), buff->byteCount()); - data.encode(out); - buff->dataCount(data.encodedSize()); - aio->queueWrite(buff); -} - -void RdmaIOHandler::close() { - aio->drainWriteQueue(boost::bind(&RdmaIOHandler::drained, this)); -} - -// TODO: Dummy implementation, need to fill this in for heartbeat timeout to work -void RdmaIOHandler::abort() { -} - -void RdmaIOHandler::activateOutput() { - aio->notifyPendingWrite(); -} - -void RdmaIOHandler::idle(Rdma::AsynchIO&) { - // TODO: Shouldn't need this test as idle() should only ever be called when - // the connection is writable anyway - if ( !aio->writable() ) { - return; - } - if (codec == 0) return; - if (!codec->canEncode()) { - return; - } - Rdma::Buffer* buff = aio->getSendBuffer(); - if (buff) { - size_t encoded=codec->encode(buff->bytes(), buff->byteCount()); - buff->dataCount(encoded); - aio->queueWrite(buff); - if (codec->isClosed()) { - close(); - } - } -} - -void RdmaIOHandler::initProtocolOut() { - // We mustn't have already started the conversation - // but we must be able to send - assert( codec == 0 ); - assert( aio->writable() ); - codec = factory->create(*this, identifier, SecuritySettings()); - write(framing::ProtocolInitiation(codec->getVersion())); -} - -void RdmaIOHandler::error(Rdma::AsynchIO&) { - disconnected(); -} - -namespace { - void stopped(RdmaIOHandler* async) { - delete async; - } -} - -void RdmaIOHandler::disconnectAction() { - { - Mutex::ScopedLock l(pollingLock); - // If we're closed already then we'll get to drained() anyway - if (!polling) return; - polling = false; - } - aio->stop(boost::bind(&stopped, this)); -} - -void RdmaIOHandler::disconnected() { - aio->requestCallback(boost::bind(&RdmaIOHandler::disconnectAction, this)); -} - -void RdmaIOHandler::drained() { - // We know we've drained the write queue now, but we don't have to do anything - // because we can rely on the client to disconnect to trigger the connection - // cleanup. -} - -void RdmaIOHandler::full(Rdma::AsynchIO&) { - QPID_LOG(debug, "Rdma: buffer full [" << identifier << "]"); -} - -// TODO: Dummy implementation of read throttling -void RdmaIOHandler::giveReadCredit(int32_t) { -} - -// The logic here is subtly different from TCP as RDMA is message oriented -// so we define that an RDMA message is a frame - in this case there is no putting back -// of any message remainder - there shouldn't be any. And what we read here can't be -// smaller than a frame -void RdmaIOHandler::readbuff(Rdma::AsynchIO&, Rdma::Buffer* buff) { - if (readError) { - return; - } - size_t decoded = 0; - try { - if (codec) { - decoded = codec->decode(buff->bytes(), buff->dataCount()); - }else{ - // Need to start protocol processing - initProtocolIn(buff); - } - }catch(const std::exception& e){ - QPID_LOG(error, e.what()); - readError = true; - close(); - } -} - -void RdmaIOHandler::initProtocolIn(Rdma::Buffer* buff) { - framing::Buffer in(buff->bytes(), buff->dataCount()); - framing::ProtocolInitiation protocolInit; - size_t decoded = 0; - if (protocolInit.decode(in)) { - decoded = in.getPosition(); - QPID_LOG(debug, "Rdma: RECV [" << identifier << "] INIT(" << protocolInit << ")"); - - codec = factory->create(protocolInit.getVersion(), *this, identifier, SecuritySettings()); - - // If we failed to create the codec then we don't understand the offered protocol version - if (!codec) { - // send valid version header & close connection. - write(framing::ProtocolInitiation(framing::highestProtocolVersion)); - readError = true; - close(); - } - } -} - -class RdmaIOProtocolFactory : public ProtocolFactory { - auto_ptr<Rdma::Listener> listener; - const uint16_t listeningPort; - - public: - RdmaIOProtocolFactory(int16_t port, int backlog); - void accept(Poller::shared_ptr, ConnectionCodec::Factory*); - void connect(Poller::shared_ptr, const string& host, int16_t port, ConnectionCodec::Factory*, ConnectFailedCallback); - - uint16_t getPort() const; - string getHost() const; - - private: - bool request(Rdma::Connection::intrusive_ptr, const Rdma::ConnectionParams&, ConnectionCodec::Factory*); - void established(Poller::shared_ptr, Rdma::Connection::intrusive_ptr); - void connected(Poller::shared_ptr, Rdma::Connection::intrusive_ptr, const Rdma::ConnectionParams&, ConnectionCodec::Factory*); - void connectionError(Rdma::Connection::intrusive_ptr, Rdma::ErrorType); - void disconnected(Rdma::Connection::intrusive_ptr); - void rejected(Rdma::Connection::intrusive_ptr, const Rdma::ConnectionParams&, ConnectFailedCallback); -}; - -// Static instance to initialise plugin -static class RdmaIOPlugin : public Plugin { - void earlyInitialize(Target&) { - } - - void initialize(Target& target) { - // Check whether we actually have any rdma devices - if ( Rdma::deviceCount() == 0 ) { - QPID_LOG(info, "Rdma: Disabled: no rdma devices found"); - return; - } - - broker::Broker* broker = dynamic_cast<broker::Broker*>(&target); - // Only provide to a Broker - if (broker) { - const broker::Broker::Options& opts = broker->getOptions(); - ProtocolFactory::shared_ptr protocol(new RdmaIOProtocolFactory(opts.port, opts.connectionBacklog)); - QPID_LOG(notice, "Rdma: Listening on RDMA port " << protocol->getPort()); - broker->registerProtocolFactory("rdma", protocol); - } - } -} rdmaPlugin; - -RdmaIOProtocolFactory::RdmaIOProtocolFactory(int16_t port, int /*backlog*/) : - listeningPort(port) -{} - -void RdmaIOProtocolFactory::established(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr ci) { - RdmaIOHandler* async = ci->getContext<RdmaIOHandler>(); - async->start(poller); -} - -bool RdmaIOProtocolFactory::request(Rdma::Connection::intrusive_ptr ci, const Rdma::ConnectionParams& cp, - ConnectionCodec::Factory* f) { - try { - if (cp.rdmaProtocolVersion == 0) { - QPID_LOG(warning, "Rdma: connection from protocol version 0 client"); - } - RdmaIOHandler* async = new RdmaIOHandler(ci, f); - Rdma::AsynchIO* aio = - new Rdma::AsynchIO(ci->getQueuePair(), - cp.rdmaProtocolVersion, - cp.maxRecvBufferSize, cp.initialXmitCredit, Rdma::DEFAULT_WR_ENTRIES, - boost::bind(&RdmaIOHandler::readbuff, async, _1, _2), - boost::bind(&RdmaIOHandler::idle, async, _1), - 0, // boost::bind(&RdmaIOHandler::full, async, _1), - boost::bind(&RdmaIOHandler::error, async, _1)); - async->init(aio); - - // Record aio so we can get it back from a connection - ci->addContext(async); - return true; - } catch (const Rdma::Exception& e) { - QPID_LOG(error, "Rdma: Cannot accept new connection (Rdma exception): " << e.what()); - } catch (const std::exception& e) { - QPID_LOG(error, "Rdma: Cannot accept new connection (unknown exception): " << e.what()); - } - - // If we get here we caught an exception so reject connection - return false; -} - -void RdmaIOProtocolFactory::connectionError(Rdma::Connection::intrusive_ptr, Rdma::ErrorType) { -} - -void RdmaIOProtocolFactory::disconnected(Rdma::Connection::intrusive_ptr ci) { - // If we've got a connection already tear it down, otherwise ignore - RdmaIOHandler* async = ci->getContext<RdmaIOHandler>(); - if (async) { - // Make sure we don't disconnect more than once - ci->removeContext(); - async->disconnected(); - } -} - -uint16_t RdmaIOProtocolFactory::getPort() const { - return listeningPort; // Immutable no need for lock. -} - -string RdmaIOProtocolFactory::getHost() const { - //return listener.getSockname(); - return ""; -} - -void RdmaIOProtocolFactory::accept(Poller::shared_ptr poller, ConnectionCodec::Factory* fact) { - ::sockaddr_in sin; - - sin.sin_family = AF_INET; - sin.sin_port = htons(listeningPort); - sin.sin_addr.s_addr = INADDR_ANY; - - listener.reset( - new Rdma::Listener( - Rdma::ConnectionParams(65536, Rdma::DEFAULT_WR_ENTRIES), - boost::bind(&RdmaIOProtocolFactory::established, this, poller, _1), - boost::bind(&RdmaIOProtocolFactory::connectionError, this, _1, _2), - boost::bind(&RdmaIOProtocolFactory::disconnected, this, _1), - boost::bind(&RdmaIOProtocolFactory::request, this, _1, _2, fact))); - - SocketAddress sa("",boost::lexical_cast<std::string>(listeningPort)); - listener->start(poller, sa); -} - -// Only used for outgoing connections (in federation) -void RdmaIOProtocolFactory::rejected(Rdma::Connection::intrusive_ptr, const Rdma::ConnectionParams&, ConnectFailedCallback failed) { - failed(-1, "Connection rejected"); -} - -// Do the same as connection request and established but mark a client too -void RdmaIOProtocolFactory::connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr ci, const Rdma::ConnectionParams& cp, - ConnectionCodec::Factory* f) { - (void) request(ci, cp, f); - established(poller, ci); - RdmaIOHandler* async = ci->getContext<RdmaIOHandler>(); - async->initProtocolOut(); -} - -void RdmaIOProtocolFactory::connect( - Poller::shared_ptr poller, - const std::string& host, int16_t port, - ConnectionCodec::Factory* f, - ConnectFailedCallback failed) -{ - Rdma::Connector* c = - new Rdma::Connector( - Rdma::ConnectionParams(8000, Rdma::DEFAULT_WR_ENTRIES), - boost::bind(&RdmaIOProtocolFactory::connected, this, poller, _1, _2, f), - boost::bind(&RdmaIOProtocolFactory::connectionError, this, _1, _2), - boost::bind(&RdmaIOProtocolFactory::disconnected, this, _1), - boost::bind(&RdmaIOProtocolFactory::rejected, this, _1, _2, failed)); - - SocketAddress sa(host, boost::lexical_cast<std::string>(port)); - c->start(poller, sa); -} - -}} // namespace qpid::sys |