summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp')
-rw-r--r--qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp247
1 files changed, 247 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp b/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp
new file mode 100644
index 0000000000..38e9b59541
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp
@@ -0,0 +1,247 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/sys/rdma/RdmaIO.h"
+#include "qpid/sys/rdma/rdma_exception.h"
+#include "qpid/sys/Time.h"
+#include "qpid/sys/Thread.h"
+
+#include <netdb.h>
+#include <arpa/inet.h>
+
+#include <vector>
+#include <string>
+#include <iostream>
+#include <algorithm>
+#include <cmath>
+#include <boost/bind.hpp>
+
+using std::vector;
+using std::string;
+using std::cout;
+using std::cerr;
+using std::copy;
+using std::rand;
+
+using qpid::sys::Thread;
+using qpid::sys::Poller;
+using qpid::sys::Dispatcher;
+using qpid::sys::SocketAddress;
+using qpid::sys::AbsTime;
+using qpid::sys::Duration;
+using qpid::sys::TIME_SEC;
+using qpid::sys::TIME_INFINITE;
+
+namespace qpid {
+namespace tests {
+
+// count of messages
+int64_t smsgs = 0;
+int64_t sbytes = 0;
+int64_t rmsgs = 0;
+int64_t rbytes = 0;
+
+int target = 1000000;
+int msgsize = 200;
+AbsTime startTime;
+Duration sendingDuration(TIME_INFINITE);
+Duration fullTestDuration(TIME_INFINITE);
+
+// Random generator
+// This is an RNG designed by George Marsaglia see http://en.wikipedia.org/wiki/Xorshift
+class Xor128Generator {
+ uint32_t x;
+ uint32_t y;
+ uint32_t z;
+ uint32_t w;
+
+public:
+ Xor128Generator() :
+ x(123456789),y(362436069),z(521288629),w(88675123)
+ {++(*this);}
+
+ Xor128Generator& operator++() {
+ uint32_t t = x ^ (x << 11);
+ x = y; y = z; z = w;
+ w = w ^ (w >> 19) ^ t ^ (t >> 8);
+ return *this;
+ }
+
+ uint32_t operator*() {
+ return w;
+ }
+};
+
+Xor128Generator output;
+Xor128Generator input;
+
+void write(Rdma::AsynchIO& aio) {
+ while (aio.writable() && smsgs < target) {
+ Rdma::Buffer* b = aio.getSendBuffer();
+ if (!b) break;
+ b->dataCount(msgsize);
+ uint32_t* ip = reinterpret_cast<uint32_t*>(b->bytes());
+ uint32_t* lip = ip + b->dataCount() / sizeof(uint32_t);
+ while (ip != lip) {*ip++ = *output; ++output;}
+ aio.queueWrite(b);
+ ++smsgs;
+ sbytes += msgsize;
+ }
+}
+
+void dataError(Rdma::AsynchIO&) {
+ cout << "Data error:\n";
+}
+
+void data(Poller::shared_ptr p, Rdma::AsynchIO& aio, Rdma::Buffer* b) {
+ ++rmsgs;
+ rbytes += b->dataCount();
+
+ // Check message is unaltered
+ bool match = true;
+ uint32_t* ip = reinterpret_cast<uint32_t*>(b->bytes());
+ uint32_t* lip = ip + b->dataCount() / sizeof(uint32_t);
+ while (ip != lip) { if (*ip++ != *input) {match = false; break;} ++input;}
+ if (!match) {
+ cout << "Data doesn't match: at msg " << rmsgs << " byte " << rbytes-b->dataCount() << " (ish)\n";
+ exit(1);
+ }
+
+ // When all messages have been recvd stop
+ if (rmsgs < target) {
+ write(aio);
+ } else {
+ fullTestDuration = std::min(fullTestDuration, Duration(startTime, AbsTime::now()));
+ if (aio.incompletedWrites() == 0)
+ p->shutdown();
+ }
+}
+
+void full(Rdma::AsynchIO& a, Rdma::Buffer* b) {
+ // Warn as we shouldn't get here anymore
+ cerr << "!";
+
+ // Don't need to keep buffer just adjust the counts
+ --smsgs;
+ sbytes -= b->dataCount();
+
+ // Give buffer back
+ a.returnSendBuffer(b);
+}
+
+void idle(Poller::shared_ptr p, Rdma::AsynchIO& aio) {
+ if (smsgs < target) {
+ write(aio);
+ } else {
+ sendingDuration = std::min(sendingDuration, Duration(startTime, AbsTime::now()));
+ if (rmsgs >= target && aio.incompletedWrites() == 0)
+ p->shutdown();
+ }
+}
+
+void drained(Rdma::AsynchIO&) {
+ cout << "Drained:\n";
+}
+
+void connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr& ci, const Rdma::ConnectionParams& cp) {
+ cout << "Connected\n";
+ Rdma::QueuePair::intrusive_ptr q = ci->getQueuePair();
+
+ Rdma::AsynchIO* aio = new Rdma::AsynchIO(ci->getQueuePair(),
+ cp.rdmaProtocolVersion,
+ cp.maxRecvBufferSize, cp.initialXmitCredit , Rdma::DEFAULT_WR_ENTRIES,
+ boost::bind(&data, poller, _1, _2),
+ boost::bind(&idle, poller, _1),
+ &full,
+ dataError);
+
+ startTime = AbsTime::now();
+ write(*aio);
+
+ aio->start(poller);
+}
+
+void disconnected(boost::shared_ptr<Poller> p, Rdma::Connection::intrusive_ptr&) {
+ cout << "Disconnected\n";
+ p->shutdown();
+}
+
+void connectionError(boost::shared_ptr<Poller> p, Rdma::Connection::intrusive_ptr&, const Rdma::ErrorType) {
+ cout << "Connection error\n";
+ p->shutdown();
+}
+
+void rejected(boost::shared_ptr<Poller> p, Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams&) {
+ cout << "Connection rejected\n";
+ p->shutdown();
+}
+
+}} // namespace qpid::tests
+
+using namespace qpid::tests;
+
+int main(int argc, char* argv[]) {
+ vector<string> args(&argv[0], &argv[argc]);
+
+ string host = args[1];
+ string port = (args.size() < 3) ? "20079" : args[2];
+
+ if (args.size() > 3)
+ msgsize = atoi(args[3].c_str());
+ cout << "Message size: " << msgsize << "\n";
+
+ try {
+ boost::shared_ptr<Poller> p(new Poller());
+
+ Rdma::Connector c(
+ Rdma::ConnectionParams(msgsize, Rdma::DEFAULT_WR_ENTRIES),
+ boost::bind(&connected, p, _1, _2),
+ boost::bind(&connectionError, p, _1, _2),
+ boost::bind(&disconnected, p, _1),
+ boost::bind(&rejected, p, _1, _2));
+
+ SocketAddress sa(host, port);
+ cout << "Connecting to: " << sa.asString() <<"\n";
+ c.start(p, sa);
+
+ // The poller loop blocks all signals so run in its own thread
+ Thread t(*p);
+ t.join();
+ } catch (Rdma::Exception& e) {
+ int err = e.getError();
+ cerr << "Error: " << e.what() << "(" << err << ")\n";
+ }
+
+ cout
+ << "Sent: " << smsgs
+ << "msgs (" << sbytes
+ << "bytes) in: " << double(sendingDuration)/TIME_SEC
+ << "s: " << double(smsgs)*TIME_SEC/sendingDuration
+ << "msgs/s(" << double(sbytes)*TIME_SEC/sendingDuration
+ << "bytes/s)\n";
+ cout
+ << "Recd: " << rmsgs
+ << "msgs (" << rbytes
+ << "bytes) in: " << double(fullTestDuration)/TIME_SEC
+ << "s: " << double(rmsgs)*TIME_SEC/fullTestDuration
+ << "msgs/s(" << double(rbytes)*TIME_SEC/fullTestDuration
+ << "bytes/s)\n";
+
+}