diff options
author | Gordon Sim <gsim@apache.org> | 2007-12-03 20:12:16 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-12-03 20:12:16 +0000 |
commit | e3303ea643833b1b72f6ac656c0e6ddca03d7258 (patch) | |
tree | 8d28a4d6885a3f6846dd13e13353db047cdfa052 /cpp/src | |
parent | 64b2f75b75b818aa6c60a5ba25068258d708f96d (diff) | |
download | qpid-python-e3303ea643833b1b72f6ac656c0e6ddca03d7258.tar.gz |
Add option for durability to topic test
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@600655 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/tests/topic_listener.cpp | 7 | ||||
-rw-r--r-- | cpp/src/tests/topic_publisher.cpp | 20 |
2 files changed, 16 insertions, 11 deletions
diff --git a/cpp/src/tests/topic_listener.cpp b/cpp/src/tests/topic_listener.cpp index fca50e3941..2456a85190 100644 --- a/cpp/src/tests/topic_listener.cpp +++ b/cpp/src/tests/topic_listener.cpp @@ -74,11 +74,14 @@ public: struct Args : public qpid::TestOptions { int ack; bool transactional; + bool durable; int prefetch; - Args() : ack(0), transactional(false), prefetch(0) { + + Args() : ack(0), transactional(false), durable(false), prefetch(0) { addOptions() ("ack", optValue(ack, "MODE"), "Ack frequency in messages (defaults to half the prefetch value)") ("transactional", optValue(transactional), "Use transactions") + ("durable", optValue(durable), "subscribers should use durable queues") ("prefetch", optValue(prefetch, "N"), "prefetch count (0 implies no flow control, and no acking)"); } }; @@ -107,7 +110,7 @@ int main(int argc, char** argv){ //declare exchange, queue and bind them: session.queueDeclare(arg::queue="response"); std::string control = "control_" + session.getId().str(); - session.queueDeclare(arg::queue=control); + session.queueDeclare(arg::queue=control, arg::durable=args.durable); session.queueBind(arg::exchange="amq.topic", arg::queue=control, arg::routingKey="topic_control"); //set up listener diff --git a/cpp/src/tests/topic_publisher.cpp b/cpp/src/tests/topic_publisher.cpp index 1354dc1435..f678b0eb21 100644 --- a/cpp/src/tests/topic_publisher.cpp +++ b/cpp/src/tests/topic_publisher.cpp @@ -59,6 +59,7 @@ class Publisher : public MessageListener{ Session_0_10& session; const string controlTopic; const bool transactional; + const bool durable; Monitor monitor; int count; @@ -66,7 +67,7 @@ class Publisher : public MessageListener{ string generateData(int size); public: - Publisher(Session_0_10& session, const string& controlTopic, bool tx); + Publisher(Session_0_10& session, const string& controlTopic, bool tx, bool durable); virtual void received(Message& msg); int64_t publish(int msgs, int listeners, int size); void terminate(); @@ -78,23 +79,21 @@ public: struct Args : public TestOptions { int messages; int subscribers; - int ack; bool transactional; - int prefetch; + bool durable; int batches; int delay; int size; Args() : messages(1000), subscribers(1), - ack(500), transactional(false), prefetch(1000), + transactional(false), durable(false), batches(1), delay(0), size(256) { addOptions() ("messages", optValue(messages, "N"), "how many messages to send") ("subscribers", optValue(subscribers, "N"), "how many subscribers to expect reports from") - ("ack", optValue(ack, "MODE"), "Acknowledgement mode:0=NO_ACK, 1=AUTO_ACK, 2=LAZY_ACK") ("transactional", optValue(transactional), "client should use transactions") - ("prefetch", optValue(prefetch, "N"), "prefetch count") + ("durable", optValue(durable), "messages should be durable") ("batches", optValue(batches, "N"), "how many batches to run") ("delay", optValue(delay, "SECONDS"), "Causes a delay between each batch") ("size", optValue(size, "BYTES"), "size of the published messages"); @@ -121,7 +120,7 @@ int main(int argc, char** argv) { //set up listener SubscriptionManager mgr(session); - Publisher publisher(session, "topic_control", args.transactional); + Publisher publisher(session, "topic_control", args.transactional, args.durable); mgr.subscribe(publisher, "response"); mgr.start(); @@ -158,8 +157,8 @@ int main(int argc, char** argv) { return 1; } -Publisher::Publisher(Session_0_10& _session, const string& _controlTopic, bool tx) : - session(_session), controlTopic(_controlTopic), transactional(tx){} +Publisher::Publisher(Session_0_10& _session, const string& _controlTopic, bool tx, bool d) : + session(_session), controlTopic(_controlTopic), transactional(tx), durable(d), count(0) {} void Publisher::received(Message& ){ //count responses and when all are received end the current batch @@ -176,6 +175,9 @@ void Publisher::waitForCompletion(int msgs){ int64_t Publisher::publish(int msgs, int listeners, int size){ Message msg(generateData(size), controlTopic); + if (durable) { + msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT); + } AbsTime start = now(); { Monitor::ScopedLock l(monitor); |