summaryrefslogtreecommitdiff
path: root/cpp/src/tests/qpid_send.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/qpid_send.cpp')
-rw-r--r--cpp/src/tests/qpid_send.cpp31
1 files changed, 29 insertions, 2 deletions
diff --git a/cpp/src/tests/qpid_send.cpp b/cpp/src/tests/qpid_send.cpp
index 42921a9da1..c58d5fa10b 100644
--- a/cpp/src/tests/qpid_send.cpp
+++ b/cpp/src/tests/qpid_send.cpp
@@ -47,6 +47,7 @@ struct Options : public qpid::Options
{
bool help;
std::string url;
+ std::string connectionOptions;
std::string address;
int64_t timeout;
uint count;
@@ -60,6 +61,8 @@ struct Options : public qpid::Options
string_vector properties;
string_vector entries;
std::string content;
+ uint tx;
+ uint rollbackFrequency;
qpid::log::Options log;
Options(const std::string& argv0=std::string())
@@ -71,11 +74,14 @@ struct Options : public qpid::Options
sendEos(0),
durable(false),
ttl(0),
+ tx(0),
+ rollbackFrequency(0),
log(argv0)
{
addOptions()
("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to")
("address,a", qpid::optValue(address, "ADDRESS"), "address to drain from")
+ ("connection-options", qpid::optValue(connectionOptions, "OPTIONS"), "options for the connection")
("timeout,t", qpid::optValue(timeout, "TIMEOUT"), "exit after the specified time")
("count,c", qpid::optValue(count, "COUNT"), "stop after count messages have been sent, zero disables")
("id,i", qpid::optValue(id, "ID"), "use the supplied id instead of generating one")
@@ -88,6 +94,8 @@ struct Options : public qpid::Options
("correlation-id", qpid::optValue(correlationid, "ID"), "correlation-id for message")
("user-id", qpid::optValue(userid, "USERID"), "userid for message")
("content", qpid::optValue(content, "CONTENT"), "specify textual content")
+ ("tx", qpid::optValue(tx, "N"), "batch size for transactions (0 implies transaction are not used)")
+ ("rollback-frequency", qpid::optValue(rollbackFrequency, "N"), "rollback frequency (0 implies no transaction will be rolledback)")
("help", qpid::optValue(help), "print this usage statement");
add(log);
}
@@ -172,8 +180,12 @@ int main(int argc, char ** argv)
Options opts;
if (opts.parse(argc, argv)) {
try {
- Connection connection = Connection::open(opts.url);
- Session session = connection.newSession();
+ Variant::Map connectionOptions;
+ if (opts.connectionOptions.size()) {
+ parseOptionString(opts.connectionOptions, connectionOptions);
+ }
+ Connection connection = Connection::open(opts.url, connectionOptions);
+ Session session = connection.newSession(opts.tx > 0);
Sender sender = session.createSender(opts.address);
Message msg;
msg.setDurable(opts.durable);
@@ -186,16 +198,31 @@ int main(int argc, char ** argv)
opts.setProperties(msg);
std::string content;
uint sent = 0;
+ uint txCount = 0;
while (getline(std::cin, content)) {
msg.setContent(content);
msg.getHeaders()["sn"] = ++sent;
sender.send(msg);
+ if (opts.tx && (sent % opts.tx == 0)) {
+ if (opts.rollbackFrequency && (++txCount % opts.rollbackFrequency == 0)) {
+ session.rollback();
+ } else {
+ session.commit();
+ }
+ }
}
for (uint i = opts.sendEos; i > 0; --i) {
msg.getHeaders()["sn"] = ++sent;
msg.setContent(EOS);//TODO: add in ability to send digest or similar
sender.send(msg);
}
+ if (opts.tx) {
+ if (opts.rollbackFrequency && (++txCount % opts.rollbackFrequency == 0)) {
+ session.rollback();
+ } else {
+ session.commit();
+ }
+ }
session.sync();
session.close();
connection.close();