diff options
Diffstat (limited to 'qpid/cpp/src/qpid/client/RdmaConnector.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/client/RdmaConnector.cpp | 431 |
1 files changed, 431 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/client/RdmaConnector.cpp b/qpid/cpp/src/qpid/client/RdmaConnector.cpp new file mode 100644 index 0000000000..664640f5e7 --- /dev/null +++ b/qpid/cpp/src/qpid/client/RdmaConnector.cpp @@ -0,0 +1,431 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/client/Connector.h" + +#include "qpid/client/Bounds.h" +#include "qpid/client/ConnectionImpl.h" +#include "qpid/client/ConnectionSettings.h" +#include "qpid/log/Statement.h" +#include "qpid/sys/Time.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/InitiationHandler.h" +#include "qpid/sys/rdma/RdmaIO.h" +#include "qpid/sys/rdma/rdma_exception.h" +#include "qpid/sys/Dispatcher.h" +#include "qpid/sys/Poller.h" +#include "qpid/sys/SecurityLayer.h" +#include "qpid/Msg.h" + +#include <iostream> +#include <boost/bind.hpp> +#include <boost/format.hpp> +#include <boost/lexical_cast.hpp> + +// This stuff needs to abstracted out of here to a platform specific file +#include <netdb.h> + +namespace qpid { +namespace client { + +using namespace qpid::sys; +using namespace qpid::framing; +using boost::format; +using boost::str; + +class RdmaConnector : public Connector, public sys::Codec +{ + typedef std::deque<framing::AMQFrame> Frames; + + const uint16_t maxFrameSize; + sys::Mutex lock; + Frames frames; + size_t lastEof; // Position after last EOF in frames + uint64_t currentSize; + Bounds* bounds; + + framing::ProtocolVersion version; + bool initiated; + + sys::Mutex dataConnectedLock; + bool dataConnected; + + sys::ShutdownHandler* shutdownHandler; + framing::InputHandler* input; + framing::InitiationHandler* initialiser; + framing::OutputHandler* output; + + Rdma::AsynchIO* aio; + Rdma::Connector* acon; + sys::Poller::shared_ptr poller; + std::auto_ptr<qpid::sys::SecurityLayer> securityLayer; + + ~RdmaConnector(); + + // Callbacks + void connected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr, const Rdma::ConnectionParams&); + void connectionError(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr, Rdma::ErrorType); + void disconnected(); + void rejected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr, const Rdma::ConnectionParams&); + + void readbuff(Rdma::AsynchIO&, Rdma::Buffer*); + void writebuff(Rdma::AsynchIO&); + void writeDataBlock(const framing::AMQDataBlock& data); + void dataError(Rdma::AsynchIO&); + void drained(); + void connectionStopped(Rdma::Connector* acon, Rdma::AsynchIO* aio); + void dataStopped(Rdma::AsynchIO* aio); + + std::string identifier; + + void connect(const std::string& host, const std::string& port); + void close(); + void send(framing::AMQFrame& frame); + void abort() {} // TODO: need to fix this for heartbeat timeouts to work + + void setInputHandler(framing::InputHandler* handler); + void setShutdownHandler(sys::ShutdownHandler* handler); + sys::ShutdownHandler* getShutdownHandler() const; + framing::OutputHandler* getOutputHandler(); + const std::string& getIdentifier() const; + void activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer>); + const qpid::sys::SecuritySettings* getSecuritySettings() { return 0; } + + size_t decode(const char* buffer, size_t size); + size_t encode(const char* buffer, size_t size); + bool canEncode(); + +public: + RdmaConnector(Poller::shared_ptr, + framing::ProtocolVersion pVersion, + const ConnectionSettings&, + ConnectionImpl*); +}; + +// Static constructor which registers connector here +namespace { + Connector* create(Poller::shared_ptr p, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) { + return new RdmaConnector(p, v, s, c); + } + + struct StaticInit { + StaticInit() { + Connector::registerFactory("rdma", &create); + Connector::registerFactory("ib", &create); + }; + } init; +} + + +RdmaConnector::RdmaConnector(Poller::shared_ptr p, + ProtocolVersion ver, + const ConnectionSettings& settings, + ConnectionImpl* cimpl) + : maxFrameSize(settings.maxFrameSize), + lastEof(0), + currentSize(0), + bounds(cimpl), + version(ver), + initiated(false), + dataConnected(false), + shutdownHandler(0), + aio(0), + acon(0), + poller(p) +{ + QPID_LOG(debug, "RdmaConnector created for " << version); +} + +namespace { + void deleteAsynchIO(Rdma::AsynchIO& aio) { + delete &aio; + } + + void deleteConnector(Rdma::ConnectionManager& con) { + delete &con; + } +} + +RdmaConnector::~RdmaConnector() { + QPID_LOG(debug, "~RdmaConnector " << identifier); + if (aio) { + aio->stop(deleteAsynchIO); + } + if (acon) { + acon->stop(deleteConnector); + } +} + +void RdmaConnector::connect(const std::string& host, const std::string& port){ + Mutex::ScopedLock l(dataConnectedLock); + assert(!dataConnected); + + acon = new Rdma::Connector( + Rdma::ConnectionParams(maxFrameSize, Rdma::DEFAULT_WR_ENTRIES), + boost::bind(&RdmaConnector::connected, this, poller, _1, _2), + boost::bind(&RdmaConnector::connectionError, this, poller, _1, _2), + boost::bind(&RdmaConnector::disconnected, this), + boost::bind(&RdmaConnector::rejected, this, poller, _1, _2)); + + SocketAddress sa(host, port); + acon->start(poller, sa); +} + +// The following only gets run when connected +void RdmaConnector::connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr ci, const Rdma::ConnectionParams& cp) { + try { + Mutex::ScopedLock l(dataConnectedLock); + assert(!dataConnected); + Rdma::QueuePair::intrusive_ptr q = ci->getQueuePair(); + + aio = new Rdma::AsynchIO(ci->getQueuePair(), + cp.rdmaProtocolVersion, + cp.maxRecvBufferSize, cp.initialXmitCredit , Rdma::DEFAULT_WR_ENTRIES, + boost::bind(&RdmaConnector::readbuff, this, _1, _2), + boost::bind(&RdmaConnector::writebuff, this, _1), + 0, // write buffers full + boost::bind(&RdmaConnector::dataError, this, _1)); + + identifier = str(format("[%1% %2%]") % ci->getLocalName() % ci->getPeerName()); + ProtocolInitiation init(version); + writeDataBlock(init); + + aio->start(poller); + + dataConnected = true; + + return; + } catch (const Rdma::Exception& e) { + QPID_LOG(error, "Rdma: Cannot create new connection (Rdma exception): " << e.what()); + } catch (const std::exception& e) { + QPID_LOG(error, "Rdma: Cannot create new connection (unknown exception): " << e.what()); + } + dataConnected = false; + connectionStopped(acon, aio); +} + +void RdmaConnector::connectionError(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr, Rdma::ErrorType) { + QPID_LOG(debug, "Connection Error " << identifier); + connectionStopped(acon, aio); +} + +// Bizarrely we seem to get rejected events *after* we've already got a connected event for some peer disconnects +// so we need to check whether the data connection is started or not in here +void RdmaConnector::rejected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr, const Rdma::ConnectionParams& cp) { + QPID_LOG(debug, "Connection Rejected " << identifier << ": " << cp.maxRecvBufferSize); + if (dataConnected) { + disconnected(); + } else { + connectionStopped(acon, aio); + } +} + +void RdmaConnector::disconnected() { + QPID_LOG(debug, "Connection disconnected " << identifier); + { + Mutex::ScopedLock l(dataConnectedLock); + // If we're closed already then we'll get to drained() anyway + if (!dataConnected) return; + dataConnected = false; + } + // Make sure that all the disconnected actions take place on the data "thread" + aio->requestCallback(boost::bind(&RdmaConnector::drained, this)); +} + +void RdmaConnector::dataError(Rdma::AsynchIO&) { + QPID_LOG(debug, "Data Error " << identifier); + { + Mutex::ScopedLock l(dataConnectedLock); + // If we're closed already then we'll get to drained() anyway + if (!dataConnected) return; + dataConnected = false; + } + drained(); +} + +void RdmaConnector::close() { + QPID_LOG(debug, "RdmaConnector::close " << identifier); + { + Mutex::ScopedLock l(dataConnectedLock); + if (!dataConnected) return; + dataConnected = false; + } + aio->drainWriteQueue(boost::bind(&RdmaConnector::drained, this)); +} + +void RdmaConnector::drained() { + QPID_LOG(debug, "RdmaConnector::drained " << identifier); + assert(!dataConnected); + assert(aio); + Rdma::AsynchIO* a = aio; + aio = 0; + a->stop(boost::bind(&RdmaConnector::dataStopped, this, a)); +} + +void RdmaConnector::dataStopped(Rdma::AsynchIO* a) { + QPID_LOG(debug, "RdmaConnector::dataStopped " << identifier); + assert(!dataConnected); + assert(acon); + Rdma::Connector* c = acon; + acon = 0; + c->stop(boost::bind(&RdmaConnector::connectionStopped, this, c, a)); +} + +void RdmaConnector::connectionStopped(Rdma::Connector* c, Rdma::AsynchIO* a) { + QPID_LOG(debug, "RdmaConnector::connectionStopped " << identifier); + assert(!dataConnected); + aio = 0; + acon = 0; + delete a; + delete c; + if (shutdownHandler) { + ShutdownHandler* s = shutdownHandler; + shutdownHandler = 0; + s->shutdown(); + } +} + +void RdmaConnector::setInputHandler(InputHandler* handler){ + input = handler; +} + +void RdmaConnector::setShutdownHandler(ShutdownHandler* handler){ + shutdownHandler = handler; +} + +OutputHandler* RdmaConnector::getOutputHandler(){ + return this; +} + +sys::ShutdownHandler* RdmaConnector::getShutdownHandler() const { + return shutdownHandler; +} + +const std::string& RdmaConnector::getIdentifier() const { + return identifier; +} + +void RdmaConnector::send(AMQFrame& frame) { + // It is possible that we are called to write after we are already shutting down + Mutex::ScopedLock l(dataConnectedLock); + if (!dataConnected) return; + + bool notifyWrite = false; + { + Mutex::ScopedLock l(lock); + frames.push_back(frame); + //only ask to write if this is the end of a frameset or if we + //already have a buffers worth of data + currentSize += frame.encodedSize(); + if (frame.getEof()) { + lastEof = frames.size(); + notifyWrite = true; + } else { + notifyWrite = (currentSize >= maxFrameSize); + } + } + if (notifyWrite) aio->notifyPendingWrite(); +} + +// Called in IO thread. (write idle routine) +// This is NOT only called in response to previously calling notifyPendingWrite +void RdmaConnector::writebuff(Rdma::AsynchIO&) { + // It's possible to be disconnected and be writable + Mutex::ScopedLock l(dataConnectedLock); + if (!dataConnected) { + return; + } + Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this; + if (!codec->canEncode()) { + return; + } + Rdma::Buffer* buffer = aio->getSendBuffer(); + if (buffer) { + size_t encoded = codec->encode(buffer->bytes(), buffer->byteCount()); + buffer->dataCount(encoded); + aio->queueWrite(buffer); + } +} + +bool RdmaConnector::canEncode() +{ + Mutex::ScopedLock l(lock); + //have at least one full frameset or a whole buffers worth of data + return aio->writable() && (lastEof || currentSize >= maxFrameSize); +} + +size_t RdmaConnector::encode(const char* buffer, size_t size) +{ + framing::Buffer out(const_cast<char*>(buffer), size); + size_t bytesWritten(0); + { + Mutex::ScopedLock l(lock); + while (!frames.empty() && out.available() >= frames.front().encodedSize() ) { + frames.front().encode(out); + QPID_LOG(trace, "SENT " << identifier << ": " << frames.front()); + frames.pop_front(); + if (lastEof) --lastEof; + } + bytesWritten = size - out.available(); + currentSize -= bytesWritten; + } + if (bounds) bounds->reduce(bytesWritten); + return bytesWritten; +} + +void RdmaConnector::readbuff(Rdma::AsynchIO&, Rdma::Buffer* buff) { + Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this; + codec->decode(buff->bytes(), buff->dataCount()); +} + +size_t RdmaConnector::decode(const char* buffer, size_t size) +{ + framing::Buffer in(const_cast<char*>(buffer), size); + if (!initiated) { + framing::ProtocolInitiation protocolInit; + if (protocolInit.decode(in)) { + //TODO: check the version is correct + QPID_LOG(debug, "RECV " << identifier << " INIT(" << protocolInit << ")"); + } + initiated = true; + } + AMQFrame frame; + while(frame.decode(in)){ + QPID_LOG(trace, "RECV " << identifier << ": " << frame); + input->received(frame); + } + return size - in.available(); +} + +void RdmaConnector::writeDataBlock(const AMQDataBlock& data) { + Rdma::Buffer* buff = aio->getSendBuffer(); + framing::Buffer out(buff->bytes(), buff->byteCount()); + data.encode(out); + buff->dataCount(data.encodedSize()); + aio->queueWrite(buff); +} + +void RdmaConnector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer> sl) +{ + securityLayer = sl; + securityLayer->init(this); +} + +}} // namespace qpid::client |