diff options
author | Gordon Sim <gsim@apache.org> | 2010-03-11 18:23:46 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2010-03-11 18:23:46 +0000 |
commit | b00e749a170b09db7e24161baa809c11fa65531f (patch) | |
tree | 1356c7ecbc3555892d9be10ec9d86455335259fb /cpp/src/tests | |
parent | 88086e0099c0fb67ac3a01c5f8793c0634b946a0 (diff) | |
download | qpid-python-b00e749a170b09db7e24161baa809c11fa65531f.tar.gz |
QPID-2382: Created separate utility class for handling updates from failover exchange; cleaned up reconnection options
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@921971 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests')
-rw-r--r-- | cpp/src/tests/qpid_recv.cpp | 8 | ||||
-rw-r--r-- | cpp/src/tests/qpid_send.cpp | 14 | ||||
-rw-r--r-- | cpp/src/tests/qpid_stream.cpp | 31 |
3 files changed, 40 insertions, 13 deletions
diff --git a/cpp/src/tests/qpid_recv.cpp b/cpp/src/tests/qpid_recv.cpp index 9e4e202053..e4cc6a7ac8 100644 --- a/cpp/src/tests/qpid_recv.cpp +++ b/cpp/src/tests/qpid_recv.cpp @@ -27,12 +27,14 @@ #include <qpid/Options.h> #include <qpid/log/Logger.h> #include <qpid/log/Options.h> +#include <qpid/client/amqp0_10/FailoverUpdates.h> #include "TestOptions.h" #include <iostream> - +#include <memory> using namespace qpid::messaging; +using qpid::client::amqp0_10::FailoverUpdates; using namespace std; @@ -54,6 +56,7 @@ struct Options : public qpid::Options uint tx; uint rollbackFrequency; bool printHeaders; + bool failoverUpdates; qpid::log::Options log; Options(const std::string& argv0=std::string()) @@ -69,6 +72,7 @@ struct Options : public qpid::Options tx(0), rollbackFrequency(0), printHeaders(false), + failoverUpdates(false), log(argv0) { addOptions() @@ -84,6 +88,7 @@ struct Options : public qpid::Options ("tx", qpid::optValue(tx, "N"), "batch size for transactions (0 implies transaction are not used)") ("rollback-frequency", qpid::optValue(rollbackFrequency, "N"), "rollback frequency (0 implies no transaction will be rolledback)") ("print-headers", qpid::optValue(printHeaders), "If specified print out all message headers as well as content") + ("failover-updates", qpid::optValue(failoverUpdates), "Listen for membership updates distributed via amq.failover") ("help", qpid::optValue(help), "print this usage statement"); add(log); } @@ -146,6 +151,7 @@ int main(int argc, char ** argv) try { Connection connection(opts.connectionOptions); connection.open(opts.url); + std::auto_ptr<FailoverUpdates> updates(opts.failoverUpdates ? new FailoverUpdates(connection) : 0); Session session = connection.newSession(opts.tx > 0); Receiver receiver = session.createReceiver(opts.address); receiver.setCapacity(opts.capacity); diff --git a/cpp/src/tests/qpid_send.cpp b/cpp/src/tests/qpid_send.cpp index 57c348ab9c..50e6c4371a 100644 --- a/cpp/src/tests/qpid_send.cpp +++ b/cpp/src/tests/qpid_send.cpp @@ -25,16 +25,15 @@ #include <qpid/messaging/Message.h> #include <qpid/messaging/Sender.h> #include <qpid/messaging/Session.h> +#include <qpid/client/amqp0_10/FailoverUpdates.h> #include "TestOptions.h" #include <fstream> #include <iostream> +#include <memory> using namespace qpid::messaging; -using qpid::framing::Uuid; -using qpid::sys::AbsTime; -using qpid::sys::now; -using qpid::sys::TIME_INFINITE; +using qpid::client::amqp0_10::FailoverUpdates; typedef std::vector<std::string> string_vector; @@ -49,7 +48,6 @@ struct Options : public qpid::Options std::string url; std::string connectionOptions; std::string address; - int64_t timeout; uint count; std::string id; std::string replyto; @@ -64,13 +62,13 @@ struct Options : public qpid::Options uint tx; uint rollbackFrequency; uint capacity; + bool failoverUpdates; qpid::log::Options log; Options(const std::string& argv0=std::string()) : qpid::Options("Options"), help(false), url("amqp:tcp:127.0.0.1"), - timeout(TIME_INFINITE), count(1), sendEos(0), durable(false), @@ -78,13 +76,13 @@ struct Options : public qpid::Options tx(0), rollbackFrequency(0), capacity(0), + failoverUpdates(false), log(argv0) { addOptions() ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to") ("address,a", qpid::optValue(address, "ADDRESS"), "address to drain from") ("connection-options", qpid::optValue(connectionOptions, "OPTIONS"), "options for the connection") - ("timeout,t", qpid::optValue(timeout, "TIMEOUT"), "exit after the specified time") ("count,c", qpid::optValue(count, "COUNT"), "stop after count messages have been sent, zero disables") ("id,i", qpid::optValue(id, "ID"), "use the supplied id instead of generating one") ("reply-to", qpid::optValue(replyto, "REPLY-TO"), "specify reply-to address") @@ -99,6 +97,7 @@ struct Options : public qpid::Options ("capacity", qpid::optValue(capacity, "N"), "size of the senders outgoing message queue") ("tx", qpid::optValue(tx, "N"), "batch size for transactions (0 implies transaction are not used)") ("rollback-frequency", qpid::optValue(rollbackFrequency, "N"), "rollback frequency (0 implies no transaction will be rolledback)") + ("failover-updates", qpid::optValue(failoverUpdates), "Listen for membership updates distributed via amq.failover") ("help", qpid::optValue(help), "print this usage statement"); add(log); } @@ -185,6 +184,7 @@ int main(int argc, char ** argv) try { Connection connection(opts.connectionOptions); connection.open(opts.url); + std::auto_ptr<FailoverUpdates> updates(opts.failoverUpdates ? new FailoverUpdates(connection) : 0); Session session = connection.newSession(opts.tx > 0); Sender sender = session.createSender(opts.address); if (opts.capacity) sender.setCapacity(opts.capacity); diff --git a/cpp/src/tests/qpid_stream.cpp b/cpp/src/tests/qpid_stream.cpp index ca21fa248b..ef0aea52e4 100644 --- a/cpp/src/tests/qpid_stream.cpp +++ b/cpp/src/tests/qpid_stream.cpp @@ -40,16 +40,33 @@ struct Args : public qpid::Options { std::string url; std::string address; + uint size; uint rate; bool durable; - - Args() : url("amqp:tcp:127.0.0.1:5672"), address("test-queue"), rate(1000), durable(false) + uint receiverCapacity; + uint senderCapacity; + uint ackFrequency; + + Args() : + url("amqp:tcp:127.0.0.1:5672"), + address("test-queue"), + size(512), + rate(1000), + durable(false), + receiverCapacity(0), + senderCapacity(0), + ackFrequency(1) { addOptions() ("url", qpid::optValue(url, "URL"), "Url to connect to.") ("address", qpid::optValue(address, "ADDRESS"), "Address to stream messages through.") + ("size", qpid::optValue(size, "bytes"), "Message size in bytes (content only, not headers).") ("rate", qpid::optValue(rate, "msgs/sec"), "Rate at which to stream messages.") - ("durable", qpid::optValue(durable, "true|false"), "Mark messages as durable."); + ("durable", qpid::optValue(durable, "true|false"), "Mark messages as durable.") + ("sender-capacity", qpid::optValue(senderCapacity, "N"), "Credit window (0 implies infinite window)") + ("receiver-capacity", qpid::optValue(receiverCapacity, "N"), "Credit window (0 implies infinite window)") + ("ack-frequency", qpid::optValue(ackFrequency, "N"), + "Ack frequency (0 implies none of the messages will get accepted)"); } }; @@ -93,7 +110,8 @@ struct Publish : Client void doWork(Session& session) { Sender sender = session.createSender(opts.address); - Message msg; + if (opts.senderCapacity) sender.setCapacity(opts.senderCapacity); + Message msg(std::string(opts.size, 'X')); uint64_t interval = qpid::sys::TIME_SEC / opts.rate; uint64_t sent = 0, missedRate = 0; qpid::sys::AbsTime start = qpid::sys::now(); @@ -123,9 +141,12 @@ struct Consume : Client double maxLatency = 0; double totalLatency = 0; Receiver receiver = session.createReceiver(opts.address); + if (opts.receiverCapacity) receiver.setCapacity(opts.receiverCapacity); while (receiver.fetch(msg)) { - session.acknowledge();//TODO: add batching option ++received; + if (opts.ackFrequency && (received % opts.ackFrequency == 0)) { + session.acknowledge(); + } //calculate latency uint64_t receivedAt = timestamp(qpid::sys::now()); uint64_t sentAt = msg.getHeaders()[TIMESTAMP].asUint64(); |