summaryrefslogtreecommitdiff
path: root/cpp/src/tests/topic_publisher.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/topic_publisher.cpp')
-rw-r--r--cpp/src/tests/topic_publisher.cpp26
1 files changed, 22 insertions, 4 deletions
diff --git a/cpp/src/tests/topic_publisher.cpp b/cpp/src/tests/topic_publisher.cpp
index 0f3406a1fe..f37ad2dc0e 100644
--- a/cpp/src/tests/topic_publisher.cpp
+++ b/cpp/src/tests/topic_publisher.cpp
@@ -82,6 +82,7 @@ struct Args : public TestOptions {
int batches;
int delay;
int size;
+ string statusqueue;
Args() : messages(1000), subscribers(1),
transactional(false), durable(false),
@@ -94,7 +95,8 @@ struct Args : public TestOptions {
("durable", optValue(durable), "messages should be durable")
("batches", optValue(batches, "N"), "how many batches to run")
("delay", optValue(delay, "SECONDS"), "Causes a delay between each batch")
- ("size", optValue(size, "BYTES"), "size of the published messages");
+ ("size", optValue(size, "BYTES"), "size of the published messages")
+ ("status-queue", optValue(statusqueue, "QUEUE-NAME"), "Message queue to read status messages from");
}
};
@@ -108,11 +110,28 @@ int main(int argc, char** argv) {
Connection connection;
args.open(connection);
AsyncSession session = connection.newSession();
+
+ // If status-queue is defined, wait for all expected listeners to join in before we start
+ if( args.statusqueue.length() > 0 ) {
+ cout << "Waiting for " << args.subscribers << " listeners..." << endl;
+ SubscriptionManager statusSubs(session);
+ LocalQueue statusQ;
+ statusSubs.subscribe(statusQ, args.statusqueue);
+ for (int i = 0; i < args.subscribers; i++) {
+ Message m = statusQ.get();
+ if( m.getData().find("topic_listener: ", 0) == 0 ) {
+ cout << "Listener " << (i+1) << " of " << args.subscribers
+ << " is ready (pid " << m.getData().substr(16, m.getData().length() - 16)
+ << ")" << endl;
+ } else {
+ throw Exception(QPID_MSG("Unexpected message received on status queue: " << m.getData()));
+ }
+ }
+ }
+
if (args.transactional) {
session.txSelect();
}
-
-
session.queueDeclare(arg::queue="response");
session.exchangeBind(arg::exchange="amq.direct", arg::queue="response", arg::bindingKey="response");
@@ -203,4 +222,3 @@ void Publisher::terminate(){
session.txCommit();
}
}
-