diff options
Diffstat (limited to 'qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp | 247 |
1 files changed, 247 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp b/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp new file mode 100644 index 0000000000..38e9b59541 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp @@ -0,0 +1,247 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/sys/rdma/RdmaIO.h" +#include "qpid/sys/rdma/rdma_exception.h" +#include "qpid/sys/Time.h" +#include "qpid/sys/Thread.h" + +#include <netdb.h> +#include <arpa/inet.h> + +#include <vector> +#include <string> +#include <iostream> +#include <algorithm> +#include <cmath> +#include <boost/bind.hpp> + +using std::vector; +using std::string; +using std::cout; +using std::cerr; +using std::copy; +using std::rand; + +using qpid::sys::Thread; +using qpid::sys::Poller; +using qpid::sys::Dispatcher; +using qpid::sys::SocketAddress; +using qpid::sys::AbsTime; +using qpid::sys::Duration; +using qpid::sys::TIME_SEC; +using qpid::sys::TIME_INFINITE; + +namespace qpid { +namespace tests { + +// count of messages +int64_t smsgs = 0; +int64_t sbytes = 0; +int64_t rmsgs = 0; +int64_t rbytes = 0; + +int target = 1000000; +int msgsize = 200; +AbsTime startTime; +Duration sendingDuration(TIME_INFINITE); +Duration fullTestDuration(TIME_INFINITE); + +// Random generator +// This is an RNG designed by George Marsaglia see http://en.wikipedia.org/wiki/Xorshift +class Xor128Generator { + uint32_t x; + uint32_t y; + uint32_t z; + uint32_t w; + +public: + Xor128Generator() : + x(123456789),y(362436069),z(521288629),w(88675123) + {++(*this);} + + Xor128Generator& operator++() { + uint32_t t = x ^ (x << 11); + x = y; y = z; z = w; + w = w ^ (w >> 19) ^ t ^ (t >> 8); + return *this; + } + + uint32_t operator*() { + return w; + } +}; + +Xor128Generator output; +Xor128Generator input; + +void write(Rdma::AsynchIO& aio) { + while (aio.writable() && smsgs < target) { + Rdma::Buffer* b = aio.getSendBuffer(); + if (!b) break; + b->dataCount(msgsize); + uint32_t* ip = reinterpret_cast<uint32_t*>(b->bytes()); + uint32_t* lip = ip + b->dataCount() / sizeof(uint32_t); + while (ip != lip) {*ip++ = *output; ++output;} + aio.queueWrite(b); + ++smsgs; + sbytes += msgsize; + } +} + +void dataError(Rdma::AsynchIO&) { + cout << "Data error:\n"; +} + +void data(Poller::shared_ptr p, Rdma::AsynchIO& aio, Rdma::Buffer* b) { + ++rmsgs; + rbytes += b->dataCount(); + + // Check message is unaltered + bool match = true; + uint32_t* ip = reinterpret_cast<uint32_t*>(b->bytes()); + uint32_t* lip = ip + b->dataCount() / sizeof(uint32_t); + while (ip != lip) { if (*ip++ != *input) {match = false; break;} ++input;} + if (!match) { + cout << "Data doesn't match: at msg " << rmsgs << " byte " << rbytes-b->dataCount() << " (ish)\n"; + exit(1); + } + + // When all messages have been recvd stop + if (rmsgs < target) { + write(aio); + } else { + fullTestDuration = std::min(fullTestDuration, Duration(startTime, AbsTime::now())); + if (aio.incompletedWrites() == 0) + p->shutdown(); + } +} + +void full(Rdma::AsynchIO& a, Rdma::Buffer* b) { + // Warn as we shouldn't get here anymore + cerr << "!"; + + // Don't need to keep buffer just adjust the counts + --smsgs; + sbytes -= b->dataCount(); + + // Give buffer back + a.returnSendBuffer(b); +} + +void idle(Poller::shared_ptr p, Rdma::AsynchIO& aio) { + if (smsgs < target) { + write(aio); + } else { + sendingDuration = std::min(sendingDuration, Duration(startTime, AbsTime::now())); + if (rmsgs >= target && aio.incompletedWrites() == 0) + p->shutdown(); + } +} + +void drained(Rdma::AsynchIO&) { + cout << "Drained:\n"; +} + +void connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr& ci, const Rdma::ConnectionParams& cp) { + cout << "Connected\n"; + Rdma::QueuePair::intrusive_ptr q = ci->getQueuePair(); + + Rdma::AsynchIO* aio = new Rdma::AsynchIO(ci->getQueuePair(), + cp.rdmaProtocolVersion, + cp.maxRecvBufferSize, cp.initialXmitCredit , Rdma::DEFAULT_WR_ENTRIES, + boost::bind(&data, poller, _1, _2), + boost::bind(&idle, poller, _1), + &full, + dataError); + + startTime = AbsTime::now(); + write(*aio); + + aio->start(poller); +} + +void disconnected(boost::shared_ptr<Poller> p, Rdma::Connection::intrusive_ptr&) { + cout << "Disconnected\n"; + p->shutdown(); +} + +void connectionError(boost::shared_ptr<Poller> p, Rdma::Connection::intrusive_ptr&, const Rdma::ErrorType) { + cout << "Connection error\n"; + p->shutdown(); +} + +void rejected(boost::shared_ptr<Poller> p, Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams&) { + cout << "Connection rejected\n"; + p->shutdown(); +} + +}} // namespace qpid::tests + +using namespace qpid::tests; + +int main(int argc, char* argv[]) { + vector<string> args(&argv[0], &argv[argc]); + + string host = args[1]; + string port = (args.size() < 3) ? "20079" : args[2]; + + if (args.size() > 3) + msgsize = atoi(args[3].c_str()); + cout << "Message size: " << msgsize << "\n"; + + try { + boost::shared_ptr<Poller> p(new Poller()); + + Rdma::Connector c( + Rdma::ConnectionParams(msgsize, Rdma::DEFAULT_WR_ENTRIES), + boost::bind(&connected, p, _1, _2), + boost::bind(&connectionError, p, _1, _2), + boost::bind(&disconnected, p, _1), + boost::bind(&rejected, p, _1, _2)); + + SocketAddress sa(host, port); + cout << "Connecting to: " << sa.asString() <<"\n"; + c.start(p, sa); + + // The poller loop blocks all signals so run in its own thread + Thread t(*p); + t.join(); + } catch (Rdma::Exception& e) { + int err = e.getError(); + cerr << "Error: " << e.what() << "(" << err << ")\n"; + } + + cout + << "Sent: " << smsgs + << "msgs (" << sbytes + << "bytes) in: " << double(sendingDuration)/TIME_SEC + << "s: " << double(smsgs)*TIME_SEC/sendingDuration + << "msgs/s(" << double(sbytes)*TIME_SEC/sendingDuration + << "bytes/s)\n"; + cout + << "Recd: " << rmsgs + << "msgs (" << rbytes + << "bytes) in: " << double(fullTestDuration)/TIME_SEC + << "s: " << double(rmsgs)*TIME_SEC/fullTestDuration + << "msgs/s(" << double(rbytes)*TIME_SEC/fullTestDuration + << "bytes/s)\n"; + +} |