/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #ifndef RDMA_WRAP_H #define RDMA_WRAP_H #include #include "qpid/RefCounted.h" #include "qpid/sys/IOHandle.h" #include "qpid/sys/Mutex.h" #include #include #include #include namespace qpid { namespace sys { class SocketAddress; }} namespace Rdma { const int DEFAULT_TIMEOUT = 2000; // 2 secs const int DEFAULT_BACKLOG = 100; const int DEFAULT_CQ_ENTRIES = 256; const int DEFAULT_WR_ENTRIES = 64; extern const ::rdma_conn_param DEFAULT_CONNECT_PARAM; int deviceCount(); struct Buffer { friend class QueuePair; friend class QueuePairEvent; char* bytes() const; int32_t byteCount() const; int32_t dataCount() const; void dataCount(int32_t); private: Buffer(uint32_t lkey, char* bytes, const int32_t byteCount, const int32_t reserve=0); int32_t bufferSize; int32_t reserved; // for framing header ::ibv_sge sge; }; inline char* Buffer::bytes() const { return (char*) sge.addr; } /** return the number of bytes available for application data */ inline int32_t Buffer::byteCount() const { return bufferSize - reserved; } inline int32_t Buffer::dataCount() const { return sge.length; } inline void Buffer::dataCount(int32_t s) { // catch any attempt to overflow a buffer assert(s <= bufferSize + reserved); sge.length = s; } class Connection; enum QueueDirection { NONE, SEND, RECV }; class QueuePairEvent { boost::shared_ptr< ::ibv_cq > cq; ::ibv_wc wc; QueueDirection dir; friend class QueuePair; QueuePairEvent(); QueuePairEvent( const ::ibv_wc& w, boost::shared_ptr< ::ibv_cq > c, QueueDirection d); public: operator bool() const; bool immPresent() const; uint32_t getImm() const; QueueDirection getDirection() const; ::ibv_wc_opcode getEventType() const; ::ibv_wc_status getEventStatus() const; Buffer* getBuffer() const; }; // Wrapper for a queue pair - this has the functionality for // putting buffers on the receive queue and for sending buffers // to the other end of the connection. class QueuePair : public qpid::sys::IOHandle, public qpid::RefCounted { friend class Connection; boost::shared_ptr< ::ibv_pd > pd; boost::shared_ptr< ::ibv_mr > smr; boost::shared_ptr< ::ibv_mr > rmr; boost::shared_ptr< ::ibv_comp_channel > cchannel; boost::shared_ptr< ::ibv_cq > scq; boost::shared_ptr< ::ibv_cq > rcq; boost::shared_ptr< ::ibv_qp > qp; int outstandingSendEvents; int outstandingRecvEvents; std::vector sendBuffers; std::vector recvBuffers; qpid::sys::Mutex bufferLock; std::vector freeBuffers; QueuePair(boost::shared_ptr< ::rdma_cm_id > id); ~QueuePair(); public: typedef boost::intrusive_ptr intrusive_ptr; // Create a buffers to use for writing void createSendBuffers(int sendBufferCount, int dataSize, int headerSize); // Get a send buffer Buffer* getSendBuffer(); // Return buffer to pool after use void returnSendBuffer(Buffer* b); // Create and post recv buffers void allocateRecvBuffers(int recvBufferCount, int bufferSize); // Make channel non-blocking by making // associated fd nonblocking void nonblocking(); // If we get EAGAIN because the channel has been set non blocking // and we'd have to wait then return an empty event QueuePair::intrusive_ptr getNextChannelEvent(); QueuePairEvent getNextEvent(); void postRecv(Buffer* buf); void postSend(Buffer* buf); void postSend(uint32_t imm, Buffer* buf); void notifyRecv(); void notifySend(); }; class ConnectionEvent { friend class Connection; // The order of the members is important as we have to acknowledge // the event before destroying the ids on destruction boost::intrusive_ptr id; boost::intrusive_ptr listen_id; boost::shared_ptr< ::rdma_cm_event > event; ConnectionEvent() {} ConnectionEvent(::rdma_cm_event* e); // Default copy, assignment and destructor ok public: operator bool() const; ::rdma_cm_event_type getEventType() const; ::rdma_conn_param getConnectionParam() const; boost::intrusive_ptr getConnection () const; boost::intrusive_ptr getListenId() const; }; // For the moment this is a fairly simple wrapper for rdma_cm_id. // // NB: It allocates a protection domain (pd) per connection which means that // registered buffers can't be shared between different connections // (this can only happen between connections on the same controller in any case, // so needs careful management if used) class Connection : public qpid::sys::IOHandle, public qpid::RefCounted { boost::shared_ptr< ::rdma_event_channel > channel; boost::shared_ptr< ::rdma_cm_id > id; QueuePair::intrusive_ptr qp; void* context; friend class ConnectionEvent; friend class QueuePair; // Wrap the passed in rdma_cm_id with a Connection // this basically happens only on connection request Connection(::rdma_cm_id* i); Connection(); ~Connection(); void ensureQueuePair(); public: typedef boost::intrusive_ptr intrusive_ptr; static intrusive_ptr make(); static intrusive_ptr find(::rdma_cm_id* i); template void addContext(T* c) { // Don't allow replacing context if (!context) context = c; } void removeContext() { context = 0; } template T* getContext() { return static_cast(context); } // Make channel non-blocking by making // associated fd nonblocking void nonblocking(); // If we get EAGAIN because the channel has been set non blocking // and we'd have to wait then return an empty event ConnectionEvent getNextEvent(); void bind(const qpid::sys::SocketAddress& src_addr) const; void listen(int backlog = DEFAULT_BACKLOG) const; void resolve_addr( const qpid::sys::SocketAddress& dst_addr, int timeout_ms = DEFAULT_TIMEOUT) const; void resolve_route(int timeout_ms = DEFAULT_TIMEOUT) const; void disconnect() const; // TODO: Currently you can only connect with the default connection parameters void connect(const void* data, size_t len); void connect(); template void connect(const T* data) { connect(data, sizeof(T)); } // TODO: Not sure how to default accept params - they come from the connection request // event void accept(const ::rdma_conn_param& param, const void* data, size_t len); void accept(const ::rdma_conn_param& param); template void accept(const ::rdma_conn_param& param, const T* data) { accept(param, data, sizeof(T)); } void reject(const void* data, size_t len) const; void reject() const; template void reject(const T* data) const { reject(data, sizeof(T)); } QueuePair::intrusive_ptr getQueuePair(); std::string getLocalName() const; std::string getPeerName() const; std::string getFullName() const { return getLocalName()+"-"+getPeerName(); } }; } std::ostream& operator<<(std::ostream& o, ::rdma_cm_event_type t); #endif // RDMA_WRAP_H