summaryrefslogtreecommitdiff
path: root/cpp/src/tests/topic_publisher.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/topic_publisher.cpp')
-rw-r--r--cpp/src/tests/topic_publisher.cpp138
1 files changed, 72 insertions, 66 deletions
diff --git a/cpp/src/tests/topic_publisher.cpp b/cpp/src/tests/topic_publisher.cpp
index f37ad2dc0e..3381132b1a 100644
--- a/cpp/src/tests/topic_publisher.cpp
+++ b/cpp/src/tests/topic_publisher.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,7 +23,7 @@
* This file provides one half of a test and example of a pub-sub
* style of interaction. See topic_listener.cpp for the other half, in
* which the logic for subscribers is defined.
- *
+ *
* This file contains the publisher logic. The publisher will send a
* number of messages to the exchange with the appropriate routing key
* for the logical 'topic'. Once it has done this it will then send a
@@ -40,7 +40,6 @@
#include "qpid/client/AsyncSession.h"
#include "qpid/client/SubscriptionManager.h"
#include "qpid/sys/Monitor.h"
-#include <unistd.h>
#include "qpid/sys/Time.h"
#include <cstdlib>
#include <iostream>
@@ -50,19 +49,22 @@ using namespace qpid::client;
using namespace qpid::sys;
using namespace std;
+namespace qpid {
+namespace tests {
+
/**
* The publishing logic is defined in this class. It implements
* message listener and can therfore be used to receive messages sent
* back by the subscribers.
*/
-class Publisher {
+class Publisher {
AsyncSession session;
SubscriptionManager mgr;
LocalQueue queue;
const string controlTopic;
const bool transactional;
const bool durable;
-
+
string generateData(int size);
public:
@@ -100,6 +102,64 @@ struct Args : public TestOptions {
}
};
+Publisher::Publisher(const AsyncSession& _session, const string& _controlTopic, bool tx, bool d) :
+ session(_session), mgr(session), controlTopic(_controlTopic), transactional(tx), durable(d)
+{
+ mgr.subscribe(queue, "response");
+}
+
+int64_t Publisher::publish(int msgs, int listeners, int size){
+ Message msg(generateData(size), controlTopic);
+ if (durable) {
+ msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
+ }
+ AbsTime start = now();
+
+ for(int i = 0; i < msgs; i++){
+ session.messageTransfer(arg::content=msg, arg::destination="amq.topic", arg::acceptMode=1);
+ }
+ //send report request
+ Message reportRequest("", controlTopic);
+ reportRequest.getHeaders().setString("TYPE", "REPORT_REQUEST");
+ session.messageTransfer(arg::content=reportRequest, arg::destination="amq.topic", arg::acceptMode=1);
+ if(transactional){
+ sync(session).txCommit();
+ }
+ //wait for a response from each listener (TODO, could log these)
+ for (int i = 0; i < listeners; i++) {
+ Message report = queue.pop();
+ }
+
+ if(transactional){
+ sync(session).txCommit();
+ }
+
+ AbsTime finish = now();
+ return Duration(start, finish);
+}
+
+string Publisher::generateData(int size){
+ string data;
+ for(int i = 0; i < size; i++){
+ data += ('A' + (i / 26));
+ }
+ return data;
+}
+
+void Publisher::terminate(){
+ //send termination request
+ Message terminationRequest("", controlTopic);
+ terminationRequest.getHeaders().setString("TYPE", "TERMINATION_REQUEST");
+ session.messageTransfer(arg::content=terminationRequest, arg::destination="amq.topic", arg::acceptMode=1);
+ if(transactional){
+ session.txCommit();
+ }
+}
+
+}} // namespace qpid::tests
+
+using namespace qpid::tests;
+
int main(int argc, char** argv) {
try{
Args args;
@@ -121,11 +181,11 @@ int main(int argc, char** argv) {
Message m = statusQ.get();
if( m.getData().find("topic_listener: ", 0) == 0 ) {
cout << "Listener " << (i+1) << " of " << args.subscribers
- << " is ready (pid " << m.getData().substr(16, m.getData().length() - 16)
- << ")" << endl;
+ << " is ready (pid " << m.getData().substr(16, m.getData().length() - 16)
+ << ")" << endl;
} else {
throw Exception(QPID_MSG("Unexpected message received on status queue: " << m.getData()));
- }
+ }
}
}
@@ -142,7 +202,7 @@ int main(int argc, char** argv) {
int64_t min(0);
int64_t sum(0);
for(int i = 0; i < batchSize; i++){
- if(i > 0 && args.delay) sleep(args.delay);
+ if(i > 0 && args.delay) qpid::sys::sleep(args.delay);
int64_t msecs =
publisher.publish(args.messages,
args.subscribers,
@@ -151,12 +211,12 @@ int main(int argc, char** argv) {
if(!min || msecs < min) min = msecs;
sum += msecs;
cout << "Completed " << (i+1) << " of " << batchSize
- << " in " << msecs << "ms" << endl;
+ << " in " << msecs << "ms" << endl;
}
publisher.terminate();
int64_t avg = sum / batchSize;
if(batchSize > 1){
- cout << batchSize << " batches completed. avg=" << avg <<
+ cout << batchSize << " batches completed. avg=" << avg <<
", max=" << max << ", min=" << min << endl;
}
session.close();
@@ -168,57 +228,3 @@ int main(int argc, char** argv) {
}
return 1;
}
-
-Publisher::Publisher(const AsyncSession& _session, const string& _controlTopic, bool tx, bool d) :
- session(_session), mgr(session), controlTopic(_controlTopic), transactional(tx), durable(d)
-{
- mgr.subscribe(queue, "response");
-}
-
-int64_t Publisher::publish(int msgs, int listeners, int size){
- Message msg(generateData(size), controlTopic);
- if (durable) {
- msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
- }
- AbsTime start = now();
-
- for(int i = 0; i < msgs; i++){
- session.messageTransfer(arg::content=msg, arg::destination="amq.topic", arg::acceptMode=1);
- }
- //send report request
- Message reportRequest("", controlTopic);
- reportRequest.getHeaders().setString("TYPE", "REPORT_REQUEST");
- session.messageTransfer(arg::content=reportRequest, arg::destination="amq.topic", arg::acceptMode=1);
- if(transactional){
- sync(session).txCommit();
- }
- //wait for a response from each listener (TODO, could log these)
- for (int i = 0; i < listeners; i++) {
- Message report = queue.pop();
- }
-
- if(transactional){
- sync(session).txCommit();
- }
-
- AbsTime finish = now();
- return Duration(start, finish);
-}
-
-string Publisher::generateData(int size){
- string data;
- for(int i = 0; i < size; i++){
- data += ('A' + (i / 26));
- }
- return data;
-}
-
-void Publisher::terminate(){
- //send termination request
- Message terminationRequest("", controlTopic);
- terminationRequest.getHeaders().setString("TYPE", "TERMINATION_REQUEST");
- session.messageTransfer(arg::content=terminationRequest, arg::destination="amq.topic", arg::acceptMode=1);
- if(transactional){
- session.txCommit();
- }
-}