diff options
author | Andrew Stitcher <astitcher@apache.org> | 2008-09-11 06:16:19 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2008-09-11 06:16:19 +0000 |
commit | 2e05b7082f5e387fc686925e5ac006485e4686db (patch) | |
tree | b0a43e45da7cc24b65407ce6f7254e21b3fcde78 /cpp/src | |
parent | 468b4b6ddaa3d96bb743cdbd27ded651eea31847 (diff) | |
download | qpid-python-2e05b7082f5e387fc686925e5ac006485e4686db.tar.gz |
Implementation of AMQP over RDMA protocols (Infiniband)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@694143 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Makefile.am | 25 | ||||
-rw-r--r-- | cpp/src/acl.mk | 2 | ||||
-rw-r--r-- | cpp/src/qpid/client/RdmaConnector.cpp | 427 | ||||
-rw-r--r-- | cpp/src/qpid/client/SessionImpl.cpp | 5 | ||||
-rw-r--r-- | cpp/src/qpid/sys/RdmaIOPlugin.cpp | 327 | ||||
-rw-r--r-- | cpp/src/qpid/sys/rdma/RdmaClient.cpp | 60 | ||||
-rw-r--r-- | cpp/src/qpid/sys/rdma/RdmaIO.cpp | 449 | ||||
-rw-r--r-- | cpp/src/qpid/sys/rdma/RdmaIO.h | 134 | ||||
-rw-r--r-- | cpp/src/qpid/sys/rdma/RdmaServer.cpp | 45 | ||||
-rw-r--r-- | cpp/src/qpid/sys/rdma/rdma_factories.cpp | 5 | ||||
-rw-r--r-- | cpp/src/qpid/sys/rdma/rdma_factories.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/sys/rdma/rdma_wrap.cpp | 155 | ||||
-rw-r--r-- | cpp/src/qpid/sys/rdma/rdma_wrap.h | 174 |
13 files changed, 1521 insertions, 288 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index a5e4527c30..7c02516575 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -127,6 +127,7 @@ librdmawrap_la_SOURCES = \ qpid/sys/rdma/rdma_factories.cpp \ qpid/sys/rdma/RdmaIO.cpp \ qpid/sys/rdma/RdmaIO.h \ + qpid/sys/rdma/rdma_wrap.cpp \ qpid/sys/rdma/rdma_wrap.h librdmawrap_la_LIBADD = \ libqpidcommon.la \ @@ -139,11 +140,31 @@ lib_LTLIBRARIES += \ librdmawrap_la_LDFLAGS = \ -no-undefined +rdma_la_SOURCES = \ + qpid/sys/RdmaIOPlugin.cpp +rdma_la_LIBADD = \ + libqpidbroker.la \ + librdmawrap.la +rdma_la_LDFLAGS = $(PLUGINLDFLAGS) +rdma_la_CXXFLAGS = \ + $(AM_CXXFLAGS) -Wno-missing-field-initializers +dmodule_LTLIBRARIES += \ + rdma.la + +rdmaconnector_la_SOURCES = \ + qpid/client/RdmaConnector.cpp +rdmaconnector_la_LIBADD = \ + libqpidclient.la \ + librdmawrap.la +rdmaconnector_la_LDFLAGS = $(PLUGINLDFLAGS) +rdmaconnector_la_CXXFLAGS = \ + $(AM_CXXFLAGS) -Wno-missing-field-initializers +cmodule_LTLIBRARIES += \ + rdmaconnector.la + # RDMA test/sample programs noinst_PROGRAMS += RdmaServer RdmaClient RdmaServer_SOURCES = qpid/sys/rdma/RdmaServer.cpp -RdmaServer_CXXFLAGS = \ - $(AM_CXXFLAGS) -Wno-missing-field-initializers RdmaServer_LDADD = \ librdmawrap.la libqpidcommon.la RdmaClient_SOURCES = qpid/sys/rdma/RdmaClient.cpp diff --git a/cpp/src/acl.mk b/cpp/src/acl.mk index d31a883c24..8c1201a75a 100644 --- a/cpp/src/acl.mk +++ b/cpp/src/acl.mk @@ -12,5 +12,5 @@ acl_la_SOURCES = \ qpid/acl/AclReader.cpp \ qpid/acl/AclReader.h -acl_la_LIBADD= libqpidbroker.la +acl_la_LIBADD = libqpidbroker.la acl_la_LDFLAGS = $(PLUGINLDFLAGS) diff --git a/cpp/src/qpid/client/RdmaConnector.cpp b/cpp/src/qpid/client/RdmaConnector.cpp new file mode 100644 index 0000000000..c0775ab9cd --- /dev/null +++ b/cpp/src/qpid/client/RdmaConnector.cpp @@ -0,0 +1,427 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Connector.h" + +#include "Bounds.h" +#include "ConnectionImpl.h" +#include "ConnectionSettings.h" +#include "qpid/log/Statement.h" +#include "qpid/sys/Time.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/sys/rdma/RdmaIO.h" +#include "qpid/sys/Dispatcher.h" +#include "qpid/sys/Poller.h" +#include "qpid/Msg.h" + +#include <iostream> +#include <boost/bind.hpp> +#include <boost/format.hpp> +#include <boost/lexical_cast.hpp> + +// This stuff needs to abstracted out of here to a platform specific file +#include <netdb.h> + +namespace qpid { +namespace client { + +using namespace qpid::sys; +using namespace qpid::framing; +using boost::format; +using boost::str; + +class RdmaConnector : public Connector, private sys::Runnable +{ + struct Buff; + + /** Batch up frames for writing to aio. */ + class Writer : public framing::FrameHandler { + typedef Rdma::Buffer BufferBase; + typedef std::deque<framing::AMQFrame> Frames; + + const uint16_t maxFrameSize; + sys::Mutex lock; + Rdma::AsynchIO* aio; + BufferBase* buffer; + Frames frames; + size_t lastEof; // Position after last EOF in frames + framing::Buffer encode; + size_t framesEncoded; + std::string identifier; + Bounds* bounds; + + void writeOne(); + void newBuffer(); + + public: + + Writer(uint16_t maxFrameSize, Bounds*); + ~Writer(); + void init(std::string id, Rdma::AsynchIO*); + void handle(framing::AMQFrame&); + void write(Rdma::AsynchIO&); + }; + + const uint16_t maxFrameSize; + framing::ProtocolVersion version; + bool initiated; + + sys::Mutex pollingLock; + bool polling; + bool joined; + + sys::ShutdownHandler* shutdownHandler; + framing::InputHandler* input; + framing::InitiationHandler* initialiser; + framing::OutputHandler* output; + + Writer writer; + + sys::Thread receiver; + + Rdma::AsynchIO* aio; + sys::Poller::shared_ptr poller; + + ~RdmaConnector(); + + void run(); + void handleClosed(); + bool closeInternal(); + + // Callbacks + void connected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams&); + void connectionError(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&, Rdma::ErrorType); + void disconnected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&); + void rejected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams&); + + void readbuff(Rdma::AsynchIO&, Rdma::Buffer*); + void writebuff(Rdma::AsynchIO&); + void writeDataBlock(const framing::AMQDataBlock& data); + void eof(Rdma::AsynchIO&); + + std::string identifier; + + ConnectionImpl* impl; + + void connect(const std::string& host, int port); + void close(); + void send(framing::AMQFrame& frame); + + void setInputHandler(framing::InputHandler* handler); + void setShutdownHandler(sys::ShutdownHandler* handler); + sys::ShutdownHandler* getShutdownHandler() const; + framing::OutputHandler* getOutputHandler(); + const std::string& getIdentifier() const; + +public: + RdmaConnector(framing::ProtocolVersion pVersion, + const ConnectionSettings&, + ConnectionImpl*); +}; + +// Static constructor which registers connector here +namespace { + Connector* create(framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) { + return new RdmaConnector(v, s, c); + } + + struct StaticInit { + StaticInit() { + Connector::registerFactory("rdma", &create); + Connector::registerFactory("ib", &create); + }; + } init; +} + + +RdmaConnector::RdmaConnector(ProtocolVersion ver, + const ConnectionSettings& settings, + ConnectionImpl* cimpl) + : maxFrameSize(settings.maxFrameSize), + version(ver), + initiated(false), + polling(false), + joined(true), + shutdownHandler(0), + writer(maxFrameSize, cimpl), + aio(0), + impl(cimpl) +{ + QPID_LOG(debug, "RdmaConnector created for " << version); +} + +RdmaConnector::~RdmaConnector() { + close(); +} + +void RdmaConnector::connect(const std::string& host, int port){ + Mutex::ScopedLock l(pollingLock); + assert(!polling); + assert(joined); + poller = Poller::shared_ptr(new Poller); + + // This stuff needs to abstracted out of here to a platform specific file + ::addrinfo *res; + ::addrinfo hints; + hints.ai_flags = 0; + hints.ai_family = AF_INET; + hints.ai_socktype = SOCK_STREAM; + hints.ai_protocol = 0; + int n = ::getaddrinfo(host.c_str(), boost::lexical_cast<std::string>(port).c_str(), &hints, &res); + if (n<0) { + throw Exception(QPID_MSG("Cannot resolve " << host << ": " << ::gai_strerror(n))); + } + + Rdma::Connector* c = new Rdma::Connector( + *res->ai_addr, + Rdma::ConnectionParams(maxFrameSize, Rdma::DEFAULT_WR_ENTRIES), + boost::bind(&RdmaConnector::connected, this, poller, _1, _2), + boost::bind(&RdmaConnector::connectionError, this, poller, _1, _2), + boost::bind(&RdmaConnector::disconnected, this, poller, _1), + boost::bind(&RdmaConnector::rejected, this, poller, _1, _2)); + c->start(poller); + + polling = true; + joined = false; + receiver = Thread(this); +} + +// The following only gets run when connected +void RdmaConnector::connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr& ci, const Rdma::ConnectionParams& cp) { + Rdma::QueuePair::intrusive_ptr q = ci->getQueuePair(); + + aio = new Rdma::AsynchIO(ci->getQueuePair(), + cp.maxRecvBufferSize, cp.initialXmitCredit , Rdma::DEFAULT_WR_ENTRIES, + boost::bind(&RdmaConnector::readbuff, this, _1, _2), + boost::bind(&RdmaConnector::writebuff, this, _1), + 0, // write buffers full + boost::bind(&RdmaConnector::eof, this, _1)); // data error - just close connection + aio->start(poller); + + identifier = str(format("[%1% %2%]") % ci->getLocalName() % ci->getPeerName()); + writer.init(identifier, aio); + ProtocolInitiation init(version); + writeDataBlock(init); +} + +void RdmaConnector::connectionError(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&, Rdma::ErrorType) { + QPID_LOG(trace, "Connection Error " << identifier); + eof(*aio); +} + +void RdmaConnector::disconnected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&) { + eof(*aio); +} + +void RdmaConnector::rejected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams& cp) { + QPID_LOG(trace, "Connection Rejected " << identifier << ": " << cp.maxRecvBufferSize); + eof(*aio); +} + +bool RdmaConnector::closeInternal() { + bool ret; + { + Mutex::ScopedLock l(pollingLock); + ret = polling; + if (polling) { + polling = false; + poller->shutdown(); + } + if (joined || receiver.id() == Thread::current().id()) { + return ret; + } + joined = true; + } + + receiver.join(); + return ret; +} + +void RdmaConnector::close() { + closeInternal(); +} + +void RdmaConnector::setInputHandler(InputHandler* handler){ + input = handler; +} + +void RdmaConnector::setShutdownHandler(ShutdownHandler* handler){ + shutdownHandler = handler; +} + +OutputHandler* RdmaConnector::getOutputHandler(){ + return this; +} + +sys::ShutdownHandler* RdmaConnector::getShutdownHandler() const { + return shutdownHandler; +} + +const std::string& RdmaConnector::getIdentifier() const { + return identifier; +} + +void RdmaConnector::send(AMQFrame& frame) { + writer.handle(frame); +} + +void RdmaConnector::handleClosed() { + if (closeInternal() && shutdownHandler) + shutdownHandler->shutdown(); +} + +RdmaConnector::Writer::Writer(uint16_t s, Bounds* b) : + maxFrameSize(s), + aio(0), + buffer(0), + lastEof(0), + bounds(b) +{ +} + +RdmaConnector::Writer::~Writer() { + if (aio) + aio->returnBuffer(buffer); +} + +void RdmaConnector::Writer::init(std::string id, Rdma::AsynchIO* a) { + Mutex::ScopedLock l(lock); + identifier = id; + aio = a; + newBuffer(); +} +void RdmaConnector::Writer::handle(framing::AMQFrame& frame) { + Mutex::ScopedLock l(lock); + frames.push_back(frame); + // Don't bother to send anything unless we're at the end of a frameset (assembly in 0-10 terminology) + if (frame.getEof()) { + lastEof = frames.size(); + QPID_LOG(debug, "Requesting write: lastEof=" << lastEof); + aio->notifyPendingWrite(); + } + QPID_LOG(trace, "SENT " << identifier << ": " << frame); +} + +void RdmaConnector::Writer::writeOne() { + assert(buffer); + QPID_LOG(trace, "Write buffer " << encode.getPosition() + << " bytes " << framesEncoded << " frames "); + framesEncoded = 0; + + buffer->dataStart = 0; + buffer->dataCount = encode.getPosition(); + aio->queueWrite(buffer); + newBuffer(); +} + +void RdmaConnector::Writer::newBuffer() { + buffer = aio->getBuffer(); + encode = framing::Buffer(buffer->bytes, buffer->byteCount); + framesEncoded = 0; +} + +// Called in IO thread. (write idle routine) +// This is NOT only called in response to previously calling notifyPendingWrite +void RdmaConnector::Writer::write(Rdma::AsynchIO&) { + Mutex::ScopedLock l(lock); + assert(buffer); + // If nothing to do return immediately + if (lastEof==0) + return; + size_t bytesWritten = 0; + while (aio->writable() && !frames.empty()) { + const AMQFrame* frame = &frames.front(); + uint32_t size = frame->size(); + while (size <= encode.available()) { + frame->encode(encode); + frames.pop_front(); + ++framesEncoded; + bytesWritten += size; + if (frames.empty()) + break; + frame = &frames.front(); + size = frame->size(); + } + lastEof -= framesEncoded; + writeOne(); + } + if (bounds) bounds->reduce(bytesWritten); +} + +void RdmaConnector::readbuff(Rdma::AsynchIO&, Rdma::Buffer* buff) { + framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount); + + if (!initiated) { + framing::ProtocolInitiation protocolInit; + if (protocolInit.decode(in)) { + //TODO: check the version is correct + QPID_LOG(debug, "RECV " << identifier << " INIT(" << protocolInit << ")"); + } + initiated = true; + } + AMQFrame frame; + while(frame.decode(in)){ + QPID_LOG(trace, "RECV " << identifier << ": " << frame); + input->received(frame); + } +} + +void RdmaConnector::writebuff(Rdma::AsynchIO& aio_) { + writer.write(aio_); +} + +void RdmaConnector::writeDataBlock(const AMQDataBlock& data) { + Rdma::Buffer* buff = aio->getBuffer(); + framing::Buffer out(buff->bytes, buff->byteCount); + data.encode(out); + buff->dataCount = data.size(); + aio->queueWrite(buff); +} + +void RdmaConnector::eof(Rdma::AsynchIO&) { + handleClosed(); +} + +// TODO: astitcher 20070908 This version of the code can never time out, so the idle processing +// will never be called +void RdmaConnector::run(){ + // Keep the connection impl in memory until run() completes. + //GRS: currently the ConnectionImpls destructor is where the Io thread is joined + //boost::shared_ptr<ConnectionImpl> protect = impl->shared_from_this(); + //assert(protect); + try { + Dispatcher d(poller); + + //aio->start(poller); + d.run(); + //aio->queueForDeletion(); + } catch (const std::exception& e) { + { + // We're no longer polling + Mutex::ScopedLock l(pollingLock); + polling = false; + } + QPID_LOG(error, e.what()); + handleClosed(); + } +} + + +}} // namespace qpid::client diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp index 68955050b4..b736d116e1 100644 --- a/cpp/src/qpid/client/SessionImpl.cpp +++ b/cpp/src/qpid/client/SessionImpl.cpp @@ -137,8 +137,8 @@ void SessionImpl::suspend() //user thread void SessionImpl::detach() //call with lock held { if (state == ATTACHED) { - proxy.detach(id.getName()); setState(DETACHING); + proxy.detach(id.getName()); } } @@ -630,7 +630,8 @@ inline void SessionImpl::setState(State s) //call with lock held inline void SessionImpl::waitFor(State s) //call with lock held { // We can be DETACHED at any time - state.waitFor(States(s, DETACHED)); + if (s == DETACHED) state.waitFor(DETACHED); + else state.waitFor(States(s, DETACHED)); check(); } diff --git a/cpp/src/qpid/sys/RdmaIOPlugin.cpp b/cpp/src/qpid/sys/RdmaIOPlugin.cpp new file mode 100644 index 0000000000..80475e662d --- /dev/null +++ b/cpp/src/qpid/sys/RdmaIOPlugin.cpp @@ -0,0 +1,327 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ProtocolFactory.h" + +#include "qpid/Plugin.h" +#include "qpid/broker/Broker.h" +#include "qpid/framing/AMQP_HighestVersion.h" +#include "qpid/log/Statement.h" +#include "qpid/sys/rdma/RdmaIO.h" +#include "qpid/sys/OutputControl.h" + +#include <boost/bind.hpp> +#include <memory> + +#include <netdb.h> + +using std::auto_ptr; +using std::string; +using std::stringstream; + +namespace qpid { +namespace sys { + +class RdmaIOHandler : public OutputControl { + Rdma::Connection::intrusive_ptr connection; + std::string identifier; + Rdma::AsynchIO* aio; + ConnectionCodec::Factory* factory; + ConnectionCodec* codec; + bool readError; + bool isClient; + + void write(const framing::ProtocolInitiation&); + + public: + RdmaIOHandler(Rdma::Connection::intrusive_ptr& c, ConnectionCodec::Factory* f); + ~RdmaIOHandler(); + void init(Rdma::AsynchIO* a); + void start(Poller::shared_ptr poller) {aio->start(poller);} + + void setClient() { isClient = true; } + + // Output side + void close(); + void activateOutput(); + + // Input side + void readbuff(Rdma::AsynchIO& aio, Rdma::Buffer* buff); + + // Notifications + void full(Rdma::AsynchIO& aio); + void idle(Rdma::AsynchIO& aio); + void error(Rdma::AsynchIO& aio); +}; + +RdmaIOHandler::RdmaIOHandler(Rdma::Connection::intrusive_ptr& c, qpid::sys::ConnectionCodec::Factory* f) : + connection(c), + identifier(c->getPeerName()), + factory(f), + codec(0), + readError(false), + isClient(false) +{ +} + +void RdmaIOHandler::init(Rdma::AsynchIO* a) { + aio = a; +} + +RdmaIOHandler::~RdmaIOHandler() { + if (codec) + codec->closed(); + delete codec; + + aio->deferDelete(); +} + +void RdmaIOHandler::write(const framing::ProtocolInitiation& data) +{ + QPID_LOG(debug, "SENT [" << identifier << "] INIT(" << data << ")"); + Rdma::Buffer* buff = aio->getBuffer(); + framing::Buffer out(buff->bytes, buff->byteCount); + data.encode(out); + buff->dataCount = data.size(); + aio->queueWrite(buff); +} + +void RdmaIOHandler::close() { + aio->queueWriteClose(); +} + +void RdmaIOHandler::activateOutput() { + aio->notifyPendingWrite(); +} + +void RdmaIOHandler::idle(Rdma::AsynchIO&) { + if (!aio->writable()) { + return; + } + if (isClient && codec == 0) { + codec = factory->create(*this, identifier); + write(framing::ProtocolInitiation(codec->getVersion())); + return; + } + if (codec == 0) return; + if (codec->canEncode()) { + // Try and get a queued buffer if not then construct new one + Rdma::Buffer* buff = aio->getBuffer(); + size_t encoded=codec->encode(buff->bytes, buff->byteCount); + buff->dataCount = encoded; + aio->queueWrite(buff); + } + if (codec->isClosed()) + aio->queueWriteClose(); +} + +void RdmaIOHandler::error(Rdma::AsynchIO&) { + close(); +} + +void RdmaIOHandler::full(Rdma::AsynchIO&) { + QPID_LOG(debug, "buffer full [" << identifier << "]"); +} + +// The logic here is subtly different from TCP as RDMA is message oriented +// so we define that an RDMA message is a frame - in this case there is no putting back +// of any message remainder - there shouldn't be any. And what we read here can't be +// smaller than a frame +void RdmaIOHandler::readbuff(Rdma::AsynchIO&, Rdma::Buffer* buff) { + if (readError) { + return; + } + size_t decoded = 0; + if (codec) { // Already initiated + try { + decoded = codec->decode(buff->bytes+buff->dataStart, buff->dataCount); + }catch(const std::exception& e){ + QPID_LOG(error, e.what()); + readError = true; + aio->queueWriteClose(); + } + }else{ + framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount); + framing::ProtocolInitiation protocolInit; + if (protocolInit.decode(in)) { + decoded = in.getPosition(); + QPID_LOG(debug, "RECV [" << identifier << "] INIT(" << protocolInit << ")"); + try { + codec = factory->create(protocolInit.getVersion(), *this, identifier); + if (!codec) { + //TODO: may still want to revise this... + //send valid version header & close connection. + write(framing::ProtocolInitiation(framing::highestProtocolVersion)); + readError = true; + aio->queueWriteClose(); + } + } catch (const std::exception& e) { + QPID_LOG(error, e.what()); + readError = true; + aio->queueWriteClose(); + } + } + } +} + +class RdmaIOProtocolFactory : public ProtocolFactory { + auto_ptr<Rdma::Listener> listener; + const uint16_t listeningPort; + + public: + RdmaIOProtocolFactory(int16_t port, int backlog); + void accept(Poller::shared_ptr, ConnectionCodec::Factory*); + void connect(Poller::shared_ptr, const string& host, int16_t port, ConnectionCodec::Factory*, ConnectFailedCallback); + + uint16_t getPort() const; + string getHost() const; + + private: + bool request(Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams&, ConnectionCodec::Factory*); + void established(Poller::shared_ptr, Rdma::Connection::intrusive_ptr&); + void connected(Poller::shared_ptr, Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams&, ConnectionCodec::Factory*); + void connectionError(Rdma::Connection::intrusive_ptr&, Rdma::ErrorType); + void disconnected(Rdma::Connection::intrusive_ptr&); + void rejected(Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams&); +}; + +// Static instance to initialise plugin +static class RdmaIOPlugin : public Plugin { + void earlyInitialize(Target&) { + } + + void initialize(Target& target) { + broker::Broker* broker = dynamic_cast<broker::Broker*>(&target); + // Only provide to a Broker + if (broker) { + const broker::Broker::Options& opts = broker->getOptions(); + ProtocolFactory::shared_ptr protocol(new RdmaIOProtocolFactory(opts.port, opts.connectionBacklog)); + QPID_LOG(info, "Listening on RDMA port " << protocol->getPort()); + broker->registerProtocolFactory(protocol); + } + } +} rdmaPlugin; + +RdmaIOProtocolFactory::RdmaIOProtocolFactory(int16_t port, int /*backlog*/) : + listeningPort(port) +{} + +void RdmaIOProtocolFactory::established(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr& ci) { + RdmaIOHandler* async = ci->getContext<RdmaIOHandler>(); + async->start(poller); +} + +bool RdmaIOProtocolFactory::request(Rdma::Connection::intrusive_ptr& ci, const Rdma::ConnectionParams& cp, + ConnectionCodec::Factory* f) { + RdmaIOHandler* async = new RdmaIOHandler(ci, f); + Rdma::AsynchIO* aio = + new Rdma::AsynchIO(ci->getQueuePair(), + cp.maxRecvBufferSize, cp.initialXmitCredit, Rdma::DEFAULT_WR_ENTRIES, + boost::bind(&RdmaIOHandler::readbuff, async, _1, _2), + boost::bind(&RdmaIOHandler::idle, async, _1), + 0, // boost::bind(&RdmaIOHandler::full, async, _1), + boost::bind(&RdmaIOHandler::error, async, _1)); + async->init(aio); + + // Record aio so we can get it back from a connection + ci->addContext(async); + return true; +} + +void RdmaIOProtocolFactory::connectionError(Rdma::Connection::intrusive_ptr&, Rdma::ErrorType) { +} + +void RdmaIOProtocolFactory::disconnected(Rdma::Connection::intrusive_ptr& ci) { + // If we've got a connection already tear it down, otherwise ignore + RdmaIOHandler* async = ci->getContext<RdmaIOHandler>(); + if (async) { + async->close(); + } + delete async; +} + +uint16_t RdmaIOProtocolFactory::getPort() const { + return listeningPort; // Immutable no need for lock. +} + +string RdmaIOProtocolFactory::getHost() const { + //return listener.getSockname(); + return ""; +} + +void RdmaIOProtocolFactory::accept(Poller::shared_ptr poller, ConnectionCodec::Factory* fact) { + ::sockaddr_in sin; + + sin.sin_family = AF_INET; + sin.sin_port = htons(listeningPort); + sin.sin_addr.s_addr = INADDR_ANY; + + listener.reset( + new Rdma::Listener((const sockaddr&)(sin), + Rdma::ConnectionParams(65536, Rdma::DEFAULT_WR_ENTRIES), + boost::bind(&RdmaIOProtocolFactory::established, this, poller, _1), + boost::bind(&RdmaIOProtocolFactory::connectionError, this, _1, _2), + boost::bind(&RdmaIOProtocolFactory::disconnected, this, _1), + boost::bind(&RdmaIOProtocolFactory::request, this, _1, _2, fact))); + + listener->start(poller); +} + +// Only used for outgoing connections (in federation) +void RdmaIOProtocolFactory::rejected(Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams&) { +} + +// Do the same as connection request and established but mark a client too +void RdmaIOProtocolFactory::connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr& ci, const Rdma::ConnectionParams& cp, + ConnectionCodec::Factory* f) { + (void) request(ci, cp, f); + RdmaIOHandler* async = ci->getContext<RdmaIOHandler>(); + async->setClient(); + established(poller, ci); +} + +void RdmaIOProtocolFactory::connect( + Poller::shared_ptr poller, + const std::string& host, int16_t p, + ConnectionCodec::Factory* f, + ConnectFailedCallback) +{ + ::addrinfo *res; + ::addrinfo hints = {}; + hints.ai_family = AF_INET; + hints.ai_socktype = SOCK_STREAM; + stringstream ss; ss << p; + string port = ss.str(); + int n = ::getaddrinfo(host.c_str(), port.c_str(), &hints, &res); + if (n<0) { + throw Exception(QPID_MSG("Cannot resolve " << host << ": " << ::gai_strerror(n))); + } + + Rdma::Connector c( + *res->ai_addr, + Rdma::ConnectionParams(8000, Rdma::DEFAULT_WR_ENTRIES), + boost::bind(&RdmaIOProtocolFactory::connected, this, poller, _1, _2, f), + boost::bind(&RdmaIOProtocolFactory::connectionError, this, _1, _2), + boost::bind(&RdmaIOProtocolFactory::disconnected, this, _1), + boost::bind(&RdmaIOProtocolFactory::rejected, this, _1, _2)); +} + +}} // namespace qpid::sys diff --git a/cpp/src/qpid/sys/rdma/RdmaClient.cpp b/cpp/src/qpid/sys/rdma/RdmaClient.cpp index 1a24cb9c80..0d3dd83131 100644 --- a/cpp/src/qpid/sys/rdma/RdmaClient.cpp +++ b/cpp/src/qpid/sys/rdma/RdmaClient.cpp @@ -50,8 +50,6 @@ int64_t smsgs = 0; int64_t sbytes = 0; int64_t rmsgs = 0; int64_t rbytes = 0; -int64_t cmsgs = 0; -int writable = true; int target = 1000000; int msgsize = 200; @@ -62,17 +60,15 @@ Duration fullTestDuration(TIME_INFINITE); vector<char> testString; void write(Rdma::AsynchIO& aio) { - if ((cmsgs - rmsgs) < Rdma::DEFAULT_WR_ENTRIES/2) { - while (writable) { - if (smsgs >= target) - return; - Rdma::Buffer* b = aio.getBuffer(); - std::copy(testString.begin(), testString.end(), b->bytes); - b->dataCount = msgsize; - aio.queueWrite(b); - ++smsgs; - sbytes += b->byteCount; - } + while (aio.writable()) { + if (smsgs >= target) + return; + Rdma::Buffer* b = aio.getBuffer(); + std::copy(testString.begin(), testString.end(), b->bytes); + b->dataCount = msgsize; + aio.queueWrite(b); + ++smsgs; + sbytes += msgsize; } } @@ -82,39 +78,46 @@ void dataError(Rdma::AsynchIO&) { void data(Poller::shared_ptr p, Rdma::AsynchIO& aio, Rdma::Buffer* b) { ++rmsgs; - rbytes += b->byteCount; + rbytes += b->dataCount; // When all messages have been recvd stop if (rmsgs < target) { write(aio); } else { fullTestDuration = std::min(fullTestDuration, Duration(startTime, AbsTime::now())); - if (cmsgs >= target) + if (aio.incompletedWrites() == 0) p->shutdown(); } } -void full(Rdma::AsynchIO&) { - writable = false; +void full(Rdma::AsynchIO& a, Rdma::Buffer* b) { + // Warn as we shouldn't get here anymore + cerr << "!"; + + // Don't need to keep buffer just adjust the counts + --smsgs; + sbytes -= b->dataCount; + + // Give buffer back + a.returnBuffer(b); } void idle(Poller::shared_ptr p, Rdma::AsynchIO& aio) { - writable = true; - ++cmsgs; if (smsgs < target) { write(aio); } else { sendingDuration = std::min(sendingDuration, Duration(startTime, AbsTime::now())); - if (rmsgs >= target && cmsgs >= target) + if (rmsgs >= target && aio.incompletedWrites() == 0) p->shutdown(); } } -void connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr& ci) { +void connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr& ci, const Rdma::ConnectionParams& cp) { cout << "Connected\n"; Rdma::QueuePair::intrusive_ptr q = ci->getQueuePair(); - Rdma::AsynchIO* aio = new Rdma::AsynchIO(ci->getQueuePair(), msgsize, + Rdma::AsynchIO* aio = new Rdma::AsynchIO(ci->getQueuePair(), + cp.maxRecvBufferSize, cp.initialXmitCredit , Rdma::DEFAULT_WR_ENTRIES, boost::bind(&data, poller, _1, _2), boost::bind(&idle, poller, _1), &full, @@ -131,12 +134,12 @@ void disconnected(boost::shared_ptr<Poller> p, Rdma::Connection::intrusive_ptr&) p->shutdown(); } -void connectionError(boost::shared_ptr<Poller> p, Rdma::Connection::intrusive_ptr&) { +void connectionError(boost::shared_ptr<Poller> p, Rdma::Connection::intrusive_ptr&, const Rdma::ErrorType) { cout << "Connection error\n"; p->shutdown(); } -void rejected(boost::shared_ptr<Poller> p, Rdma::Connection::intrusive_ptr&) { +void rejected(boost::shared_ptr<Poller> p, Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams&) { cout << "Connection rejected\n"; p->shutdown(); } @@ -164,7 +167,7 @@ int main(int argc, char* argv[]) { // Make a random message of that size testString.resize(msgsize); for (int i = 0; i < msgsize; ++i) { - testString[i] = 32 + rand() & 0x3f; + testString[i] = 32 + (rand() & 0x3f); } try { @@ -173,10 +176,11 @@ int main(int argc, char* argv[]) { Rdma::Connector c( *res->ai_addr, - boost::bind(&connected, p, _1), - boost::bind(&connectionError, p, _1), + Rdma::ConnectionParams(msgsize, Rdma::DEFAULT_WR_ENTRIES), + boost::bind(&connected, p, _1, _2), + boost::bind(&connectionError, p, _1, _2), boost::bind(&disconnected, p, _1), - boost::bind(&rejected, p, _1)); + boost::bind(&rejected, p, _1, _2)); c.start(p); d.run(); diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.cpp b/cpp/src/qpid/sys/rdma/RdmaIO.cpp index e7a5e7d5cb..dd4fbefcaf 100644 --- a/cpp/src/qpid/sys/rdma/RdmaIO.cpp +++ b/cpp/src/qpid/sys/rdma/RdmaIO.cpp @@ -20,13 +20,21 @@ */ #include "RdmaIO.h" +#include "qpid/log/Statement.h" + + #include <iostream> #include <boost/bind.hpp> +using qpid::sys::DispatchHandle; +using qpid::sys::Poller; + namespace Rdma { AsynchIO::AsynchIO( QueuePair::intrusive_ptr q, - int s, + int size, + int xCredit, + int rCount, ReadCallback rc, IdleCallback ic, FullCallback fc, @@ -34,10 +42,15 @@ namespace Rdma { ) : qp(q), dataHandle(*qp, boost::bind(&AsynchIO::dataEvent, this, _1), 0, 0), - bufferSize(s), - recvBufferCount(DEFAULT_WR_ENTRIES), - xmitBufferCount(DEFAULT_WR_ENTRIES), + bufferSize(size), + recvCredit(0), + xmitCredit(xCredit), + recvBufferCount(rCount), + xmitBufferCount(xCredit), outstandingWrites(0), + closed(false), + deleting(false), + state(IDLE), readCallback(rc), idleCallback(ic), fullCallback(fc), @@ -57,72 +70,232 @@ namespace Rdma { } AsynchIO::~AsynchIO() { + // Warn if we are deleting whilst there are still unreclaimed write buffers + if ( outstandingWrites>0 ) + QPID_LOG(error, "RDMA: qp=" << qp << ": Deleting queue before all write buffers finished"); + + // Turn off callbacks (before doing the deletes) + dataHandle.stopWatch(); + // The buffers ptr_deque automatically deletes all the buffers we've allocated + // TODO: It might turn out to be more efficient in high connection loads to reuse the + // buffers rather than having to reregister them all the time (this would be straightforward if all + // connections haver the same buffer size and harder otherwise) } void AsynchIO::start(Poller::shared_ptr poller) { dataHandle.startWatch(poller); } - // TODO: Currently we don't prevent write buffer overrun we just advise - // when to stop writing. - void AsynchIO::queueWrite(Buffer* buff) { - qp->postSend(buff); - ++outstandingWrites; - if (outstandingWrites >= xmitBufferCount) { - fullCallback(*this); + // Mark for deletion/Delete this object when we have no outstanding writes + void AsynchIO::deferDelete() { + { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock); + if (outstandingWrites > 0 || state != IDLE) { + deleting = true; + return; + } + state = DELETED; // Stop any read callback before the dataHandle.stopWatch() in the destructor } + delete this; } - void AsynchIO::notifyPendingWrite() { - // Just perform the idle callback (if possible) - if (outstandingWrites < xmitBufferCount) { - idleCallback(*this); + void AsynchIO::queueWrite(Buffer* buff) { + // Make sure we don't overrun our available buffers + // either at our end or the known available at the peers end + if (writable()) { + // TODO: We might want to batch up sending credit + if (recvCredit > 0) { + int creditSent = recvCredit & ~FlagsMask; + qp->postSend(creditSent, buff); + recvCredit -= creditSent; + } else { + qp->postSend(buff); + } + ++outstandingWrites; + --xmitCredit; + } else { + if (fullCallback) { + fullCallback(*this, buff); + } else { + QPID_LOG(error, "RDMA: qp=" << qp << ": Write queue full, but no callback, throwing buffer away"); + returnBuffer(buff); + } } } + // Mark now closed (so we don't accept any more writes or make any idle callbacks) void AsynchIO::queueWriteClose() { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock); + closed = true; } - Buffer* AsynchIO::getBuffer() { - qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock); - if (bufferQueue.empty()) { - Buffer* b = qp->createBuffer(bufferSize); - buffers.push_front(b); - b->dataCount = 0; - return b; - } else { - Buffer* b = bufferQueue.front(); - bufferQueue.pop_front(); - b->dataCount = 0; - b->dataStart = 0; - return b; + void AsynchIO::notifyPendingWrite() { + // As notifyPendingWrite can be called on an arbitrary thread it must check whether we are processing or not. + // If we are then we just return as we know that we will eventually do the idle callback anyway. + // + { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock); + // We can get here in any state (as the caller could be in any thread) + switch (state) { + case NOTIFY_WRITE: + case PENDING_NOTIFY: + // We only need to note a pending notify if we're already doing a notify as data processing + // is always followed by write notification processing + state = PENDING_NOTIFY; + return; + case PENDING_DATA: + return; + case DATA: + // Only need to return here as data processing will do the idleCallback itself anyway + return; + case IDLE: + state = NOTIFY_WRITE; + break; + case DELETED: + assert(state!=DELETED); + } } + + doWriteCallback(); + // Keep track of what we need to do so that we can release the lock + enum {COMPLETION, NOTIFY} action; + { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock); + // If there was pending data whilst we were doing this, process it now + switch (state) { + case NOTIFY_WRITE: + state = IDLE; + return; + case PENDING_DATA: + action = COMPLETION; + break; + case PENDING_NOTIFY: + action = NOTIFY; + break; + default: + assert(state!=IDLE && state!=DATA && state!=DELETED); + return; + } + // Using NOTIFY_WRITE for both cases is a bit strange, but we're making sure we get the + // correct result if we reenter notifyPendingWrite(), in which case we want to + // end up in PENDING_NOTIFY (entering dataEvent doesn't matter as it only checks + // not IDLE) + state = NOTIFY_WRITE; + } + do { + // Note we only get here if we were in the PENDING_DATA or PENDING_NOTIFY state + // so that we do need to process completions or notifications now + switch (action) { + case COMPLETION: + processCompletions(); + case NOTIFY: + doWriteCallback(); + break; + } + { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock); + switch (state) { + case NOTIFY_WRITE: + state = IDLE; + goto exit; + case PENDING_DATA: + action = COMPLETION; + break; + case PENDING_NOTIFY: + action = NOTIFY; + break; + default: + assert(state!=IDLE && state!=DATA && state!=DELETED); + return; + } + state = NOTIFY_WRITE; + } + } while (true); + exit: + // If we just processed completions we might need to delete ourselves + if (action == COMPLETION && deleting && outstandingWrites == 0) { + delete this; + } + } + + void AsynchIO::dataEvent(qpid::sys::DispatchHandle&) { + // Keep track of writable notifications + { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock); + // We're already processing a notification + switch (state) { + case IDLE: + break; + default: + state = PENDING_DATA; + return; + } + // Can't get here in DATA state as that would violate the serialisation rules + assert( state==IDLE ); + state = DATA; + } + + processCompletions(); + + { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock); + assert( state==DATA ); + state = NOTIFY_WRITE; + } + + do { + doWriteCallback(); + + { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock); + if ( state==NOTIFY_WRITE ) { + state = IDLE; + break; + } + // Can't get DATA/PENDING_DATA here as dataEvent cannot be reentered + assert( state==PENDING_NOTIFY ); + state = NOTIFY_WRITE; + } + } while (true); + + // We might need to delete ourselves + if (deleting && outstandingWrites == 0) { + delete this; + } } - void AsynchIO::dataEvent(DispatchHandle&) { + void AsynchIO::processCompletions() { QueuePair::intrusive_ptr q = qp->getNextChannelEvent(); + // Re-enable notification for queue: + // This needs to happen before we could do anything that could generate more work completion + // events (ie the callbacks etc. in the following). + // This can't make us reenter this code as the handle attached to the completion queue will still be + // disabled by the poller until we leave this code + qp->notifyRecv(); + qp->notifySend(); + + int recvEvents = 0; + int sendEvents = 0; + // If no event do nothing if (!q) return; assert(q == qp); - // Re-enable notification for queue - qp->notifySend(); - qp->notifyRecv(); - // Repeat until no more events do { QueuePairEvent e(qp->getNextEvent()); if (!e) - return; + break; ::ibv_wc_status status = e.getEventStatus(); if (status != IBV_WC_SUCCESS) { errorCallback(*this); + // TODO: Probably need to flush queues at this point return; } @@ -131,46 +304,144 @@ namespace Rdma { Buffer* b = e.getBuffer(); QueueDirection dir = e.getDirection(); if (dir == RECV) { - readCallback(*this, b); + ++recvEvents; + + // Get our xmitCredit if it was sent + bool dataPresent = true; + if (e.immPresent() ) { + xmitCredit += (e.getImm() & ~FlagsMask); + dataPresent = ((e.getImm() & IgnoreData) == 0); + } + + // if there was no data sent then the message was only to update our credit + if ( dataPresent ) { + readCallback(*this, b); + } + // At this point the buffer has been consumed so put it back on the recv queue + b->dataStart = 0; + b->dataCount = 0; qp->postRecv(b); + + // Received another message + ++recvCredit; + + // Send recvCredit if it is large enough (it will have got this large because we've not sent anything recently) + if (recvCredit > recvBufferCount/2) { + // TODO: This should use RDMA write with imm as there might not ever be a buffer to receive this message + // but this is a little unlikely, as to get in this state we have to have received messages without sending any + // for a while so its likely we've received an credit update from the far side. + if (writable()) { + Buffer* ob = getBuffer(); + // Have to send something as adapters hate it when you try to transfer 0 bytes + *reinterpret_cast< uint32_t* >(ob->bytes) = htonl(recvCredit); + ob->dataCount = sizeof(uint32_t); + + int creditSent = recvCredit & ~FlagsMask; + qp->postSend(creditSent | IgnoreData, ob); + recvCredit -= creditSent; + ++outstandingWrites; + --xmitCredit; + } else { + QPID_LOG(warning, "RDMA: qp=" << qp << ": Unable to send unsolicited credit"); + } + } } else { + ++sendEvents; { qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock); bufferQueue.push_front(b); } --outstandingWrites; - // TODO: maybe don't call idle unless we're low on write buffers - idleCallback(*this); } } while (true); + + // Not sure if this is expected or not + if (recvEvents == 0 && sendEvents == 0) { + QPID_LOG(debug, "RDMA: qp=" << qp << ": Got channel event with no recv/send completions"); + } + } + + void AsynchIO::doWriteCallback() { + // TODO: maybe don't call idle unless we're low on write buffers + // Keep on calling the idle routine as long as we are writable and we got something to write last call + while (writable()) { + int xc = xmitCredit; + idleCallback(*this); + // Check whether we actually wrote anything + if (xmitCredit == xc) { + QPID_LOG(debug, "RDMA: qp=" << qp << ": Called for data, but got none: xmitCredit=" << xmitCredit); + return; + } + } + } + + Buffer* AsynchIO::getBuffer() { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock); + if (bufferQueue.empty()) { + Buffer* b = qp->createBuffer(bufferSize); + buffers.push_front(b); + return b; + } else { + Buffer* b = bufferQueue.front(); + bufferQueue.pop_front(); + b->dataCount = 0; + b->dataStart = 0; + return b; + } + + } + + void AsynchIO::returnBuffer(Buffer* b) { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock); + bufferQueue.push_front(b); + b->dataCount = 0; + b->dataStart = 0; } + ConnectionManager::ConnectionManager( + ErrorCallback errc, + DisconnectedCallback dc + ) : + ci(Connection::make()), + handle(*ci, boost::bind(&ConnectionManager::event, this, _1), 0, 0), + errorCallback(errc), + disconnectedCallback(dc) + { + ci->nonblocking(); + } + + void ConnectionManager::start(Poller::shared_ptr poller) { + startConnection(ci); + handle.startWatch(poller); + } + + void ConnectionManager::event(DispatchHandle&) { + connectionEvent(ci); + } + Listener::Listener( const sockaddr& src, - ConnectedCallback cc, + const ConnectionParams& cp, + EstablishedCallback ec, ErrorCallback errc, DisconnectedCallback dc, ConnectionRequestCallback crc ) : + ConnectionManager(errc, dc), src_addr(src), - ci(Connection::make()), - handle(*ci, boost::bind(&Listener::connectionEvent, this, _1), 0, 0), - connectedCallback(cc), - errorCallback(errc), - disconnectedCallback(dc), - connectionRequestCallback(crc) + checkConnectionParams(cp), + connectionRequestCallback(crc), + establishedCallback(ec) { - ci->nonblocking(); } - void Listener::start(Poller::shared_ptr poller) { + void Listener::startConnection(Connection::intrusive_ptr ci) { ci->bind(src_addr); ci->listen(); - handle.startWatch(poller); } - void Listener::connectionEvent(DispatchHandle&) { + void Listener::connectionEvent(Connection::intrusive_ptr ci) { ConnectionEvent e(ci->getNextEvent()); // If (for whatever reason) there was no event do nothing @@ -181,65 +452,75 @@ namespace Rdma { // you get from CONNECT_REQUEST has the same context info // as its parent listening rdma_cm_id ::rdma_cm_event_type eventType = e.getEventType(); + ::rdma_conn_param conn_param = e.getConnectionParam(); Rdma::Connection::intrusive_ptr id = e.getConnection(); switch (eventType) { case RDMA_CM_EVENT_CONNECT_REQUEST: { - bool accept = true; - // Extract connection parameters and private data from event - ::rdma_conn_param conn_param = e.getConnectionParam(); + // Make sure peer has sent params we can use + if (!conn_param.private_data || conn_param.private_data_len < sizeof(ConnectionParams)) { + id->reject(); + break; + } + ConnectionParams cp = *static_cast<const ConnectionParams*>(conn_param.private_data); + // Reject if requested msg size is bigger than we allow + if (cp.maxRecvBufferSize > checkConnectionParams.maxRecvBufferSize) { + id->reject(&checkConnectionParams); + break; + } + + bool accept = true; if (connectionRequestCallback) - //TODO: pass private data to callback (and accept new private data for accept somehow) - accept = connectionRequestCallback(id); + accept = connectionRequestCallback(id, cp); + if (accept) { // Accept connection - id->accept(conn_param); + cp.initialXmitCredit = checkConnectionParams.initialXmitCredit; + id->accept(conn_param, &cp); } else { - //Reject connection + // Reject connection id->reject(); } - break; } case RDMA_CM_EVENT_ESTABLISHED: - connectedCallback(id); + establishedCallback(id); break; case RDMA_CM_EVENT_DISCONNECTED: disconnectedCallback(id); break; case RDMA_CM_EVENT_CONNECT_ERROR: - errorCallback(id); + errorCallback(id, CONNECT_ERROR); break; default: - std::cerr << "Warning: unexpected response to listen - " << eventType << "\n"; + // Unexpected response + errorCallback(id, UNKNOWN); + //std::cerr << "Warning: unexpected response to listen - " << eventType << "\n"; } } Connector::Connector( const sockaddr& dst, + const ConnectionParams& cp, ConnectedCallback cc, ErrorCallback errc, DisconnectedCallback dc, RejectedCallback rc ) : + ConnectionManager(errc, dc), dst_addr(dst), - ci(Connection::make()), - handle(*ci, boost::bind(&Connector::connectionEvent, this, _1), 0, 0), - connectedCallback(cc), - errorCallback(errc), - disconnectedCallback(dc), - rejectedCallback(rc) + connectionParams(cp), + rejectedCallback(rc), + connectedCallback(cc) { - ci->nonblocking(); } - void Connector::start(Poller::shared_ptr poller) { + void Connector::startConnection(Connection::intrusive_ptr ci) { ci->resolve_addr(dst_addr); - handle.startWatch(poller); } - void Connector::connectionEvent(DispatchHandle&) { + void Connector::connectionEvent(Connection::intrusive_ptr ci) { ConnectionEvent e(ci->getNextEvent()); // If (for whatever reason) there was no event do nothing @@ -247,6 +528,8 @@ namespace Rdma { return; ::rdma_cm_event_type eventType = e.getEventType(); + ::rdma_conn_param conn_param = e.getConnectionParam(); + Rdma::Connection::intrusive_ptr id = e.getConnection(); switch (eventType) { case RDMA_CM_EVENT_ADDR_RESOLVED: // RESOLVE_ADDR @@ -254,38 +537,46 @@ namespace Rdma { break; case RDMA_CM_EVENT_ADDR_ERROR: // RESOLVE_ADDR - errorCallback(ci); + errorCallback(ci, ADDR_ERROR); break; case RDMA_CM_EVENT_ROUTE_RESOLVED: // RESOLVE_ROUTE: - ci->connect(); + ci->connect(&connectionParams); break; case RDMA_CM_EVENT_ROUTE_ERROR: // RESOLVE_ROUTE: - errorCallback(ci); + errorCallback(ci, ROUTE_ERROR); break; case RDMA_CM_EVENT_CONNECT_ERROR: // CONNECTING - errorCallback(ci); + errorCallback(ci, CONNECT_ERROR); break; case RDMA_CM_EVENT_UNREACHABLE: // CONNECTING - errorCallback(ci); + errorCallback(ci, UNREACHABLE); break; - case RDMA_CM_EVENT_REJECTED: + case RDMA_CM_EVENT_REJECTED: { // CONNECTING - rejectedCallback(ci); + // Extract private data from event + assert(conn_param.private_data && conn_param.private_data_len >= sizeof(ConnectionParams)); + ConnectionParams cp = *static_cast<const ConnectionParams*>(conn_param.private_data); + rejectedCallback(ci, cp); break; - case RDMA_CM_EVENT_ESTABLISHED: + } + case RDMA_CM_EVENT_ESTABLISHED: { // CONNECTING - connectedCallback(ci); + // Extract private data from event + assert(conn_param.private_data && conn_param.private_data_len >= sizeof(ConnectionParams)); + ConnectionParams cp = *static_cast<const ConnectionParams*>(conn_param.private_data); + connectedCallback(ci, cp); break; + } case RDMA_CM_EVENT_DISCONNECTED: // ESTABLISHED disconnectedCallback(ci); break; default: - std::cerr << "Warning: unexpected event in connect: " << eventType << "\n"; + QPID_LOG(warning, "RDMA: Unexpected event in connect: " << eventType); } } } diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.h b/cpp/src/qpid/sys/rdma/RdmaIO.h index 35fbc4fa11..8b1422a1af 100644 --- a/cpp/src/qpid/sys/rdma/RdmaIO.h +++ b/cpp/src/qpid/sys/rdma/RdmaIO.h @@ -32,32 +32,29 @@ #include <boost/ptr_container/ptr_deque.hpp> #include <deque> -using qpid::sys::DispatchHandle; -using qpid::sys::Poller; - namespace Rdma { class Connection; - typedef boost::function1<void, Rdma::Connection::intrusive_ptr&> ConnectedCallback; - typedef boost::function1<void, Rdma::Connection::intrusive_ptr&> ErrorCallback; - typedef boost::function1<void, Rdma::Connection::intrusive_ptr&> DisconnectedCallback; - typedef boost::function1<bool, Rdma::Connection::intrusive_ptr&> ConnectionRequestCallback; - typedef boost::function1<void, Rdma::Connection::intrusive_ptr&> RejectedCallback; - class AsynchIO { typedef boost::function1<void, AsynchIO&> ErrorCallback; typedef boost::function2<void, AsynchIO&, Buffer*> ReadCallback; typedef boost::function1<void, AsynchIO&> IdleCallback; - typedef boost::function1<void, AsynchIO&> FullCallback; + typedef boost::function2<void, AsynchIO&, Buffer*> FullCallback; QueuePair::intrusive_ptr qp; - DispatchHandle dataHandle; + qpid::sys::DispatchHandleRef dataHandle; int bufferSize; + int recvCredit; + int xmitCredit; int recvBufferCount; int xmitBufferCount; int outstandingWrites; + bool closed; // TODO: Perhaps (probably) this state can be merged with the following... + bool deleting; // TODO: Perhaps (probably) this state can be merged with the following... + enum { IDLE, DATA, PENDING_DATA, NOTIFY_WRITE, PENDING_NOTIFY, DELETED } state; + qpid::sys::Mutex stateLock; std::deque<Buffer*> bufferQueue; qpid::sys::Mutex bufferQueueLock; boost::ptr_deque<Buffer> buffers; @@ -70,70 +67,145 @@ namespace Rdma { public: AsynchIO( QueuePair::intrusive_ptr q, - int s, + int size, + int xCredit, + int rCount, ReadCallback rc, IdleCallback ic, FullCallback fc, ErrorCallback ec ); - ~AsynchIO(); - void start(Poller::shared_ptr poller); + void start(qpid::sys::Poller::shared_ptr poller); + bool writable() const; void queueWrite(Buffer* buff); void notifyPendingWrite(); void queueWriteClose(); + void deferDelete(); + int incompletedWrites() const; Buffer* getBuffer(); + void returnBuffer(Buffer*); private: - void dataEvent(DispatchHandle& handle); + // Don't let anyone else delete us to make sure there can't be outstanding callbacks + ~AsynchIO(); + + // Constants for the peer-peer command messages + // These are sent in the high bits if the imm data of an rdma message + // The low bits are used to send the credit + const static int FlagsMask = 0x10000000; // Mask for all flag bits - be sure to update this if you add more command bits + const static int IgnoreData = 0x10000000; // Message contains no application data + + void dataEvent(qpid::sys::DispatchHandle& handle); + void processCompletions(); + void doWriteCallback(); }; - class Listener - { - sockaddr src_addr; + inline bool AsynchIO::writable() const { + return (!closed && outstandingWrites < xmitBufferCount && xmitCredit > 0); + } + + inline int AsynchIO::incompletedWrites() const { + return outstandingWrites; + } + + // These are the parameters necessary to start the conversation + // * Each peer HAS to allocate buffers of the size of the maximum receive from its peer + // * Each peer HAS to know the initial "credit" it has for transmitting to its peer + struct ConnectionParams { + int maxRecvBufferSize; + int initialXmitCredit ; + + ConnectionParams(int s, int c) : + maxRecvBufferSize(s), + initialXmitCredit(c) + {} + }; + + enum ErrorType { + ADDR_ERROR, + ROUTE_ERROR, + CONNECT_ERROR, + UNREACHABLE, + UNKNOWN + }; + + typedef boost::function2<void, Rdma::Connection::intrusive_ptr&, ErrorType> ErrorCallback; + typedef boost::function1<void, Rdma::Connection::intrusive_ptr&> DisconnectedCallback; + + class ConnectionManager { Connection::intrusive_ptr ci; - DispatchHandle handle; - ConnectedCallback connectedCallback; + qpid::sys::DispatchHandle handle; + + protected: ErrorCallback errorCallback; DisconnectedCallback disconnectedCallback; + + public: + ConnectionManager( + ErrorCallback errc, + DisconnectedCallback dc + ); + + virtual ~ConnectionManager() {} + + void start(qpid::sys::Poller::shared_ptr poller); + + private: + void event(qpid::sys::DispatchHandle& handle); + + virtual void startConnection(Connection::intrusive_ptr ci) = 0; + virtual void connectionEvent(Connection::intrusive_ptr ci) = 0; + }; + + typedef boost::function2<bool, Rdma::Connection::intrusive_ptr&, const ConnectionParams&> ConnectionRequestCallback; + typedef boost::function1<void, Rdma::Connection::intrusive_ptr&> EstablishedCallback; + + class Listener : public ConnectionManager + { + sockaddr src_addr; + ConnectionParams checkConnectionParams; ConnectionRequestCallback connectionRequestCallback; + EstablishedCallback establishedCallback; public: Listener( const sockaddr& src, - ConnectedCallback cc, + const ConnectionParams& cp, + EstablishedCallback ec, ErrorCallback errc, DisconnectedCallback dc, ConnectionRequestCallback crc = 0 ); - void start(Poller::shared_ptr poller); private: - void connectionEvent(DispatchHandle& handle); + void startConnection(Connection::intrusive_ptr ci); + void connectionEvent(Connection::intrusive_ptr ci); }; - class Connector + typedef boost::function2<void, Rdma::Connection::intrusive_ptr&, const ConnectionParams&> RejectedCallback; + typedef boost::function2<void, Rdma::Connection::intrusive_ptr&, const ConnectionParams&> ConnectedCallback; + + class Connector : public ConnectionManager { sockaddr dst_addr; - Connection::intrusive_ptr ci; - DispatchHandle handle; - ConnectedCallback connectedCallback; - ErrorCallback errorCallback; - DisconnectedCallback disconnectedCallback; + ConnectionParams connectionParams; RejectedCallback rejectedCallback; + ConnectedCallback connectedCallback; public: Connector( const sockaddr& dst, + const ConnectionParams& cp, ConnectedCallback cc, ErrorCallback errc, DisconnectedCallback dc, RejectedCallback rc = 0 ); - void start(Poller::shared_ptr poller); private: - void connectionEvent(DispatchHandle& handle); + void startConnection(Connection::intrusive_ptr ci); + void connectionEvent(Connection::intrusive_ptr ci); }; } diff --git a/cpp/src/qpid/sys/rdma/RdmaServer.cpp b/cpp/src/qpid/sys/rdma/RdmaServer.cpp index dee2d17eed..594578a265 100644 --- a/cpp/src/qpid/sys/rdma/RdmaServer.cpp +++ b/cpp/src/qpid/sys/rdma/RdmaServer.cpp @@ -42,12 +42,10 @@ using qpid::sys::Dispatcher; struct ConRec { Rdma::Connection::intrusive_ptr connection; Rdma::AsynchIO* data; - bool writable; queue<Rdma::Buffer*> queuedWrites; ConRec(Rdma::Connection::intrusive_ptr c) : - connection(c), - writable(true) + connection(c) {} }; @@ -55,50 +53,53 @@ void dataError(Rdma::AsynchIO&) { cout << "Data error:\n"; } +void idle(ConRec* cr, Rdma::AsynchIO& a) { + // Need to make sure full is not called as it would reorder messages + while (!cr->queuedWrites.empty() && a.writable()) { + Rdma::Buffer* buf = cr->queuedWrites.front(); + cr->queuedWrites.pop(); + a.queueWrite(buf); + } +} + void data(ConRec* cr, Rdma::AsynchIO& a, Rdma::Buffer* b) { // Echo data back Rdma::Buffer* buf = a.getBuffer(); std::copy(b->bytes+b->dataStart, b->bytes+b->dataStart+b->dataCount, buf->bytes); buf->dataCount = b->dataCount; - if (cr->queuedWrites.empty() && cr->writable) { + if (cr->queuedWrites.empty()) { + // If can't write then full will be called and push buffer on back of queue a.queueWrite(buf); } else { cr->queuedWrites.push(buf); + // Try to empty queue + idle(cr, a); } } -void full(ConRec* cr, Rdma::AsynchIO&) { - cr->writable = false; -} - -void idle(ConRec* cr, Rdma::AsynchIO& a) { - cr->writable = true; - while (!cr->queuedWrites.empty() && cr->writable) { - Rdma::Buffer* buf = cr->queuedWrites.front(); - cr->queuedWrites.pop(); - a.queueWrite(buf); - } +void full(ConRec* cr, Rdma::AsynchIO&, Rdma::Buffer* buf) { + cr->queuedWrites.push(buf); } void disconnected(Rdma::Connection::intrusive_ptr& ci) { ConRec* cr = ci->getContext<ConRec>(); cr->connection->disconnect(); - delete cr->data; + cr->data->queueWriteClose(); delete cr; cout << "Disconnected: " << cr << "\n"; } -void connectionError(Rdma::Connection::intrusive_ptr& ci) { +void connectionError(Rdma::Connection::intrusive_ptr& ci, Rdma::ErrorType) { ConRec* cr = ci->getContext<ConRec>(); cr->connection->disconnect(); if (cr) { - delete cr->data; + cr->data->queueWriteClose(); delete cr; } cout << "Connection error: " << cr << "\n"; } -bool connectionRequest(Rdma::Connection::intrusive_ptr& ci) { +bool connectionRequest(Rdma::Connection::intrusive_ptr& ci, const Rdma::ConnectionParams& cp) { cout << "Incoming connection: "; // For fun reject alternate connection attempts @@ -109,10 +110,11 @@ bool connectionRequest(Rdma::Connection::intrusive_ptr& ci) { if (x) { ConRec* cr = new ConRec(ci); Rdma::AsynchIO* aio = - new Rdma::AsynchIO(ci->getQueuePair(), 8000, + new Rdma::AsynchIO(ci->getQueuePair(), + cp.maxRecvBufferSize, cp.initialXmitCredit, Rdma::DEFAULT_WR_ENTRIES, boost::bind(data, cr, _1, _2), boost::bind(idle, cr, _1), - boost::bind(full, cr, _1), + boost::bind(full, cr, _1, _2), dataError); ci->addContext(cr); cr->data = aio; @@ -149,6 +151,7 @@ int main(int argc, char* argv[]) { Dispatcher d(p); Rdma::Listener a((const sockaddr&)(sin), + Rdma::ConnectionParams(16384, Rdma::DEFAULT_WR_ENTRIES), boost::bind(connected, p, _1), connectionError, disconnected, diff --git a/cpp/src/qpid/sys/rdma/rdma_factories.cpp b/cpp/src/qpid/sys/rdma/rdma_factories.cpp index 0ac920203d..c6e8df814b 100644 --- a/cpp/src/qpid/sys/rdma/rdma_factories.cpp +++ b/cpp/src/qpid/sys/rdma/rdma_factories.cpp @@ -56,4 +56,9 @@ namespace Rdma { (void) ::ibv_destroy_cq(cq); } + void destroyQp(::ibv_qp* qp) throw () { + if (qp) + (void) ::ibv_destroy_qp(qp); + } + } diff --git a/cpp/src/qpid/sys/rdma/rdma_factories.h b/cpp/src/qpid/sys/rdma/rdma_factories.h index b2b2b18726..b568cadc7b 100644 --- a/cpp/src/qpid/sys/rdma/rdma_factories.h +++ b/cpp/src/qpid/sys/rdma/rdma_factories.h @@ -35,6 +35,7 @@ namespace Rdma { void deallocPd(::ibv_pd* p) throw (); void destroyCChannel(::ibv_comp_channel* c) throw (); void destroyCq(::ibv_cq* cq) throw (); + void destroyQp(::ibv_qp* qp) throw (); inline boost::shared_ptr< ::rdma_event_channel > mkEChannel() { return diff --git a/cpp/src/qpid/sys/rdma/rdma_wrap.cpp b/cpp/src/qpid/sys/rdma/rdma_wrap.cpp new file mode 100644 index 0000000000..ac0813ffd6 --- /dev/null +++ b/cpp/src/qpid/sys/rdma/rdma_wrap.cpp @@ -0,0 +1,155 @@ +#include "rdma_wrap.h" + +namespace Rdma { + const ::rdma_conn_param DEFAULT_CONNECT_PARAM = { + 0, // .private_data + 0, // .private_data_len + 4, // .responder_resources + 4, // .initiator_depth + 0, // .flow_control + 5, // .retry_count + 7 // .rnr_retry_count + }; + + ::rdma_conn_param ConnectionEvent::getConnectionParam() const { + // It's badly documented, but it seems from the librdma source code that all the following + // event types have a valid param.conn + switch (event->event) { + case RDMA_CM_EVENT_CONNECT_REQUEST: + case RDMA_CM_EVENT_ESTABLISHED: + case RDMA_CM_EVENT_REJECTED: + case RDMA_CM_EVENT_DISCONNECTED: + case RDMA_CM_EVENT_CONNECT_ERROR: + return event->param.conn; + default: + ::rdma_conn_param p = {}; + return p; + } + } + + QueuePair::QueuePair(boost::shared_ptr< ::rdma_cm_id > i) : + qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate), + pd(allocPd(i->verbs)), + cchannel(mkCChannel(i->verbs)), + scq(mkCq(i->verbs, DEFAULT_CQ_ENTRIES, 0, cchannel.get())), + rcq(mkCq(i->verbs, DEFAULT_CQ_ENTRIES, 0, cchannel.get())), + outstandingSendEvents(0), + outstandingRecvEvents(0) + { + impl->fd = cchannel->fd; + + // Set cq context to this QueuePair object so we can find + // ourselves again + scq->cq_context = this; + rcq->cq_context = this; + + ::ibv_qp_init_attr qp_attr = {}; + + // TODO: make a default struct for this + qp_attr.cap.max_send_wr = DEFAULT_WR_ENTRIES; + qp_attr.cap.max_send_sge = 4; + qp_attr.cap.max_recv_wr = DEFAULT_WR_ENTRIES; + qp_attr.cap.max_recv_sge = 4; + + qp_attr.send_cq = scq.get(); + qp_attr.recv_cq = rcq.get(); + qp_attr.qp_type = IBV_QPT_RC; + + CHECK(::rdma_create_qp(i.get(), pd.get(), &qp_attr)); + qp = boost::shared_ptr< ::ibv_qp >(i->qp, destroyQp); + + // Set the qp context to this so we can find ourselves again + qp->qp_context = this; + } + + QueuePair::~QueuePair() { + if (outstandingSendEvents > 0) + ::ibv_ack_cq_events(scq.get(), outstandingSendEvents); + if (outstandingRecvEvents > 0) + ::ibv_ack_cq_events(rcq.get(), outstandingRecvEvents); + + // Reset back pointer in case someone else has the qp + qp->qp_context = 0; + } + + void QueuePair::postRecv(Buffer* buf) { + ::ibv_recv_wr rwr = {}; + ::ibv_sge sge; + + sge.addr = (uintptr_t) buf->bytes+buf->dataStart; + sge.length = buf->dataCount; + sge.lkey = buf->mr->lkey; + + rwr.wr_id = reinterpret_cast<uint64_t>(buf); + rwr.sg_list = &sge; + rwr.num_sge = 1; + + ::ibv_recv_wr* badrwr = 0; + CHECK_IBV(::ibv_post_recv(qp.get(), &rwr, &badrwr)); + if (badrwr) + throw std::logic_error("ibv_post_recv(): Bad rwr"); + } + + void QueuePair::postSend(Buffer* buf) { + ::ibv_send_wr swr = {}; + ::ibv_sge sge; + + sge.addr = (uintptr_t) buf->bytes+buf->dataStart; + sge.length = buf->dataCount; + sge.lkey = buf->mr->lkey; + + swr.wr_id = reinterpret_cast<uint64_t>(buf); + swr.opcode = IBV_WR_SEND; + swr.send_flags = IBV_SEND_SIGNALED; + swr.sg_list = &sge; + swr.num_sge = 1; + + ::ibv_send_wr* badswr = 0; + CHECK_IBV(::ibv_post_send(qp.get(), &swr, &badswr)); + if (badswr) + throw std::logic_error("ibv_post_send(): Bad swr"); + } + + void QueuePair::postSend(uint32_t imm, Buffer* buf) { + ::ibv_send_wr swr = {}; + ::ibv_sge sge; + + sge.addr = (uintptr_t) buf->bytes+buf->dataStart; + sge.length = buf->dataCount; + sge.lkey = buf->mr->lkey; + swr.send_flags = IBV_SEND_SIGNALED; + + swr.wr_id = reinterpret_cast<uint64_t>(buf); + swr.imm_data = htonl(imm); + swr.opcode = IBV_WR_SEND_WITH_IMM; + swr.sg_list = &sge; + swr.num_sge = 1; + + ::ibv_send_wr* badswr = 0; + CHECK_IBV(::ibv_post_send(qp.get(), &swr, &badswr)); + if (badswr) + throw std::logic_error("ibv_post_send(): Bad swr"); + } +} + +std::ostream& operator<<(std::ostream& o, ::rdma_cm_event_type t) { +# define CHECK_TYPE(t) case t: o << #t; break; + switch(t) { + CHECK_TYPE(RDMA_CM_EVENT_ADDR_RESOLVED) + CHECK_TYPE(RDMA_CM_EVENT_ADDR_ERROR) + CHECK_TYPE(RDMA_CM_EVENT_ROUTE_RESOLVED) + CHECK_TYPE(RDMA_CM_EVENT_ROUTE_ERROR) + CHECK_TYPE(RDMA_CM_EVENT_CONNECT_REQUEST) + CHECK_TYPE(RDMA_CM_EVENT_CONNECT_RESPONSE) + CHECK_TYPE(RDMA_CM_EVENT_CONNECT_ERROR) + CHECK_TYPE(RDMA_CM_EVENT_UNREACHABLE) + CHECK_TYPE(RDMA_CM_EVENT_REJECTED) + CHECK_TYPE(RDMA_CM_EVENT_ESTABLISHED) + CHECK_TYPE(RDMA_CM_EVENT_DISCONNECTED) + CHECK_TYPE(RDMA_CM_EVENT_DEVICE_REMOVAL) + CHECK_TYPE(RDMA_CM_EVENT_MULTICAST_JOIN) + CHECK_TYPE(RDMA_CM_EVENT_MULTICAST_ERROR) + } +# undef CHECK_TYPE + return o; +} diff --git a/cpp/src/qpid/sys/rdma/rdma_wrap.h b/cpp/src/qpid/sys/rdma/rdma_wrap.h index 41f02bb464..61d8281cee 100644 --- a/cpp/src/qpid/sys/rdma/rdma_wrap.h +++ b/cpp/src/qpid/sys/rdma/rdma_wrap.h @@ -31,6 +31,8 @@ #include <fcntl.h> +#include <netdb.h> + #include <vector> #include <algorithm> #include <iostream> @@ -43,15 +45,7 @@ namespace Rdma { const int DEFAULT_BACKLOG = 100; const int DEFAULT_CQ_ENTRIES = 256; const int DEFAULT_WR_ENTRIES = 64; - const ::rdma_conn_param DEFAULT_CONNECT_PARAM = { - 0, // .private_data - 0, // .private_data_len - 4, // .responder_resources - 4, // .initiator_depth - 0, // .flow_control - 5, // .retry_count - 7 // .rnr_retry_count - }; + extern const ::rdma_conn_param DEFAULT_CONNECT_PARAM; struct Buffer { friend class QueuePair; @@ -115,6 +109,14 @@ namespace Rdma { return dir != NONE; } + bool immPresent() const { + return wc.wc_flags & IBV_WC_WITH_IMM; + } + + uint32_t getImm() const { + return ntohl(wc.imm_data); + } + QueueDirection getDirection() const { return dir; } @@ -137,15 +139,12 @@ namespace Rdma { // Wrapper for a queue pair - this has the functionality for // putting buffers on the receive queue and for sending buffers // to the other end of the connection. - // - // Currently QueuePairs are contained inside Connections and have no - // separate lifetime class QueuePair : public qpid::sys::IOHandle, public qpid::RefCounted { boost::shared_ptr< ::ibv_pd > pd; boost::shared_ptr< ::ibv_comp_channel > cchannel; boost::shared_ptr< ::ibv_cq > scq; boost::shared_ptr< ::ibv_cq > rcq; - boost::shared_ptr< ::rdma_cm_id > id; + boost::shared_ptr< ::ibv_qp > qp; int outstandingSendEvents; int outstandingRecvEvents; @@ -207,6 +206,7 @@ namespace Rdma { void postRecv(Buffer* buf); void postSend(Buffer* buf); + void postSend(uint32_t imm, Buffer* buf); void notifyRecv(); void notifySend(); }; @@ -233,14 +233,7 @@ namespace Rdma { return event->event; } - ::rdma_conn_param getConnectionParam() const { - if (event->event == RDMA_CM_EVENT_CONNECT_REQUEST) { - return event->param.conn; - } else { - ::rdma_conn_param p = {}; - return p; - } - } + ::rdma_conn_param getConnectionParam() const; boost::intrusive_ptr<Connection> getConnection () const { return id; @@ -291,6 +284,11 @@ namespace Rdma { impl->fd = channel->fd; } + ~Connection() { + // Reset the id context in case someone else has it + id->context = 0; + } + // Default destructor fine void ensureQueuePair() { @@ -445,52 +443,38 @@ namespace Rdma { return qp; } + + std::string getLocalName() const { + ::sockaddr* addr = ::rdma_get_local_addr(id.get()); + char hostName[NI_MAXHOST]; + char portName[NI_MAXSERV]; + CHECK_IBV(::getnameinfo( + addr, sizeof(::sockaddr_storage), + hostName, sizeof(hostName), + portName, sizeof(portName), + NI_NUMERICHOST | NI_NUMERICSERV)); + std::string r(hostName); + r += ":"; + r += portName; + return r; + } + + std::string getPeerName() const { + ::sockaddr* addr = ::rdma_get_peer_addr(id.get()); + char hostName[NI_MAXHOST]; + char portName[NI_MAXSERV]; + CHECK_IBV(::getnameinfo( + addr, sizeof(::sockaddr_storage), + hostName, sizeof(hostName), + portName, sizeof(portName), + NI_NUMERICHOST | NI_NUMERICSERV)); + std::string r(hostName); + r += ":"; + r += portName; + return r; + } }; - inline QueuePair::QueuePair(boost::shared_ptr< ::rdma_cm_id > i) : - qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate), - pd(allocPd(i->verbs)), - cchannel(mkCChannel(i->verbs)), - scq(mkCq(i->verbs, DEFAULT_CQ_ENTRIES, 0, cchannel.get())), - rcq(mkCq(i->verbs, DEFAULT_CQ_ENTRIES, 0, cchannel.get())), - id(i), - outstandingSendEvents(0), - outstandingRecvEvents(0) - { - impl->fd = cchannel->fd; - - // Set cq context to this QueuePair object so we can find - // ourselves again - scq->cq_context = this; - rcq->cq_context = this; - - ::ibv_qp_init_attr qp_attr = {}; - - // TODO: make a default struct for this - qp_attr.cap.max_send_wr = DEFAULT_WR_ENTRIES; - qp_attr.cap.max_send_sge = 4; - qp_attr.cap.max_recv_wr = DEFAULT_WR_ENTRIES; - qp_attr.cap.max_recv_sge = 4; - - qp_attr.send_cq = scq.get(); - qp_attr.recv_cq = rcq.get(); - qp_attr.qp_type = IBV_QPT_RC; - - CHECK(::rdma_create_qp(id.get(), pd.get(), &qp_attr)); - - // Set the qp context to this so we can find ourselves again - id->qp->qp_context = this; - } - - inline QueuePair::~QueuePair() { - if (outstandingSendEvents > 0) - ::ibv_ack_cq_events(scq.get(), outstandingSendEvents); - if (outstandingRecvEvents > 0) - ::ibv_ack_cq_events(rcq.get(), outstandingRecvEvents); - - ::rdma_destroy_qp(id.get()); - } - inline void QueuePair::notifyRecv() { CHECK_IBV(ibv_req_notify_cq(rcq.get(), 0)); } @@ -499,44 +483,6 @@ namespace Rdma { CHECK_IBV(ibv_req_notify_cq(scq.get(), 0)); } - inline void QueuePair::postRecv(Buffer* buf) { - ::ibv_recv_wr rwr = {}; - ::ibv_sge sge; - - sge.addr = (uintptr_t) buf->bytes+buf->dataStart; - sge.length = buf->dataCount; - sge.lkey = buf->mr->lkey; - - rwr.wr_id = reinterpret_cast<uint64_t>(buf); - rwr.sg_list = &sge; - rwr.num_sge = 1; - - ::ibv_recv_wr* badrwr = 0; - CHECK_IBV(::ibv_post_recv(id->qp, &rwr, &badrwr)); - if (badrwr) - throw std::logic_error("ibv_post_recv(): Bad rwr"); - } - - inline void QueuePair::postSend(Buffer* buf) { - ::ibv_send_wr swr = {}; - ::ibv_sge sge; - - sge.addr = (uintptr_t) buf->bytes+buf->dataStart; - sge.length = buf->dataCount; - sge.lkey = buf->mr->lkey; - - swr.wr_id = reinterpret_cast<uint64_t>(buf); - swr.opcode = IBV_WR_SEND; - swr.send_flags = IBV_SEND_SIGNALED; - swr.sg_list = &sge; - swr.num_sge = 1; - - ::ibv_send_wr* badswr = 0; - CHECK_IBV(::ibv_post_send(id->qp, &swr, &badswr)); - if (badswr) - throw std::logic_error("ibv_post_send(): Bad swr"); - } - inline ConnectionEvent::ConnectionEvent(::rdma_cm_event* e) : id((e->event != RDMA_CM_EVENT_CONNECT_REQUEST) ? Connection::find(e->id) : new Connection(e->id)), @@ -545,26 +491,6 @@ namespace Rdma { {} } -inline std::ostream& operator<<(std::ostream& o, ::rdma_cm_event_type t) { -# define CHECK_TYPE(t) case t: o << #t; break; - switch(t) { - CHECK_TYPE(RDMA_CM_EVENT_ADDR_RESOLVED) - CHECK_TYPE(RDMA_CM_EVENT_ADDR_ERROR) - CHECK_TYPE(RDMA_CM_EVENT_ROUTE_RESOLVED) - CHECK_TYPE(RDMA_CM_EVENT_ROUTE_ERROR) - CHECK_TYPE(RDMA_CM_EVENT_CONNECT_REQUEST) - CHECK_TYPE(RDMA_CM_EVENT_CONNECT_RESPONSE) - CHECK_TYPE(RDMA_CM_EVENT_CONNECT_ERROR) - CHECK_TYPE(RDMA_CM_EVENT_UNREACHABLE) - CHECK_TYPE(RDMA_CM_EVENT_REJECTED) - CHECK_TYPE(RDMA_CM_EVENT_ESTABLISHED) - CHECK_TYPE(RDMA_CM_EVENT_DISCONNECTED) - CHECK_TYPE(RDMA_CM_EVENT_DEVICE_REMOVAL) - CHECK_TYPE(RDMA_CM_EVENT_MULTICAST_JOIN) - CHECK_TYPE(RDMA_CM_EVENT_MULTICAST_ERROR) - } -# undef CHECK_TYPE - return o; -} +std::ostream& operator<<(std::ostream& o, ::rdma_cm_event_type t); #endif // RDMA_WRAP_H |