summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorCarl C. Trieloff <cctrieloff@apache.org>2008-12-03 03:26:25 +0000
committerCarl C. Trieloff <cctrieloff@apache.org>2008-12-03 03:26:25 +0000
commit9b9589493ab1d5780b290885395e37ab86dde9c2 (patch)
tree2294060428c5293111b3b88ae40b95a1d4aba49f /cpp/src
parentf9cccf7b8f346c415c5f9b7d52127ef67797b02d (diff)
downloadqpid-python-9b9589493ab1d5780b290885395e37ab86dde9c2.tar.gz
clean up
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@722737 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/tests/Makefile.am3
-rw-r--r--cpp/src/tests/tsxtest.cpp628
2 files changed, 0 insertions, 631 deletions
diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am
index 31060d65d2..08b72fc040 100644
--- a/cpp/src/tests/Makefile.am
+++ b/cpp/src/tests/Makefile.am
@@ -166,9 +166,6 @@ check_PROGRAMS+=sender
sender_SOURCES=sender.cpp TestOptions.h ConnectionOptions.h
sender_LDADD=$(lib_client)
-check_PROGRAMS+=tsxtest
-tsxtest_SOURCES=tsxtest.cpp TestOptions.h ConnectionOptions.h
-tsxtest_LDADD=$(lib_client)
TESTS_ENVIRONMENT = VALGRIND=$(VALGRIND) srcdir=$(srcdir) QPID_DATA_DIR= BOOST_TEST_SHOW_PROGRESS=yes $(srcdir)/run_test
diff --git a/cpp/src/tests/tsxtest.cpp b/cpp/src/tests/tsxtest.cpp
deleted file mode 100644
index fd41a15ecb..0000000000
--- a/cpp/src/tests/tsxtest.cpp
+++ /dev/null
@@ -1,628 +0,0 @@
-#include <qpid/Options.h>
-#include <qpid/client/Message.h>
-#include <qpid/client/Connection.h>
-#include <qpid/client/AsyncSession.h>
-#include <qpid/client/SubscriptionManager.h>
-#include <qpid/client/QueueOptions.h>
-#include <qpid/sys/Time.h>
-#include <boost/ptr_container/ptr_vector.hpp>
-#include <boost/bind.hpp>
-#include <algorithm>
-#include <limits>
-#include <unistd.h>
-#include <iostream>
-#include <fstream>
-#include <netinet/in.h>
-#include <netinet/tcp.h>
-
-using namespace std;
-using namespace qpid;
-using namespace qpid::client;
-using namespace qpid::sys;
-
-struct Opts : public qpid::Options {
- bool help;
- bool sms, smr, tc, report, durable, clean;
- string host;
- int port;
- string host2;
- int port2;
- int rate;
- int start_from;
- int id;
- int symbols;
- int sessions;
- int messages;
- int depth;
- int sym_a;
- int sym_b;
-
- // port 5672
- Opts() : help(false), sms(false), smr(false), tc(false), report(false), durable(false), clean(false),
- host("127.0.0.1"), port(5672), host2("127.0.0.1"), port2(5672),
- rate(10000), start_from(0), id(0), symbols(10), sessions(10), messages(10), depth(10000), sym_a(0), sym_b(9)
- {
- addOptions()
- ("help", optValue(help), "print this help message")
- ("sms", optValue(sms), "session sender")
- ("smr", optValue(smr), "session receiver")
- ("tc", optValue(tc), "trading component")
- ("report", optValue(report), "report results when test is complete")
- ("host,h", optValue(host, "HOST"), "Broker host to connect to")
- ("port,p", optValue(port, "PORT"), "Broker port to connect to")
- ("HOST,H", optValue(host2, "HOST2"),
- "Broker host2 to connect to (applicable to TC only)")
- ("PORT,P", optValue(port2, "PORT2"),
- "Broker port2 to connect to (applicable to TC only)")
- ("id", optValue(id, "N"), "Id number for this process")
- ("start_from", optValue(start_from, "START"), "Client numbers start from N")
- ("symbols", optValue(symbols, "N"), "Use N stock symbols")
- ("sessions", optValue(sessions, "N"), "Use N sessions (sms)")
- ("messages", optValue(messages, "N"), "Send N messages")
- ("durable_msg", optValue(durable), "use durable messages")
- ("clean", optValue(clean), "purge queues")
- ("depth", optValue(depth, "DEPTH"), "queue depth")
- ("rate", optValue(rate, "N"), "Send messages at rate of N per second, 0 means fast as possible.")
- ("sym_a", optValue(sym_a, "a"), "Start symbols from TSX.a")
- ("sym_b", optValue(sym_b, "b"), "Last symbol is TSX.b")
- ;
- }
-};
-
-ostream& operator<<(ostream&o, const Opts& opts) {
- o << static_cast<const qpid::Options&>(opts) << endl;
- return o;
-}
-
-Opts opts; // Global, used by all functions.
-
-int
-getRandomNumber(int low, int high)
-{
- struct timeval tv;
- gettimeofday(&tv, 0);
-
- srand48(tv.tv_usec);
- double r = drand48();
-
- int ret = int( double(high - low + 1) * r) + low;
-
- return ret;
-}
-
-// Order entry message format.
-struct OrderEntry {
- int sessionId;
- int msgId;
- int responses;
- int responseId;
- int next_sess;
- int add_size;
- AbsTime time[4];
-
- OrderEntry() : sessionId(), msgId(), responses(), responseId(), time() {
- add_size = getRandomNumber(200, 500) - sizeof(OrderEntry);
- }
- void clone(OrderEntry* oe) {
- sessionId = oe->sessionId;
- msgId = oe->msgId;
- responses = oe->responses;
- responseId = oe->responseId;
- next_sess = oe->next_sess;
- add_size = oe->add_size;
- time[0] = oe->time[0];
- time[1] = oe->time[1];
- time[2] = oe->time[2];
- time[3] = oe->time[3];
- }
-};
-//} __attribute__ ((packed));
-
-// Trivial "encoding" just copies struct memory. Assumes all machines
-// are same architecture and compiler.
-template <class T> Message makeMessage(const T& data, const string& key=string(), const string& /*exch*/=string()) {
- Message message(string(reinterpret_cast<const char*>(&data), sizeof(data)) + string(data.add_size, 'A'), key);
-// Using default exchange
-// message.getDeliveryProperties().setExchange(exch);
- if (!opts.durable) {
- message.getDeliveryProperties().setDeliveryMode(qpid::framing::TRANSIENT);
- } else {
- message.getDeliveryProperties().setDeliveryMode(qpid::framing::PERSISTENT);
- }
- return message;
-}
-
-ostream& printMs(ostream& o, Duration d) {
- return o << (d / TIME_MSEC) << "ms";
-}
-
-// Format a delay as a string.
-string delayStr(AbsTime begin, AbsTime end) {
- ostringstream os;
- printMs(os, Duration(begin, end));
- return os.str();
-}
-
- string timeStr(AbsTime t) {
- ostringstream os;
- os << Duration(t);
- return os.str();
- }
-
-ostream& operator << (ostream& o, const OrderEntry& oe) {
- return o << "OrderEntry["
- << "session=" << oe.sessionId
- << " message=" << oe.msgId
- << " responses=" << oe.responses
- << " responseId=" << oe.responseId
- << " additional size=" << oe.add_size
- << " delay[0-1]=" << delayStr(oe.time[0], oe.time[1])
- << " delay[1-2]=" << delayStr(oe.time[2], oe.time[3])
- << " TS[0]=" << timeStr(oe.time[0])
- << " TS[1]=" << timeStr(oe.time[1])
- << " TS[2]=" << timeStr(oe.time[2])
- << " TS[3]=" << timeStr(oe.time[3])
- //<< " body=" << oe.body
- << "]";
-}
-
-const char* RESULTS="results";
-
-// Base class for all the clients.
-struct Client : public Runnable {
- int id;
- Connection connection;
- AsyncSession amqpSession;
- Thread thread;
-
- Client() : id(opts.id) {
- ConnectionSettings cs;
- cs.host = opts.host;
- cs.port = opts.port;
- cs.tcpNoDelay = true;
- connection.open(cs);
- amqpSession = async(connection.newSession());
- // Declare symbol queues
- for (int i = 0; i < opts.symbols; ++i) {
- if (opts.clean)
- amqpSession.queueDelete(arg::queue=symbolQueue(i));
- QueueOptions options;
- // options.setSizePolicy(FLOW_TO_DISK, 0, opts.depth);
- amqpSession.queueDeclare(arg::queue=symbolQueue(i), arg::durable=true, arg::arguments=options);
- }
- // Declare session response queues
- //for (int i = 0; i < opts.sessions; ++i)
- // amqpSession.queueDeclare(arg::queue=sessionQueue(i), arg::durable=true);
- amqpSession.queueDeclare(arg::queue=RESULTS);
- }
-
- ~Client() {
- amqpSession.close();
- connection.close();
- join();
- }
-
- void start() {
- thread = Thread(*this);
- }
-
- void join() {
- thread.join();
- thread = Thread(); // Avoid double join.
- }
-
- void sleep_until(const AbsTime& t) {
- Duration d(now(), t);
- if (int64_t(d) > 0)
- qpid::sys::usleep(int64_t(d)/TIME_USEC);
- }
-
- // Name of symbol queue n
- string symbolQueue(int n) {
- ostringstream sym;
- sym << "TSX." << n%opts.symbols;
- return sym.str();
- }
-
- // Name of session n
- string sessionQueue(int n) {
- ostringstream sym;
- sym << "SMR." << n;
- return sym.str();
- }
-
-};
-
-struct Clean : public Client {
- void run() {
- if (opts.durable)
- cout << "DURABLE messages" << endl;
- else
- cout << "TRANSIENT messages" << endl;
- cout << "Cleaning queues" << endl;
- // Purge any old data in the test queues.
- // NOTE: the sync() call waits till the command completes.
- // Otherwise we might start the tests before the purges were complete.
- for (int i = 0; i < opts.symbols; ++i) {
- amqpSession.queuePurge(arg::queue=symbolQueue(i));
- amqpSession.sync();
- }
-// for (int i = 0; i < opts.sessions; ++i) {
-// amqpSession.queuePurge(arg::queue=sessionQueue(i));
-// amqpSession.sync();
-// }
- amqpSession.queuePurge(arg::queue=RESULTS);
- amqpSession.sync();
- }
-};
-
-// Base class for subscriber clients
-struct SubClient : public Client, public MessageListener {
- SubscriptionManager subs;
- SubClient() : subs(amqpSession) {}
- ~SubClient() { stop(); }
- void stop() { subs.stop(); join(); }
-};
-
-
-struct Sms : public Client {
-
- Sms() {
- // Declare session response queues
- int ini = opts.start_from;
- for (int i = ini; i < (ini+opts.sessions); ++i) {
- if (opts.clean)
- amqpSession.queueDelete(arg::queue=sessionQueue(i));
- QueueOptions options;
- // options.setSizePolicy(FLOW_TO_DISK, 0, opts.depth);
- amqpSession.queueDeclare(arg::queue=sessionQueue(i), arg::durable=true, arg::arguments=options);
- }
- }
- void run() {
- int my_id = id+opts.start_from;
- cout << "SMS " << my_id << endl;
- AbsTime start = now();
- int64_t interval = opts.rate ? TIME_SEC/opts.rate : 0;
- for (int i = 0; i < opts.messages; ++i) {
- OrderEntry oe;
- oe.sessionId = my_id;
- oe.msgId = i;
- if (opts.rate)
- sleep_until(AbsTime(start, i*interval));
- int s = opts.sym_a + i%(opts.sym_b - opts.sym_a);
- string sym = symbolQueue(s);
- oe.responseId = 0;
- oe.responses = ((i+1)%3 == 0) ? 2 : 1; // 2 responses for every 3rd message.
- oe.next_sess = oe.sessionId+1;
- oe.time[0] = now();
- // Send to queue sym via default exchange
- amqpSession.messageTransfer(arg::content=makeMessage(oe, sym));
- if (i && i%1000==0)
- cout << "SMS " << my_id << " sent " << i << " messages" << endl;
- }
- cout << "SMS " << my_id << " done" << endl;
- }
-};
-
-
-struct Tc : public SubClient {
-
- Connection connection2;
- AsyncSession out_sess;
- bool use_other_session;
- int msgs;
- Tc() : use_other_session(false), msgs(0) {
- if (opts.host != opts.host2) {
- ConnectionSettings cs2;
- cs2.host = opts.host2;
- cs2.port = opts.port2;
- cs2.tcpNoDelay = true;
- connection2.open(cs2);
- out_sess = async(connection2.newSession());
- use_other_session = true;
- // Declare session response queues
- int ini = opts.start_from;
- for (int i = ini; i < ini+opts.sessions; ++i) {
- if (opts.clean)
- out_sess.queueDelete(arg::queue=sessionQueue(i));
- QueueOptions options;
- /// options.setSizePolicy(FLOW_TO_DISK, 0, opts.depth);
- out_sess.queueDeclare(arg::queue=sessionQueue(i), arg::durable=true, arg::arguments=options);
- if(opts.clean) {
- out_sess.queuePurge(arg::queue=sessionQueue(i));
- out_sess.sync();
- }
- }
- } else {
- int ini = opts.start_from;
- for (int i = ini; i < ini+opts.sessions; ++i) {
- if (opts.clean)
- amqpSession.queueDelete(arg::queue=sessionQueue(i));
- QueueOptions options;
- // options.setSizePolicy(FLOW_TO_DISK, 0, opts.depth);
- amqpSession.queueDeclare(arg::queue=sessionQueue(i), arg::durable=true, arg::arguments=options);
- if(opts.clean) {
- amqpSession.queuePurge(arg::queue=sessionQueue(i));
- amqpSession.sync();
- }
- }
- //cout << "Warning: source and destination for TC are identical" << endl;
- }
- }
-
- ~Tc() { if(use_other_session) connection2.close(); }
-
- void received(Message& m) {
- //assert(m.getData().size() == sizeof(OrderEntry));
- OrderEntry oe(*reinterpret_cast<const OrderEntry*>(m.getData().data()));
- oe.time[1] = now();
- for (int i = 0; i < oe.responses; ++i) {
- msgs++;
- oe.responseId = i+1;
- string key = sessionQueue(oe.sessionId);
- if (i > 0) {
- if (oe.sessionId+i == opts.sessions+opts.start_from) {
- key = sessionQueue(opts.start_from);
- } else {
- key = sessionQueue(oe.sessionId+1);
- }
- }
-//if(id == 0)
-//cout << "[" << msgs << "] RESPOND TO " << i << "/" << oe.responses << " " << oe.sessionId << " " << key << endl;
-
- oe.time[2]=now();
- if(use_other_session) {
- out_sess.messageTransfer(arg::content=makeMessage(oe, key));
- } else {
- amqpSession.messageTransfer(arg::content=makeMessage(oe, key));
- }
- if (msgs && msgs%1000==0)
- cout << "TC " << id << " processed" << msgs << endl;
- //cout << "Tc " << id << " sent to " << key << endl;
- }
- }
-
- void run() {
- if ((id >= opts.sym_a) && (id <= opts.sym_b)) {
- cout << "TC " << symbolQueue(id) << endl;
- //QPID_LOG(info, "TC subscribing to " << symbolQueue(id));
- subs.subscribe(*this, symbolQueue(id));
- subs.run();
- }
- }
-};
-
-// Average results per SMR or overall.
-struct SMRStats {
- int sessionId;
- long messages; // Total received.
- AbsTime begin, end; // Total time from first sent to last received.
- int64_t delay[3]; // Total latency 0-1, 2-3, 0-3
- int64_t max_d;
- int add_size;
-
- SMRStats() : sessionId(), messages(0), begin(), end(), delay(), add_size(0)
- {
- delay[0]=0;
- delay[1]=0;
- max_d=0;
- }
-
- int64_t time() const { return Duration(begin, end); }
-
- long throughput() const {
- int64_t timeMs=time()/TIME_MSEC;
- return 1000*messages/timeMs;
- }
-
- int64_t latency(int i) const { return delay[i]/messages; }
-
- void add(const OrderEntry& oe) {
- if (messages == 0) begin = oe.time[0];
- end = oe.time[3];
- int64_t d1 = Duration(oe.time[0], oe.time[1]);
- int64_t d2 = Duration(oe.time[2], oe.time[3]);
- int64_t d3 = Duration(oe.time[0], oe.time[3]);
- delay[0] += d1;
- delay[1] += d2;
- delay[2] += d3;
- if (d3 > max_d) max_d = d3;
- messages++;
- }
-};
-
-
-ostream& operator<<(ostream& o, const SMRStats& r) {
- o << "SMRStats["
- << "session=" << r.sessionId
- << " time=";
- printMs(o, r.time());
- o << " messages=" << r.messages
- << " latency(0)=" << r.latency(0)
- << " latency(1)=" << r.latency(1)
- << " throughput=" << r.throughput()
- << "]";
- return o;
-}
-
-struct Smr : public SubClient {
-
- SMRStats result;
- int expect; // # messages to expect
- list<OrderEntry*> msgList;
- int my_id;
-
- Smr() : expect(opts.messages + opts.messages/3) {
- // Declare session response queues
- int ini = opts.start_from;
- for (int i = ini; i < ini+opts.sessions; ++i) {
- if (opts.clean)
- amqpSession.queueDelete(arg::queue=sessionQueue(i));
- QueueOptions options;
- // options.setSizePolicy(FLOW_TO_DISK, 0, opts.depth);
- amqpSession.queueDeclare(arg::queue=sessionQueue(i), arg::durable=true, arg::arguments=options);
- }
- }
-
- void received(Message& m) {
- //assert(m.getData().size() == sizeof(OrderEntry));
- OrderEntry oe(*reinterpret_cast<const OrderEntry*>(m.getData().data()));
- oe.time[3] = now();
- //QPID_LOG(info, "SMR " << my_id << " received: " << oe);
-//cout << " id: " << oe.msgId << " session id: " << oe.sessionId << endl;
-//" received data: " << (m.getData().data() + sizeof(OrderEntry)) << endl;
- result.add(oe);
-
- OrderEntry* new_oe = new OrderEntry();
- new_oe->clone(&oe);
- msgList.push_back(new_oe);
- if (result.messages && result.messages%1000==0)
- cout << "SMR " << my_id << " received " << result.messages << endl;
- if (result.messages == expect) {
- subs.stop();
- amqpSession.messageTransfer(
- arg::content=makeMessage(result, RESULTS));
- cout << result << endl;
-
- std::stringstream filename;
- filename << "ReceivedMessages" << id+opts.start_from << ".dat";
- ofstream* datFileStream_ = new ofstream(filename.str().c_str(), ios::trunc);
- for(list<OrderEntry*>::const_iterator i = msgList.begin();
- i != msgList.end(); ++i)
- {
- (*datFileStream_) << **i << endl;
- delete *i;
- }
- msgList.clear();
- delete datFileStream_;
- }
- }
-
- void run() {
- my_id = id + opts.start_from;
- cout << "SMR " << my_id << endl;
- result.sessionId = my_id;
- //QPID_LOG(info, "SMR subscribing to " << sessionQueue(my_id));
- subs.subscribe(*this, sessionQueue(my_id));
- subs.run();
- cout << "SMR " << my_id << " done" << endl;
- }
-};
-
-struct Report : public SubClient {
- long reports;
- long messages;
- int64_t time;
- int64_t latency[2];
- int64_t max_l;
- long throughput;
-
- int expect; // # reports expected
- Report() : reports(0), messages(0), time(0), latency(), throughput(0),
- expect(opts.sessions) {
- latency[0] = 0;
- latency[1] = 0;
- latency[2] = 0;
- }
-
- void received(Message& m) {
- max_l = 0;
- assert(m.getData().size() == sizeof(SMRStats));
- SMRStats r(*reinterpret_cast<const SMRStats*>(m.getData().data()));
- //QPID_LOG(info, "Report received: " << r);
- reports++;
- messages += r.messages;
- time += r.time();
- latency[0] += r.latency(0);
- latency[1] += r.latency(1);
- latency[2] += r.latency(2);
- if (r.max_d > max_l) max_l = r.max_d;
- throughput += r.throughput();
-
- if (--expect == 0) {
- subs.stop();
- cout << endl << "Aggregate results: " << endl
- << "Sessions: " << opts.sessions << " (from " << opts.start_from << ")" << endl
- << "Messages per smr: " << messages/reports << endl
- << "Avg time per smr: ";
- printMs(cout, time/reports);
- cout << endl
- << "Avg throughput (msgs/sec) per smr: " << throughput/reports << endl
- << "Avg latency per smr (0-1, 2-3): ";
- printMs(cout, latency[0]/reports);
- cout << ", ";
- printMs(cout, latency[1]/reports);
- cout << endl
- << "Avg latency (0-3): ";
- printMs(cout, (latency[2])/reports);
- cout << endl
- << "Max message latency: ";
- printMs(cout, max_l);
- cout << endl;
- }
- }
-
- void run() {
- //QPID_LOG(info, "Reporter subscribing to " << RESULTS);
- subs.subscribe(*this, RESULTS);
- subs.run();
- }
-};
-
-template <class C> struct Clients {
- boost::ptr_vector<C> clients;
-
- Clients(bool enabled, int count) {
- if (!enabled) return;
- for (int i = 0; i < count; ++i) {
- C* cc = new C();
- clients.push_back(cc);
- clients.back().id = i;
- }
- }
- void run() {
- for_each(clients.begin(), clients.end(), mem_fun_ref(&Client::start));
- }
- void join() {
- for_each(clients.begin(), clients.end(), mem_fun_ref(&Client::join));
- }
- void stop() {
- for_each(clients.begin(), clients.end(), mem_fun_ref(&SubClient::stop));
- }
-};
-
-int main(int argc, char** argv) {
- try {
- opts.parse(argc, argv);
- // If no action specified.
- if (!opts.sms && !opts.tc && !opts.smr ) {
- opts.sms = opts.tc = opts.smr = true;
- }
-
- if (opts.help)
- cout << opts;
- else {
- if (opts.clean) Clean().run();
- Clients<Sms> sms(opts.sms, opts.sessions);
- Clients<Tc> tc(opts.tc, opts.symbols);
- Clients<Smr> smr(opts.smr, opts.sessions);
-
- smr.run();
- tc.run();
- sms.run();
-
- sms.join();
- smr.join();
- if (opts.sms && opts.smr)
- tc.stop(); // Leave TCs running if SMS or SMR are in a different process
- else
- tc.join();
- if (opts.report) Report().run();
- }
- return 0;
- }
- catch (const exception& e) {
- cerr << "Exception: " << e.what() << endl;
- return 1;
- }
-}