summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-12-03 20:12:16 +0000
committerGordon Sim <gsim@apache.org>2007-12-03 20:12:16 +0000
commite3303ea643833b1b72f6ac656c0e6ddca03d7258 (patch)
tree8d28a4d6885a3f6846dd13e13353db047cdfa052 /cpp/src
parent64b2f75b75b818aa6c60a5ba25068258d708f96d (diff)
downloadqpid-python-e3303ea643833b1b72f6ac656c0e6ddca03d7258.tar.gz
Add option for durability to topic test
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@600655 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/tests/topic_listener.cpp7
-rw-r--r--cpp/src/tests/topic_publisher.cpp20
2 files changed, 16 insertions, 11 deletions
diff --git a/cpp/src/tests/topic_listener.cpp b/cpp/src/tests/topic_listener.cpp
index fca50e3941..2456a85190 100644
--- a/cpp/src/tests/topic_listener.cpp
+++ b/cpp/src/tests/topic_listener.cpp
@@ -74,11 +74,14 @@ public:
struct Args : public qpid::TestOptions {
int ack;
bool transactional;
+ bool durable;
int prefetch;
- Args() : ack(0), transactional(false), prefetch(0) {
+
+ Args() : ack(0), transactional(false), durable(false), prefetch(0) {
addOptions()
("ack", optValue(ack, "MODE"), "Ack frequency in messages (defaults to half the prefetch value)")
("transactional", optValue(transactional), "Use transactions")
+ ("durable", optValue(durable), "subscribers should use durable queues")
("prefetch", optValue(prefetch, "N"), "prefetch count (0 implies no flow control, and no acking)");
}
};
@@ -107,7 +110,7 @@ int main(int argc, char** argv){
//declare exchange, queue and bind them:
session.queueDeclare(arg::queue="response");
std::string control = "control_" + session.getId().str();
- session.queueDeclare(arg::queue=control);
+ session.queueDeclare(arg::queue=control, arg::durable=args.durable);
session.queueBind(arg::exchange="amq.topic", arg::queue=control, arg::routingKey="topic_control");
//set up listener
diff --git a/cpp/src/tests/topic_publisher.cpp b/cpp/src/tests/topic_publisher.cpp
index 1354dc1435..f678b0eb21 100644
--- a/cpp/src/tests/topic_publisher.cpp
+++ b/cpp/src/tests/topic_publisher.cpp
@@ -59,6 +59,7 @@ class Publisher : public MessageListener{
Session_0_10& session;
const string controlTopic;
const bool transactional;
+ const bool durable;
Monitor monitor;
int count;
@@ -66,7 +67,7 @@ class Publisher : public MessageListener{
string generateData(int size);
public:
- Publisher(Session_0_10& session, const string& controlTopic, bool tx);
+ Publisher(Session_0_10& session, const string& controlTopic, bool tx, bool durable);
virtual void received(Message& msg);
int64_t publish(int msgs, int listeners, int size);
void terminate();
@@ -78,23 +79,21 @@ public:
struct Args : public TestOptions {
int messages;
int subscribers;
- int ack;
bool transactional;
- int prefetch;
+ bool durable;
int batches;
int delay;
int size;
Args() : messages(1000), subscribers(1),
- ack(500), transactional(false), prefetch(1000),
+ transactional(false), durable(false),
batches(1), delay(0), size(256)
{
addOptions()
("messages", optValue(messages, "N"), "how many messages to send")
("subscribers", optValue(subscribers, "N"), "how many subscribers to expect reports from")
- ("ack", optValue(ack, "MODE"), "Acknowledgement mode:0=NO_ACK, 1=AUTO_ACK, 2=LAZY_ACK")
("transactional", optValue(transactional), "client should use transactions")
- ("prefetch", optValue(prefetch, "N"), "prefetch count")
+ ("durable", optValue(durable), "messages should be durable")
("batches", optValue(batches, "N"), "how many batches to run")
("delay", optValue(delay, "SECONDS"), "Causes a delay between each batch")
("size", optValue(size, "BYTES"), "size of the published messages");
@@ -121,7 +120,7 @@ int main(int argc, char** argv) {
//set up listener
SubscriptionManager mgr(session);
- Publisher publisher(session, "topic_control", args.transactional);
+ Publisher publisher(session, "topic_control", args.transactional, args.durable);
mgr.subscribe(publisher, "response");
mgr.start();
@@ -158,8 +157,8 @@ int main(int argc, char** argv) {
return 1;
}
-Publisher::Publisher(Session_0_10& _session, const string& _controlTopic, bool tx) :
- session(_session), controlTopic(_controlTopic), transactional(tx){}
+Publisher::Publisher(Session_0_10& _session, const string& _controlTopic, bool tx, bool d) :
+ session(_session), controlTopic(_controlTopic), transactional(tx), durable(d), count(0) {}
void Publisher::received(Message& ){
//count responses and when all are received end the current batch
@@ -176,6 +175,9 @@ void Publisher::waitForCompletion(int msgs){
int64_t Publisher::publish(int msgs, int listeners, int size){
Message msg(generateData(size), controlTopic);
+ if (durable) {
+ msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
+ }
AbsTime start = now();
{
Monitor::ScopedLock l(monitor);