summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-11-07 23:07:51 +0000
committerAlan Conway <aconway@apache.org>2007-11-07 23:07:51 +0000
commitb930ecd07bc7af075bef2f3fa958bfc118ad5f84 (patch)
tree50249df94aa1864f9b682377edd10d226b073d93 /cpp
parentbf7f244c28c899b90789e77eda5f585edda5bcd9 (diff)
downloadqpid-python-b930ecd07bc7af075bef2f3fa958bfc118ad5f84.tar.gz
Fix race condition in perftest.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@592941 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/tests/perftest.cpp55
1 files changed, 37 insertions, 18 deletions
diff --git a/cpp/src/tests/perftest.cpp b/cpp/src/tests/perftest.cpp
index c775407cdf..d9316527bc 100644
--- a/cpp/src/tests/perftest.cpp
+++ b/cpp/src/tests/perftest.cpp
@@ -68,7 +68,24 @@ Mode mode;
struct ListenThread : public Runnable { Thread thread; void run(); };
struct PublishThread : public Runnable { Thread thread; void run(); };
-
+
+// Create and purge the shared queues
+void setup() {
+ cout << "Create shared queues" << endl;
+ Connection connection;
+ opts.open(connection);
+ Session_0_10 session = connection.newSession();
+ session.setSynchronous(true); // Make sure this is all completed.
+ session.queueDeclare(arg::queue="control"); // Control queue
+ session.queuePurge(arg::queue="control");
+ if (mode==SHARED) {
+ session.queueDeclare(arg::queue="perftest"); // Shared data queue
+ session.queuePurge(arg::queue="perftest");
+ }
+ session.close();
+ connection.close();
+}
+
int main(int argc, char** argv) {
try {
opts.parse(argc, argv);
@@ -78,6 +95,7 @@ int main(int argc, char** argv) {
else throw Exception("Invalid mode");
if (!opts.listen && !opts.publish)
opts.listen = opts.publish = true;
+ setup();
std::vector<ListenThread> listen(opts.consumers);
PublishThread publish;
if (opts.listen)
@@ -122,19 +140,16 @@ void PublishThread::run() {
opts.open(connection);
Session_0_10 session = connection.newSession();
- session.queueDeclare(arg::queue="control"); // Control queue
- session.queuePurge(arg::queue="control");
- if (mode==SHARED) {
- session.queueDeclare(arg::queue="perftest"); // Shared data queue
- session.queuePurge(arg::queue="perftest");
- }
-
// Wait for consumers.
+ cout << "Publisher wating for consumers " << flush;
SubscriptionManager subs(session);
LocalQueue control;
subs.subscribe(control, "control");
- for (int i = 0; i < opts.consumers; ++i)
+ for (int i = 0; i < opts.consumers; ++i) {
+ cout << "." << flush;
expect(control.pop().getData(), "ready");
+ }
+ cout << endl;
// Create test message
size_t msgSize=max(opts.size, 32);
@@ -167,6 +182,7 @@ void PublishThread::run() {
cout << "publish rate:" << (opts.count)/secs(start,end) << endl;
// Wait for consumer(s) to finish.
+ cout << "Publisher wating for consumer reports. " << endl;
for (int i = 0; i < opts.consumers; ++i) {
string report=control.pop().getData();
if (report.find("consume") != 0)
@@ -201,14 +217,11 @@ void ListenThread::run() {
Session_0_10 session = connection.newSession();
string consumeQueue;
- switch (mode) {
- case SHARED:
+ if (mode == SHARED) {
consumeQueue="perftest";
- session.queueDeclare(arg::queue="perftest");
- break;
- case FANOUT:
- case TOPIC:
- consumeQueue=session.getId().str(); // Unique
+ }
+ else {
+ consumeQueue=session.getId().str(); // Unique name.
session.queueDeclare(arg::queue=consumeQueue,
arg::exclusive=true,
arg::autoDelete=true);
@@ -217,7 +230,6 @@ void ListenThread::run() {
arg::routingKey="perftest");
}
// Notify publisher we are ready.
- session.queueDeclare(arg::queue="control"); // Control queue
session.messageTransfer(arg::content=Message("ready", "control"));
SubscriptionManager subs(session);
@@ -226,8 +238,15 @@ void ListenThread::run() {
int consumed=0;
AbsTime start=now();
Message msg;
- while ((msg=consume.pop()).getData() != "done")
+ if (!opts.publish)
+ cout << "Consuming " << flush;
+ while ((msg=consume.pop()).getData() != "done") {
++consumed;
+ if (!opts.publish && (consumed%10000) == 0)
+ cout << "." << flush;
+ }
+ if (!opts.publish)
+ cout << endl;
msg.acknowledge(); // Ack all outstanding messages.
AbsTime end=now();