summaryrefslogtreecommitdiff
path: root/cpp/src/tests
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests')
-rw-r--r--cpp/src/tests/Makefile.am6
-rw-r--r--cpp/src/tests/SimpleTestCaseBase.cpp2
-rw-r--r--cpp/src/tests/TestOptions.h51
-rw-r--r--cpp/src/tests/client_test.cpp30
-rwxr-xr-xcpp/src/tests/daemon_test31
-rw-r--r--cpp/src/tests/interop_runner.cpp67
-rwxr-xr-xcpp/src/tests/kill_broker2
-rw-r--r--cpp/src/tests/logging.cpp77
-rwxr-xr-xcpp/src/tests/run_test18
-rwxr-xr-xcpp/src/tests/start_broker5
-rw-r--r--cpp/src/tests/topic_listener.cpp100
-rw-r--r--cpp/src/tests/topic_publisher.cpp161
-rwxr-xr-xcpp/src/tests/topictest2
13 files changed, 192 insertions, 360 deletions
diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am
index 9b15b68c04..ebf1aa9f41 100644
--- a/cpp/src/tests/Makefile.am
+++ b/cpp/src/tests/Makefile.am
@@ -89,14 +89,14 @@ testprogs = \
check_PROGRAMS += $(unit_progs) $(testprogs) interop_runner
-TESTS_ENVIRONMENT = VALGRIND=$(VALGRIND) srcdir=$(srcdir) ./run_test
+TESTS_ENVIRONMENT = VALGRIND=$(VALGRIND) srcdir=$(srcdir) $(srcdir)/run_test
system_tests = client_test quick_topictest
-TESTS = dummy_test $(unit_progs) $(unit_wrappers) run-unit-tests start_broker $(system_tests) python_tests kill_broker daemon_test
+TESTS = dummy_test $(unit_progs) $(unit_wrappers) run-unit-tests start_broker $(system_tests) python_tests kill_broker
EXTRA_DIST = \
test_env run_test \
- run-unit-tests start_broker python_tests kill_broker daemon_test \
+ run-unit-tests start_broker python_tests kill_broker \
quick_topictest \
topictest \
.valgrind.supp-default \
diff --git a/cpp/src/tests/SimpleTestCaseBase.cpp b/cpp/src/tests/SimpleTestCaseBase.cpp
index 5f37fd0eb3..4f071cd02b 100644
--- a/cpp/src/tests/SimpleTestCaseBase.cpp
+++ b/cpp/src/tests/SimpleTestCaseBase.cpp
@@ -70,7 +70,7 @@ void SimpleTestCaseBase::Sender::start(){
SimpleTestCaseBase::Worker::Worker(TestOptions& options, const int _messages) :
connection(options.trace), messages(_messages), count(0)
{
- connection.open(options.broker, options.port);
+ connection.open(options.host, options.port);
connection.openChannel(channel);
}
diff --git a/cpp/src/tests/TestOptions.h b/cpp/src/tests/TestOptions.h
index 45575ba450..ee3af0873a 100644
--- a/cpp/src/tests/TestOptions.h
+++ b/cpp/src/tests/TestOptions.h
@@ -21,46 +21,35 @@
*
*/
-#include "qpid/CommonOptions.h"
+#include "qpid/Options.h"
+#include "qpid/Url.h"
namespace qpid {
-struct TestOptions : public qpid::CommonOptions
+struct TestOptions : public qpid::Options
{
- TestOptions() : desc("Options"), broker("localhost"), virtualhost(""), clientid("cpp"), help(false)
+ TestOptions() : Options("Test Options"), host("localhost"), port(TcpAddress::DEFAULT_PORT), clientid("cpp"), trace(false), help(false)
{
- using namespace qpid::program_options;
- using namespace boost::program_options;
- CommonOptions::addTo(desc);
- desc.add_options()
- ("broker,b", optValue(broker, "HOSTNAME"), "the hostname to connect to")
- ("virtualhost,v", optValue(virtualhost, "VIRTUAL_HOST"), "virtual host")
+ addOptions()
+ ("host,h", optValue(host, "HOST"), "Broker host to connect to")
+ // TODO aconway 2007-06-26: broker is synonym for host. Drop broker?
+ ("broker,b", optValue(host, "HOST"), "Broker host to connect to")
+ ("port,p", optValue(port, "PORT"), "Broker port to connect to")
+ ("virtualhost,v", optValue(virtualhost, "VHOST"), "virtual host")
("clientname,n", optValue(clientid, "ID"), "unique client identifier")
- ("help,h", optValue(help), "print this usage statement");
+ ("username", optValue(username, "USER"), "user name for broker log in.")
+ ("password", optValue(password, "USER"), "password for broker log in.")
+ ("trace,t", optValue(trace), "Turn on debug tracing.")
+ ("help", optValue(help), "print this usage statement");
}
- void parse(int argc, char** argv)
- {
- using namespace boost::program_options;
- try {
- variables_map vm;
- store(parse_command_line(argc, argv, desc), vm);
- notify(vm);
- } catch(const error& e) {
- std::cerr << "Error: " << e.what() << std::endl
- << "Specify '--help' for usage." << std::endl;
- }
- }
-
- void usage()
- {
- std::cout << desc << std::endl;
- }
-
- boost::program_options::options_description desc;
- std::string broker;
+ std::string host;
+ uint16_t port;
std::string virtualhost;
- std::string clientid;
+ std::string clientid;
+ std::string username;
+ std::string password;
+ bool trace;
bool help;
};
diff --git a/cpp/src/tests/client_test.cpp b/cpp/src/tests/client_test.cpp
index a05c01043f..3b8a3a2ee6 100644
--- a/cpp/src/tests/client_test.cpp
+++ b/cpp/src/tests/client_test.cpp
@@ -28,6 +28,7 @@
#include <iostream>
+#include "TestOptions.h"
#include "qpid/QpidError.h"
#include "qpid/client/ClientChannel.h"
#include "qpid/client/Connection.h"
@@ -59,38 +60,39 @@ public:
}
};
-int main(int argc, char**)
+int main(int argc, char** argv)
{
- verbose = argc > 1;
try {
+ qpid::TestOptions opts;
+ opts.parse(argc, argv);
+
//Use a custom exchange
Exchange exchange("MyExchange", Exchange::TOPIC_EXCHANGE);
//Use a named, temporary queue
Queue queue("MyQueue", true);
- Connection con(verbose);
- string host("localhost");
- con.open(host, 5672, "guest", "guest", "/test");
- if (verbose)
+ Connection con(opts.trace);
+ con.open(opts.host, opts.port, opts.username, opts.password, opts.virtualhost);
+ if (opts.trace)
std::cout << "Opened connection." << std::endl;
//Create and open a channel on the connection through which
//most functionality is exposed
Channel channel;
con.openChannel(channel);
- if (verbose) std::cout << "Opened channel." << std::endl;
+ if (opts.trace) std::cout << "Opened channel." << std::endl;
//'declare' the exchange and the queue, which will create them
//as they don't exist
channel.declareExchange(exchange);
- if (verbose) std::cout << "Declared exchange." << std::endl;
+ if (opts.trace) std::cout << "Declared exchange." << std::endl;
channel.declareQueue(queue);
- if (verbose) std::cout << "Declared queue." << std::endl;
+ if (opts.trace) std::cout << "Declared queue." << std::endl;
//now bind the queue to the exchange
channel.bind(exchange, queue, "MyTopic");
- if (verbose) std::cout << "Bound queue to exchange." << std::endl;
+ if (opts.trace) std::cout << "Bound queue to exchange." << std::endl;
//Set up a message listener to receive any messages that
//arrive in our queue on the broker. We only expect one, and
@@ -101,7 +103,7 @@ int main(int argc, char**)
SimpleListener listener(&monitor);
string tag("MyTag");
channel.consume(queue, tag, &listener);
- if (verbose) std::cout << "Registered consumer." << std::endl;
+ if (opts.trace) std::cout << "Registered consumer." << std::endl;
//we need to enable the message dispatching for this channel
//and we want that to occur on another thread so we call
@@ -114,7 +116,7 @@ int main(int argc, char**)
string data("MyMessage");
msg.setData(data);
channel.publish(msg, exchange, "MyTopic");
- if (verbose) std::cout << "Published message: " << data << std::endl;
+ if (opts.trace) std::cout << "Published message: " << data << std::endl;
{
Monitor::ScopedLock l(monitor);
@@ -125,9 +127,9 @@ int main(int argc, char**)
//close the channel & connection
channel.close();
- if (verbose) std::cout << "Closed channel." << std::endl;
+ if (opts.trace) std::cout << "Closed channel." << std::endl;
con.close();
- if (verbose) std::cout << "Closed connection." << std::endl;
+ if (opts.trace) std::cout << "Closed connection." << std::endl;
return 0;
} catch(const std::exception& e) {
std::cout << e.what() << std::endl;
diff --git a/cpp/src/tests/daemon_test b/cpp/src/tests/daemon_test
deleted file mode 100755
index 6d1fc73923..0000000000
--- a/cpp/src/tests/daemon_test
+++ /dev/null
@@ -1,31 +0,0 @@
-#!/bin/sh
-# Without arguments run all daemon tests, exit status is number of failures.
-# With arguments run just the test named by $1.
-#
-
-TEMP=`mktemp`
-qpidd="../qpidd --log.output qpidd.log"
-client_tests=./client_test
-trap 'rm -f $TEMP' 0
-
-fail() { echo FAIL: $0:$* 1>&2; exit 1; }
-
-# Start and stop daemon on default port.
-PID=`$qpidd --check` && fail $LINENO: qpidd already running pid=$PID
-$qpidd -d || fail $LINENO: $qpidd -d failed
-$qpidd -c >/dev/null || fail $LINENO: qpidd --check says qpidd did not start
-./client_test > $TEMP || fail $LINENO: client_test: `cat $TEMP`
-$qpidd -q || fail $LINENO: qpidd -q failed
-$qpidd -c >/dev/null && fail $LINENO: Still running after quit.
-
-# Start and stop daemon on dynamic port.
-export QPID_PORT=`$qpidd -dp0`
-# Note: QPID_PORT fom environment will be used below here:
-$qpidd -c >/dev/null || fail $LINENO: qpidd did not start. QPID_PORT=$QPID_PORT
-$qpidd -q || fail $LINENO: qpidd -q failed. QPID_PORT=$QPID_PORT
-$qpidd -c >/dev/null && fail $LINENO: Still running after start. QPID_PORT=$QPID_PORT
-
-# FIXME aconway 2007-06-11: run client test, needs a --port option.
-
-
-true
diff --git a/cpp/src/tests/interop_runner.cpp b/cpp/src/tests/interop_runner.cpp
index 7d40eeedb1..7c47edafa0 100644
--- a/cpp/src/tests/interop_runner.cpp
+++ b/cpp/src/tests/interop_runner.cpp
@@ -19,7 +19,7 @@
*
*/
-#include "qpid/CommonOptions.h"
+#include "qpid/Options.h"
#include "qpid/Exception.h"
#include "qpid/QpidError.h"
#include "qpid/client/ClientChannel.h"
@@ -47,13 +47,13 @@ using namespace qpid::sys;
using qpid::TestCase;
using qpid::TestOptions;
using qpid::framing::FieldTable;
-using std::string;
+using namespace std;
class DummyRun : public TestCase
{
public:
DummyRun() {}
- void assign(const std::string&, FieldTable&, TestOptions&) {}
+ void assign(const string&, FieldTable&, TestOptions&) {}
void start() {}
void stop() {}
void report(qpid::client::Message&) {}
@@ -64,7 +64,7 @@ string parse_next_word(const string& input, const string& delims, string::size_t
/**
*/
class Listener : public MessageListener, private Runnable{
- typedef boost::ptr_map<std::string, TestCase> TestMap;
+ typedef boost::ptr_map<string, TestCase> TestMap;
Channel& channel;
TestOptions& options;
@@ -72,7 +72,7 @@ class Listener : public MessageListener, private Runnable{
const string name;
const string topic;
TestMap::iterator test;
- std::auto_ptr<Thread> runner;
+ auto_ptr<Thread> runner;
string reportTo;
string reportCorrelator;
@@ -88,44 +88,41 @@ public:
Listener(Channel& channel, TestOptions& options);
void received(Message& msg);
void bindAndConsume();
- void registerTest(std::string name, TestCase* test);
+ void registerTest(string name, TestCase* test);
};
-/**
- */
-int main(int argc, char** argv){
- TestOptions options;
- options.parse(argc, argv);
-
- if (options.help) {
- options.usage();
- } else {
- try{
+int main(int argc, char** argv) {
+ try {
+ TestOptions options;
+ options.parse(argc, argv);
+ if (options.help)
+ cout << options;
+ else {
Connection connection(options.trace);
- connection.open(options.broker, options.port, "guest", "guest", options.virtualhost);
+ connection.open(options.host, options.port, "guest", "guest", options.virtualhost);
- Channel channel;
- connection.openChannel(channel);
+ Channel channel;
+ connection.openChannel(channel);
- Listener listener(channel, options);
- listener.registerTest("TC1_DummyRun", new DummyRun());
- listener.registerTest("TC2_BasicP2P", new qpid::BasicP2PTest());
- listener.registerTest("TC3_BasicPubSub", new qpid::BasicPubSubTest());
+ Listener listener(channel, options);
+ listener.registerTest("TC1_DummyRun", new DummyRun());
+ listener.registerTest("TC2_BasicP2P", new qpid::BasicP2PTest());
+ listener.registerTest("TC3_BasicPubSub", new qpid::BasicPubSubTest());
- listener.bindAndConsume();
+ listener.bindAndConsume();
- channel.run();
- connection.close();
- } catch(const std::exception& error) {
- std::cout << error.what() << std::endl;
+ channel.run();
+ connection.close();
}
+ } catch(const exception& error) {
+ cout << error.what() << endl << "Type " << argv[0] << " --help for help" << endl;
}
}
Listener::Listener(Channel& _channel, TestOptions& _options) : channel(_channel), options(_options), name(options.clientid), topic("iop.control." + name)
{}
-void Listener::registerTest(std::string name, TestCase* test)
+void Listener::registerTest(string name, TestCase* test)
{
tests.insert(name, test);
}
@@ -139,7 +136,7 @@ void Listener::bindAndConsume()
channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, control, "iop.control", bindArgs);
channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, control, topic, bindArgs);
- std::string tag;
+ string tag;
channel.consume(control, tag, this);
}
@@ -178,14 +175,14 @@ void Listener::sendResponse(Message& response, string replyTo)
void Listener::received(Message& message)
{
- std::string type(message.getHeaders().getString("CONTROL_TYPE"));
+ string type(message.getHeaders().getString("CONTROL_TYPE"));
if (type == "INVITE") {
- std::string name(message.getHeaders().getString("TEST_NAME"));
+ string name(message.getHeaders().getString("TEST_NAME"));
if (name.empty() || invite(name)) {
sendSimpleResponse("ENLIST", message);
} else {
- std::cout << "Can't take part in '" << name << "'" << std::endl;
+ cout << "Can't take part in '" << name << "'" << endl;
}
} else if (type == "ASSIGN_ROLE") {
test->assign(message.getHeaders().getString("ROLE"), message.getHeaders(), options);
@@ -193,7 +190,7 @@ void Listener::received(Message& message)
} else if (type == "START") {
reportTo = message.getReplyTo();
reportCorrelator = message.getCorrelationId();
- runner = std::auto_ptr<Thread>(new Thread(this));
+ runner = auto_ptr<Thread>(new Thread(this));
} else if (type == "STATUS_REQUEST") {
reportTo = message.getReplyTo();
reportCorrelator = message.getCorrelationId();
@@ -203,7 +200,7 @@ void Listener::received(Message& message)
if (test != tests.end()) test->stop();
shutdown();
} else {
- std::cerr <<"ERROR!: Received unknown control message: " << type << std::endl;
+ cerr <<"ERROR!: Received unknown control message: " << type << endl;
shutdown();
}
}
diff --git a/cpp/src/tests/kill_broker b/cpp/src/tests/kill_broker
index e6331d6876..1887eee50d 100755
--- a/cpp/src/tests/kill_broker
+++ b/cpp/src/tests/kill_broker
@@ -1,2 +1,2 @@
#!/bin/sh
-../qpidd --quit
+QPID_PORT=`cat qpidd.port` ../qpidd --quit && rm -f qpidd.port
diff --git a/cpp/src/tests/logging.cpp b/cpp/src/tests/logging.cpp
index ebe8f4d6e8..f5402aaad7 100644
--- a/cpp/src/tests/logging.cpp
+++ b/cpp/src/tests/logging.cpp
@@ -22,7 +22,7 @@
#include "qpid/log/Logger.h"
#include "qpid/log/Options.h"
#include "qpid/memory.h"
-#include "qpid/CommonOptions.h"
+#include "qpid/Options.h"
#include <boost/test/floating_point_comparison.hpp>
#include <boost/format.hpp>
#include <exception>
@@ -255,14 +255,6 @@ Statement statement(
}
-struct TestOptions : public Options {
- TestOptions(int argc, char** argv) {
- qpid::po::options_description desc;
- addTo(desc);
- qpid::parseOptions(desc, argc, argv);
- }
-};
-
#define ARGC(argv) (sizeof(argv)/sizeof(char*))
BOOST_AUTO_TEST_CASE(testOptionsParse) {
@@ -278,7 +270,8 @@ BOOST_AUTO_TEST_CASE(testOptionsParse) {
"--log.thread", "true",
"--log.function", "YES"
};
- TestOptions opts(ARGC(argv), argv);
+ qpid::log::Options opts;
+ opts.parse(ARGC(argv), argv);
vector<string> expect=list_of("error+:foo")("debug:bar")("info");
BOOST_CHECK_EQUAL(expect, opts.selectors);
expect=list_of("x")("y");
@@ -306,7 +299,8 @@ BOOST_AUTO_TEST_CASE(testSelectorFromOptions) {
"--log.enable", "debug:bar",
"--log.enable", "info"
};
- TestOptions opts(ARGC(argv), argv);
+ qpid::log::Options opts;
+ opts.parse(ARGC(argv), argv);
vector<string> expect=list_of("error+:foo")("debug:bar")("info");
BOOST_CHECK_EQUAL(expect, opts.selectors);
Selector s(opts);
@@ -319,37 +313,36 @@ BOOST_AUTO_TEST_CASE(testSelectorFromOptions) {
BOOST_AUTO_TEST_CASE(testOptionsFormat) {
Logger& l = clearLogger();
- Options opts;
- BOOST_CHECK_EQUAL(Logger::TIME|Logger::LEVEL, l.format(opts));
- char* argv[]={
- 0,
- "--log.time", "no",
- "--log.level", "no",
- "--log.source", "1",
- "--log.thread", "1"
- };
- qpid::po::options_description desc;
- opts.addTo(desc);
- qpid::parseOptions(desc, ARGC(argv), argv);
- BOOST_CHECK_EQUAL(
- Logger::FILE|Logger::LINE|Logger::THREAD, l.format(opts));
- opts = Options(); // Clear.
- char* argv2[]={
- 0,
- "--log.level", "no",
- "--log.thread", "true",
- "--log.function", "YES",
- "--log.time", "YES"
- };
- qpid::po::options_description desc2;
- opts.addTo(desc2);
- qpid::parseOptions(desc2, ARGC(argv2), argv2);
- BOOST_CHECK_EQUAL(
- Logger::THREAD|Logger::FUNCTION|Logger::TIME,
- l.format(opts));
+ {
+ Options opts;
+ BOOST_CHECK_EQUAL(Logger::TIME|Logger::LEVEL, l.format(opts));
+ char* argv[]={
+ 0,
+ "--log.time", "no",
+ "--log.level", "no",
+ "--log.source", "1",
+ "--log.thread", "1"
+ };
+ opts.parse(ARGC(argv), argv);
+ BOOST_CHECK_EQUAL(
+ Logger::FILE|Logger::LINE|Logger::THREAD, l.format(opts));
+ }
+ {
+ Options opts; // Clear.
+ char* argv[]={
+ 0,
+ "--log.level", "no",
+ "--log.thread", "true",
+ "--log.function", "YES",
+ "--log.time", "YES"
+ };
+ opts.parse(ARGC(argv), argv);
+ BOOST_CHECK_EQUAL(
+ Logger::THREAD|Logger::FUNCTION|Logger::TIME,
+ l.format(opts));
+ }
}
-
BOOST_AUTO_TEST_CASE(testLoggerConfigure) {
Logger& l = clearLogger();
Options opts;
@@ -360,9 +353,7 @@ BOOST_AUTO_TEST_CASE(testLoggerConfigure) {
"--log.output", "logging.tmp",
"--log.enable", "critical"
};
- qpid::po::options_description desc;
- opts.addTo(desc);
- qpid::parseOptions(desc, ARGC(argv), argv);
+ opts.parse(ARGC(argv), argv);
l.configure(opts, "test");
QPID_LOG(critical, "foo"); int srcline=__LINE__;
ifstream log("logging.tmp");
diff --git a/cpp/src/tests/run_test b/cpp/src/tests/run_test
index bfd6991481..deb22b4450 100755
--- a/cpp/src/tests/run_test
+++ b/cpp/src/tests/run_test
@@ -1,12 +1,15 @@
#!/bin/sh
#
-# Run a test executable. Output nothing if test passes,
-# show the output if it fails. Leave output in <test>.log for
-# examination.
+# Set up environment and run a test executable or script.
+#
+# Output nothing if test passes, show the output if it fails and
+# leave output in <test>.log for examination.
+#
+# If qpidd.port exists run test with QPID_PORT=`cat qpidd.port`
#
-# If $VALGRIND if is set run under valgrind. If there are
-# valgrind erros show valgrind output, also leave it in
-# <test>.valgrind for examination.
+# If $VALGRIND if is set run under valgrind. If there are valgrind
+# erros show valgrind output, also leave it in <test>.valgrind for
+# examination.
#
vg_failed() {
@@ -33,6 +36,9 @@ vg_check()
# Export variables from makefile.
export VALGRIND srcdir
+# Export QPID_PORT if qpidd.port exists.
+test -f qpidd.port && export QPID_PORT=`cat qpidd.port`
+
VG_LOG="$1.vglog"
TEST_LOG="$1.log"
rm -f $VG_LOG $TEST_LOG
diff --git a/cpp/src/tests/start_broker b/cpp/src/tests/start_broker
index 92a12e6e8a..b8a83fd5e3 100755
--- a/cpp/src/tests/start_broker
+++ b/cpp/src/tests/start_broker
@@ -1,5 +1,2 @@
#!/bin/sh
-fail() { echo FAIL: $0:$* 1>&2; exit 1; }
-qpidd=../qpidd
-PID=`$qpidd --check` && fail $qpidd already running $PID
-$qpidd --log.output qpidd.log --daemon || fail $qpidd startup: `cat qpidd.log`
+../qpidd --daemon --port 0 --log.output qpidd.log > qpidd.port
diff --git a/cpp/src/tests/topic_listener.cpp b/cpp/src/tests/topic_listener.cpp
index 9fb3ee8371..cb6bafcd8e 100644
--- a/cpp/src/tests/topic_listener.cpp
+++ b/cpp/src/tests/topic_listener.cpp
@@ -33,6 +33,7 @@
*/
#include "qpid/QpidError.h"
+#include "TestOptions.h"
#include "qpid/client/ClientChannel.h"
#include "qpid/client/Connection.h"
#include "qpid/client/ClientExchange.h"
@@ -42,6 +43,7 @@
#include <iostream>
#include <sstream>
+using namespace qpid;
using namespace qpid::client;
using namespace qpid::sys;
using namespace std;
@@ -68,44 +70,35 @@ public:
/**
* A utility class for managing the options passed in.
*/
-class Args{
- string host;
- int port;
- AckMode ackMode;
+struct Args : public qpid::TestOptions {
+ int ackmode;
bool transactional;
int prefetch;
- bool trace;
- bool help;
-public:
- inline Args() : host("localhost"), port(5672), ackMode(NO_ACK), transactional(false), prefetch(1000), trace(false), help(false){}
- void parse(int argc, char** argv);
- void usage();
-
- const string& getHost() const { return host;}
- int getPort() const { return port; }
- AckMode getAckMode(){ return ackMode; }
- bool getTransactional() const { return transactional; }
- int getPrefetch(){ return prefetch; }
- bool getTrace() const { return trace; }
- bool getHelp() const { return help; }
+ Args() : ackmode(NO_ACK), transactional(false), prefetch(1000) {
+ addOptions()
+ ("ack", optValue(ackmode, "MODE"), "Ack mode: 0=NO_ACK, 1=AUTO_ACK, 2=LAZY_ACK")
+ ("transactional", optValue(transactional), "Use transactions")
+ ("prefetch", optValue(prefetch, "N"), "prefetch count");
+ }
};
+
/**
* The main routine creates a Listener instance and sets it up to
* consume from a private queue bound to the exchange with the
* appropriate topic name.
*/
int main(int argc, char** argv){
- Args args;
- args.parse(argc, argv);
- if(args.getHelp()){
- args.usage();
- }else{
- try{
+ try{
+ Args args;
+ args.parse(argc, argv);
+ if(args.help)
+ cout << args << endl;
+ else {
cout << "topic_listener: Started." << endl;
- Connection connection(args.getTrace());
- connection.open(args.getHost(), args.getPort(), "guest", "guest", "/test");
- Channel channel(args.getTransactional(), args.getPrefetch());
+ Connection connection(args.trace);
+ connection.open(args.host, args.port, args.username, args.password, args.virtualhost);
+ Channel channel(args.transactional, args.prefetch);
connection.openChannel(channel);
//declare exchange, queue and bind them:
@@ -117,17 +110,17 @@ int main(int argc, char** argv){
qpid::framing::FieldTable bindArgs;
channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, control, "topic_control", bindArgs);
//set up listener
- Listener listener(&channel, response.getName(), args.getTransactional());
+ Listener listener(&channel, response.getName(), args.transactional);
string tag;
- channel.consume(control, tag, &listener, args.getAckMode());
+ channel.consume(control, tag, &listener, AckMode(args.ackmode));
cout << "topic_listener: Consuming." << endl;
channel.run();
connection.close();
cout << "topic_listener: normal exit" << endl;
- return 0;
- }catch(const std::exception& error){
- cout << "topic_listener: " << error.what() << endl;
}
+ return 0;
+ } catch (const std::exception& error) {
+ cout << "topic_listener: " << error.what() << endl;
}
return 1;
}
@@ -172,46 +165,3 @@ void Listener::report(){
}
}
-
-void Args::parse(int argc, char** argv){
- for(int i = 1; i < argc; i++){
- string name(argv[i]);
- if("-help" == name){
- help = true;
- break;
- }else if("-host" == name){
- host = argv[++i];
- }else if("-port" == name){
- port = atoi(argv[++i]);
- }else if("-ack_mode" == name){
- ackMode = AckMode(atoi(argv[++i]));
- }else if("-transactional" == name){
- transactional = true;
- }else if("-prefetch" == name){
- prefetch = atoi(argv[++i]);
- }else if("-trace" == name){
- trace = true;
- }else{
- cout << "Warning: unrecognised option " << name << endl;
- }
- }
-}
-
-void Args::usage(){
- cout << "Options:" << endl;
- cout << " -help" << endl;
- cout << " Prints this usage message" << endl;
- cout << " -host <host>" << endl;
- cout << " Specifies host to connect to (default is localhost)" << endl;
- cout << " -port <port>" << endl;
- cout << " Specifies port to conect to (default is 5762)" << endl;
- cout << " -ack_mode <mode>" << endl;
- cout << " Sets the acknowledgement mode" << endl;
- cout << " 0=NO_ACK (default), 1=AUTO_ACK, 2=LAZY_ACK" << endl;
- cout << " -transactional" << endl;
- cout << " Indicates the client should use transactions" << endl;
- cout << " -prefetch <count>" << endl;
- cout << " Specifies the prefetch count (default is 1000)" << endl;
- cout << " -trace" << endl;
- cout << " Indicates that the frames sent and received should be logged" << endl;
-}
diff --git a/cpp/src/tests/topic_publisher.cpp b/cpp/src/tests/topic_publisher.cpp
index 9384053e68..f792540c09 100644
--- a/cpp/src/tests/topic_publisher.cpp
+++ b/cpp/src/tests/topic_publisher.cpp
@@ -34,6 +34,7 @@
* subscriber shutdown.
*/
+#include "TestOptions.h"
#include "qpid/QpidError.h"
#include "qpid/client/ClientChannel.h"
#include "qpid/client/Connection.h"
@@ -46,9 +47,10 @@
#include <cstdlib>
#include <iostream>
+using namespace qpid;
using namespace qpid::client;
using namespace qpid::sys;
-using std::string;
+using namespace std;
/**
* The publishing logic is defined in this class. It implements
@@ -57,7 +59,7 @@ using std::string;
*/
class Publisher : public MessageListener{
Channel* const channel;
- const std::string controlTopic;
+ const string controlTopic;
const bool transactional;
Monitor monitor;
int count;
@@ -66,7 +68,7 @@ class Publisher : public MessageListener{
string generateData(int size);
public:
- Publisher(Channel* channel, const std::string& controlTopic, bool tx);
+ Publisher(Channel* channel, const string& controlTopic, bool tx);
virtual void received(Message& msg);
int64_t publish(int msgs, int listeners, int size);
void terminate();
@@ -75,51 +77,42 @@ public:
/**
* A utility class for managing the options passed in to the test
*/
-class Args{
- string host;
- int port;
+struct Args : public TestOptions {
int messages;
int subscribers;
- AckMode ackMode;
+ int ackmode;
bool transactional;
int prefetch;
int batches;
int delay;
int size;
- bool trace;
- bool help;
-public:
- inline Args() : host("localhost"), port(5672), messages(1000), subscribers(1),
- ackMode(NO_ACK), transactional(false), prefetch(1000), batches(1),
- delay(0), size(256), trace(false), help(false){}
-
- void parse(int argc, char** argv);
- void usage();
- const string& getHost() const { return host;}
- int getPort() const { return port; }
- int getMessages() const { return messages; }
- int getSubscribers() const { return subscribers; }
- AckMode getAckMode(){ return ackMode; }
- bool getTransactional() const { return transactional; }
- int getPrefetch(){ return prefetch; }
- int getBatches(){ return batches; }
- int getDelay(){ return delay; }
- int getSize(){ return size; }
- bool getTrace() const { return trace; }
- bool getHelp() const { return help; }
+ Args() : messages(1000), subscribers(1),
+ ackmode(NO_ACK), transactional(false), prefetch(1000),
+ batches(1), delay(0), size(256)
+ {
+ addOptions()
+ ("messages", optValue(messages, "N"), "how many messages to send")
+ ("subscribers", optValue(subscribers, "N"), "how many subscribers to expect reports from")
+ ("ackmode", optValue(ackmode, "MODE"), "Acknowledgement mode:0=NO_ACK, 1=AUTO_ACK, 2=LAZY_ACK")
+ ("transactional", optValue(transactional), "client should use transactions")
+ ("prefetch", optValue(prefetch, "N"), "prefetch count")
+ ("batches", optValue(batches, "N"), "how many batches to run")
+ ("delay", optValue(delay, "SECONDS"), "Causes a delay between each batch")
+ ("size", optValue(size, "BYTES"), "size of the published messages");
+ }
};
int main(int argc, char** argv) {
- Args args;
- args.parse(argc, argv);
- if(args.getHelp()){
- args.usage();
- } else {
- try{
- Connection connection(args.getTrace());
- connection.open(args.getHost(), args.getPort(), "guest", "guest", "/test");
- Channel channel(args.getTransactional(), args.getPrefetch());
+ try{
+ Args args;
+ args.parse(argc, argv);
+ if(args.help)
+ cout << args << endl;
+ else {
+ Connection connection(args.trace);
+ connection.open(args.host, args.port, args.username, args.password, args.virtualhost);
+ Channel channel(args.transactional, args.prefetch);
connection.openChannel(channel);
//declare queue (relying on default binding):
@@ -127,44 +120,44 @@ int main(int argc, char** argv) {
channel.declareQueue(response);
//set up listener
- Publisher publisher(&channel, "topic_control", args.getTransactional());
- std::string tag("mytag");
- channel.consume(response, tag, &publisher, args.getAckMode());
+ Publisher publisher(&channel, "topic_control", args.transactional);
+ string tag("mytag");
+ channel.consume(response, tag, &publisher, AckMode(args.ackmode));
channel.start();
- int batchSize(args.getBatches());
+ int batchSize(args.batches);
int64_t max(0);
int64_t min(0);
int64_t sum(0);
for(int i = 0; i < batchSize; i++){
- if(i > 0 && args.getDelay()) sleep(args.getDelay());
+ if(i > 0 && args.delay) sleep(args.delay);
int64_t msecs =
- publisher.publish(args.getMessages(),
- args.getSubscribers(),
- args.getSize()) / TIME_MSEC;
+ publisher.publish(args.messages,
+ args.subscribers,
+ args.size) / TIME_MSEC;
if(!max || msecs > max) max = msecs;
if(!min || msecs < min) min = msecs;
sum += msecs;
- std::cout << "Completed " << (i+1) << " of " << batchSize
- << " in " << msecs << "ms" << std::endl;
+ cout << "Completed " << (i+1) << " of " << batchSize
+ << " in " << msecs << "ms" << endl;
}
publisher.terminate();
int64_t avg = sum / batchSize;
if(batchSize > 1){
- std::cout << batchSize << " batches completed. avg=" << avg <<
- ", max=" << max << ", min=" << min << std::endl;
+ cout << batchSize << " batches completed. avg=" << avg <<
+ ", max=" << max << ", min=" << min << endl;
}
channel.close();
connection.close();
- return 0;
- }catch(std::exception& error) {
- std::cout << error.what() << std::endl;
}
+ return 0;
+ }catch(exception& error) {
+ cout << error.what() << endl;
}
return 1;
}
-Publisher::Publisher(Channel* _channel, const std::string& _controlTopic, bool tx) :
+Publisher::Publisher(Channel* _channel, const string& _controlTopic, bool tx) :
channel(_channel), controlTopic(_controlTopic), transactional(tx){}
void Publisher::received(Message& ){
@@ -223,65 +216,3 @@ void Publisher::terminate(){
}
}
-void Args::parse(int argc, char** argv){
- for(int i = 1; i < argc; i++){
- string name(argv[i]);
- if("-help" == name){
- help = true;
- break;
- }else if("-host" == name){
- host = argv[++i];
- }else if("-port" == name){
- port = atoi(argv[++i]);
- }else if("-messages" == name){
- messages = atoi(argv[++i]);
- }else if("-subscribers" == name){
- subscribers = atoi(argv[++i]);
- }else if("-ack_mode" == name){
- ackMode = AckMode(atoi(argv[++i]));
- }else if("-transactional" == name){
- transactional = true;
- }else if("-prefetch" == name){
- prefetch = atoi(argv[++i]);
- }else if("-batches" == name){
- batches = atoi(argv[++i]);
- }else if("-delay" == name){
- delay = atoi(argv[++i]);
- }else if("-size" == name){
- size = atoi(argv[++i]);
- }else if("-trace" == name){
- trace = true;
- }else{
- std::cout << "Warning: unrecognised option " << name << std::endl;
- }
- }
-}
-
-void Args::usage(){
- std::cout << "Options:" << std::endl;
- std::cout << " -help" << std::endl;
- std::cout << " Prints this usage message" << std::endl;
- std::cout << " -host <host>" << std::endl;
- std::cout << " Specifies host to connect to (default is localhost)" << std::endl;
- std::cout << " -port <port>" << std::endl;
- std::cout << " Specifies port to conect to (default is 5762)" << std::endl;
- std::cout << " -messages <count>" << std::endl;
- std::cout << " Specifies how many messages to send" << std::endl;
- std::cout << " -subscribers <count>" << std::endl;
- std::cout << " Specifies how many subscribers to expect reports from" << std::endl;
- std::cout << " -ack_mode <mode>" << std::endl;
- std::cout << " Sets the acknowledgement mode" << std::endl;
- std::cout << " 0=NO_ACK (default), 1=AUTO_ACK, 2=LAZY_ACK" << std::endl;
- std::cout << " -transactional" << std::endl;
- std::cout << " Indicates the client should use transactions" << std::endl;
- std::cout << " -prefetch <count>" << std::endl;
- std::cout << " Specifies the prefetch count (default is 1000)" << std::endl;
- std::cout << " -batches <count>" << std::endl;
- std::cout << " Specifies how many batches to run" << std::endl;
- std::cout << " -delay <seconds>" << std::endl;
- std::cout << " Causes a delay between each batch" << std::endl;
- std::cout << " -size <bytes>" << std::endl;
- std::cout << " Sets the size of the published messages (default is 256 bytes)" << std::endl;
- std::cout << " -trace" << std::endl;
- std::cout << " Indicates that the frames sent and received should be logged" << std::endl;
-}
diff --git a/cpp/src/tests/topictest b/cpp/src/tests/topictest
index 92e40b2c37..21387ac3bd 100755
--- a/cpp/src/tests/topictest
+++ b/cpp/src/tests/topictest
@@ -28,7 +28,7 @@ subscribe() {
}
publish() {
- ./topic_publisher -messages $MESSAGES -batches $BATCHES -subscribers $SUBSCRIBERS
+ ./topic_publisher --messages $MESSAGES --batches $BATCHES --subscribers $SUBSCRIBERS
}
for ((i=$SUBSCRIBERS ; i--; )); do