diff options
Diffstat (limited to 'cpp/src/tests/qpid_send.cpp')
-rw-r--r-- | cpp/src/tests/qpid_send.cpp | 31 |
1 files changed, 29 insertions, 2 deletions
diff --git a/cpp/src/tests/qpid_send.cpp b/cpp/src/tests/qpid_send.cpp index 42921a9da1..c58d5fa10b 100644 --- a/cpp/src/tests/qpid_send.cpp +++ b/cpp/src/tests/qpid_send.cpp @@ -47,6 +47,7 @@ struct Options : public qpid::Options { bool help; std::string url; + std::string connectionOptions; std::string address; int64_t timeout; uint count; @@ -60,6 +61,8 @@ struct Options : public qpid::Options string_vector properties; string_vector entries; std::string content; + uint tx; + uint rollbackFrequency; qpid::log::Options log; Options(const std::string& argv0=std::string()) @@ -71,11 +74,14 @@ struct Options : public qpid::Options sendEos(0), durable(false), ttl(0), + tx(0), + rollbackFrequency(0), log(argv0) { addOptions() ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to") ("address,a", qpid::optValue(address, "ADDRESS"), "address to drain from") + ("connection-options", qpid::optValue(connectionOptions, "OPTIONS"), "options for the connection") ("timeout,t", qpid::optValue(timeout, "TIMEOUT"), "exit after the specified time") ("count,c", qpid::optValue(count, "COUNT"), "stop after count messages have been sent, zero disables") ("id,i", qpid::optValue(id, "ID"), "use the supplied id instead of generating one") @@ -88,6 +94,8 @@ struct Options : public qpid::Options ("correlation-id", qpid::optValue(correlationid, "ID"), "correlation-id for message") ("user-id", qpid::optValue(userid, "USERID"), "userid for message") ("content", qpid::optValue(content, "CONTENT"), "specify textual content") + ("tx", qpid::optValue(tx, "N"), "batch size for transactions (0 implies transaction are not used)") + ("rollback-frequency", qpid::optValue(rollbackFrequency, "N"), "rollback frequency (0 implies no transaction will be rolledback)") ("help", qpid::optValue(help), "print this usage statement"); add(log); } @@ -172,8 +180,12 @@ int main(int argc, char ** argv) Options opts; if (opts.parse(argc, argv)) { try { - Connection connection = Connection::open(opts.url); - Session session = connection.newSession(); + Variant::Map connectionOptions; + if (opts.connectionOptions.size()) { + parseOptionString(opts.connectionOptions, connectionOptions); + } + Connection connection = Connection::open(opts.url, connectionOptions); + Session session = connection.newSession(opts.tx > 0); Sender sender = session.createSender(opts.address); Message msg; msg.setDurable(opts.durable); @@ -186,16 +198,31 @@ int main(int argc, char ** argv) opts.setProperties(msg); std::string content; uint sent = 0; + uint txCount = 0; while (getline(std::cin, content)) { msg.setContent(content); msg.getHeaders()["sn"] = ++sent; sender.send(msg); + if (opts.tx && (sent % opts.tx == 0)) { + if (opts.rollbackFrequency && (++txCount % opts.rollbackFrequency == 0)) { + session.rollback(); + } else { + session.commit(); + } + } } for (uint i = opts.sendEos; i > 0; --i) { msg.getHeaders()["sn"] = ++sent; msg.setContent(EOS);//TODO: add in ability to send digest or similar sender.send(msg); } + if (opts.tx) { + if (opts.rollbackFrequency && (++txCount % opts.rollbackFrequency == 0)) { + session.rollback(); + } else { + session.commit(); + } + } session.sync(); session.close(); connection.close(); |