summaryrefslogtreecommitdiff
path: root/cpp/src/tests/qpid_stream.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/qpid_stream.cpp')
-rw-r--r--cpp/src/tests/qpid_stream.cpp31
1 files changed, 26 insertions, 5 deletions
diff --git a/cpp/src/tests/qpid_stream.cpp b/cpp/src/tests/qpid_stream.cpp
index ca21fa248b..ef0aea52e4 100644
--- a/cpp/src/tests/qpid_stream.cpp
+++ b/cpp/src/tests/qpid_stream.cpp
@@ -40,16 +40,33 @@ struct Args : public qpid::Options
{
std::string url;
std::string address;
+ uint size;
uint rate;
bool durable;
-
- Args() : url("amqp:tcp:127.0.0.1:5672"), address("test-queue"), rate(1000), durable(false)
+ uint receiverCapacity;
+ uint senderCapacity;
+ uint ackFrequency;
+
+ Args() :
+ url("amqp:tcp:127.0.0.1:5672"),
+ address("test-queue"),
+ size(512),
+ rate(1000),
+ durable(false),
+ receiverCapacity(0),
+ senderCapacity(0),
+ ackFrequency(1)
{
addOptions()
("url", qpid::optValue(url, "URL"), "Url to connect to.")
("address", qpid::optValue(address, "ADDRESS"), "Address to stream messages through.")
+ ("size", qpid::optValue(size, "bytes"), "Message size in bytes (content only, not headers).")
("rate", qpid::optValue(rate, "msgs/sec"), "Rate at which to stream messages.")
- ("durable", qpid::optValue(durable, "true|false"), "Mark messages as durable.");
+ ("durable", qpid::optValue(durable, "true|false"), "Mark messages as durable.")
+ ("sender-capacity", qpid::optValue(senderCapacity, "N"), "Credit window (0 implies infinite window)")
+ ("receiver-capacity", qpid::optValue(receiverCapacity, "N"), "Credit window (0 implies infinite window)")
+ ("ack-frequency", qpid::optValue(ackFrequency, "N"),
+ "Ack frequency (0 implies none of the messages will get accepted)");
}
};
@@ -93,7 +110,8 @@ struct Publish : Client
void doWork(Session& session)
{
Sender sender = session.createSender(opts.address);
- Message msg;
+ if (opts.senderCapacity) sender.setCapacity(opts.senderCapacity);
+ Message msg(std::string(opts.size, 'X'));
uint64_t interval = qpid::sys::TIME_SEC / opts.rate;
uint64_t sent = 0, missedRate = 0;
qpid::sys::AbsTime start = qpid::sys::now();
@@ -123,9 +141,12 @@ struct Consume : Client
double maxLatency = 0;
double totalLatency = 0;
Receiver receiver = session.createReceiver(opts.address);
+ if (opts.receiverCapacity) receiver.setCapacity(opts.receiverCapacity);
while (receiver.fetch(msg)) {
- session.acknowledge();//TODO: add batching option
++received;
+ if (opts.ackFrequency && (received % opts.ackFrequency == 0)) {
+ session.acknowledge();
+ }
//calculate latency
uint64_t receivedAt = timestamp(qpid::sys::now());
uint64_t sentAt = msg.getHeaders()[TIMESTAMP].asUint64();