diff options
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Event.cpp | 14 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Event.h | 2 | ||||
-rw-r--r-- | cpp/src/tests/Makefile.am | 4 | ||||
-rw-r--r-- | cpp/src/tests/tsxtest.cpp | 628 |
5 files changed, 643 insertions, 8 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 0ac0da2be4..6cad003605 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -261,7 +261,8 @@ void Cluster::deliver( { Mutex::ScopedLock l(lock); MemberId from(nodeid, pid); - deliver(Event::delivered(from, msg, msg_len), l); + framing::Buffer buf(static_cast<char*>(msg), msg_len); + deliver(Event::decode(from, buf), l); } void Cluster::deliver(const Event& e, Lock&) { diff --git a/cpp/src/qpid/cluster/Event.cpp b/cpp/src/qpid/cluster/Event.cpp index 0e55d7bce3..5ac63c86f5 100644 --- a/cpp/src/qpid/cluster/Event.cpp +++ b/cpp/src/qpid/cluster/Event.cpp @@ -32,20 +32,21 @@ namespace cluster { using framing::Buffer; -const size_t Event::OVERHEAD = sizeof(uint8_t) + sizeof(uint64_t) + sizeof(uint32_t); +const size_t Event::OVERHEAD = sizeof(uint8_t) + sizeof(uint64_t) + sizeof(uint32_t) + sizeof(uint32_t); Event::Event(EventType t, const ConnectionId& c, size_t s, uint32_t i) : type(t), connectionId(c), size(s), data(RefCountedBuffer::create(s)), id(i) {} -Event Event::delivered(const MemberId& m, void* d, size_t s) { - Buffer buf(static_cast<char*>(d), s); +Event Event::decode(const MemberId& m, framing::Buffer& buf) { + assert(buf.available() > OVERHEAD); EventType type((EventType)buf.getOctet()); assert(type == DATA || type == CONTROL); ConnectionId connection(m, reinterpret_cast<Connection*>(buf.getLongLong())); uint32_t id = buf.getLong(); - assert(buf.getPosition() == OVERHEAD); - Event e(type, connection, s-OVERHEAD, id); - memcpy(e.getData(), static_cast<char*>(d)+OVERHEAD, s-OVERHEAD); + uint32_t size = buf.getLong(); + Event e(type, connection, size, id); + assert(buf.available() >= size); + memcpy(e.getData(), buf.getPointer() + buf.getPosition(), size); return e; } @@ -63,6 +64,7 @@ bool Event::mcast (Cpg& cpg) const { b.putOctet(type); b.putLongLong(reinterpret_cast<uint64_t>(connectionId.getPointer())); b.putLong(id); + b.putLong(size); assert(b.getPosition() == OVERHEAD); iovec iov[] = { { header, OVERHEAD }, { const_cast<char*>(getData()), getSize() } }; return cpg.mcast(iov, sizeof(iov)/sizeof(*iov)); diff --git a/cpp/src/qpid/cluster/Event.h b/cpp/src/qpid/cluster/Event.h index b61ce0e60d..e046990747 100644 --- a/cpp/src/qpid/cluster/Event.h +++ b/cpp/src/qpid/cluster/Event.h @@ -46,7 +46,7 @@ class Event { Event(EventType t=DATA, const ConnectionId& c=ConnectionId(), size_t size=0, uint32_t id=0); /** Create an event copied from delivered data. */ - static Event delivered(const MemberId& m, void* data, size_t size); + static Event decode(const MemberId& m, framing::Buffer&); /** Create an event containing a control */ static Event control(const framing::AMQBody&, const ConnectionId&, uint32_t id=0); diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index 78aadf0da0..31060d65d2 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -166,6 +166,10 @@ check_PROGRAMS+=sender sender_SOURCES=sender.cpp TestOptions.h ConnectionOptions.h sender_LDADD=$(lib_client) +check_PROGRAMS+=tsxtest +tsxtest_SOURCES=tsxtest.cpp TestOptions.h ConnectionOptions.h +tsxtest_LDADD=$(lib_client) + TESTS_ENVIRONMENT = VALGRIND=$(VALGRIND) srcdir=$(srcdir) QPID_DATA_DIR= BOOST_TEST_SHOW_PROGRESS=yes $(srcdir)/run_test system_tests = client_test quick_perftest quick_topictest run_header_test quick_txtest diff --git a/cpp/src/tests/tsxtest.cpp b/cpp/src/tests/tsxtest.cpp new file mode 100644 index 0000000000..fd41a15ecb --- /dev/null +++ b/cpp/src/tests/tsxtest.cpp @@ -0,0 +1,628 @@ +#include <qpid/Options.h> +#include <qpid/client/Message.h> +#include <qpid/client/Connection.h> +#include <qpid/client/AsyncSession.h> +#include <qpid/client/SubscriptionManager.h> +#include <qpid/client/QueueOptions.h> +#include <qpid/sys/Time.h> +#include <boost/ptr_container/ptr_vector.hpp> +#include <boost/bind.hpp> +#include <algorithm> +#include <limits> +#include <unistd.h> +#include <iostream> +#include <fstream> +#include <netinet/in.h> +#include <netinet/tcp.h> + +using namespace std; +using namespace qpid; +using namespace qpid::client; +using namespace qpid::sys; + +struct Opts : public qpid::Options { + bool help; + bool sms, smr, tc, report, durable, clean; + string host; + int port; + string host2; + int port2; + int rate; + int start_from; + int id; + int symbols; + int sessions; + int messages; + int depth; + int sym_a; + int sym_b; + + // port 5672 + Opts() : help(false), sms(false), smr(false), tc(false), report(false), durable(false), clean(false), + host("127.0.0.1"), port(5672), host2("127.0.0.1"), port2(5672), + rate(10000), start_from(0), id(0), symbols(10), sessions(10), messages(10), depth(10000), sym_a(0), sym_b(9) + { + addOptions() + ("help", optValue(help), "print this help message") + ("sms", optValue(sms), "session sender") + ("smr", optValue(smr), "session receiver") + ("tc", optValue(tc), "trading component") + ("report", optValue(report), "report results when test is complete") + ("host,h", optValue(host, "HOST"), "Broker host to connect to") + ("port,p", optValue(port, "PORT"), "Broker port to connect to") + ("HOST,H", optValue(host2, "HOST2"), + "Broker host2 to connect to (applicable to TC only)") + ("PORT,P", optValue(port2, "PORT2"), + "Broker port2 to connect to (applicable to TC only)") + ("id", optValue(id, "N"), "Id number for this process") + ("start_from", optValue(start_from, "START"), "Client numbers start from N") + ("symbols", optValue(symbols, "N"), "Use N stock symbols") + ("sessions", optValue(sessions, "N"), "Use N sessions (sms)") + ("messages", optValue(messages, "N"), "Send N messages") + ("durable_msg", optValue(durable), "use durable messages") + ("clean", optValue(clean), "purge queues") + ("depth", optValue(depth, "DEPTH"), "queue depth") + ("rate", optValue(rate, "N"), "Send messages at rate of N per second, 0 means fast as possible.") + ("sym_a", optValue(sym_a, "a"), "Start symbols from TSX.a") + ("sym_b", optValue(sym_b, "b"), "Last symbol is TSX.b") + ; + } +}; + +ostream& operator<<(ostream&o, const Opts& opts) { + o << static_cast<const qpid::Options&>(opts) << endl; + return o; +} + +Opts opts; // Global, used by all functions. + +int +getRandomNumber(int low, int high) +{ + struct timeval tv; + gettimeofday(&tv, 0); + + srand48(tv.tv_usec); + double r = drand48(); + + int ret = int( double(high - low + 1) * r) + low; + + return ret; +} + +// Order entry message format. +struct OrderEntry { + int sessionId; + int msgId; + int responses; + int responseId; + int next_sess; + int add_size; + AbsTime time[4]; + + OrderEntry() : sessionId(), msgId(), responses(), responseId(), time() { + add_size = getRandomNumber(200, 500) - sizeof(OrderEntry); + } + void clone(OrderEntry* oe) { + sessionId = oe->sessionId; + msgId = oe->msgId; + responses = oe->responses; + responseId = oe->responseId; + next_sess = oe->next_sess; + add_size = oe->add_size; + time[0] = oe->time[0]; + time[1] = oe->time[1]; + time[2] = oe->time[2]; + time[3] = oe->time[3]; + } +}; +//} __attribute__ ((packed)); + +// Trivial "encoding" just copies struct memory. Assumes all machines +// are same architecture and compiler. +template <class T> Message makeMessage(const T& data, const string& key=string(), const string& /*exch*/=string()) { + Message message(string(reinterpret_cast<const char*>(&data), sizeof(data)) + string(data.add_size, 'A'), key); +// Using default exchange +// message.getDeliveryProperties().setExchange(exch); + if (!opts.durable) { + message.getDeliveryProperties().setDeliveryMode(qpid::framing::TRANSIENT); + } else { + message.getDeliveryProperties().setDeliveryMode(qpid::framing::PERSISTENT); + } + return message; +} + +ostream& printMs(ostream& o, Duration d) { + return o << (d / TIME_MSEC) << "ms"; +} + +// Format a delay as a string. +string delayStr(AbsTime begin, AbsTime end) { + ostringstream os; + printMs(os, Duration(begin, end)); + return os.str(); +} + + string timeStr(AbsTime t) { + ostringstream os; + os << Duration(t); + return os.str(); + } + +ostream& operator << (ostream& o, const OrderEntry& oe) { + return o << "OrderEntry[" + << "session=" << oe.sessionId + << " message=" << oe.msgId + << " responses=" << oe.responses + << " responseId=" << oe.responseId + << " additional size=" << oe.add_size + << " delay[0-1]=" << delayStr(oe.time[0], oe.time[1]) + << " delay[1-2]=" << delayStr(oe.time[2], oe.time[3]) + << " TS[0]=" << timeStr(oe.time[0]) + << " TS[1]=" << timeStr(oe.time[1]) + << " TS[2]=" << timeStr(oe.time[2]) + << " TS[3]=" << timeStr(oe.time[3]) + //<< " body=" << oe.body + << "]"; +} + +const char* RESULTS="results"; + +// Base class for all the clients. +struct Client : public Runnable { + int id; + Connection connection; + AsyncSession amqpSession; + Thread thread; + + Client() : id(opts.id) { + ConnectionSettings cs; + cs.host = opts.host; + cs.port = opts.port; + cs.tcpNoDelay = true; + connection.open(cs); + amqpSession = async(connection.newSession()); + // Declare symbol queues + for (int i = 0; i < opts.symbols; ++i) { + if (opts.clean) + amqpSession.queueDelete(arg::queue=symbolQueue(i)); + QueueOptions options; + // options.setSizePolicy(FLOW_TO_DISK, 0, opts.depth); + amqpSession.queueDeclare(arg::queue=symbolQueue(i), arg::durable=true, arg::arguments=options); + } + // Declare session response queues + //for (int i = 0; i < opts.sessions; ++i) + // amqpSession.queueDeclare(arg::queue=sessionQueue(i), arg::durable=true); + amqpSession.queueDeclare(arg::queue=RESULTS); + } + + ~Client() { + amqpSession.close(); + connection.close(); + join(); + } + + void start() { + thread = Thread(*this); + } + + void join() { + thread.join(); + thread = Thread(); // Avoid double join. + } + + void sleep_until(const AbsTime& t) { + Duration d(now(), t); + if (int64_t(d) > 0) + qpid::sys::usleep(int64_t(d)/TIME_USEC); + } + + // Name of symbol queue n + string symbolQueue(int n) { + ostringstream sym; + sym << "TSX." << n%opts.symbols; + return sym.str(); + } + + // Name of session n + string sessionQueue(int n) { + ostringstream sym; + sym << "SMR." << n; + return sym.str(); + } + +}; + +struct Clean : public Client { + void run() { + if (opts.durable) + cout << "DURABLE messages" << endl; + else + cout << "TRANSIENT messages" << endl; + cout << "Cleaning queues" << endl; + // Purge any old data in the test queues. + // NOTE: the sync() call waits till the command completes. + // Otherwise we might start the tests before the purges were complete. + for (int i = 0; i < opts.symbols; ++i) { + amqpSession.queuePurge(arg::queue=symbolQueue(i)); + amqpSession.sync(); + } +// for (int i = 0; i < opts.sessions; ++i) { +// amqpSession.queuePurge(arg::queue=sessionQueue(i)); +// amqpSession.sync(); +// } + amqpSession.queuePurge(arg::queue=RESULTS); + amqpSession.sync(); + } +}; + +// Base class for subscriber clients +struct SubClient : public Client, public MessageListener { + SubscriptionManager subs; + SubClient() : subs(amqpSession) {} + ~SubClient() { stop(); } + void stop() { subs.stop(); join(); } +}; + + +struct Sms : public Client { + + Sms() { + // Declare session response queues + int ini = opts.start_from; + for (int i = ini; i < (ini+opts.sessions); ++i) { + if (opts.clean) + amqpSession.queueDelete(arg::queue=sessionQueue(i)); + QueueOptions options; + // options.setSizePolicy(FLOW_TO_DISK, 0, opts.depth); + amqpSession.queueDeclare(arg::queue=sessionQueue(i), arg::durable=true, arg::arguments=options); + } + } + void run() { + int my_id = id+opts.start_from; + cout << "SMS " << my_id << endl; + AbsTime start = now(); + int64_t interval = opts.rate ? TIME_SEC/opts.rate : 0; + for (int i = 0; i < opts.messages; ++i) { + OrderEntry oe; + oe.sessionId = my_id; + oe.msgId = i; + if (opts.rate) + sleep_until(AbsTime(start, i*interval)); + int s = opts.sym_a + i%(opts.sym_b - opts.sym_a); + string sym = symbolQueue(s); + oe.responseId = 0; + oe.responses = ((i+1)%3 == 0) ? 2 : 1; // 2 responses for every 3rd message. + oe.next_sess = oe.sessionId+1; + oe.time[0] = now(); + // Send to queue sym via default exchange + amqpSession.messageTransfer(arg::content=makeMessage(oe, sym)); + if (i && i%1000==0) + cout << "SMS " << my_id << " sent " << i << " messages" << endl; + } + cout << "SMS " << my_id << " done" << endl; + } +}; + + +struct Tc : public SubClient { + + Connection connection2; + AsyncSession out_sess; + bool use_other_session; + int msgs; + Tc() : use_other_session(false), msgs(0) { + if (opts.host != opts.host2) { + ConnectionSettings cs2; + cs2.host = opts.host2; + cs2.port = opts.port2; + cs2.tcpNoDelay = true; + connection2.open(cs2); + out_sess = async(connection2.newSession()); + use_other_session = true; + // Declare session response queues + int ini = opts.start_from; + for (int i = ini; i < ini+opts.sessions; ++i) { + if (opts.clean) + out_sess.queueDelete(arg::queue=sessionQueue(i)); + QueueOptions options; + /// options.setSizePolicy(FLOW_TO_DISK, 0, opts.depth); + out_sess.queueDeclare(arg::queue=sessionQueue(i), arg::durable=true, arg::arguments=options); + if(opts.clean) { + out_sess.queuePurge(arg::queue=sessionQueue(i)); + out_sess.sync(); + } + } + } else { + int ini = opts.start_from; + for (int i = ini; i < ini+opts.sessions; ++i) { + if (opts.clean) + amqpSession.queueDelete(arg::queue=sessionQueue(i)); + QueueOptions options; + // options.setSizePolicy(FLOW_TO_DISK, 0, opts.depth); + amqpSession.queueDeclare(arg::queue=sessionQueue(i), arg::durable=true, arg::arguments=options); + if(opts.clean) { + amqpSession.queuePurge(arg::queue=sessionQueue(i)); + amqpSession.sync(); + } + } + //cout << "Warning: source and destination for TC are identical" << endl; + } + } + + ~Tc() { if(use_other_session) connection2.close(); } + + void received(Message& m) { + //assert(m.getData().size() == sizeof(OrderEntry)); + OrderEntry oe(*reinterpret_cast<const OrderEntry*>(m.getData().data())); + oe.time[1] = now(); + for (int i = 0; i < oe.responses; ++i) { + msgs++; + oe.responseId = i+1; + string key = sessionQueue(oe.sessionId); + if (i > 0) { + if (oe.sessionId+i == opts.sessions+opts.start_from) { + key = sessionQueue(opts.start_from); + } else { + key = sessionQueue(oe.sessionId+1); + } + } +//if(id == 0) +//cout << "[" << msgs << "] RESPOND TO " << i << "/" << oe.responses << " " << oe.sessionId << " " << key << endl; + + oe.time[2]=now(); + if(use_other_session) { + out_sess.messageTransfer(arg::content=makeMessage(oe, key)); + } else { + amqpSession.messageTransfer(arg::content=makeMessage(oe, key)); + } + if (msgs && msgs%1000==0) + cout << "TC " << id << " processed" << msgs << endl; + //cout << "Tc " << id << " sent to " << key << endl; + } + } + + void run() { + if ((id >= opts.sym_a) && (id <= opts.sym_b)) { + cout << "TC " << symbolQueue(id) << endl; + //QPID_LOG(info, "TC subscribing to " << symbolQueue(id)); + subs.subscribe(*this, symbolQueue(id)); + subs.run(); + } + } +}; + +// Average results per SMR or overall. +struct SMRStats { + int sessionId; + long messages; // Total received. + AbsTime begin, end; // Total time from first sent to last received. + int64_t delay[3]; // Total latency 0-1, 2-3, 0-3 + int64_t max_d; + int add_size; + + SMRStats() : sessionId(), messages(0), begin(), end(), delay(), add_size(0) + { + delay[0]=0; + delay[1]=0; + max_d=0; + } + + int64_t time() const { return Duration(begin, end); } + + long throughput() const { + int64_t timeMs=time()/TIME_MSEC; + return 1000*messages/timeMs; + } + + int64_t latency(int i) const { return delay[i]/messages; } + + void add(const OrderEntry& oe) { + if (messages == 0) begin = oe.time[0]; + end = oe.time[3]; + int64_t d1 = Duration(oe.time[0], oe.time[1]); + int64_t d2 = Duration(oe.time[2], oe.time[3]); + int64_t d3 = Duration(oe.time[0], oe.time[3]); + delay[0] += d1; + delay[1] += d2; + delay[2] += d3; + if (d3 > max_d) max_d = d3; + messages++; + } +}; + + +ostream& operator<<(ostream& o, const SMRStats& r) { + o << "SMRStats[" + << "session=" << r.sessionId + << " time="; + printMs(o, r.time()); + o << " messages=" << r.messages + << " latency(0)=" << r.latency(0) + << " latency(1)=" << r.latency(1) + << " throughput=" << r.throughput() + << "]"; + return o; +} + +struct Smr : public SubClient { + + SMRStats result; + int expect; // # messages to expect + list<OrderEntry*> msgList; + int my_id; + + Smr() : expect(opts.messages + opts.messages/3) { + // Declare session response queues + int ini = opts.start_from; + for (int i = ini; i < ini+opts.sessions; ++i) { + if (opts.clean) + amqpSession.queueDelete(arg::queue=sessionQueue(i)); + QueueOptions options; + // options.setSizePolicy(FLOW_TO_DISK, 0, opts.depth); + amqpSession.queueDeclare(arg::queue=sessionQueue(i), arg::durable=true, arg::arguments=options); + } + } + + void received(Message& m) { + //assert(m.getData().size() == sizeof(OrderEntry)); + OrderEntry oe(*reinterpret_cast<const OrderEntry*>(m.getData().data())); + oe.time[3] = now(); + //QPID_LOG(info, "SMR " << my_id << " received: " << oe); +//cout << " id: " << oe.msgId << " session id: " << oe.sessionId << endl; +//" received data: " << (m.getData().data() + sizeof(OrderEntry)) << endl; + result.add(oe); + + OrderEntry* new_oe = new OrderEntry(); + new_oe->clone(&oe); + msgList.push_back(new_oe); + if (result.messages && result.messages%1000==0) + cout << "SMR " << my_id << " received " << result.messages << endl; + if (result.messages == expect) { + subs.stop(); + amqpSession.messageTransfer( + arg::content=makeMessage(result, RESULTS)); + cout << result << endl; + + std::stringstream filename; + filename << "ReceivedMessages" << id+opts.start_from << ".dat"; + ofstream* datFileStream_ = new ofstream(filename.str().c_str(), ios::trunc); + for(list<OrderEntry*>::const_iterator i = msgList.begin(); + i != msgList.end(); ++i) + { + (*datFileStream_) << **i << endl; + delete *i; + } + msgList.clear(); + delete datFileStream_; + } + } + + void run() { + my_id = id + opts.start_from; + cout << "SMR " << my_id << endl; + result.sessionId = my_id; + //QPID_LOG(info, "SMR subscribing to " << sessionQueue(my_id)); + subs.subscribe(*this, sessionQueue(my_id)); + subs.run(); + cout << "SMR " << my_id << " done" << endl; + } +}; + +struct Report : public SubClient { + long reports; + long messages; + int64_t time; + int64_t latency[2]; + int64_t max_l; + long throughput; + + int expect; // # reports expected + Report() : reports(0), messages(0), time(0), latency(), throughput(0), + expect(opts.sessions) { + latency[0] = 0; + latency[1] = 0; + latency[2] = 0; + } + + void received(Message& m) { + max_l = 0; + assert(m.getData().size() == sizeof(SMRStats)); + SMRStats r(*reinterpret_cast<const SMRStats*>(m.getData().data())); + //QPID_LOG(info, "Report received: " << r); + reports++; + messages += r.messages; + time += r.time(); + latency[0] += r.latency(0); + latency[1] += r.latency(1); + latency[2] += r.latency(2); + if (r.max_d > max_l) max_l = r.max_d; + throughput += r.throughput(); + + if (--expect == 0) { + subs.stop(); + cout << endl << "Aggregate results: " << endl + << "Sessions: " << opts.sessions << " (from " << opts.start_from << ")" << endl + << "Messages per smr: " << messages/reports << endl + << "Avg time per smr: "; + printMs(cout, time/reports); + cout << endl + << "Avg throughput (msgs/sec) per smr: " << throughput/reports << endl + << "Avg latency per smr (0-1, 2-3): "; + printMs(cout, latency[0]/reports); + cout << ", "; + printMs(cout, latency[1]/reports); + cout << endl + << "Avg latency (0-3): "; + printMs(cout, (latency[2])/reports); + cout << endl + << "Max message latency: "; + printMs(cout, max_l); + cout << endl; + } + } + + void run() { + //QPID_LOG(info, "Reporter subscribing to " << RESULTS); + subs.subscribe(*this, RESULTS); + subs.run(); + } +}; + +template <class C> struct Clients { + boost::ptr_vector<C> clients; + + Clients(bool enabled, int count) { + if (!enabled) return; + for (int i = 0; i < count; ++i) { + C* cc = new C(); + clients.push_back(cc); + clients.back().id = i; + } + } + void run() { + for_each(clients.begin(), clients.end(), mem_fun_ref(&Client::start)); + } + void join() { + for_each(clients.begin(), clients.end(), mem_fun_ref(&Client::join)); + } + void stop() { + for_each(clients.begin(), clients.end(), mem_fun_ref(&SubClient::stop)); + } +}; + +int main(int argc, char** argv) { + try { + opts.parse(argc, argv); + // If no action specified. + if (!opts.sms && !opts.tc && !opts.smr ) { + opts.sms = opts.tc = opts.smr = true; + } + + if (opts.help) + cout << opts; + else { + if (opts.clean) Clean().run(); + Clients<Sms> sms(opts.sms, opts.sessions); + Clients<Tc> tc(opts.tc, opts.symbols); + Clients<Smr> smr(opts.smr, opts.sessions); + + smr.run(); + tc.run(); + sms.run(); + + sms.join(); + smr.join(); + if (opts.sms && opts.smr) + tc.stop(); // Leave TCs running if SMS or SMR are in a different process + else + tc.join(); + if (opts.report) Report().run(); + } + return 0; + } + catch (const exception& e) { + cerr << "Exception: " << e.what() << endl; + return 1; + } +} |