diff options
author | Gordon Sim <gsim@apache.org> | 2010-03-11 18:23:46 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2010-03-11 18:23:46 +0000 |
commit | b00e749a170b09db7e24161baa809c11fa65531f (patch) | |
tree | 1356c7ecbc3555892d9be10ec9d86455335259fb /cpp/src/tests/qpid_stream.cpp | |
parent | 88086e0099c0fb67ac3a01c5f8793c0634b946a0 (diff) | |
download | qpid-python-b00e749a170b09db7e24161baa809c11fa65531f.tar.gz |
QPID-2382: Created separate utility class for handling updates from failover exchange; cleaned up reconnection options
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@921971 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/qpid_stream.cpp')
-rw-r--r-- | cpp/src/tests/qpid_stream.cpp | 31 |
1 files changed, 26 insertions, 5 deletions
diff --git a/cpp/src/tests/qpid_stream.cpp b/cpp/src/tests/qpid_stream.cpp index ca21fa248b..ef0aea52e4 100644 --- a/cpp/src/tests/qpid_stream.cpp +++ b/cpp/src/tests/qpid_stream.cpp @@ -40,16 +40,33 @@ struct Args : public qpid::Options { std::string url; std::string address; + uint size; uint rate; bool durable; - - Args() : url("amqp:tcp:127.0.0.1:5672"), address("test-queue"), rate(1000), durable(false) + uint receiverCapacity; + uint senderCapacity; + uint ackFrequency; + + Args() : + url("amqp:tcp:127.0.0.1:5672"), + address("test-queue"), + size(512), + rate(1000), + durable(false), + receiverCapacity(0), + senderCapacity(0), + ackFrequency(1) { addOptions() ("url", qpid::optValue(url, "URL"), "Url to connect to.") ("address", qpid::optValue(address, "ADDRESS"), "Address to stream messages through.") + ("size", qpid::optValue(size, "bytes"), "Message size in bytes (content only, not headers).") ("rate", qpid::optValue(rate, "msgs/sec"), "Rate at which to stream messages.") - ("durable", qpid::optValue(durable, "true|false"), "Mark messages as durable."); + ("durable", qpid::optValue(durable, "true|false"), "Mark messages as durable.") + ("sender-capacity", qpid::optValue(senderCapacity, "N"), "Credit window (0 implies infinite window)") + ("receiver-capacity", qpid::optValue(receiverCapacity, "N"), "Credit window (0 implies infinite window)") + ("ack-frequency", qpid::optValue(ackFrequency, "N"), + "Ack frequency (0 implies none of the messages will get accepted)"); } }; @@ -93,7 +110,8 @@ struct Publish : Client void doWork(Session& session) { Sender sender = session.createSender(opts.address); - Message msg; + if (opts.senderCapacity) sender.setCapacity(opts.senderCapacity); + Message msg(std::string(opts.size, 'X')); uint64_t interval = qpid::sys::TIME_SEC / opts.rate; uint64_t sent = 0, missedRate = 0; qpid::sys::AbsTime start = qpid::sys::now(); @@ -123,9 +141,12 @@ struct Consume : Client double maxLatency = 0; double totalLatency = 0; Receiver receiver = session.createReceiver(opts.address); + if (opts.receiverCapacity) receiver.setCapacity(opts.receiverCapacity); while (receiver.fetch(msg)) { - session.acknowledge();//TODO: add batching option ++received; + if (opts.ackFrequency && (received % opts.ackFrequency == 0)) { + session.acknowledge(); + } //calculate latency uint64_t receivedAt = timestamp(qpid::sys::now()); uint64_t sentAt = msg.getHeaders()[TIMESTAMP].asUint64(); |