From 40c4ee7bb08d9202253c976f9f8f61d81d6010d0 Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Fri, 25 Jul 2008 07:36:07 +0000 Subject: QPID-447: Optional mechanism to avoid race when automating topic tests. Patch from David Sommerseth. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-10@679689 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/tests/topic_listener.cpp | 18 ++++++++++++++---- cpp/src/tests/topic_publisher.cpp | 26 ++++++++++++++++++++++---- 2 files changed, 36 insertions(+), 8 deletions(-) diff --git a/cpp/src/tests/topic_listener.cpp b/cpp/src/tests/topic_listener.cpp index 89419c9c7a..636618b6dc 100644 --- a/cpp/src/tests/topic_listener.cpp +++ b/cpp/src/tests/topic_listener.cpp @@ -76,13 +76,15 @@ struct Args : public qpid::TestOptions { bool transactional; bool durable; int prefetch; + string statusqueue; Args() : ack(0), transactional(false), durable(false), prefetch(0) { addOptions() ("ack", optValue(ack, "MODE"), "Ack frequency in messages (defaults to half the prefetch value)") ("transactional", optValue(transactional), "Use transactions") ("durable", optValue(durable), "subscribers should use durable queues") - ("prefetch", optValue(prefetch, "N"), "prefetch count (0 implies no flow control, and no acking)"); + ("prefetch", optValue(prefetch, "N"), "prefetch count (0 implies no flow control, and no acking)") + ("status-queue", optValue(statusqueue, "QUEUE-NAME"), "Message queue to put status messages on"); } }; @@ -102,9 +104,6 @@ int main(int argc, char** argv){ Connection connection; args.open(connection); AsyncSession session = connection.newSession(); - if (args.transactional) { - session.txSelect(); - } //declare exchange, queue and bind them: session.queueDeclare(arg::queue="response"); @@ -128,6 +127,17 @@ int main(int argc, char** argv){ } mgr.subscribe(listener, control); session.sync(); + + if( args.statusqueue.length() > 0 ) { + stringstream msg_str; + msg_str << "topic_listener: " << (int)getpid(); + session.messageTransfer(arg::content=Message(msg_str.str(), args.statusqueue)); + cout << "Ready status put on queue '" << args.statusqueue << "'" << endl; + } + + if (args.transactional) { + session.txSelect(); + } cout << "topic_listener: listening..." << endl; mgr.run(); diff --git a/cpp/src/tests/topic_publisher.cpp b/cpp/src/tests/topic_publisher.cpp index 0f3406a1fe..f37ad2dc0e 100644 --- a/cpp/src/tests/topic_publisher.cpp +++ b/cpp/src/tests/topic_publisher.cpp @@ -82,6 +82,7 @@ struct Args : public TestOptions { int batches; int delay; int size; + string statusqueue; Args() : messages(1000), subscribers(1), transactional(false), durable(false), @@ -94,7 +95,8 @@ struct Args : public TestOptions { ("durable", optValue(durable), "messages should be durable") ("batches", optValue(batches, "N"), "how many batches to run") ("delay", optValue(delay, "SECONDS"), "Causes a delay between each batch") - ("size", optValue(size, "BYTES"), "size of the published messages"); + ("size", optValue(size, "BYTES"), "size of the published messages") + ("status-queue", optValue(statusqueue, "QUEUE-NAME"), "Message queue to read status messages from"); } }; @@ -108,11 +110,28 @@ int main(int argc, char** argv) { Connection connection; args.open(connection); AsyncSession session = connection.newSession(); + + // If status-queue is defined, wait for all expected listeners to join in before we start + if( args.statusqueue.length() > 0 ) { + cout << "Waiting for " << args.subscribers << " listeners..." << endl; + SubscriptionManager statusSubs(session); + LocalQueue statusQ; + statusSubs.subscribe(statusQ, args.statusqueue); + for (int i = 0; i < args.subscribers; i++) { + Message m = statusQ.get(); + if( m.getData().find("topic_listener: ", 0) == 0 ) { + cout << "Listener " << (i+1) << " of " << args.subscribers + << " is ready (pid " << m.getData().substr(16, m.getData().length() - 16) + << ")" << endl; + } else { + throw Exception(QPID_MSG("Unexpected message received on status queue: " << m.getData())); + } + } + } + if (args.transactional) { session.txSelect(); } - - session.queueDeclare(arg::queue="response"); session.exchangeBind(arg::exchange="amq.direct", arg::queue="response", arg::bindingKey="response"); @@ -203,4 +222,3 @@ void Publisher::terminate(){ session.txCommit(); } } - -- cgit v1.2.1