summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-07-25 07:36:07 +0000
committerGordon Sim <gsim@apache.org>2008-07-25 07:36:07 +0000
commit40c4ee7bb08d9202253c976f9f8f61d81d6010d0 (patch)
tree33093edc6e0eec8b2e21dd091abd9fd5f8841673
parent01515d42ac23716890e14ed7c0b6c95dd596035e (diff)
downloadqpid-python-40c4ee7bb08d9202253c976f9f8f61d81d6010d0.tar.gz
QPID-447: Optional mechanism to avoid race when automating topic tests. Patch from David Sommerseth.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-10@679689 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/tests/topic_listener.cpp18
-rw-r--r--cpp/src/tests/topic_publisher.cpp26
2 files changed, 36 insertions, 8 deletions
diff --git a/cpp/src/tests/topic_listener.cpp b/cpp/src/tests/topic_listener.cpp
index 89419c9c7a..636618b6dc 100644
--- a/cpp/src/tests/topic_listener.cpp
+++ b/cpp/src/tests/topic_listener.cpp
@@ -76,13 +76,15 @@ struct Args : public qpid::TestOptions {
bool transactional;
bool durable;
int prefetch;
+ string statusqueue;
Args() : ack(0), transactional(false), durable(false), prefetch(0) {
addOptions()
("ack", optValue(ack, "MODE"), "Ack frequency in messages (defaults to half the prefetch value)")
("transactional", optValue(transactional), "Use transactions")
("durable", optValue(durable), "subscribers should use durable queues")
- ("prefetch", optValue(prefetch, "N"), "prefetch count (0 implies no flow control, and no acking)");
+ ("prefetch", optValue(prefetch, "N"), "prefetch count (0 implies no flow control, and no acking)")
+ ("status-queue", optValue(statusqueue, "QUEUE-NAME"), "Message queue to put status messages on");
}
};
@@ -102,9 +104,6 @@ int main(int argc, char** argv){
Connection connection;
args.open(connection);
AsyncSession session = connection.newSession();
- if (args.transactional) {
- session.txSelect();
- }
//declare exchange, queue and bind them:
session.queueDeclare(arg::queue="response");
@@ -128,6 +127,17 @@ int main(int argc, char** argv){
}
mgr.subscribe(listener, control);
session.sync();
+
+ if( args.statusqueue.length() > 0 ) {
+ stringstream msg_str;
+ msg_str << "topic_listener: " << (int)getpid();
+ session.messageTransfer(arg::content=Message(msg_str.str(), args.statusqueue));
+ cout << "Ready status put on queue '" << args.statusqueue << "'" << endl;
+ }
+
+ if (args.transactional) {
+ session.txSelect();
+ }
cout << "topic_listener: listening..." << endl;
mgr.run();
diff --git a/cpp/src/tests/topic_publisher.cpp b/cpp/src/tests/topic_publisher.cpp
index 0f3406a1fe..f37ad2dc0e 100644
--- a/cpp/src/tests/topic_publisher.cpp
+++ b/cpp/src/tests/topic_publisher.cpp
@@ -82,6 +82,7 @@ struct Args : public TestOptions {
int batches;
int delay;
int size;
+ string statusqueue;
Args() : messages(1000), subscribers(1),
transactional(false), durable(false),
@@ -94,7 +95,8 @@ struct Args : public TestOptions {
("durable", optValue(durable), "messages should be durable")
("batches", optValue(batches, "N"), "how many batches to run")
("delay", optValue(delay, "SECONDS"), "Causes a delay between each batch")
- ("size", optValue(size, "BYTES"), "size of the published messages");
+ ("size", optValue(size, "BYTES"), "size of the published messages")
+ ("status-queue", optValue(statusqueue, "QUEUE-NAME"), "Message queue to read status messages from");
}
};
@@ -108,11 +110,28 @@ int main(int argc, char** argv) {
Connection connection;
args.open(connection);
AsyncSession session = connection.newSession();
+
+ // If status-queue is defined, wait for all expected listeners to join in before we start
+ if( args.statusqueue.length() > 0 ) {
+ cout << "Waiting for " << args.subscribers << " listeners..." << endl;
+ SubscriptionManager statusSubs(session);
+ LocalQueue statusQ;
+ statusSubs.subscribe(statusQ, args.statusqueue);
+ for (int i = 0; i < args.subscribers; i++) {
+ Message m = statusQ.get();
+ if( m.getData().find("topic_listener: ", 0) == 0 ) {
+ cout << "Listener " << (i+1) << " of " << args.subscribers
+ << " is ready (pid " << m.getData().substr(16, m.getData().length() - 16)
+ << ")" << endl;
+ } else {
+ throw Exception(QPID_MSG("Unexpected message received on status queue: " << m.getData()));
+ }
+ }
+ }
+
if (args.transactional) {
session.txSelect();
}
-
-
session.queueDeclare(arg::queue="response");
session.exchangeBind(arg::exchange="amq.direct", arg::queue="response", arg::bindingKey="response");
@@ -203,4 +222,3 @@ void Publisher::terminate(){
session.txCommit();
}
}
-