diff options
Diffstat (limited to 'cpp/src/tests/qpid_stream.cpp')
-rw-r--r-- | cpp/src/tests/qpid_stream.cpp | 31 |
1 files changed, 26 insertions, 5 deletions
diff --git a/cpp/src/tests/qpid_stream.cpp b/cpp/src/tests/qpid_stream.cpp index ca21fa248b..ef0aea52e4 100644 --- a/cpp/src/tests/qpid_stream.cpp +++ b/cpp/src/tests/qpid_stream.cpp @@ -40,16 +40,33 @@ struct Args : public qpid::Options { std::string url; std::string address; + uint size; uint rate; bool durable; - - Args() : url("amqp:tcp:127.0.0.1:5672"), address("test-queue"), rate(1000), durable(false) + uint receiverCapacity; + uint senderCapacity; + uint ackFrequency; + + Args() : + url("amqp:tcp:127.0.0.1:5672"), + address("test-queue"), + size(512), + rate(1000), + durable(false), + receiverCapacity(0), + senderCapacity(0), + ackFrequency(1) { addOptions() ("url", qpid::optValue(url, "URL"), "Url to connect to.") ("address", qpid::optValue(address, "ADDRESS"), "Address to stream messages through.") + ("size", qpid::optValue(size, "bytes"), "Message size in bytes (content only, not headers).") ("rate", qpid::optValue(rate, "msgs/sec"), "Rate at which to stream messages.") - ("durable", qpid::optValue(durable, "true|false"), "Mark messages as durable."); + ("durable", qpid::optValue(durable, "true|false"), "Mark messages as durable.") + ("sender-capacity", qpid::optValue(senderCapacity, "N"), "Credit window (0 implies infinite window)") + ("receiver-capacity", qpid::optValue(receiverCapacity, "N"), "Credit window (0 implies infinite window)") + ("ack-frequency", qpid::optValue(ackFrequency, "N"), + "Ack frequency (0 implies none of the messages will get accepted)"); } }; @@ -93,7 +110,8 @@ struct Publish : Client void doWork(Session& session) { Sender sender = session.createSender(opts.address); - Message msg; + if (opts.senderCapacity) sender.setCapacity(opts.senderCapacity); + Message msg(std::string(opts.size, 'X')); uint64_t interval = qpid::sys::TIME_SEC / opts.rate; uint64_t sent = 0, missedRate = 0; qpid::sys::AbsTime start = qpid::sys::now(); @@ -123,9 +141,12 @@ struct Consume : Client double maxLatency = 0; double totalLatency = 0; Receiver receiver = session.createReceiver(opts.address); + if (opts.receiverCapacity) receiver.setCapacity(opts.receiverCapacity); while (receiver.fetch(msg)) { - session.acknowledge();//TODO: add batching option ++received; + if (opts.ackFrequency && (received % opts.ackFrequency == 0)) { + session.acknowledge(); + } //calculate latency uint64_t receivedAt = timestamp(qpid::sys::now()); uint64_t sentAt = msg.getHeaders()[TIMESTAMP].asUint64(); |