summaryrefslogtreecommitdiff
path: root/cpp/tests/topic_publisher.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/tests/topic_publisher.cpp')
-rw-r--r--cpp/tests/topic_publisher.cpp258
1 files changed, 258 insertions, 0 deletions
diff --git a/cpp/tests/topic_publisher.cpp b/cpp/tests/topic_publisher.cpp
new file mode 100644
index 0000000000..cde3f6ee79
--- /dev/null
+++ b/cpp/tests/topic_publisher.cpp
@@ -0,0 +1,258 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <QpidError.h>
+#include <ClientChannel.h>
+#include <Connection.h>
+#include <ClientExchange.h>
+#include <MessageListener.h>
+#include <ClientQueue.h>
+#include <sys/Monitor.h>
+#include "unistd.h"
+#include <sys/Time.h>
+#include <cstdlib>
+#include <iostream>
+
+using namespace qpid::client;
+using namespace qpid::sys;
+using std::string;
+
+class Publisher : public MessageListener{
+ Channel* const channel;
+ const std::string controlTopic;
+ const bool transactional;
+ Monitor monitor;
+ int count;
+
+ void waitForCompletion(int msgs);
+ string generateData(int size);
+
+public:
+ Publisher(Channel* channel, const std::string& controlTopic, bool tx);
+ virtual void received(Message& msg);
+ int64_t publish(int msgs, int listeners, int size);
+ void terminate();
+};
+
+class Args{
+ string host;
+ int port;
+ int messages;
+ int subscribers;
+ int ackMode;
+ bool transactional;
+ int prefetch;
+ int batches;
+ int delay;
+ int size;
+ bool trace;
+ bool help;
+public:
+ inline Args() : host("localhost"), port(5672), messages(1000), subscribers(1),
+ ackMode(NO_ACK), transactional(false), prefetch(1000), batches(1),
+ delay(0), size(256), trace(false), help(false){}
+
+ void parse(int argc, char** argv);
+ void usage();
+
+ inline const string& getHost() const { return host;}
+ inline int getPort() const { return port; }
+ inline int getMessages() const { return messages; }
+ inline int getSubscribers() const { return subscribers; }
+ inline int getAckMode(){ return ackMode; }
+ inline bool getTransactional() const { return transactional; }
+ inline int getPrefetch(){ return prefetch; }
+ inline int getBatches(){ return batches; }
+ inline int getDelay(){ return delay; }
+ inline int getSize(){ return size; }
+ inline bool getTrace() const { return trace; }
+ inline bool getHelp() const { return help; }
+};
+
+int main(int argc, char** argv){
+ Args args;
+ args.parse(argc, argv);
+ if(args.getHelp()){
+ args.usage();
+ }else{
+ try{
+ Connection connection(args.getTrace());
+ connection.open(args.getHost(), args.getPort());
+ Channel channel(args.getTransactional(), args.getPrefetch());
+ connection.openChannel(&channel);
+
+ //declare queue (relying on default binding):
+ Queue response("response");
+ channel.declareQueue(response);
+
+ //set up listener
+ Publisher publisher(&channel, "topic_control", args.getTransactional());
+ std::string tag("mytag");
+ channel.consume(response, tag, &publisher, args.getAckMode());
+ channel.start();
+
+ int batchSize(args.getBatches());
+ int64_t max(0);
+ int64_t min(0);
+ int64_t sum(0);
+ for(int i = 0; i < batchSize; i++){
+ if(i > 0 && args.getDelay()) sleep(args.getDelay());
+ Time time = publisher.publish(
+ args.getMessages(), args.getSubscribers(), args.getSize());
+ if(!max || time > max) max = time;
+ if(!min || time < min) min = time;
+ sum += time;
+ std::cout << "Completed " << (i+1) << " of " << batchSize
+ << " in " << time/TIME_MSEC << "ms" << std::endl;
+ }
+ publisher.terminate();
+ int64_t avg = sum / batchSize;
+ if(batchSize > 1){
+ std::cout << batchSize << " batches completed. avg=" << avg <<
+ ", max=" << max << ", min=" << min << std::endl;
+ }
+ channel.close();
+ connection.close();
+ }catch(qpid::QpidError error){
+ std::cout << error.what() << std::endl;
+ }
+ }
+}
+
+Publisher::Publisher(Channel* _channel, const std::string& _controlTopic, bool tx) :
+ channel(_channel), controlTopic(_controlTopic), transactional(tx){}
+
+void Publisher::received(Message& ){
+ //count responses and when all are received end the current batch
+ Monitor::ScopedLock l(monitor);
+ if(--count == 0){
+ monitor.notify();
+ }
+}
+
+void Publisher::waitForCompletion(int msgs){
+ count = msgs;
+ monitor.wait();
+}
+
+int64_t Publisher::publish(int msgs, int listeners, int size){
+ Message msg;
+ msg.setData(generateData(size));
+ Time start = now();
+ {
+ Monitor::ScopedLock l(monitor);
+ for(int i = 0; i < msgs; i++){
+ channel->publish(msg, Exchange::DEFAULT_TOPIC_EXCHANGE, controlTopic);
+ }
+ //send report request
+ Message reportRequest;
+ reportRequest.getHeaders().setString("TYPE", "REPORT_REQUEST");
+ channel->publish(reportRequest, Exchange::DEFAULT_TOPIC_EXCHANGE, controlTopic);
+ if(transactional){
+ channel->commit();
+ }
+
+ waitForCompletion(listeners);
+ }
+
+ Time finish = now();
+ return finish - start;
+}
+
+string Publisher::generateData(int size){
+ string data;
+ for(int i = 0; i < size; i++){
+ data += ('A' + (i / 26));
+ }
+ return data;
+}
+
+void Publisher::terminate(){
+ //send termination request
+ Message terminationRequest;
+ terminationRequest.getHeaders().setString("TYPE", "TERMINATION_REQUEST");
+ channel->publish(terminationRequest, Exchange::DEFAULT_TOPIC_EXCHANGE, controlTopic);
+ if(transactional){
+ channel->commit();
+ }
+}
+
+void Args::parse(int argc, char** argv){
+ for(int i = 1; i < argc; i++){
+ string name(argv[i]);
+ if("-help" == name){
+ help = true;
+ break;
+ }else if("-host" == name){
+ host = argv[++i];
+ }else if("-port" == name){
+ port = atoi(argv[++i]);
+ }else if("-messages" == name){
+ messages = atoi(argv[++i]);
+ }else if("-subscribers" == name){
+ subscribers = atoi(argv[++i]);
+ }else if("-ack_mode" == name){
+ ackMode = atoi(argv[++i]);
+ }else if("-transactional" == name){
+ transactional = true;
+ }else if("-prefetch" == name){
+ prefetch = atoi(argv[++i]);
+ }else if("-batches" == name){
+ batches = atoi(argv[++i]);
+ }else if("-delay" == name){
+ delay = atoi(argv[++i]);
+ }else if("-size" == name){
+ size = atoi(argv[++i]);
+ }else if("-trace" == name){
+ trace = true;
+ }else{
+ std::cout << "Warning: unrecognised option " << name << std::endl;
+ }
+ }
+}
+
+void Args::usage(){
+ std::cout << "Options:" << std::endl;
+ std::cout << " -help" << std::endl;
+ std::cout << " Prints this usage message" << std::endl;
+ std::cout << " -host <host>" << std::endl;
+ std::cout << " Specifies host to connect to (default is localhost)" << std::endl;
+ std::cout << " -port <port>" << std::endl;
+ std::cout << " Specifies port to conect to (default is 5762)" << std::endl;
+ std::cout << " -messages <count>" << std::endl;
+ std::cout << " Specifies how many messages to send" << std::endl;
+ std::cout << " -subscribers <count>" << std::endl;
+ std::cout << " Specifies how many subscribers to expect reports from" << std::endl;
+ std::cout << " -ack_mode <mode>" << std::endl;
+ std::cout << " Sets the acknowledgement mode" << std::endl;
+ std::cout << " 0=NO_ACK (default), 1=AUTO_ACK, 2=LAZY_ACK" << std::endl;
+ std::cout << " -transactional" << std::endl;
+ std::cout << " Indicates the client should use transactions" << std::endl;
+ std::cout << " -prefetch <count>" << std::endl;
+ std::cout << " Specifies the prefetch count (default is 1000)" << std::endl;
+ std::cout << " -batches <count>" << std::endl;
+ std::cout << " Specifies how many batches to run" << std::endl;
+ std::cout << " -delay <seconds>" << std::endl;
+ std::cout << " Causes a delay between each batch" << std::endl;
+ std::cout << " -size <bytes>" << std::endl;
+ std::cout << " Sets the size of the published messages (default is 256 bytes)" << std::endl;
+ std::cout << " -trace" << std::endl;
+ std::cout << " Indicates that the frames sent and received should be logged" << std::endl;
+}