diff options
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/qpid/sys/rdma/RdmaServer.cpp | 46 |
1 files changed, 35 insertions, 11 deletions
diff --git a/cpp/src/qpid/sys/rdma/RdmaServer.cpp b/cpp/src/qpid/sys/rdma/RdmaServer.cpp index 97715326d5..564fd62730 100644 --- a/cpp/src/qpid/sys/rdma/RdmaServer.cpp +++ b/cpp/src/qpid/sys/rdma/RdmaServer.cpp @@ -44,10 +44,28 @@ using qpid::sys::Dispatcher; namespace qpid { namespace tests { +struct Buffer { + char* bytes() const {return bytes_;} + int32_t byteCount() const {return size;} + + Buffer(const int32_t s): + bytes_(new char[s]), + size(s) + { + } + + ~Buffer() { + delete [] bytes_; + } +private: + char* bytes_; + int32_t size; +}; + struct ConRec { Rdma::Connection::intrusive_ptr connection; Rdma::AsynchIO* data; - queue<Rdma::Buffer*> queuedWrites; + queue<Buffer*> queuedWrites; ConRec(Rdma::Connection::intrusive_ptr c) : connection(c) @@ -60,30 +78,36 @@ void dataError(Rdma::AsynchIO&) { void idle(ConRec* cr, Rdma::AsynchIO& a) { // Need to make sure full is not called as it would reorder messages - while (!cr->queuedWrites.empty() && a.writable()) { - Rdma::Buffer* buf = cr->queuedWrites.front(); + while (!cr->queuedWrites.empty() && a.writable() && a.bufferAvailable()) { + Buffer* buf = cr->queuedWrites.front(); cr->queuedWrites.pop(); - a.queueWrite(buf); + Rdma::Buffer* rbuf = a.getBuffer(); + std::copy(buf->bytes(), buf->bytes()+buf->byteCount(), rbuf->bytes()); + rbuf->dataCount(buf->byteCount()); + delete buf; + a.queueWrite(rbuf); } } void data(ConRec* cr, Rdma::AsynchIO& a, Rdma::Buffer* b) { // Echo data back - Rdma::Buffer* buf = a.getBuffer(); - std::copy(b->bytes(), b->bytes()+b->dataCount(), buf->bytes()); - buf->dataCount(b->dataCount()); - if (cr->queuedWrites.empty()) { - // If can't write then full will be called and push buffer on back of queue + if (cr->queuedWrites.empty() && a.writable() && a.bufferAvailable()) { + Rdma::Buffer* buf = a.getBuffer(); + std::copy(b->bytes(), b->bytes()+b->dataCount(), buf->bytes()); + buf->dataCount(b->dataCount()); a.queueWrite(buf); } else { + Buffer* buf = new Buffer(b->dataCount()); + std::copy(b->bytes(), b->bytes()+b->dataCount(), buf->bytes()); cr->queuedWrites.push(buf); // Try to empty queue idle(cr, a); } } -void full(ConRec* cr, Rdma::AsynchIO&, Rdma::Buffer* buf) { - cr->queuedWrites.push(buf); +void full(ConRec*, Rdma::AsynchIO&, Rdma::Buffer*) { + // Shouldn't ever be called + cout << "!"; } void drained(Rdma::AsynchIO&) { |