diff options
Diffstat (limited to 'cpp/src/qpid/client/RdmaConnector.cpp')
-rw-r--r-- | cpp/src/qpid/client/RdmaConnector.cpp | 431 |
1 files changed, 0 insertions, 431 deletions
diff --git a/cpp/src/qpid/client/RdmaConnector.cpp b/cpp/src/qpid/client/RdmaConnector.cpp deleted file mode 100644 index 6af607198c..0000000000 --- a/cpp/src/qpid/client/RdmaConnector.cpp +++ /dev/null @@ -1,431 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/client/Connector.h" - -#include "qpid/client/Bounds.h" -#include "qpid/client/ConnectionImpl.h" -#include "qpid/client/ConnectionSettings.h" -#include "qpid/log/Statement.h" -#include "qpid/sys/Time.h" -#include "qpid/framing/AMQFrame.h" -#include "qpid/framing/InitiationHandler.h" -#include "qpid/sys/rdma/RdmaIO.h" -#include "qpid/sys/rdma/rdma_exception.h" -#include "qpid/sys/Dispatcher.h" -#include "qpid/sys/Poller.h" -#include "qpid/sys/SecurityLayer.h" -#include "qpid/Msg.h" - -#include <iostream> -#include <boost/bind.hpp> -#include <boost/format.hpp> -#include <boost/lexical_cast.hpp> - -// This stuff needs to abstracted out of here to a platform specific file -#include <netdb.h> - -namespace qpid { -namespace client { - -using namespace qpid::sys; -using namespace qpid::framing; -using boost::format; -using boost::str; - -class RdmaConnector : public Connector, public sys::Codec -{ - typedef std::deque<framing::AMQFrame> Frames; - - const uint16_t maxFrameSize; - sys::Mutex lock; - Frames frames; - size_t lastEof; // Position after last EOF in frames - uint64_t currentSize; - Bounds* bounds; - - framing::ProtocolVersion version; - bool initiated; - - sys::Mutex dataConnectedLock; - bool dataConnected; - - sys::ShutdownHandler* shutdownHandler; - framing::InputHandler* input; - framing::InitiationHandler* initialiser; - framing::OutputHandler* output; - - Rdma::AsynchIO* aio; - Rdma::Connector* acon; - sys::Poller::shared_ptr poller; - std::auto_ptr<qpid::sys::SecurityLayer> securityLayer; - - ~RdmaConnector(); - - // Callbacks - void connected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr, const Rdma::ConnectionParams&); - void connectionError(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr, Rdma::ErrorType); - void disconnected(); - void rejected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr, const Rdma::ConnectionParams&); - - void readbuff(Rdma::AsynchIO&, Rdma::Buffer*); - void writebuff(Rdma::AsynchIO&); - void writeDataBlock(const framing::AMQDataBlock& data); - void dataError(Rdma::AsynchIO&); - void drained(); - void connectionStopped(Rdma::Connector* acon, Rdma::AsynchIO* aio); - void dataStopped(Rdma::AsynchIO* aio); - - std::string identifier; - - void connect(const std::string& host, int port); - void close(); - void send(framing::AMQFrame& frame); - void abort() {} // TODO: need to fix this for heartbeat timeouts to work - - void setInputHandler(framing::InputHandler* handler); - void setShutdownHandler(sys::ShutdownHandler* handler); - sys::ShutdownHandler* getShutdownHandler() const; - framing::OutputHandler* getOutputHandler(); - const std::string& getIdentifier() const; - void activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer>); - const qpid::sys::SecuritySettings* getSecuritySettings() { return 0; } - - size_t decode(const char* buffer, size_t size); - size_t encode(const char* buffer, size_t size); - bool canEncode(); - -public: - RdmaConnector(Poller::shared_ptr, - framing::ProtocolVersion pVersion, - const ConnectionSettings&, - ConnectionImpl*); -}; - -// Static constructor which registers connector here -namespace { - Connector* create(Poller::shared_ptr p, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) { - return new RdmaConnector(p, v, s, c); - } - - struct StaticInit { - StaticInit() { - Connector::registerFactory("rdma", &create); - Connector::registerFactory("ib", &create); - }; - } init; -} - - -RdmaConnector::RdmaConnector(Poller::shared_ptr p, - ProtocolVersion ver, - const ConnectionSettings& settings, - ConnectionImpl* cimpl) - : maxFrameSize(settings.maxFrameSize), - lastEof(0), - currentSize(0), - bounds(cimpl), - version(ver), - initiated(false), - dataConnected(false), - shutdownHandler(0), - aio(0), - acon(0), - poller(p) -{ - QPID_LOG(debug, "RdmaConnector created for " << version); -} - -namespace { - void deleteAsynchIO(Rdma::AsynchIO& aio) { - delete &aio; - } - - void deleteConnector(Rdma::ConnectionManager& con) { - delete &con; - } -} - -RdmaConnector::~RdmaConnector() { - QPID_LOG(debug, "~RdmaConnector " << identifier); - if (aio) { - aio->stop(deleteAsynchIO); - } - if (acon) { - acon->stop(deleteConnector); - } -} - -void RdmaConnector::connect(const std::string& host, int port){ - Mutex::ScopedLock l(dataConnectedLock); - assert(!dataConnected); - - acon = new Rdma::Connector( - Rdma::ConnectionParams(maxFrameSize, Rdma::DEFAULT_WR_ENTRIES), - boost::bind(&RdmaConnector::connected, this, poller, _1, _2), - boost::bind(&RdmaConnector::connectionError, this, poller, _1, _2), - boost::bind(&RdmaConnector::disconnected, this), - boost::bind(&RdmaConnector::rejected, this, poller, _1, _2)); - - SocketAddress sa(host, boost::lexical_cast<std::string>(port)); - acon->start(poller, sa); -} - -// The following only gets run when connected -void RdmaConnector::connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr ci, const Rdma::ConnectionParams& cp) { - try { - Mutex::ScopedLock l(dataConnectedLock); - assert(!dataConnected); - Rdma::QueuePair::intrusive_ptr q = ci->getQueuePair(); - - aio = new Rdma::AsynchIO(ci->getQueuePair(), - cp.rdmaProtocolVersion, - cp.maxRecvBufferSize, cp.initialXmitCredit , Rdma::DEFAULT_WR_ENTRIES, - boost::bind(&RdmaConnector::readbuff, this, _1, _2), - boost::bind(&RdmaConnector::writebuff, this, _1), - 0, // write buffers full - boost::bind(&RdmaConnector::dataError, this, _1)); - - identifier = str(format("[%1% %2%]") % ci->getLocalName() % ci->getPeerName()); - ProtocolInitiation init(version); - writeDataBlock(init); - - aio->start(poller); - - dataConnected = true; - - return; - } catch (const Rdma::Exception& e) { - QPID_LOG(error, "Rdma: Cannot create new connection (Rdma exception): " << e.what()); - } catch (const std::exception& e) { - QPID_LOG(error, "Rdma: Cannot create new connection (unknown exception): " << e.what()); - } - dataConnected = false; - connectionStopped(acon, aio); -} - -void RdmaConnector::connectionError(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr, Rdma::ErrorType) { - QPID_LOG(debug, "Connection Error " << identifier); - connectionStopped(acon, aio); -} - -// Bizarrely we seem to get rejected events *after* we've already got a connected event for some peer disconnects -// so we need to check whether the data connection is started or not in here -void RdmaConnector::rejected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr, const Rdma::ConnectionParams& cp) { - QPID_LOG(debug, "Connection Rejected " << identifier << ": " << cp.maxRecvBufferSize); - if (dataConnected) { - disconnected(); - } else { - connectionStopped(acon, aio); - } -} - -void RdmaConnector::disconnected() { - QPID_LOG(debug, "Connection disconnected " << identifier); - { - Mutex::ScopedLock l(dataConnectedLock); - // If we're closed already then we'll get to drained() anyway - if (!dataConnected) return; - dataConnected = false; - } - // Make sure that all the disconnected actions take place on the data "thread" - aio->requestCallback(boost::bind(&RdmaConnector::drained, this)); -} - -void RdmaConnector::dataError(Rdma::AsynchIO&) { - QPID_LOG(debug, "Data Error " << identifier); - { - Mutex::ScopedLock l(dataConnectedLock); - // If we're closed already then we'll get to drained() anyway - if (!dataConnected) return; - dataConnected = false; - } - drained(); -} - -void RdmaConnector::close() { - QPID_LOG(debug, "RdmaConnector::close " << identifier); - { - Mutex::ScopedLock l(dataConnectedLock); - if (!dataConnected) return; - dataConnected = false; - } - aio->drainWriteQueue(boost::bind(&RdmaConnector::drained, this)); -} - -void RdmaConnector::drained() { - QPID_LOG(debug, "RdmaConnector::drained " << identifier); - assert(!dataConnected); - assert(aio); - Rdma::AsynchIO* a = aio; - aio = 0; - a->stop(boost::bind(&RdmaConnector::dataStopped, this, a)); -} - -void RdmaConnector::dataStopped(Rdma::AsynchIO* a) { - QPID_LOG(debug, "RdmaConnector::dataStopped " << identifier); - assert(!dataConnected); - assert(acon); - Rdma::Connector* c = acon; - acon = 0; - c->stop(boost::bind(&RdmaConnector::connectionStopped, this, c, a)); -} - -void RdmaConnector::connectionStopped(Rdma::Connector* c, Rdma::AsynchIO* a) { - QPID_LOG(debug, "RdmaConnector::connectionStopped " << identifier); - assert(!dataConnected); - aio = 0; - acon = 0; - delete a; - delete c; - if (shutdownHandler) { - ShutdownHandler* s = shutdownHandler; - shutdownHandler = 0; - s->shutdown(); - } -} - -void RdmaConnector::setInputHandler(InputHandler* handler){ - input = handler; -} - -void RdmaConnector::setShutdownHandler(ShutdownHandler* handler){ - shutdownHandler = handler; -} - -OutputHandler* RdmaConnector::getOutputHandler(){ - return this; -} - -sys::ShutdownHandler* RdmaConnector::getShutdownHandler() const { - return shutdownHandler; -} - -const std::string& RdmaConnector::getIdentifier() const { - return identifier; -} - -void RdmaConnector::send(AMQFrame& frame) { - // It is possible that we are called to write after we are already shutting down - Mutex::ScopedLock l(dataConnectedLock); - if (!dataConnected) return; - - bool notifyWrite = false; - { - Mutex::ScopedLock l(lock); - frames.push_back(frame); - //only ask to write if this is the end of a frameset or if we - //already have a buffers worth of data - currentSize += frame.encodedSize(); - if (frame.getEof()) { - lastEof = frames.size(); - notifyWrite = true; - } else { - notifyWrite = (currentSize >= maxFrameSize); - } - } - if (notifyWrite) aio->notifyPendingWrite(); -} - -// Called in IO thread. (write idle routine) -// This is NOT only called in response to previously calling notifyPendingWrite -void RdmaConnector::writebuff(Rdma::AsynchIO&) { - // It's possible to be disconnected and be writable - Mutex::ScopedLock l(dataConnectedLock); - if (!dataConnected) { - return; - } - Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this; - if (!codec->canEncode()) { - return; - } - Rdma::Buffer* buffer = aio->getSendBuffer(); - if (buffer) { - size_t encoded = codec->encode(buffer->bytes(), buffer->byteCount()); - buffer->dataCount(encoded); - aio->queueWrite(buffer); - } -} - -bool RdmaConnector::canEncode() -{ - Mutex::ScopedLock l(lock); - //have at least one full frameset or a whole buffers worth of data - return aio->writable() && (lastEof || currentSize >= maxFrameSize); -} - -size_t RdmaConnector::encode(const char* buffer, size_t size) -{ - framing::Buffer out(const_cast<char*>(buffer), size); - size_t bytesWritten(0); - { - Mutex::ScopedLock l(lock); - while (!frames.empty() && out.available() >= frames.front().encodedSize() ) { - frames.front().encode(out); - QPID_LOG(trace, "SENT " << identifier << ": " << frames.front()); - frames.pop_front(); - if (lastEof) --lastEof; - } - bytesWritten = size - out.available(); - currentSize -= bytesWritten; - } - if (bounds) bounds->reduce(bytesWritten); - return bytesWritten; -} - -void RdmaConnector::readbuff(Rdma::AsynchIO&, Rdma::Buffer* buff) { - Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this; - codec->decode(buff->bytes(), buff->dataCount()); -} - -size_t RdmaConnector::decode(const char* buffer, size_t size) -{ - framing::Buffer in(const_cast<char*>(buffer), size); - if (!initiated) { - framing::ProtocolInitiation protocolInit; - if (protocolInit.decode(in)) { - //TODO: check the version is correct - QPID_LOG(debug, "RECV " << identifier << " INIT(" << protocolInit << ")"); - } - initiated = true; - } - AMQFrame frame; - while(frame.decode(in)){ - QPID_LOG(trace, "RECV " << identifier << ": " << frame); - input->received(frame); - } - return size - in.available(); -} - -void RdmaConnector::writeDataBlock(const AMQDataBlock& data) { - Rdma::Buffer* buff = aio->getSendBuffer(); - framing::Buffer out(buff->bytes(), buff->byteCount()); - data.encode(out); - buff->dataCount(data.encodedSize()); - aio->queueWrite(buff); -} - -void RdmaConnector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer> sl) -{ - securityLayer = sl; - securityLayer->init(this); -} - -}} // namespace qpid::client |