From 9cf58ef6be38185c9a9d5325fb2dd522aa774529 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Mon, 14 Jun 2010 13:44:00 +0000 Subject: Rename tests qpid_* to qpid-* for consistency. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@954471 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/tests/CMakeLists.txt | 12 +- cpp/src/tests/Makefile.am | 29 ++-- cpp/src/tests/qpid-cpp-benchmark | 145 +++++++++++++++++ cpp/src/tests/qpid-ping.cpp | 76 +++++++++ cpp/src/tests/qpid-receive.cpp | 237 +++++++++++++++++++++++++++ cpp/src/tests/qpid-send.cpp | 337 +++++++++++++++++++++++++++++++++++++++ cpp/src/tests/qpid-stream.cpp | 193 ++++++++++++++++++++++ cpp/src/tests/qpid_cpp_benchmark | 145 ----------------- cpp/src/tests/qpid_ping.cpp | 76 --------- cpp/src/tests/qpid_receive.cpp | 237 --------------------------- cpp/src/tests/qpid_send.cpp | 337 --------------------------------------- cpp/src/tests/qpid_stream.cpp | 193 ---------------------- cpp/src/tests/ssl_test | 14 +- 13 files changed, 1013 insertions(+), 1018 deletions(-) create mode 100755 cpp/src/tests/qpid-cpp-benchmark create mode 100644 cpp/src/tests/qpid-ping.cpp create mode 100644 cpp/src/tests/qpid-receive.cpp create mode 100644 cpp/src/tests/qpid-send.cpp create mode 100644 cpp/src/tests/qpid-stream.cpp delete mode 100755 cpp/src/tests/qpid_cpp_benchmark delete mode 100644 cpp/src/tests/qpid_ping.cpp delete mode 100644 cpp/src/tests/qpid_receive.cpp delete mode 100644 cpp/src/tests/qpid_send.cpp delete mode 100644 cpp/src/tests/qpid_stream.cpp (limited to 'cpp/src') diff --git a/cpp/src/tests/CMakeLists.txt b/cpp/src/tests/CMakeLists.txt index 9b7e22ab51..09865caca5 100644 --- a/cpp/src/tests/CMakeLists.txt +++ b/cpp/src/tests/CMakeLists.txt @@ -264,13 +264,13 @@ target_link_libraries (sender qpidmessaging) #sender_SOURCES=sender.cpp TestOptions.h ConnectionOptions.h remember_location(sender) -add_executable (qpid_receive qpid_receive.cpp Statistics.cpp ${platform_test_additions}) -target_link_libraries (qpid_receive qpidmessaging) -remember_location(qpid_receive) +add_executable (qpid-receive qpid-receive.cpp Statistics.cpp ${platform_test_additions}) +target_link_libraries (qpid-receive qpidmessaging) +remember_location(qpid-receive) -add_executable (qpid_send qpid_send.cpp Statistics.cpp ${platform_test_additions}) -target_link_libraries (qpid_send qpidmessaging) -remember_location(qpid_send) +add_executable (qpid-send qpid-send.cpp Statistics.cpp ${platform_test_additions}) +target_link_libraries (qpid-send qpidmessaging) +remember_location(qpid-send) # qpid-perftest and qpid-latency-test are generally useful so install them install (TARGETS qpid-perftest qpid-latency-test RUNTIME diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index bb9628df52..d800583385 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -127,14 +127,6 @@ if HAVE_XML unit_test_SOURCES+= XmlClientSessionTest.cpp endif - -# Disabled till we move to amqp_0_10 codec. -# amqp_0_10/serialize.cpp allSegmentTypes.h \ -# amqp_0_10/ProxyTemplate.cpp \ -# amqp_0_10/apply.cpp \ -# amqp_0_10/Map.cpp \ -# amqp_0_10/handlers.cpp - TESTLIBFLAGS = -module -rpath $(abs_builddir) check_LTLIBRARIES += libshlibtest.la @@ -153,6 +145,9 @@ include ssl.mk endif # Test programs that are installed and therefore built as part of make, not make check + +qpidtest_SCRIPTS += qpid-cpp-benchmark + qpidtest_PROGRAMS += receiver receiver_SOURCES = \ receiver.cpp \ @@ -168,18 +163,18 @@ sender_SOURCES = \ Statistics.cpp sender_LDADD = $(lib_messaging) -qpidtest_PROGRAMS += qpid_receive +qpidtest_PROGRAMS += qpid-receive qpid_receive_SOURCES = \ - qpid_receive.cpp \ + qpid-receive.cpp \ TestOptions.h \ ConnectionOptions.h \ Statistics.h \ Statistics.cpp qpid_receive_LDADD = $(lib_messaging) -qpidtest_PROGRAMS += qpid_send +qpidtest_PROGRAMS += qpid-send qpid_send_SOURCES = \ - qpid_send.cpp \ + qpid-send.cpp \ TestOptions.h \ ConnectionOptions.h \ Statistics.h \ @@ -216,9 +211,9 @@ qpid_topic_publisher_INCLUDES=$(PUBLIC_INCLUDES) qpid_topic_publisher_SOURCES=qpid-topic-publisher.cpp TestOptions.h ConnectionOptions.h qpid_topic_publisher_LDADD=$(lib_client) -qpidtest_PROGRAMS+=qpid_ping +qpidtest_PROGRAMS+=qpid-ping qpid_ping_INCLUDES=$(PUBLIC_INCLUDES) -qpid_ping_SOURCES=qpid_ping.cpp test_tools.h TestOptions.h ConnectionOptions.h +qpid_ping_SOURCES=qpid-ping.cpp test_tools.h TestOptions.h ConnectionOptions.h qpid_ping_LDADD=$(lib_client) # @@ -299,10 +294,10 @@ check_PROGRAMS+=qrsh qrsh_SOURCES=qrsh.cpp qrsh_LDADD=$(lib_client) -check_PROGRAMS+=qpid_stream +check_PROGRAMS+=qpid-stream qpid_stream_INCLUDES=$(PUBLIC_INCLUDES) -qpid_stream_SOURCES=qpid_stream.cpp -qpid_stream_LDADD=$(lib_messaging) +qpid_stream_SOURCES=qpid-stream.cpp +qpid_stream_LDADD=$(lib_messaging) TESTS_ENVIRONMENT = \ VALGRIND=$(VALGRIND) \ diff --git a/cpp/src/tests/qpid-cpp-benchmark b/cpp/src/tests/qpid-cpp-benchmark new file mode 100755 index 0000000000..b8a74a2b90 --- /dev/null +++ b/cpp/src/tests/qpid-cpp-benchmark @@ -0,0 +1,145 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import optparse, time, qpid.messaging +from threading import Thread +from subprocess import Popen, PIPE, STDOUT + +op = optparse.OptionParser(usage="usage: %prog [options]", + description="simple performance benchmarks") +op.add_option("-b", "--broker", default="127.0.0.1", + help="url of broker to connect to") +op.add_option("-q", "--queues", default=1, type="int", metavar="N", + help="create N queues (default %default)") +op.add_option("-s", "--senders", default=1, type="int", metavar="N", + help="start N senders per queue (default %default)") +op.add_option("-r", "--receivers", default=1, type="int", metavar="N", + help="start N receivers per queue (default %default)") +op.add_option("-m", "--messages", default=100000, type="int", metavar="N", + help="send N messages per sender (default %default)") +op.add_option("--queue-name", default="benchmark", + help="base name for queues (default %default)") +op.add_option("--send-rate", default=0, metavar="R", + help="send rate limited to R messages/second, 0 means no limit (default %default)") +op.add_option("--content-size", default=1024, type="int", metavar="BYTES", + help="message size in bytes (default %default)") +op.add_option("--ack-frequency", default=0, metavar="N", type="int", + help="receiver ack's every N messages, 0 means unconfirmed") +op.add_option("--no-report-header", dest="report_header", default=True, + action="store_false", help="don't print header on report") +op.add_option("--repeat", default=1, metavar="N", help="repeat N times", type="int") +op.add_option("--send-option", default=[], action="append", type="str", + help="Additional option for sending addresses") +op.add_option("--receive-option", default=[], action="append", type="str", + help="Additional option for receiving addresses") +op.add_option("--no-timestamp", dest="timestamp", default=True, + action="store_false", help="don't add a timestamp, no latency results") + +def start_receive(queue, opts, ready_queue): + address="%s;{%s}"%(queue,",".join(["create:always"]+opts.receive_option)) + return Popen(["qpid-receive", + "-b", opts.broker, + "-a", address, + "--forever", + "--print-content=no", + "--report-total", + "--ack-frequency", str(opts.ack_frequency), + "--ready-address", ready_queue, + "--report-header=no", + ], + stdout=PIPE, stderr=STDOUT) + +def start_send(queue, opts): + address="%s;{%s}"%(queue,",".join(opts.send_option)) + return Popen(["qpid-send", + "-b", opts.broker, + "-a", address, + "--messages", str(opts.messages), + "--send-eos", str(opts.receivers), + "--content-size", str(opts.content_size), + "--send-rate", str(opts.send_rate), + "--report-total", + "--report-header=no", + "--timestamp=%s"%(opts.timestamp and "yes" or "no"), + "--sequence=no", + ], + stdout=PIPE, stderr=STDOUT) + +def wait_for_output(p): + out,err=p.communicate() + if p.returncode != 0: raise Exception("ERROR:\n%s"%(out)) + return out + +def delete_queues(queues, broker): + c = qpid.messaging.Connection(broker) + c.open() + for q in queues: + try: s = c.session().sender("%s;{delete:always}"%(q)) + except qpid.messaging.exceptions.NotFound: pass # Ignore "no such queue" + c.close() + +def print_output(senders, receivers, want_header): + send_stats = sum([wait_for_output(p).split("\n")[:-1] for p in senders],[]) + recv_stats = sum([wait_for_output(p).split("\n")[:-1] for p in receivers],[]) + def empty_if_none(s): + if s: return s + else: return "" + stats = map(lambda s,r: empty_if_none(s)+"\t\t"+empty_if_none(r), + send_stats, recv_stats) + if want_header: print "send-tp\t\trecv-tp\tl-min\tl-max\tl-avg" + for s in stats: print s; + +class ReadyReceiver: + """A receiver for ready messages""" + def __init__(self, queue, broker): + delete_queues([queue], broker) + self.connection = qpid.messaging.Connection(broker) + self.connection.open() + self.receiver = self.connection.session().receiver( + "%s;{create:always,delete:always}"%(queue)) + self.timeout=2 + + def wait(self, receivers): + try: + for i in xrange(len(receivers)): self.receiver.fetch(self.timeout) + self.connection.close() + except qpid.messaging.Empty: + for r in receivers: + if (r.poll()): raise "Receiver error: %s"%(wait_for_output(r)) + raise "Timed out waiting for receivers to be ready" + +def main(): + opts, args = op.parse_args() + send_out = "" + receive_out = "" + ready_queue="%s-ready"%(opts.queue_name) + queues = ["%s-%s"%(opts.queue_name, i) for i in xrange(opts.queues)] + for i in xrange(opts.repeat): + delete_queues(queues, opts.broker) + ready_receiver = ReadyReceiver(ready_queue, opts.broker) + receivers = [start_receive(q, opts, ready_queue) + for q in queues for j in xrange(opts.receivers)] + ready_receiver.wait(receivers) # Wait for receivers to be ready. + senders = [start_send(q, opts) for q in queues for j in xrange(opts.senders)] + print_output(senders, receivers, opts.report_header and i == 0) + delete_queues(queues, opts.broker) + +if __name__ == "__main__": main() + diff --git a/cpp/src/tests/qpid-ping.cpp b/cpp/src/tests/qpid-ping.cpp new file mode 100644 index 0000000000..0cb4afa0ee --- /dev/null +++ b/cpp/src/tests/qpid-ping.cpp @@ -0,0 +1,76 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + + */ + +#include "TestOptions.h" +#include "qpid/client/SubscriptionManager.h" +#include "qpid/client/Connection.h" +#include "qpid/client/AsyncSession.h" +#include "qpid/sys/Time.h" +#include "qpid/sys/Thread.h" +#include "qpid/sys/Runnable.h" +#include "qpid/sys/Monitor.h" +#include "qpid/framing/Uuid.h" +#include +#include + +using namespace std; +using namespace qpid::sys; +using namespace qpid::framing; +using namespace qpid::client; +using namespace qpid; + +struct PingOptions : public qpid::TestOptions { + int timeout; // Timeout in seconds. + bool quiet; // No output + PingOptions() : timeout(1), quiet(false) { + addOptions() + ("timeout,t", optValue(timeout, "SECONDS"), "Max time to wait.") + ("quiet,q", optValue(quiet), "Don't print anything to stderr/stdout."); + } +}; + +int main(int argc, char** argv) { + try { + PingOptions opts; + opts.parse(argc, argv); + opts.con.heartbeat = (opts.timeout+1)/2; + Connection connection; + opts.open(connection); + if (!opts.quiet) cout << "Opened connection." << endl; + AsyncSession s = connection.newSession(); + string qname(Uuid(true).str()); + s.queueDeclare(arg::queue=qname,arg::autoDelete=true,arg::exclusive=true); + s.messageTransfer(arg::content=Message("hello", qname)); + if (!opts.quiet) cout << "Sent message." << endl; + SubscriptionManager subs(s); + subs.get(qname); + if (!opts.quiet) cout << "Received message." << endl; + s.sync(); + s.close(); + connection.close(); + if (!opts.quiet) cout << "Success." << endl; + return 0; + } catch (const exception& e) { + cerr << "Error: " << e.what() << endl; + return 1; + } +} diff --git a/cpp/src/tests/qpid-receive.cpp b/cpp/src/tests/qpid-receive.cpp new file mode 100644 index 0000000000..a0394ccd21 --- /dev/null +++ b/cpp/src/tests/qpid-receive.cpp @@ -0,0 +1,237 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "TestOptions.h" +#include "Statistics.h" + +#include +#include + +using namespace qpid::messaging; +using namespace qpid::types; +using namespace std; + +namespace qpid { +namespace tests { + +struct Options : public qpid::Options +{ + bool help; + std::string url; + std::string address; + std::string connectionOptions; + int64_t timeout; + bool forever; + uint messages; + bool ignoreDuplicates; + uint capacity; + uint ackFrequency; + uint tx; + uint rollbackFrequency; + bool printContent; + bool printHeaders; + bool failoverUpdates; + qpid::log::Options log; + bool reportTotal; + uint reportEvery; + bool reportHeader; + string readyAddress; + + Options(const std::string& argv0=std::string()) + : qpid::Options("Options"), + help(false), + url("amqp:tcp:127.0.0.1"), + timeout(0), + forever(false), + messages(0), + ignoreDuplicates(false), + capacity(1000), + ackFrequency(100), + tx(0), + rollbackFrequency(0), + printContent(true), + printHeaders(false), + failoverUpdates(false), + log(argv0), + reportTotal(false), + reportEvery(0), + reportHeader(true) + { + addOptions() + ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to") + ("address,a", qpid::optValue(address, "ADDRESS"), "address to receive from") + ("connection-options", qpid::optValue(connectionOptions, "OPTIONS"), "options for the connection") + ("timeout,t", qpid::optValue(timeout, "TIMEOUT"), "timeout in seconds to wait before exiting") + ("forever,f", qpid::optValue(forever), "ignore timeout and wait forever") + ("messages,m", qpid::optValue(messages, "N"), "Number of messages to receive; 0 means receive indefinitely") + ("ignore-duplicates", qpid::optValue(ignoreDuplicates), "Detect and ignore duplicates (by checking 'sn' header)") + ("capacity", qpid::optValue(capacity, "N"), "Pre-fetch window (0 implies no pre-fetch)") + ("ack-frequency", qpid::optValue(ackFrequency, "N"), "Ack frequency (0 implies none of the messages will get accepted)") + ("tx", qpid::optValue(tx, "N"), "batch size for transactions (0 implies transaction are not used)") + ("rollback-frequency", qpid::optValue(rollbackFrequency, "N"), "rollback frequency (0 implies no transaction will be rolledback)") + ("print-content", qpid::optValue(printContent, "yes|no"), "print out message content") + ("print-headers", qpid::optValue(printHeaders, "yes|no"), "print out message headers") + ("failover-updates", qpid::optValue(failoverUpdates), "Listen for membership updates distributed via amq.failover") + ("report-total", qpid::optValue(reportTotal), "Report total throughput and latency statistics") + ("report-every", qpid::optValue(reportEvery,"N"), "Report throughput and latency statistics every N messages.") + ("report-header", qpid::optValue(reportHeader, "yes|no"), "Headers on report.") ("ready-address", qpid::optValue(readyAddress, "ADDRESS"), + "send a message to this address when ready to receive") + ("help", qpid::optValue(help), "print this usage statement"); + add(log); + } + + Duration getTimeout() + { + if (forever) return Duration::FOREVER; + else return Duration::SECOND*timeout; + + } + bool parse(int argc, char** argv) + { + try { + qpid::Options::parse(argc, argv); + if (address.empty()) throw qpid::Exception("Address must be specified!"); + qpid::log::Logger::instance().configure(log); + if (help) { + std::ostringstream msg; + std::cout << msg << *this << std::endl << std::endl + << "Drains messages from the specified address" << std::endl; + return false; + } else { + return true; + } + } catch (const std::exception& e) { + std::cerr << *this << std::endl << std::endl << e.what() << std::endl; + return false; + } + } +}; + +const string EOS("eos"); +const string SN("sn"); + +class SequenceTracker +{ + uint lastSn; + public: + SequenceTracker() : lastSn(0) {} + + bool isDuplicate(Message& message) + { + uint sn = message.getProperties()[SN]; + if (lastSn < sn) { + lastSn = sn; + return false; + } else { + return true; + } + } +}; + +}} // namespace qpid::tests + +using namespace qpid::tests; + +int main(int argc, char ** argv) +{ + Connection connection; + try { + Options opts; + if (opts.parse(argc, argv)) { + connection = Connection(opts.url, opts.connectionOptions); + connection.open(); + std::auto_ptr updates(opts.failoverUpdates ? new FailoverUpdates(connection) : 0); + Session session = opts.tx ? connection.createTransactionalSession() : connection.createSession(); + Receiver receiver = session.createReceiver(opts.address); + receiver.setCapacity(opts.capacity); + Message msg; + uint count = 0; + uint txCount = 0; + SequenceTracker sequenceTracker; + Duration timeout = opts.getTimeout(); + bool done = false; + Reporter reporter(std::cout, opts.reportEvery, opts.reportHeader); + if (!opts.readyAddress.empty()) + session.createSender(opts.readyAddress).send(msg); + while (!done && receiver.fetch(msg, timeout)) { + reporter.message(msg); + if (!opts.ignoreDuplicates || !sequenceTracker.isDuplicate(msg)) { + if (msg.getContent() == EOS) { + done = true; + } else { + ++count; + if (opts.printHeaders) { + if (msg.getSubject().size()) std::cout << "Subject: " << msg.getSubject() << std::endl; + if (msg.getReplyTo()) std::cout << "ReplyTo: " << msg.getReplyTo() << std::endl; + if (msg.getCorrelationId().size()) std::cout << "CorrelationId: " << msg.getCorrelationId() << std::endl; + if (msg.getUserId().size()) std::cout << "UserId: " << msg.getUserId() << std::endl; + if (msg.getTtl().getMilliseconds()) std::cout << "TTL: " << msg.getTtl().getMilliseconds() << std::endl; + if (msg.getDurable()) std::cout << "Durable: true" << std::endl; + if (msg.getRedelivered()) std::cout << "Redelivered: true" << std::endl; + std::cout << "Properties: " << msg.getProperties() << std::endl; + std::cout << std::endl; + } + if (opts.printContent) + std::cout << msg.getContent() << std::endl;//TODO: handle map or list messages + if (opts.messages && count >= opts.messages) done = true; + } + } + if (opts.tx && (count % opts.tx == 0)) { + if (opts.rollbackFrequency && (++txCount % opts.rollbackFrequency == 0)) { + session.rollback(); + } else { + session.commit(); + } + } else if (opts.ackFrequency && (count % opts.ackFrequency == 0)) { + session.acknowledge(); + } + //opts.rejectFrequency?? + } + if (opts.reportTotal) reporter.report(); + if (opts.tx) { + if (opts.rollbackFrequency && (++txCount % opts.rollbackFrequency == 0)) { + session.rollback(); + } else { + session.commit(); + } + } else { + session.acknowledge(); + } + session.close(); + connection.close(); + return 0; + } + } catch(const std::exception& error) { + std::cerr << "Failure: " << error.what() << std::endl; + connection.close(); + return 1; + } +} diff --git a/cpp/src/tests/qpid-send.cpp b/cpp/src/tests/qpid-send.cpp new file mode 100644 index 0000000000..7e96cc1a09 --- /dev/null +++ b/cpp/src/tests/qpid-send.cpp @@ -0,0 +1,337 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include "TestOptions.h" +#include "Statistics.h" + +#include +#include +#include + +using namespace std; +using namespace qpid::messaging; +using namespace qpid::types; + +typedef std::vector string_vector; + +namespace qpid { +namespace tests { + +struct Options : public qpid::Options +{ + bool help; + std::string url; + std::string connectionOptions; + std::string address; + uint messages; + std::string id; + std::string replyto; + uint sendEos; + bool durable; + uint ttl; + std::string userid; + std::string correlationid; + string_vector properties; + string_vector entries; + std::string contentString; + uint contentSize; + bool contentStdin; + uint tx; + uint rollbackFrequency; + uint capacity; + bool failoverUpdates; + qpid::log::Options log; + bool reportTotal; + uint reportEvery; + bool reportHeader; + uint sendRate; + bool sequence; + bool timestamp; + + Options(const std::string& argv0=std::string()) + : qpid::Options("Options"), + help(false), + url("amqp:tcp:127.0.0.1"), + messages(1), + sendEos(0), + durable(false), + ttl(0), + contentString(), + contentSize(0), + contentStdin(false), + tx(0), + rollbackFrequency(0), + capacity(1000), + failoverUpdates(false), + log(argv0), + reportTotal(false), + reportEvery(0), + reportHeader(true), + sendRate(0), + sequence(true), + timestamp(true) + { + addOptions() + ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to") + ("address,a", qpid::optValue(address, "ADDRESS"), "address to drain from") + ("connection-options", qpid::optValue(connectionOptions, "OPTIONS"), "options for the connection") + ("messages,m", qpid::optValue(messages, "N"), "stop after N messages have been sent, 0 means no limit") + ("id,i", qpid::optValue(id, "ID"), "use the supplied id instead of generating one") + ("reply-to", qpid::optValue(replyto, "REPLY-TO"), "specify reply-to address") + ("send-eos", qpid::optValue(sendEos, "N"), "Send N EOS messages to mark end of input") + ("durable", qpid::optValue(durable, "yes|no"), "Mark messages as durable.") + ("ttl", qpid::optValue(ttl, "msecs"), "Time-to-live for messages, in milliseconds") + ("property,P", qpid::optValue(properties, "NAME=VALUE"), "specify message property") + ("correlation-id", qpid::optValue(correlationid, "ID"), "correlation-id for message") + ("user-id", qpid::optValue(userid, "USERID"), "userid for message") + ("content-string", qpid::optValue(contentString, "CONTENT"), "use CONTENT as message content") + ("content-size", qpid::optValue(contentSize, "N"), "create an N-byte message content") + ("content-map,M", qpid::optValue(entries, "NAME=VALUE"), "specify entry for map content") + ("content-stdin", qpid::optValue(contentStdin), "read message content from stdin, one line per message") + ("capacity", qpid::optValue(capacity, "N"), "size of the senders outgoing message queue") + ("tx", qpid::optValue(tx, "N"), "batch size for transactions (0 implies transaction are not used)") + ("rollback-frequency", qpid::optValue(rollbackFrequency, "N"), "rollback frequency (0 implies no transaction will be rolledback)") + ("failover-updates", qpid::optValue(failoverUpdates), "Listen for membership updates distributed via amq.failover") + ("report-total", qpid::optValue(reportTotal), "Report total throughput statistics") + ("report-every", qpid::optValue(reportEvery,"N"), "Report throughput statistics every N messages") + ("report-header", qpid::optValue(reportHeader, "yes|no"), "Headers on report.") + ("send-rate", qpid::optValue(sendRate,"N"), "Send at rate of N messages/second. 0 means send as fast as possible.") + ("sequence", qpid::optValue(sequence, "yes|no"), "Add a sequence number messages property (required for duplicate/lost message detection)") + ("timestamp", qpid::optValue(timestamp, "yes|no"), "Add a time stamp messages property (required for latency measurement)") + ("help", qpid::optValue(help), "print this usage statement"); + add(log); + } + + bool parse(int argc, char** argv) + { + try { + qpid::Options::parse(argc, argv); + if (address.empty()) throw qpid::Exception("Address must be specified!"); + qpid::log::Logger::instance().configure(log); + if (help) { + std::ostringstream msg; + std::cout << msg << *this << std::endl << std::endl + << "Drains messages from the specified address" << std::endl; + return false; + } else { + return true; + } + } catch (const std::exception& e) { + std::cerr << *this << std::endl << std::endl << e.what() << std::endl; + return false; + } + } + + static bool nameval(const std::string& in, std::string& name, std::string& value) + { + std::string::size_type i = in.find("="); + if (i == std::string::npos) { + name = in; + return false; + } else { + name = in.substr(0, i); + if (i+1 < in.size()) { + value = in.substr(i+1); + return true; + } else { + return false; + } + } + } + + static void setProperty(Message& message, const std::string& property) + { + std::string name; + std::string value; + if (nameval(property, name, value)) { + message.getProperties()[name] = value; + } else { + message.getProperties()[name] = Variant(); + } + } + + void setProperties(Message& message) const + { + for (string_vector::const_iterator i = properties.begin(); i != properties.end(); ++i) { + setProperty(message, *i); + } + } + + void setEntries(Variant::Map& content) const + { + for (string_vector::const_iterator i = entries.begin(); i != entries.end(); ++i) { + std::string name; + std::string value; + if (nameval(*i, name, value)) { + content[name] = value; + } else { + content[name] = Variant(); + } + } + } +}; + +const string EOS("eos"); +const string SN("sn"); +const string TS("ts"); + +}} // namespace qpid::tests + +using namespace qpid::tests; + +class ContentGenerator { + public: + virtual ~ContentGenerator() {} + virtual bool setContent(Message& msg) = 0; +}; + +class GetlineContentGenerator : public ContentGenerator { + public: + virtual bool setContent(Message& msg) { + string content; + bool got = getline(std::cin, content); + if (got) msg.setContent(content); + return got; + } +}; + +class FixedContentGenerator : public ContentGenerator { + public: + FixedContentGenerator(const string& s) : content(s) {} + virtual bool setContent(Message& msg) { + msg.setContent(content); + return true; + } + private: + std::string content; +}; + +class MapContentGenerator : public ContentGenerator { + public: + MapContentGenerator(const Options& opt) : opts(opt) {} + virtual bool setContent(Message& msg) { + Variant::Map map; + opts.setEntries(map); + encode(map, msg); + return true; + } + private: + const Options& opts; +}; + +int main(int argc, char ** argv) +{ + Connection connection; + Options opts; + try { + if (opts.parse(argc, argv)) { + connection = Connection(opts.url, opts.connectionOptions); + connection.open(); + std::auto_ptr updates(opts.failoverUpdates ? new FailoverUpdates(connection) : 0); + Session session = opts.tx ? connection.createTransactionalSession() : connection.createSession(); + Sender sender = session.createSender(opts.address); + if (opts.capacity) sender.setCapacity(opts.capacity); + Message msg; + msg.setDurable(opts.durable); + if (opts.ttl) { + msg.setTtl(Duration(opts.ttl)); + } + if (!opts.replyto.empty()) msg.setReplyTo(Address(opts.replyto)); + if (!opts.userid.empty()) msg.setUserId(opts.userid); + if (!opts.correlationid.empty()) msg.setCorrelationId(opts.correlationid); + opts.setProperties(msg); + uint sent = 0; + uint txCount = 0; + Reporter reporter(std::cout, opts.reportEvery, opts.reportHeader); + + std::auto_ptr contentGen; + if (opts.contentStdin) { + opts.messages = 0; // Don't limit # messages sent. + contentGen.reset(new GetlineContentGenerator); + } + else if (opts.entries.size() > 0) + contentGen.reset(new MapContentGenerator(opts)); + else if (opts.contentSize > 0) + contentGen.reset(new FixedContentGenerator(string(opts.contentSize, 'X'))); + else + contentGen.reset(new FixedContentGenerator(opts.contentString)); + + qpid::sys::AbsTime start = qpid::sys::now(); + int64_t interval = 0; + if (opts.sendRate) interval = qpid::sys::TIME_SEC/opts.sendRate; + + while (contentGen->setContent(msg)) { + ++sent; + if (opts.sequence) + msg.getProperties()[SN] = sent; + if (opts.timestamp) + msg.getProperties()[TS] = int64_t( + qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now())); + sender.send(msg); + reporter.message(msg); + if (opts.tx && (sent % opts.tx == 0)) { + if (opts.rollbackFrequency && + (++txCount % opts.rollbackFrequency == 0)) + session.rollback(); + else + session.commit(); + } + if (opts.messages && sent >= opts.messages) break; + if (opts.sendRate) { + qpid::sys::AbsTime waitTill(start, sent*interval); + int64_t delay = qpid::sys::Duration(qpid::sys::now(), waitTill); + if (delay > 0) + qpid::sys::usleep(delay/qpid::sys::TIME_USEC); + } + } + if (opts.reportTotal) reporter.report(); + for (uint i = opts.sendEos; i > 0; --i) { + if (opts.sequence) + msg.getProperties()[SN] = ++sent; + msg.setContent(EOS);//TODO: add in ability to send digest or similar + sender.send(msg); + } + if (opts.tx) { + if (opts.rollbackFrequency && (++txCount % opts.rollbackFrequency == 0)) { + session.rollback(); + } else { + session.commit(); + } + } + session.sync(); + session.close(); + connection.close(); + return 0; + } + } catch(const std::exception& error) { + std::cout << "Failed: " << error.what() << std::endl; + connection.close(); + return 1; + } +} diff --git a/cpp/src/tests/qpid-stream.cpp b/cpp/src/tests/qpid-stream.cpp new file mode 100644 index 0000000000..f02a484750 --- /dev/null +++ b/cpp/src/tests/qpid-stream.cpp @@ -0,0 +1,193 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace qpid::messaging; +using namespace qpid::types; + +namespace qpid { +namespace tests { + +struct Args : public qpid::Options +{ + std::string url; + std::string address; + uint size; + uint rate; + bool durable; + uint receiverCapacity; + uint senderCapacity; + uint ackFrequency; + + Args() : + url("amqp:tcp:127.0.0.1:5672"), + address("test-queue"), + size(512), + rate(1000), + durable(false), + receiverCapacity(0), + senderCapacity(0), + ackFrequency(1) + { + addOptions() + ("url", qpid::optValue(url, "URL"), "Url to connect to.") + ("address", qpid::optValue(address, "ADDRESS"), "Address to stream messages through.") + ("size", qpid::optValue(size, "bytes"), "Message size in bytes (content only, not headers).") + ("rate", qpid::optValue(rate, "msgs/sec"), "Rate at which to stream messages.") + ("durable", qpid::optValue(durable, "true|false"), "Mark messages as durable.") + ("sender-capacity", qpid::optValue(senderCapacity, "N"), "Credit window (0 implies infinite window)") + ("receiver-capacity", qpid::optValue(receiverCapacity, "N"), "Credit window (0 implies infinite window)") + ("ack-frequency", qpid::optValue(ackFrequency, "N"), + "Ack frequency (0 implies none of the messages will get accepted)"); + } +}; + +Args opts; + +const std::string TS = "ts"; + +uint64_t timestamp(const qpid::sys::AbsTime& time) +{ + qpid::sys::Duration t(qpid::sys::EPOCH, time); + return t; +} + +struct Client : qpid::sys::Runnable +{ + virtual ~Client() {} + virtual void doWork(Session&) = 0; + + void run() + { + Connection connection(opts.url); + try { + connection.open(); + Session session = connection.createSession(); + doWork(session); + session.close(); + connection.close(); + } catch(const std::exception& error) { + std::cout << error.what() << std::endl; + connection.close(); + } + } + + qpid::sys::Thread thread; + + void start() { thread = qpid::sys::Thread(this); } + void join() { thread.join(); } +}; + +struct Publish : Client +{ + void doWork(Session& session) + { + Sender sender = session.createSender(opts.address); + if (opts.senderCapacity) sender.setCapacity(opts.senderCapacity); + Message msg(std::string(opts.size, 'X')); + uint64_t interval = qpid::sys::TIME_SEC / opts.rate; + uint64_t sent = 0, missedRate = 0; + qpid::sys::AbsTime start = qpid::sys::now(); + while (true) { + qpid::sys::AbsTime sentAt = qpid::sys::now(); + msg.getProperties()[TS] = timestamp(sentAt); + sender.send(msg); + ++sent; + qpid::sys::AbsTime waitTill(start, sent*interval); + qpid::sys::Duration delay(sentAt, waitTill); + if (delay < 0) { + ++missedRate; + } else { + qpid::sys::usleep(delay / qpid::sys::TIME_USEC); + } + } + } +}; + +struct Consume : Client +{ + void doWork(Session& session) + { + Message msg; + uint64_t received = 0; + double minLatency = std::numeric_limits::max(); + double maxLatency = 0; + double totalLatency = 0; + Receiver receiver = session.createReceiver(opts.address); + if (opts.receiverCapacity) receiver.setCapacity(opts.receiverCapacity); + while (receiver.fetch(msg)) { + ++received; + if (opts.ackFrequency && (received % opts.ackFrequency == 0)) { + session.acknowledge(); + } + //calculate latency + uint64_t receivedAt = timestamp(qpid::sys::now()); + uint64_t sentAt = msg.getProperties()[TS].asUint64(); + double latency = ((double) (receivedAt - sentAt)) / qpid::sys::TIME_MSEC; + + //update avg, min & max + minLatency = std::min(minLatency, latency); + maxLatency = std::max(maxLatency, latency); + totalLatency += latency; + + if (received % opts.rate == 0) { + std::cout << "count=" << received + << ", avg=" << (totalLatency/received) + << ", min=" << minLatency + << ", max=" << maxLatency << std::endl; + } + } + } +}; + +}} // namespace qpid::tests + +using namespace qpid::tests; + +int main(int argc, char** argv) +{ + try { + opts.parse(argc, argv); + Publish publish; + Consume consume; + publish.start(); + consume.start(); + consume.join(); + publish.join(); + return 0; + } catch(const std::exception& error) { + std::cout << error.what() << std::endl; + } + return 1; +} + + diff --git a/cpp/src/tests/qpid_cpp_benchmark b/cpp/src/tests/qpid_cpp_benchmark deleted file mode 100755 index 7f9e5ce393..0000000000 --- a/cpp/src/tests/qpid_cpp_benchmark +++ /dev/null @@ -1,145 +0,0 @@ -#!/usr/bin/env python -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import optparse, time, qpid.messaging -from threading import Thread -from subprocess import Popen, PIPE, STDOUT - -op = optparse.OptionParser(usage="usage: %prog [options]", - description="simple performance benchmarks") -op.add_option("-b", "--broker", default="127.0.0.1", - help="url of broker to connect to") -op.add_option("-q", "--queues", default=1, type="int", metavar="N", - help="create N queues (default %default)") -op.add_option("-s", "--senders", default=1, type="int", metavar="N", - help="start N senders per queue (default %default)") -op.add_option("-r", "--receivers", default=1, type="int", metavar="N", - help="start N receivers per queue (default %default)") -op.add_option("-m", "--messages", default=100000, type="int", metavar="N", - help="send N messages per sender (default %default)") -op.add_option("--queue-name", default="benchmark", - help="base name for queues (default %default)") -op.add_option("--send-rate", default=0, metavar="R", - help="send rate limited to R messages/second, 0 means no limit (default %default)") -op.add_option("--content-size", default=1024, type="int", metavar="BYTES", - help="message size in bytes (default %default)") -op.add_option("--ack-frequency", default=0, metavar="N", type="int", - help="receiver ack's every N messages, 0 means unconfirmed") -op.add_option("--no-report-header", dest="report_header", default=True, - action="store_false", help="don't print header on report") -op.add_option("--repeat", default=1, metavar="N", help="repeat N times", type="int") -op.add_option("--send-option", default=[], action="append", type="str", - help="Additional option for sending addresses") -op.add_option("--receive-option", default=[], action="append", type="str", - help="Additional option for receiving addresses") -op.add_option("--no-timestamp", dest="timestamp", default=True, - action="store_false", help="don't add a timestamp, no latency results") - -def start_receive(queue, opts, ready_queue): - address="%s;{%s}"%(queue,",".join(["create:always"]+opts.receive_option)) - return Popen(["qpid_receive", - "-b", opts.broker, - "-a", address, - "--forever", - "--print-content=no", - "--report-total", - "--ack-frequency", str(opts.ack_frequency), - "--ready-address", ready_queue, - "--report-header=no", - ], - stdout=PIPE, stderr=STDOUT) - -def start_send(queue, opts): - address="%s;{%s}"%(queue,",".join(opts.send_option)) - return Popen(["qpid_send", - "-b", opts.broker, - "-a", address, - "--messages", str(opts.messages), - "--send-eos", str(opts.receivers), - "--content-size", str(opts.content_size), - "--send-rate", str(opts.send_rate), - "--report-total", - "--report-header=no", - "--timestamp=%s"%(opts.timestamp and "yes" or "no"), - "--sequence=no", - ], - stdout=PIPE, stderr=STDOUT) - -def wait_for_output(p): - out,err=p.communicate() - if p.returncode != 0: raise Exception("ERROR:\n%s"%(out)) - return out - -def delete_queues(queues, broker): - c = qpid.messaging.Connection(broker) - c.open() - for q in queues: - try: s = c.session().sender("%s;{delete:always}"%(q)) - except qpid.messaging.exceptions.NotFound: pass # Ignore "no such queue" - c.close() - -def print_output(senders, receivers, want_header): - send_stats = sum([wait_for_output(p).split("\n")[:-1] for p in senders],[]) - recv_stats = sum([wait_for_output(p).split("\n")[:-1] for p in receivers],[]) - def empty_if_none(s): - if s: return s - else: return "" - stats = map(lambda s,r: empty_if_none(s)+"\t\t"+empty_if_none(r), - send_stats, recv_stats) - if want_header: print "send-tp\t\trecv-tp\tl-min\tl-max\tl-avg" - for s in stats: print s; - -class ReadyReceiver: - """A receiver for ready messages""" - def __init__(self, queue, broker): - delete_queues([queue], broker) - self.connection = qpid.messaging.Connection(broker) - self.connection.open() - self.receiver = self.connection.session().receiver( - "%s;{create:always,delete:always}"%(queue)) - self.timeout=2 - - def wait(self, receivers): - try: - for i in xrange(len(receivers)): self.receiver.fetch(self.timeout) - self.connection.close() - except qpid.messaging.Empty: - for r in receivers: - if (r.poll()): raise "Receiver error: %s"%(wait_for_output(r)) - raise "Timed out waiting for receivers to be ready" - -def main(): - opts, args = op.parse_args() - send_out = "" - receive_out = "" - ready_queue="%s-ready"%(opts.queue_name) - queues = ["%s-%s"%(opts.queue_name, i) for i in xrange(opts.queues)] - for i in xrange(opts.repeat): - delete_queues(queues, opts.broker) - ready_receiver = ReadyReceiver(ready_queue, opts.broker) - receivers = [start_receive(q, opts, ready_queue) - for q in queues for j in xrange(opts.receivers)] - ready_receiver.wait(receivers) # Wait for receivers to be ready. - senders = [start_send(q, opts) for q in queues for j in xrange(opts.senders)] - print_output(senders, receivers, opts.report_header and i == 0) - delete_queues(queues, opts.broker) - -if __name__ == "__main__": main() - diff --git a/cpp/src/tests/qpid_ping.cpp b/cpp/src/tests/qpid_ping.cpp deleted file mode 100644 index 0cb4afa0ee..0000000000 --- a/cpp/src/tests/qpid_ping.cpp +++ /dev/null @@ -1,76 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - - */ - -#include "TestOptions.h" -#include "qpid/client/SubscriptionManager.h" -#include "qpid/client/Connection.h" -#include "qpid/client/AsyncSession.h" -#include "qpid/sys/Time.h" -#include "qpid/sys/Thread.h" -#include "qpid/sys/Runnable.h" -#include "qpid/sys/Monitor.h" -#include "qpid/framing/Uuid.h" -#include -#include - -using namespace std; -using namespace qpid::sys; -using namespace qpid::framing; -using namespace qpid::client; -using namespace qpid; - -struct PingOptions : public qpid::TestOptions { - int timeout; // Timeout in seconds. - bool quiet; // No output - PingOptions() : timeout(1), quiet(false) { - addOptions() - ("timeout,t", optValue(timeout, "SECONDS"), "Max time to wait.") - ("quiet,q", optValue(quiet), "Don't print anything to stderr/stdout."); - } -}; - -int main(int argc, char** argv) { - try { - PingOptions opts; - opts.parse(argc, argv); - opts.con.heartbeat = (opts.timeout+1)/2; - Connection connection; - opts.open(connection); - if (!opts.quiet) cout << "Opened connection." << endl; - AsyncSession s = connection.newSession(); - string qname(Uuid(true).str()); - s.queueDeclare(arg::queue=qname,arg::autoDelete=true,arg::exclusive=true); - s.messageTransfer(arg::content=Message("hello", qname)); - if (!opts.quiet) cout << "Sent message." << endl; - SubscriptionManager subs(s); - subs.get(qname); - if (!opts.quiet) cout << "Received message." << endl; - s.sync(); - s.close(); - connection.close(); - if (!opts.quiet) cout << "Success." << endl; - return 0; - } catch (const exception& e) { - cerr << "Error: " << e.what() << endl; - return 1; - } -} diff --git a/cpp/src/tests/qpid_receive.cpp b/cpp/src/tests/qpid_receive.cpp deleted file mode 100644 index 294a60b8cc..0000000000 --- a/cpp/src/tests/qpid_receive.cpp +++ /dev/null @@ -1,237 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include "TestOptions.h" -#include "Statistics.h" - -#include -#include - -using namespace qpid::messaging; -using namespace qpid::types; -using namespace std; - -namespace qpid { -namespace tests { - -struct Options : public qpid::Options -{ - bool help; - std::string url; - std::string address; - std::string connectionOptions; - int64_t timeout; - bool forever; - uint messages; - bool ignoreDuplicates; - uint capacity; - uint ackFrequency; - uint tx; - uint rollbackFrequency; - bool printContent; - bool printHeaders; - bool failoverUpdates; - qpid::log::Options log; - bool reportTotal; - uint reportEvery; - bool reportHeader; - string readyAddress; - - Options(const std::string& argv0=std::string()) - : qpid::Options("Options"), - help(false), - url("amqp:tcp:127.0.0.1"), - timeout(0), - forever(false), - messages(0), - ignoreDuplicates(false), - capacity(1000), - ackFrequency(100), - tx(0), - rollbackFrequency(0), - printContent(true), - printHeaders(false), - failoverUpdates(false), - log(argv0), - reportTotal(false), - reportEvery(0), - reportHeader(true) - { - addOptions() - ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to") - ("address,a", qpid::optValue(address, "ADDRESS"), "address to receive from") - ("connection-options", qpid::optValue(connectionOptions, "OPTIONS"), "options for the connection") - ("timeout,t", qpid::optValue(timeout, "TIMEOUT"), "timeout in seconds to wait before exiting") - ("forever,f", qpid::optValue(forever), "ignore timeout and wait forever") - ("messages,m", qpid::optValue(messages, "N"), "Number of messages to receive; 0 means receive indefinitely") - ("ignore-duplicates", qpid::optValue(ignoreDuplicates), "Detect and ignore duplicates (by checking 'sn' header)") - ("capacity", qpid::optValue(capacity, "N"), "Pre-fetch window (0 implies no pre-fetch)") - ("ack-frequency", qpid::optValue(ackFrequency, "N"), "Ack frequency (0 implies none of the messages will get accepted)") - ("tx", qpid::optValue(tx, "N"), "batch size for transactions (0 implies transaction are not used)") - ("rollback-frequency", qpid::optValue(rollbackFrequency, "N"), "rollback frequency (0 implies no transaction will be rolledback)") - ("print-content", qpid::optValue(printContent, "yes|no"), "print out message content") - ("print-headers", qpid::optValue(printHeaders, "yes|no"), "print out message headers") - ("failover-updates", qpid::optValue(failoverUpdates), "Listen for membership updates distributed via amq.failover") - ("report-total", qpid::optValue(reportTotal), "Report total throughput and latency statistics") - ("report-every", qpid::optValue(reportEvery,"N"), "Report throughput and latency statistics every N messages.") - ("report-header", qpid::optValue(reportHeader, "yes|no"), "Headers on report.") ("ready-address", qpid::optValue(readyAddress, "ADDRESS"), - "send a message to this address when ready to receive") - ("help", qpid::optValue(help), "print this usage statement"); - add(log); - } - - Duration getTimeout() - { - if (forever) return Duration::FOREVER; - else return Duration::SECOND*timeout; - - } - bool parse(int argc, char** argv) - { - try { - qpid::Options::parse(argc, argv); - if (address.empty()) throw qpid::Exception("Address must be specified!"); - qpid::log::Logger::instance().configure(log); - if (help) { - std::ostringstream msg; - std::cout << msg << *this << std::endl << std::endl - << "Drains messages from the specified address" << std::endl; - return false; - } else { - return true; - } - } catch (const std::exception& e) { - std::cerr << *this << std::endl << std::endl << e.what() << std::endl; - return false; - } - } -}; - -const string EOS("eos"); -const string SN("sn"); - -class SequenceTracker -{ - uint lastSn; - public: - SequenceTracker() : lastSn(0) {} - - bool isDuplicate(Message& message) - { - uint sn = message.getProperties()[SN]; - if (lastSn < sn) { - lastSn = sn; - return false; - } else { - return true; - } - } -}; - -}} // namespace qpid::tests - -using namespace qpid::tests; - -int main(int argc, char ** argv) -{ - Connection connection; - try { - Options opts; - if (opts.parse(argc, argv)) { - connection = Connection(opts.url, opts.connectionOptions); - connection.open(); - std::auto_ptr updates(opts.failoverUpdates ? new FailoverUpdates(connection) : 0); - Session session = opts.tx ? connection.createTransactionalSession() : connection.createSession(); - Receiver receiver = session.createReceiver(opts.address); - receiver.setCapacity(opts.capacity); - Message msg; - uint count = 0; - uint txCount = 0; - SequenceTracker sequenceTracker; - Duration timeout = opts.getTimeout(); - bool done = false; - Reporter reporter(std::cout, opts.reportEvery, opts.reportHeader); - if (!opts.readyAddress.empty()) - session.createSender(opts.readyAddress).send(msg); - while (!done && receiver.fetch(msg, timeout)) { - reporter.message(msg); - if (!opts.ignoreDuplicates || !sequenceTracker.isDuplicate(msg)) { - if (msg.getContent() == EOS) { - done = true; - } else { - ++count; - if (opts.printHeaders) { - if (msg.getSubject().size()) std::cout << "Subject: " << msg.getSubject() << std::endl; - if (msg.getReplyTo()) std::cout << "ReplyTo: " << msg.getReplyTo() << std::endl; - if (msg.getCorrelationId().size()) std::cout << "CorrelationId: " << msg.getCorrelationId() << std::endl; - if (msg.getUserId().size()) std::cout << "UserId: " << msg.getUserId() << std::endl; - if (msg.getTtl().getMilliseconds()) std::cout << "TTL: " << msg.getTtl().getMilliseconds() << std::endl; - if (msg.getDurable()) std::cout << "Durable: true" << std::endl; - if (msg.getRedelivered()) std::cout << "Redelivered: true" << std::endl; - std::cout << "Properties: " << msg.getProperties() << std::endl; - std::cout << std::endl; - } - if (opts.printContent) - std::cout << msg.getContent() << std::endl;//TODO: handle map or list messages - if (opts.messages && count >= opts.messages) done = true; - } - } - if (opts.tx && (count % opts.tx == 0)) { - if (opts.rollbackFrequency && (++txCount % opts.rollbackFrequency == 0)) { - session.rollback(); - } else { - session.commit(); - } - } else if (opts.ackFrequency && (count % opts.ackFrequency == 0)) { - session.acknowledge(); - } - //opts.rejectFrequency?? - } - if (opts.reportTotal) reporter.report(); - if (opts.tx) { - if (opts.rollbackFrequency && (++txCount % opts.rollbackFrequency == 0)) { - session.rollback(); - } else { - session.commit(); - } - } else { - session.acknowledge(); - } - session.close(); - connection.close(); - return 0; - } - } catch(const std::exception& error) { - std::cerr << "Failure: " << error.what() << std::endl; - connection.close(); - return 1; - } -} diff --git a/cpp/src/tests/qpid_send.cpp b/cpp/src/tests/qpid_send.cpp deleted file mode 100644 index 98d7cd60aa..0000000000 --- a/cpp/src/tests/qpid_send.cpp +++ /dev/null @@ -1,337 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include -#include -#include -#include -#include -#include -#include -#include -#include "TestOptions.h" -#include "Statistics.h" - -#include -#include -#include - -using namespace std; -using namespace qpid::messaging; -using namespace qpid::types; - -typedef std::vector string_vector; - -namespace qpid { -namespace tests { - -struct Options : public qpid::Options -{ - bool help; - std::string url; - std::string connectionOptions; - std::string address; - uint messages; - std::string id; - std::string replyto; - uint sendEos; - bool durable; - uint ttl; - std::string userid; - std::string correlationid; - string_vector properties; - string_vector entries; - std::string contentString; - uint contentSize; - bool contentStdin; - uint tx; - uint rollbackFrequency; - uint capacity; - bool failoverUpdates; - qpid::log::Options log; - bool reportTotal; - uint reportEvery; - bool reportHeader; - uint sendRate; - bool sequence; - bool timestamp; - - Options(const std::string& argv0=std::string()) - : qpid::Options("Options"), - help(false), - url("amqp:tcp:127.0.0.1"), - messages(1), - sendEos(0), - durable(false), - ttl(0), - contentString(), - contentSize(0), - contentStdin(false), - tx(0), - rollbackFrequency(0), - capacity(1000), - failoverUpdates(false), - log(argv0), - reportTotal(false), - reportEvery(0), - reportHeader(true), - sendRate(0), - sequence(true), - timestamp(true) - { - addOptions() - ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to") - ("address,a", qpid::optValue(address, "ADDRESS"), "address to drain from") - ("connection-options", qpid::optValue(connectionOptions, "OPTIONS"), "options for the connection") - ("messages,m", qpid::optValue(messages, "N"), "stop after N messages have been sent, 0 means no limit") - ("id,i", qpid::optValue(id, "ID"), "use the supplied id instead of generating one") - ("reply-to", qpid::optValue(replyto, "REPLY-TO"), "specify reply-to address") - ("send-eos", qpid::optValue(sendEos, "N"), "Send N EOS messages to mark end of input") - ("durable", qpid::optValue(durable, "yes|no"), "Mark messages as durable.") - ("ttl", qpid::optValue(ttl, "msecs"), "Time-to-live for messages, in milliseconds") - ("property,P", qpid::optValue(properties, "NAME=VALUE"), "specify message property") - ("correlation-id", qpid::optValue(correlationid, "ID"), "correlation-id for message") - ("user-id", qpid::optValue(userid, "USERID"), "userid for message") - ("content-string", qpid::optValue(contentString, "CONTENT"), "use CONTENT as message content") - ("content-size", qpid::optValue(contentSize, "N"), "create an N-byte message content") - ("content-map,M", qpid::optValue(entries, "NAME=VALUE"), "specify entry for map content") - ("content-stdin", qpid::optValue(contentStdin), "read message content from stdin, one line per message") - ("capacity", qpid::optValue(capacity, "N"), "size of the senders outgoing message queue") - ("tx", qpid::optValue(tx, "N"), "batch size for transactions (0 implies transaction are not used)") - ("rollback-frequency", qpid::optValue(rollbackFrequency, "N"), "rollback frequency (0 implies no transaction will be rolledback)") - ("failover-updates", qpid::optValue(failoverUpdates), "Listen for membership updates distributed via amq.failover") - ("report-total", qpid::optValue(reportTotal), "Report total throughput statistics") - ("report-every", qpid::optValue(reportEvery,"N"), "Report throughput statistics every N messages") - ("report-header", qpid::optValue(reportHeader, "yes|no"), "Headers on report.") - ("send-rate", qpid::optValue(sendRate,"N"), "Send at rate of N messages/second. 0 means send as fast as possible.") - ("sequence", qpid::optValue(sequence, "yes|no"), "Add a sequence number messages property (required for duplicate/lost message detection)") - ("timestamp", qpid::optValue(timestamp, "yes|no"), "Add a time stamp messages property (required for latency measurement)") - ("help", qpid::optValue(help), "print this usage statement"); - add(log); - } - - bool parse(int argc, char** argv) - { - try { - qpid::Options::parse(argc, argv); - if (address.empty()) throw qpid::Exception("Address must be specified!"); - qpid::log::Logger::instance().configure(log); - if (help) { - std::ostringstream msg; - std::cout << msg << *this << std::endl << std::endl - << "Drains messages from the specified address" << std::endl; - return false; - } else { - return true; - } - } catch (const std::exception& e) { - std::cerr << *this << std::endl << std::endl << e.what() << std::endl; - return false; - } - } - - static bool nameval(const std::string& in, std::string& name, std::string& value) - { - std::string::size_type i = in.find("="); - if (i == std::string::npos) { - name = in; - return false; - } else { - name = in.substr(0, i); - if (i+1 < in.size()) { - value = in.substr(i+1); - return true; - } else { - return false; - } - } - } - - static void setProperty(Message& message, const std::string& property) - { - std::string name; - std::string value; - if (nameval(property, name, value)) { - message.getProperties()[name] = value; - } else { - message.getProperties()[name] = Variant(); - } - } - - void setProperties(Message& message) const - { - for (string_vector::const_iterator i = properties.begin(); i != properties.end(); ++i) { - setProperty(message, *i); - } - } - - void setEntries(Variant::Map& content) const - { - for (string_vector::const_iterator i = entries.begin(); i != entries.end(); ++i) { - std::string name; - std::string value; - if (nameval(*i, name, value)) { - content[name] = value; - } else { - content[name] = Variant(); - } - } - } -}; - -const string EOS("eos"); -const string SN("sn"); -const string TS("ts"); - -}} // namespace qpid::tests - -using namespace qpid::tests; - -class ContentGenerator { - public: - virtual ~ContentGenerator() {} - virtual bool setContent(Message& msg) = 0; -}; - -class GetlineContentGenerator : public ContentGenerator { - public: - virtual bool setContent(Message& msg) { - string content; - bool got = getline(std::cin, content); - if (got) msg.setContent(content); - return got; - } -}; - -class FixedContentGenerator : public ContentGenerator { - public: - FixedContentGenerator(const string& s) : content(s) {} - virtual bool setContent(Message& msg) { - msg.setContent(content); - return true; - } - private: - std::string content; -}; - -class MapContentGenerator : public ContentGenerator { - public: - MapContentGenerator(const Options& opt) : opts(opt) {} - virtual bool setContent(Message& msg) { - Variant::Map map; - opts.setEntries(map); - encode(map, msg); - return true; - } - private: - const Options& opts; -}; - -int main(int argc, char ** argv) -{ - Connection connection; - Options opts; - try { - if (opts.parse(argc, argv)) { - connection = Connection(opts.url, opts.connectionOptions); - connection.open(); - std::auto_ptr updates(opts.failoverUpdates ? new FailoverUpdates(connection) : 0); - Session session = opts.tx ? connection.createTransactionalSession() : connection.createSession(); - Sender sender = session.createSender(opts.address); - if (opts.capacity) sender.setCapacity(opts.capacity); - Message msg; - msg.setDurable(opts.durable); - if (opts.ttl) { - msg.setTtl(Duration(opts.ttl)); - } - if (!opts.replyto.empty()) msg.setReplyTo(Address(opts.replyto)); - if (!opts.userid.empty()) msg.setUserId(opts.userid); - if (!opts.correlationid.empty()) msg.setCorrelationId(opts.correlationid); - opts.setProperties(msg); - uint sent = 0; - uint txCount = 0; - Reporter reporter(std::cout, opts.reportEvery, opts.reportHeader); - - std::auto_ptr contentGen; - if (opts.contentStdin) { - opts.messages = 0; // Don't limit # messages sent. - contentGen.reset(new GetlineContentGenerator); - } - else if (opts.entries.size() > 0) - contentGen.reset(new MapContentGenerator(opts)); - else if (opts.contentSize > 0) - contentGen.reset(new FixedContentGenerator(string(opts.contentSize, 'X'))); - else - contentGen.reset(new FixedContentGenerator(opts.contentString)); - - qpid::sys::AbsTime start = qpid::sys::now(); - int64_t interval = 0; - if (opts.sendRate) interval = qpid::sys::TIME_SEC/opts.sendRate; - - while (contentGen->setContent(msg)) { - ++sent; - if (opts.sequence) - msg.getProperties()[SN] = sent; - if (opts.timestamp) - msg.getProperties()[TS] = int64_t( - qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now())); - sender.send(msg); - reporter.message(msg); - if (opts.tx && (sent % opts.tx == 0)) { - if (opts.rollbackFrequency && - (++txCount % opts.rollbackFrequency == 0)) - session.rollback(); - else - session.commit(); - } - if (opts.messages && sent >= opts.messages) break; - if (opts.sendRate) { - qpid::sys::AbsTime waitTill(start, sent*interval); - int64_t delay = qpid::sys::Duration(qpid::sys::now(), waitTill); - if (delay > 0) - qpid::sys::usleep(delay/qpid::sys::TIME_USEC); - } - } - if (opts.reportTotal) reporter.report(); - for (uint i = opts.sendEos; i > 0; --i) { - if (opts.sequence) - msg.getProperties()[SN] = ++sent; - msg.setContent(EOS);//TODO: add in ability to send digest or similar - sender.send(msg); - } - if (opts.tx) { - if (opts.rollbackFrequency && (++txCount % opts.rollbackFrequency == 0)) { - session.rollback(); - } else { - session.commit(); - } - } - session.sync(); - session.close(); - connection.close(); - return 0; - } - } catch(const std::exception& error) { - std::cout << "Failed: " << error.what() << std::endl; - connection.close(); - return 1; - } -} diff --git a/cpp/src/tests/qpid_stream.cpp b/cpp/src/tests/qpid_stream.cpp deleted file mode 100644 index 2ccf27bdb9..0000000000 --- a/cpp/src/tests/qpid_stream.cpp +++ /dev/null @@ -1,193 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -using namespace qpid::messaging; -using namespace qpid::types; - -namespace qpid { -namespace tests { - -struct Args : public qpid::Options -{ - std::string url; - std::string address; - uint size; - uint rate; - bool durable; - uint receiverCapacity; - uint senderCapacity; - uint ackFrequency; - - Args() : - url("amqp:tcp:127.0.0.1:5672"), - address("test-queue"), - size(512), - rate(1000), - durable(false), - receiverCapacity(0), - senderCapacity(0), - ackFrequency(1) - { - addOptions() - ("url", qpid::optValue(url, "URL"), "Url to connect to.") - ("address", qpid::optValue(address, "ADDRESS"), "Address to stream messages through.") - ("size", qpid::optValue(size, "bytes"), "Message size in bytes (content only, not headers).") - ("rate", qpid::optValue(rate, "msgs/sec"), "Rate at which to stream messages.") - ("durable", qpid::optValue(durable, "true|false"), "Mark messages as durable.") - ("sender-capacity", qpid::optValue(senderCapacity, "N"), "Credit window (0 implies infinite window)") - ("receiver-capacity", qpid::optValue(receiverCapacity, "N"), "Credit window (0 implies infinite window)") - ("ack-frequency", qpid::optValue(ackFrequency, "N"), - "Ack frequency (0 implies none of the messages will get accepted)"); - } -}; - -Args opts; - -const std::string TS = "ts"; - -uint64_t timestamp(const qpid::sys::AbsTime& time) -{ - qpid::sys::Duration t(qpid::sys::EPOCH, time); - return t; -} - -struct Client : qpid::sys::Runnable -{ - virtual ~Client() {} - virtual void doWork(Session&) = 0; - - void run() - { - Connection connection(opts.url); - try { - connection.open(); - Session session = connection.createSession(); - doWork(session); - session.close(); - connection.close(); - } catch(const std::exception& error) { - std::cout << error.what() << std::endl; - connection.close(); - } - } - - qpid::sys::Thread thread; - - void start() { thread = qpid::sys::Thread(this); } - void join() { thread.join(); } -}; - -struct Publish : Client -{ - void doWork(Session& session) - { - Sender sender = session.createSender(opts.address); - if (opts.senderCapacity) sender.setCapacity(opts.senderCapacity); - Message msg(std::string(opts.size, 'X')); - uint64_t interval = qpid::sys::TIME_SEC / opts.rate; - uint64_t sent = 0, missedRate = 0; - qpid::sys::AbsTime start = qpid::sys::now(); - while (true) { - qpid::sys::AbsTime sentAt = qpid::sys::now(); - msg.getProperties()[TS] = timestamp(sentAt); - sender.send(msg); - ++sent; - qpid::sys::AbsTime waitTill(start, sent*interval); - qpid::sys::Duration delay(sentAt, waitTill); - if (delay < 0) { - ++missedRate; - } else { - qpid::sys::usleep(delay / qpid::sys::TIME_USEC); - } - } - } -}; - -struct Consume : Client -{ - void doWork(Session& session) - { - Message msg; - uint64_t received = 0; - double minLatency = std::numeric_limits::max(); - double maxLatency = 0; - double totalLatency = 0; - Receiver receiver = session.createReceiver(opts.address); - if (opts.receiverCapacity) receiver.setCapacity(opts.receiverCapacity); - while (receiver.fetch(msg)) { - ++received; - if (opts.ackFrequency && (received % opts.ackFrequency == 0)) { - session.acknowledge(); - } - //calculate latency - uint64_t receivedAt = timestamp(qpid::sys::now()); - uint64_t sentAt = msg.getProperties()[TS].asUint64(); - double latency = ((double) (receivedAt - sentAt)) / qpid::sys::TIME_MSEC; - - //update avg, min & max - minLatency = std::min(minLatency, latency); - maxLatency = std::max(maxLatency, latency); - totalLatency += latency; - - if (received % opts.rate == 0) { - std::cout << "count=" << received - << ", avg=" << (totalLatency/received) - << ", min=" << minLatency - << ", max=" << maxLatency << std::endl; - } - } - } -}; - -}} // namespace qpid::tests - -using namespace qpid::tests; - -int main(int argc, char** argv) -{ - try { - opts.parse(argc, argv); - Publish publish; - Consume consume; - publish.start(); - consume.start(); - consume.join(); - publish.join(); - return 0; - } catch(const std::exception& error) { - std::cout << error.what() << std::endl; - } - return 1; -} - - diff --git a/cpp/src/tests/ssl_test b/cpp/src/tests/ssl_test index a03341ec5b..35c0033ce8 100755 --- a/cpp/src/tests/ssl_test +++ b/cpp/src/tests/ssl_test @@ -77,8 +77,8 @@ export QPID_SSL_CERT_PASSWORD_FILE=${CERT_PW_FILE} ## Test connection with a URL URL=amqp:ssl:$TEST_HOSTNAME:$PORT -./qpid_send -b $URL --content-string=hello -a "foo;{create:always}" -MSG=`./qpid_receive -b $URL -a "foo;{create:always}" --messages 1` +./qpid-send -b $URL --content-string=hello -a "foo;{create:always}" +MSG=`./qpid-receive -b $URL -a "foo;{create:always}" --messages 1` test "$MSG" = "hello" || { echo "receive failed '$MSG' != 'hello'"; exit 1; } test -z $CLUSTER_LIB && exit 0 # Exit if cluster not supported. @@ -95,7 +95,7 @@ pick_port() { ssl_cluster_broker() { # $1 = port ../qpidd $COMMON_OPTS --load-module $CLUSTER_LIB --cluster-name ssl_test.$HOSTNAME.$$ --cluster-url amqp:ssl:$TEST_HOSTNAME:$1 --port 0 --ssl-port $1 --transport ssl > /dev/null # Wait for broker to be ready - qpid_ping -Pssl -b $TEST_HOSTNAME -qp $1 || { echo "Cannot connect to broker on $1"; exit 1; } + qpid-ping -Pssl -b $TEST_HOSTNAME -qp $1 || { echo "Cannot connect to broker on $1"; exit 1; } echo "Running SSL cluster broker on port $1" } @@ -103,11 +103,11 @@ PORT1=`pick_port`; ssl_cluster_broker $PORT1 PORT2=`pick_port`; ssl_cluster_broker $PORT2 # Pipe receive output to uniq to remove duplicates -./qpid_receive --connection-options "{reconnect-timeout:5}" --failover-updates -b amqp:ssl:$TEST_HOSTNAME:$PORT1 -a "foo;{create:always}" -f | uniq > ssl_test_receive.tmp & -./qpid_send -b amqp:ssl:$TEST_HOSTNAME:$PORT2 --content-string=one -a "foo;{create:always}" +./qpid-receive --connection-options "{reconnect-timeout:5}" --failover-updates -b amqp:ssl:$TEST_HOSTNAME:$PORT1 -a "foo;{create:always}" -f | uniq > ssl_test_receive.tmp & +./qpid-send -b amqp:ssl:$TEST_HOSTNAME:$PORT2 --content-string=one -a "foo;{create:always}" ../qpidd --no-module-dir -qp $PORT1 # Kill broker 1 receiver should fail-over. -./qpid_send -b amqp:ssl:$TEST_HOSTNAME:$PORT2 --content-string=two -a "foo;{create:always}" --send-eos 1 -wait # Wait for qpid_receive +./qpid-send -b amqp:ssl:$TEST_HOSTNAME:$PORT2 --content-string=two -a "foo;{create:always}" --send-eos 1 +wait # Wait for qpid-receive { echo one; echo two; } > ssl_test_receive.cmp diff ssl_test_receive.tmp ssl_test_receive.cmp || { echo "Failover failed"; exit 1; } rm -f ssl_test_receive.* -- cgit v1.2.1