summaryrefslogtreecommitdiff
path: root/cpp/src/tests
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2010-03-11 18:23:46 +0000
committerGordon Sim <gsim@apache.org>2010-03-11 18:23:46 +0000
commitb00e749a170b09db7e24161baa809c11fa65531f (patch)
tree1356c7ecbc3555892d9be10ec9d86455335259fb /cpp/src/tests
parent88086e0099c0fb67ac3a01c5f8793c0634b946a0 (diff)
downloadqpid-python-b00e749a170b09db7e24161baa809c11fa65531f.tar.gz
QPID-2382: Created separate utility class for handling updates from failover exchange; cleaned up reconnection options
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@921971 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests')
-rw-r--r--cpp/src/tests/qpid_recv.cpp8
-rw-r--r--cpp/src/tests/qpid_send.cpp14
-rw-r--r--cpp/src/tests/qpid_stream.cpp31
3 files changed, 40 insertions, 13 deletions
diff --git a/cpp/src/tests/qpid_recv.cpp b/cpp/src/tests/qpid_recv.cpp
index 9e4e202053..e4cc6a7ac8 100644
--- a/cpp/src/tests/qpid_recv.cpp
+++ b/cpp/src/tests/qpid_recv.cpp
@@ -27,12 +27,14 @@
#include <qpid/Options.h>
#include <qpid/log/Logger.h>
#include <qpid/log/Options.h>
+#include <qpid/client/amqp0_10/FailoverUpdates.h>
#include "TestOptions.h"
#include <iostream>
-
+#include <memory>
using namespace qpid::messaging;
+using qpid::client::amqp0_10::FailoverUpdates;
using namespace std;
@@ -54,6 +56,7 @@ struct Options : public qpid::Options
uint tx;
uint rollbackFrequency;
bool printHeaders;
+ bool failoverUpdates;
qpid::log::Options log;
Options(const std::string& argv0=std::string())
@@ -69,6 +72,7 @@ struct Options : public qpid::Options
tx(0),
rollbackFrequency(0),
printHeaders(false),
+ failoverUpdates(false),
log(argv0)
{
addOptions()
@@ -84,6 +88,7 @@ struct Options : public qpid::Options
("tx", qpid::optValue(tx, "N"), "batch size for transactions (0 implies transaction are not used)")
("rollback-frequency", qpid::optValue(rollbackFrequency, "N"), "rollback frequency (0 implies no transaction will be rolledback)")
("print-headers", qpid::optValue(printHeaders), "If specified print out all message headers as well as content")
+ ("failover-updates", qpid::optValue(failoverUpdates), "Listen for membership updates distributed via amq.failover")
("help", qpid::optValue(help), "print this usage statement");
add(log);
}
@@ -146,6 +151,7 @@ int main(int argc, char ** argv)
try {
Connection connection(opts.connectionOptions);
connection.open(opts.url);
+ std::auto_ptr<FailoverUpdates> updates(opts.failoverUpdates ? new FailoverUpdates(connection) : 0);
Session session = connection.newSession(opts.tx > 0);
Receiver receiver = session.createReceiver(opts.address);
receiver.setCapacity(opts.capacity);
diff --git a/cpp/src/tests/qpid_send.cpp b/cpp/src/tests/qpid_send.cpp
index 57c348ab9c..50e6c4371a 100644
--- a/cpp/src/tests/qpid_send.cpp
+++ b/cpp/src/tests/qpid_send.cpp
@@ -25,16 +25,15 @@
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Session.h>
+#include <qpid/client/amqp0_10/FailoverUpdates.h>
#include "TestOptions.h"
#include <fstream>
#include <iostream>
+#include <memory>
using namespace qpid::messaging;
-using qpid::framing::Uuid;
-using qpid::sys::AbsTime;
-using qpid::sys::now;
-using qpid::sys::TIME_INFINITE;
+using qpid::client::amqp0_10::FailoverUpdates;
typedef std::vector<std::string> string_vector;
@@ -49,7 +48,6 @@ struct Options : public qpid::Options
std::string url;
std::string connectionOptions;
std::string address;
- int64_t timeout;
uint count;
std::string id;
std::string replyto;
@@ -64,13 +62,13 @@ struct Options : public qpid::Options
uint tx;
uint rollbackFrequency;
uint capacity;
+ bool failoverUpdates;
qpid::log::Options log;
Options(const std::string& argv0=std::string())
: qpid::Options("Options"),
help(false),
url("amqp:tcp:127.0.0.1"),
- timeout(TIME_INFINITE),
count(1),
sendEos(0),
durable(false),
@@ -78,13 +76,13 @@ struct Options : public qpid::Options
tx(0),
rollbackFrequency(0),
capacity(0),
+ failoverUpdates(false),
log(argv0)
{
addOptions()
("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to")
("address,a", qpid::optValue(address, "ADDRESS"), "address to drain from")
("connection-options", qpid::optValue(connectionOptions, "OPTIONS"), "options for the connection")
- ("timeout,t", qpid::optValue(timeout, "TIMEOUT"), "exit after the specified time")
("count,c", qpid::optValue(count, "COUNT"), "stop after count messages have been sent, zero disables")
("id,i", qpid::optValue(id, "ID"), "use the supplied id instead of generating one")
("reply-to", qpid::optValue(replyto, "REPLY-TO"), "specify reply-to address")
@@ -99,6 +97,7 @@ struct Options : public qpid::Options
("capacity", qpid::optValue(capacity, "N"), "size of the senders outgoing message queue")
("tx", qpid::optValue(tx, "N"), "batch size for transactions (0 implies transaction are not used)")
("rollback-frequency", qpid::optValue(rollbackFrequency, "N"), "rollback frequency (0 implies no transaction will be rolledback)")
+ ("failover-updates", qpid::optValue(failoverUpdates), "Listen for membership updates distributed via amq.failover")
("help", qpid::optValue(help), "print this usage statement");
add(log);
}
@@ -185,6 +184,7 @@ int main(int argc, char ** argv)
try {
Connection connection(opts.connectionOptions);
connection.open(opts.url);
+ std::auto_ptr<FailoverUpdates> updates(opts.failoverUpdates ? new FailoverUpdates(connection) : 0);
Session session = connection.newSession(opts.tx > 0);
Sender sender = session.createSender(opts.address);
if (opts.capacity) sender.setCapacity(opts.capacity);
diff --git a/cpp/src/tests/qpid_stream.cpp b/cpp/src/tests/qpid_stream.cpp
index ca21fa248b..ef0aea52e4 100644
--- a/cpp/src/tests/qpid_stream.cpp
+++ b/cpp/src/tests/qpid_stream.cpp
@@ -40,16 +40,33 @@ struct Args : public qpid::Options
{
std::string url;
std::string address;
+ uint size;
uint rate;
bool durable;
-
- Args() : url("amqp:tcp:127.0.0.1:5672"), address("test-queue"), rate(1000), durable(false)
+ uint receiverCapacity;
+ uint senderCapacity;
+ uint ackFrequency;
+
+ Args() :
+ url("amqp:tcp:127.0.0.1:5672"),
+ address("test-queue"),
+ size(512),
+ rate(1000),
+ durable(false),
+ receiverCapacity(0),
+ senderCapacity(0),
+ ackFrequency(1)
{
addOptions()
("url", qpid::optValue(url, "URL"), "Url to connect to.")
("address", qpid::optValue(address, "ADDRESS"), "Address to stream messages through.")
+ ("size", qpid::optValue(size, "bytes"), "Message size in bytes (content only, not headers).")
("rate", qpid::optValue(rate, "msgs/sec"), "Rate at which to stream messages.")
- ("durable", qpid::optValue(durable, "true|false"), "Mark messages as durable.");
+ ("durable", qpid::optValue(durable, "true|false"), "Mark messages as durable.")
+ ("sender-capacity", qpid::optValue(senderCapacity, "N"), "Credit window (0 implies infinite window)")
+ ("receiver-capacity", qpid::optValue(receiverCapacity, "N"), "Credit window (0 implies infinite window)")
+ ("ack-frequency", qpid::optValue(ackFrequency, "N"),
+ "Ack frequency (0 implies none of the messages will get accepted)");
}
};
@@ -93,7 +110,8 @@ struct Publish : Client
void doWork(Session& session)
{
Sender sender = session.createSender(opts.address);
- Message msg;
+ if (opts.senderCapacity) sender.setCapacity(opts.senderCapacity);
+ Message msg(std::string(opts.size, 'X'));
uint64_t interval = qpid::sys::TIME_SEC / opts.rate;
uint64_t sent = 0, missedRate = 0;
qpid::sys::AbsTime start = qpid::sys::now();
@@ -123,9 +141,12 @@ struct Consume : Client
double maxLatency = 0;
double totalLatency = 0;
Receiver receiver = session.createReceiver(opts.address);
+ if (opts.receiverCapacity) receiver.setCapacity(opts.receiverCapacity);
while (receiver.fetch(msg)) {
- session.acknowledge();//TODO: add batching option
++received;
+ if (opts.ackFrequency && (received % opts.ackFrequency == 0)) {
+ session.acknowledge();
+ }
//calculate latency
uint64_t receivedAt = timestamp(qpid::sys::now());
uint64_t sentAt = msg.getHeaders()[TIMESTAMP].asUint64();