/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "Connector.h" #include "Bounds.h" #include "ConnectionImpl.h" #include "ConnectionSettings.h" #include "qpid/log/Statement.h" #include "qpid/sys/Time.h" #include "qpid/framing/AMQFrame.h" #include "qpid/sys/rdma/RdmaIO.h" #include "qpid/sys/Dispatcher.h" #include "qpid/sys/Poller.h" #include "qpid/Msg.h" #include #include #include #include // This stuff needs to abstracted out of here to a platform specific file #include namespace qpid { namespace client { using namespace qpid::sys; using namespace qpid::framing; using boost::format; using boost::str; class RdmaConnector : public Connector, private sys::Runnable { struct Buff; /** Batch up frames for writing to aio. */ class Writer : public framing::FrameHandler { typedef Rdma::Buffer BufferBase; typedef std::deque Frames; const uint16_t maxFrameSize; sys::Mutex lock; Rdma::AsynchIO* aio; BufferBase* buffer; Frames frames; size_t lastEof; // Position after last EOF in frames framing::Buffer encode; size_t framesEncoded; std::string identifier; Bounds* bounds; void writeOne(); void newBuffer(); public: Writer(uint16_t maxFrameSize, Bounds*); ~Writer(); void init(std::string id, Rdma::AsynchIO*); void handle(framing::AMQFrame&); void write(Rdma::AsynchIO&); }; const uint16_t maxFrameSize; framing::ProtocolVersion version; bool initiated; sys::Mutex pollingLock; bool polling; bool joined; sys::ShutdownHandler* shutdownHandler; framing::InputHandler* input; framing::InitiationHandler* initialiser; framing::OutputHandler* output; Writer writer; sys::Thread receiver; Rdma::AsynchIO* aio; sys::Poller::shared_ptr poller; ~RdmaConnector(); void run(); void handleClosed(); bool closeInternal(); // Callbacks void connected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams&); void connectionError(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&, Rdma::ErrorType); void disconnected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&); void rejected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams&); void readbuff(Rdma::AsynchIO&, Rdma::Buffer*); void writebuff(Rdma::AsynchIO&); void writeDataBlock(const framing::AMQDataBlock& data); void eof(Rdma::AsynchIO&); std::string identifier; ConnectionImpl* impl; void connect(const std::string& host, int port); void close(); void send(framing::AMQFrame& frame); void setInputHandler(framing::InputHandler* handler); void setShutdownHandler(sys::ShutdownHandler* handler); sys::ShutdownHandler* getShutdownHandler() const; framing::OutputHandler* getOutputHandler(); const std::string& getIdentifier() const; public: RdmaConnector(framing::ProtocolVersion pVersion, const ConnectionSettings&, ConnectionImpl*); }; // Static constructor which registers connector here namespace { Connector* create(framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) { return new RdmaConnector(v, s, c); } struct StaticInit { StaticInit() { Connector::registerFactory("rdma", &create); Connector::registerFactory("ib", &create); }; } init; } RdmaConnector::RdmaConnector(ProtocolVersion ver, const ConnectionSettings& settings, ConnectionImpl* cimpl) : maxFrameSize(settings.maxFrameSize), version(ver), initiated(false), polling(false), joined(true), shutdownHandler(0), writer(maxFrameSize, cimpl), aio(0), impl(cimpl) { QPID_LOG(debug, "RdmaConnector created for " << version); } RdmaConnector::~RdmaConnector() { close(); } void RdmaConnector::connect(const std::string& host, int port){ Mutex::ScopedLock l(pollingLock); assert(!polling); assert(joined); poller = Poller::shared_ptr(new Poller); // This stuff needs to abstracted out of here to a platform specific file ::addrinfo *res; ::addrinfo hints; hints.ai_flags = 0; hints.ai_family = AF_INET; hints.ai_socktype = SOCK_STREAM; hints.ai_protocol = 0; int n = ::getaddrinfo(host.c_str(), boost::lexical_cast(port).c_str(), &hints, &res); if (n<0) { throw Exception(QPID_MSG("Cannot resolve " << host << ": " << ::gai_strerror(n))); } Rdma::Connector* c = new Rdma::Connector( *res->ai_addr, Rdma::ConnectionParams(maxFrameSize, Rdma::DEFAULT_WR_ENTRIES), boost::bind(&RdmaConnector::connected, this, poller, _1, _2), boost::bind(&RdmaConnector::connectionError, this, poller, _1, _2), boost::bind(&RdmaConnector::disconnected, this, poller, _1), boost::bind(&RdmaConnector::rejected, this, poller, _1, _2)); c->start(poller); polling = true; joined = false; receiver = Thread(this); } // The following only gets run when connected void RdmaConnector::connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr& ci, const Rdma::ConnectionParams& cp) { Rdma::QueuePair::intrusive_ptr q = ci->getQueuePair(); aio = new Rdma::AsynchIO(ci->getQueuePair(), cp.maxRecvBufferSize, cp.initialXmitCredit , Rdma::DEFAULT_WR_ENTRIES, boost::bind(&RdmaConnector::readbuff, this, _1, _2), boost::bind(&RdmaConnector::writebuff, this, _1), 0, // write buffers full boost::bind(&RdmaConnector::eof, this, _1)); // data error - just close connection aio->start(poller); identifier = str(format("[%1% %2%]") % ci->getLocalName() % ci->getPeerName()); writer.init(identifier, aio); ProtocolInitiation init(version); writeDataBlock(init); } void RdmaConnector::connectionError(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&, Rdma::ErrorType) { QPID_LOG(trace, "Connection Error " << identifier); eof(*aio); } void RdmaConnector::disconnected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&) { eof(*aio); } void RdmaConnector::rejected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams& cp) { QPID_LOG(trace, "Connection Rejected " << identifier << ": " << cp.maxRecvBufferSize); eof(*aio); } bool RdmaConnector::closeInternal() { bool ret; { Mutex::ScopedLock l(pollingLock); ret = polling; if (polling) { polling = false; poller->shutdown(); } if (joined || receiver.id() == Thread::current().id()) { return ret; } joined = true; } receiver.join(); return ret; } void RdmaConnector::close() { closeInternal(); } void RdmaConnector::setInputHandler(InputHandler* handler){ input = handler; } void RdmaConnector::setShutdownHandler(ShutdownHandler* handler){ shutdownHandler = handler; } OutputHandler* RdmaConnector::getOutputHandler(){ return this; } sys::ShutdownHandler* RdmaConnector::getShutdownHandler() const { return shutdownHandler; } const std::string& RdmaConnector::getIdentifier() const { return identifier; } void RdmaConnector::send(AMQFrame& frame) { writer.handle(frame); } void RdmaConnector::handleClosed() { if (closeInternal() && shutdownHandler) shutdownHandler->shutdown(); } RdmaConnector::Writer::Writer(uint16_t s, Bounds* b) : maxFrameSize(s), aio(0), buffer(0), lastEof(0), bounds(b) { } RdmaConnector::Writer::~Writer() { if (aio) aio->returnBuffer(buffer); } void RdmaConnector::Writer::init(std::string id, Rdma::AsynchIO* a) { Mutex::ScopedLock l(lock); identifier = id; aio = a; assert(aio->bufferAvailable()); newBuffer(); } void RdmaConnector::Writer::handle(framing::AMQFrame& frame) { Mutex::ScopedLock l(lock); frames.push_back(frame); // Don't bother to send anything unless we're at the end of a frameset (assembly in 0-10 terminology) if (frame.getEof()) { lastEof = frames.size(); QPID_LOG(debug, "Requesting write: lastEof=" << lastEof); aio->notifyPendingWrite(); } QPID_LOG(trace, "SENT " << identifier << ": " << frame); } void RdmaConnector::Writer::writeOne() { assert(buffer); QPID_LOG(trace, "Write buffer " << encode.getPosition() << " bytes " << framesEncoded << " frames "); framesEncoded = 0; buffer->dataStart = 0; buffer->dataCount = encode.getPosition(); aio->queueWrite(buffer); newBuffer(); } void RdmaConnector::Writer::newBuffer() { buffer = aio->getBuffer(); encode = framing::Buffer(buffer->bytes, buffer->byteCount); framesEncoded = 0; } // Called in IO thread. (write idle routine) // This is NOT only called in response to previously calling notifyPendingWrite void RdmaConnector::Writer::write(Rdma::AsynchIO&) { Mutex::ScopedLock l(lock); assert(buffer); // If nothing to do return immediately if (lastEof==0) return; size_t bytesWritten = 0; while (aio->writable() && aio->bufferAvailable() && !frames.empty()) { const AMQFrame* frame = &frames.front(); uint32_t size = frame->encodedSize(); while (size <= encode.available()) { frame->encode(encode); frames.pop_front(); ++framesEncoded; bytesWritten += size; if (frames.empty()) break; frame = &frames.front(); size = frame->encodedSize(); } lastEof -= framesEncoded; writeOne(); } if (bounds) bounds->reduce(bytesWritten); } void RdmaConnector::readbuff(Rdma::AsynchIO&, Rdma::Buffer* buff) { framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount); if (!initiated) { framing::ProtocolInitiation protocolInit; if (protocolInit.decode(in)) { //TODO: check the version is correct QPID_LOG(debug, "RECV " << identifier << " INIT(" << protocolInit << ")"); } initiated = true; } AMQFrame frame; while(frame.decode(in)){ QPID_LOG(trace, "RECV " << identifier << ": " << frame); input->received(frame); } } void RdmaConnector::writebuff(Rdma::AsynchIO& aio_) { writer.write(aio_); } void RdmaConnector::writeDataBlock(const AMQDataBlock& data) { Rdma::Buffer* buff = aio->getBuffer(); framing::Buffer out(buff->bytes, buff->byteCount); data.encode(out); buff->dataCount = data.encodedSize(); aio->queueWrite(buff); } void RdmaConnector::eof(Rdma::AsynchIO&) { handleClosed(); } // TODO: astitcher 20070908 This version of the code can never time out, so the idle processing // will never be called void RdmaConnector::run(){ // Keep the connection impl in memory until run() completes. //GRS: currently the ConnectionImpls destructor is where the Io thread is joined //boost::shared_ptr protect = impl->shared_from_this(); //assert(protect); try { Dispatcher d(poller); //aio->start(poller); d.run(); //aio->queueForDeletion(); } catch (const std::exception& e) { { // We're no longer polling Mutex::ScopedLock l(pollingLock); polling = false; } QPID_LOG(error, e.what()); handleClosed(); } } }} // namespace qpid::client