summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/sys/rdma/RdmaServer.cpp46
1 files changed, 35 insertions, 11 deletions
diff --git a/cpp/src/qpid/sys/rdma/RdmaServer.cpp b/cpp/src/qpid/sys/rdma/RdmaServer.cpp
index 97715326d5..564fd62730 100644
--- a/cpp/src/qpid/sys/rdma/RdmaServer.cpp
+++ b/cpp/src/qpid/sys/rdma/RdmaServer.cpp
@@ -44,10 +44,28 @@ using qpid::sys::Dispatcher;
namespace qpid {
namespace tests {
+struct Buffer {
+ char* bytes() const {return bytes_;}
+ int32_t byteCount() const {return size;}
+
+ Buffer(const int32_t s):
+ bytes_(new char[s]),
+ size(s)
+ {
+ }
+
+ ~Buffer() {
+ delete [] bytes_;
+ }
+private:
+ char* bytes_;
+ int32_t size;
+};
+
struct ConRec {
Rdma::Connection::intrusive_ptr connection;
Rdma::AsynchIO* data;
- queue<Rdma::Buffer*> queuedWrites;
+ queue<Buffer*> queuedWrites;
ConRec(Rdma::Connection::intrusive_ptr c) :
connection(c)
@@ -60,30 +78,36 @@ void dataError(Rdma::AsynchIO&) {
void idle(ConRec* cr, Rdma::AsynchIO& a) {
// Need to make sure full is not called as it would reorder messages
- while (!cr->queuedWrites.empty() && a.writable()) {
- Rdma::Buffer* buf = cr->queuedWrites.front();
+ while (!cr->queuedWrites.empty() && a.writable() && a.bufferAvailable()) {
+ Buffer* buf = cr->queuedWrites.front();
cr->queuedWrites.pop();
- a.queueWrite(buf);
+ Rdma::Buffer* rbuf = a.getBuffer();
+ std::copy(buf->bytes(), buf->bytes()+buf->byteCount(), rbuf->bytes());
+ rbuf->dataCount(buf->byteCount());
+ delete buf;
+ a.queueWrite(rbuf);
}
}
void data(ConRec* cr, Rdma::AsynchIO& a, Rdma::Buffer* b) {
// Echo data back
- Rdma::Buffer* buf = a.getBuffer();
- std::copy(b->bytes(), b->bytes()+b->dataCount(), buf->bytes());
- buf->dataCount(b->dataCount());
- if (cr->queuedWrites.empty()) {
- // If can't write then full will be called and push buffer on back of queue
+ if (cr->queuedWrites.empty() && a.writable() && a.bufferAvailable()) {
+ Rdma::Buffer* buf = a.getBuffer();
+ std::copy(b->bytes(), b->bytes()+b->dataCount(), buf->bytes());
+ buf->dataCount(b->dataCount());
a.queueWrite(buf);
} else {
+ Buffer* buf = new Buffer(b->dataCount());
+ std::copy(b->bytes(), b->bytes()+b->dataCount(), buf->bytes());
cr->queuedWrites.push(buf);
// Try to empty queue
idle(cr, a);
}
}
-void full(ConRec* cr, Rdma::AsynchIO&, Rdma::Buffer* buf) {
- cr->queuedWrites.push(buf);
+void full(ConRec*, Rdma::AsynchIO&, Rdma::Buffer*) {
+ // Shouldn't ever be called
+ cout << "!";
}
void drained(Rdma::AsynchIO&) {