summaryrefslogtreecommitdiff
path: root/cpp/src/tests/qpid_recv.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/qpid_recv.cpp')
-rw-r--r--cpp/src/tests/qpid_recv.cpp56
1 files changed, 32 insertions, 24 deletions
diff --git a/cpp/src/tests/qpid_recv.cpp b/cpp/src/tests/qpid_recv.cpp
index 30ab285b34..c45d76f91f 100644
--- a/cpp/src/tests/qpid_recv.cpp
+++ b/cpp/src/tests/qpid_recv.cpp
@@ -48,12 +48,15 @@ struct Options : public qpid::Options
bool help;
std::string url;
std::string address;
+ std::string connectionOptions;
int64_t timeout;
bool forever;
uint messages;
bool ignoreDuplicates;
uint capacity;
uint ackFrequency;
+ uint tx;
+ uint rollbackFrequency;
bool printHeaders;
qpid::log::Options log;
@@ -67,18 +70,23 @@ struct Options : public qpid::Options
ignoreDuplicates(false),
capacity(0),
ackFrequency(1),
+ tx(0),
+ rollbackFrequency(0),
printHeaders(false),
log(argv0)
{
addOptions()
("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to")
("address,a", qpid::optValue(address, "ADDRESS"), "address to receive from")
+ ("connection-options", qpid::optValue(connectionOptions, "OPTIONS"), "options for the connection")
("timeout,t", qpid::optValue(timeout, "TIMEOUT"), "timeout in seconds to wait before exiting")
("forever,f", qpid::optValue(forever), "ignore timeout and wait forever")
("messages", qpid::optValue(messages, "N"), "Number of messages to receive; 0 means receive indefinitely")
("ignore-duplicates", qpid::optValue(ignoreDuplicates), "Detect and ignore duplicates (by checking 'sn' header)")
- ("credit-window", qpid::optValue(capacity, "N"), "Credit window (0 implies infinite window)")
+ ("capacity", qpid::optValue(capacity, "N"), "Credit window (0 implies infinite window)")
("ack-frequency", qpid::optValue(ackFrequency, "N"), "Ack frequency (0 implies none of the messages will get accepted)")
+ ("tx", qpid::optValue(tx, "N"), "batch size for transactions (0 implies transaction are not used)")
+ ("rollback-frequency", qpid::optValue(rollbackFrequency, "N"), "rollback frequency (0 implies no transaction will be rolledback)")
("print-headers", qpid::optValue(printHeaders), "If specified print out all message headers as well as content")
("help", qpid::optValue(help), "print this usage statement");
add(log);
@@ -111,25 +119,6 @@ struct Options : public qpid::Options
}
};
-struct Args : public qpid::TestOptions
-{
- string address;
- uint messages;
- bool ignoreDuplicates;
- uint capacity;
- uint ackFrequency;
-
- Args() : address("test-queue"), messages(0), ignoreDuplicates(false), capacity(0), ackFrequency(1)
- {
- addOptions()
- ("address", qpid::optValue(address, "ADDRESS"), "Address from which to request messages")
- ("messages", qpid::optValue(messages, "N"), "Number of messages to receive; 0 means receive indefinitely")
- ("ignore-duplicates", qpid::optValue(ignoreDuplicates), "Detect and ignore duplicates (by checking 'sn' header)")
- ("prefetch", qpid::optValue(capacity, "N"), "Number of messages that can be prefetched (0 implies no prefetch)")
- ("ack-frequency", qpid::optValue(ackFrequency, "N"), "Ack frequency (0 implies none of the messages will get accepted)");
- }
-};
-
const string EOS("eos");
class SequenceTracker
@@ -159,12 +148,17 @@ int main(int argc, char ** argv)
Options opts;
if (opts.parse(argc, argv)) {
try {
- Connection connection = Connection::open(opts.url);
- Session session = connection.newSession();
+ Variant::Map connectionOptions;
+ if (opts.connectionOptions.size()) {
+ parseOptionString(opts.connectionOptions, connectionOptions);
+ }
+ Connection connection = Connection::open(opts.url, connectionOptions);
+ Session session = connection.newSession(opts.tx > 0);
Receiver receiver = session.createReceiver(opts.address);
receiver.setCapacity(opts.capacity);
Message msg;
uint count = 0;
+ uint txCount = 0;
SequenceTracker sequenceTracker;
Duration timeout = opts.getTimeout();
bool done = false;
@@ -189,12 +183,26 @@ int main(int argc, char ** argv)
if (opts.messages && count >= opts.messages) done = true;
}
}
- if (opts.ackFrequency && (count % opts.ackFrequency == 0)) {
+ if (opts.tx && (count % opts.tx == 0)) {
+ if (opts.rollbackFrequency && (++txCount % opts.rollbackFrequency == 0)) {
+ session.rollback();
+ } else {
+ session.commit();
+ }
+ } else if (opts.ackFrequency && (count % opts.ackFrequency == 0)) {
session.acknowledge();
}
//opts.rejectFrequency??
}
- session.acknowledge();
+ if (opts.tx) {
+ if (opts.rollbackFrequency && (++txCount % opts.rollbackFrequency == 0)) {
+ session.rollback();
+ } else {
+ session.commit();
+ }
+ } else {
+ session.acknowledge();
+ }
session.close();
connection.close();
return 0;