summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp1
-rw-r--r--qpid/cpp/src/qpid/broker/MessageGroupManager.cpp105
-rw-r--r--qpid/cpp/src/qpid/broker/MessageGroupManager.h13
-rwxr-xr-xqpid/cpp/src/tests/cluster_tests.py70
-rw-r--r--qpid/cpp/src/tests/msg_group_test.cpp178
-rwxr-xr-xqpid/cpp/src/tests/run_msg_group_tests6
6 files changed, 323 insertions, 50 deletions
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index a7a1370801..14861ec3df 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -702,6 +702,7 @@ Manageable::status_t Broker::queryQueue( const std::string& name,
QPID_LOG(error, "Query failed: queue not found, name=" << name);
return Manageable::STATUS_UNKNOWN_OBJECT;
}
+ q->query( results );
return Manageable::STATUS_OK;;
}
diff --git a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
index 4fc142f553..8047cd639d 100644
--- a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
+++ b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
@@ -305,3 +305,108 @@ boost::shared_ptr<MessageGroupManager> MessageGroupManager::create( Queue *q,
}
return empty;
}
+
+/** Cluster replication:
+
+ state map format:
+
+ { "group-state": [ {"name": <group-name>,
+ "owner": <consumer-name>-or-empty,
+ "acquired-ct": <acquired count>,
+ "positions": [Seqnumbers, ... ]},
+ {...}
+ ]
+ }
+*/
+
+namespace {
+ const std::string GROUP_NAME("name");
+ const std::string GROUP_OWNER("owner");
+ const std::string GROUP_ACQUIRED_CT("acquired-ct");
+ const std::string GROUP_POSITIONS("positions");
+ const std::string GROUP_STATE("group-state");
+}
+
+
+/** Runs on UPDATER to snapshot current state */
+void MessageGroupManager::getState(qpid::framing::FieldTable& state ) const
+{
+ using namespace qpid::framing;
+ state.clear();
+ framing::Array groupState(TYPE_CODE_MAP);
+ for (GroupMap::const_iterator g = messageGroups.begin();
+ g != messageGroups.end(); ++g) {
+
+ framing::FieldTable group;
+ group.setString(GROUP_NAME, g->first);
+ group.setString(GROUP_OWNER, g->second.owner);
+ group.setInt(GROUP_ACQUIRED_CT, g->second.acquired);
+ framing::Array positions(TYPE_CODE_UINT32);
+ for (GroupState::PositionFifo::const_iterator p = g->second.members.begin();
+ p != g->second.members.end(); ++p)
+ positions.push_back(framing::Array::ValuePtr(new IntegerValue( *p )));
+ group.setArray(GROUP_POSITIONS, positions);
+ groupState.push_back(framing::Array::ValuePtr(new FieldTableValue(group)));
+ }
+ state.setArray(GROUP_STATE, groupState);
+
+ QPID_LOG(debug, "Queue \"" << queue->getName() << "\": replicating message group state, key=" << groupIdHeader);
+}
+
+
+/** called on UPDATEE to set state from snapshot */
+void MessageGroupManager::setState(const qpid::framing::FieldTable& state)
+{
+ using namespace qpid::framing;
+ messageGroups.clear();
+ consumers.clear();
+ freeGroups.clear();
+
+ framing::Array groupState(TYPE_CODE_MAP);
+
+ bool ok = state.getArray(GROUP_STATE, groupState);
+ if (!ok) {
+ QPID_LOG(error, "Unable to find message group state information for queue \"" <<
+ queue->getName() << "\": cluster inconsistency error!");
+ return;
+ }
+
+ for (framing::Array::const_iterator g = groupState.begin();
+ g != groupState.end(); ++g) {
+ framing::FieldTable group;
+ ok = framing::getEncodedValue<FieldTable>(*g, group);
+ if (!ok) {
+ QPID_LOG(error, "Invalid message group state information for queue \"" <<
+ queue->getName() << "\": table encoding error!");
+ return;
+ }
+ MessageGroupManager::GroupState state;
+ if (!group.isSet(GROUP_NAME) || !group.isSet(GROUP_OWNER) || !group.isSet(GROUP_ACQUIRED_CT)) {
+ QPID_LOG(error, "Invalid message group state information for queue \"" <<
+ queue->getName() << "\": fields missing error!");
+ return;
+ }
+ state.group = group.getAsString(GROUP_NAME);
+ state.owner = group.getAsString(GROUP_OWNER);
+ state.acquired = group.getAsInt(GROUP_ACQUIRED_CT);
+ framing::Array positions(TYPE_CODE_UINT32);
+ ok = group.getArray(GROUP_POSITIONS, positions);
+ if (!ok) {
+ QPID_LOG(error, "Invalid message group state information for queue \"" <<
+ queue->getName() << "\": position encoding error!");
+ return;
+ }
+
+ for (Array::const_iterator p = positions.begin(); p != positions.end(); ++p)
+ state.members.push_back((*p)->getIntegerValue<uint32_t, 4>());
+ messageGroups[state.group] = state;
+ if (state.owned())
+ consumers[state.owner]++;
+ else {
+ assert(state.members.size());
+ freeGroups[state.members.front()] = &messageGroups[state.group];
+ }
+ }
+
+ QPID_LOG(debug, "Queue \"" << queue->getName() << "\": message group state replicated, key =" << groupIdHeader)
+}
diff --git a/qpid/cpp/src/qpid/broker/MessageGroupManager.h b/qpid/cpp/src/qpid/broker/MessageGroupManager.h
index 0a1551f3ba..a9b73ade5d 100644
--- a/qpid/cpp/src/qpid/broker/MessageGroupManager.h
+++ b/qpid/cpp/src/qpid/broker/MessageGroupManager.h
@@ -24,7 +24,7 @@
/* for managing message grouping on Queues */
-#include "qpid/broker/QueueObserver.h"
+#include "qpid/broker/StatefulQueueObserver.h"
#include "qpid/broker/MessageAllocator.h"
@@ -34,7 +34,7 @@ namespace broker {
class QueueObserver;
class MessageAllocator;
-class MessageGroupManager : public QueueObserver, public MessageAllocator
+class MessageGroupManager : public StatefulQueueObserver, public MessageAllocator
{
const std::string groupIdHeader; // msg header holding group identifier
const unsigned int timestamp; // mark messages with timestamp if set
@@ -55,8 +55,9 @@ class MessageGroupManager : public QueueObserver, public MessageAllocator
typedef std::map<std::string, uint32_t> Consumers; // count of owned groups
typedef std::map<framing::SequenceNumber, struct GroupState *> GroupFifo;
+ // note: update getState()/setState() when changing this object's state implementation
GroupMap messageGroups; // index: group name
- GroupFifo freeGroups; // ordered by oldest free msg
+ GroupFifo freeGroups; // ordered by oldest free msg
Consumers consumers; // index: consumer name
static const std::string qpidMessageGroupKey;
@@ -95,13 +96,17 @@ class MessageGroupManager : public QueueObserver, public MessageAllocator
static boost::shared_ptr<MessageGroupManager> create( Queue *q, const qpid::framing::FieldTable& settings );
MessageGroupManager(const std::string& header, Queue *q, unsigned int _timestamp=0 )
- : QueueObserver(), MessageAllocator(q), groupIdHeader( header ), timestamp(_timestamp) {}
+ : StatefulQueueObserver(std::string("MessageGroupManager:") + header), MessageAllocator(q),
+ groupIdHeader( header ), timestamp(_timestamp) {}
void enqueued( const QueuedMessage& qm );
void acquired( const QueuedMessage& qm );
void requeued( const QueuedMessage& qm );
void dequeued( const QueuedMessage& qm );
void consumerAdded( const Consumer& );
void consumerRemoved( const Consumer& );
+ void getState(qpid::framing::FieldTable& state ) const;
+ void setState(const qpid::framing::FieldTable&);
+
bool nextConsumableMessage( Consumer::shared_ptr& c, QueuedMessage& next,
const sys::Mutex::ScopedLock&);
// uses default nextBrowsableMessage()
diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py
index 807e9508c3..13f4f716da 100755
--- a/qpid/cpp/src/tests/cluster_tests.py
+++ b/qpid/cpp/src/tests/cluster_tests.py
@@ -1035,6 +1035,76 @@ class LongTests(BrokerTest):
receiver.connection.detach()
logger.setLevel(log_level)
+ def test_msg_group_failover(self):
+ """Test fail-over during continuous send-receive of grouped messages.
+ """
+
+ class GroupedTrafficGenerator(Thread):
+ def __init__(self, url, queue, group_key):
+ Thread.__init__(self)
+ self.url = url
+ self.queue = queue
+ self.group_key = group_key
+ self.status = -1
+
+ def run(self):
+ # generate traffic for approx 10 seconds (2011msgs / 200 per-sec)
+ cmd = ["msg_group_test",
+ "--broker=%s" % self.url,
+ "--address=%s" % self.queue,
+ "--connection-options={%s}" % (Cluster.CONNECTION_OPTIONS),
+ "--group-key=%s" % self.group_key,
+ "--receivers=2",
+ "--senders=3",
+ "--messages=2011",
+ "--send-rate=200",
+ "--capacity=11",
+ "--ack-frequency=23",
+ "--allow-duplicates",
+ "--group-size=37",
+ "--randomize-group-size",
+ "--interleave=13"]
+ # "--trace"]
+ self.generator = Popen( cmd );
+ self.status = self.generator.wait()
+ return self.status
+
+ def results(self):
+ self.join(timeout=30) # 3x assumed duration
+ if self.isAlive(): return -1
+ return self.status
+
+ # Original cluster will all be killed so expect exit with failure
+ cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL, args=["-t"])
+ for b in cluster: b.ready() # Wait for brokers to be ready
+
+ # create a queue with rather draconian flow control settings
+ ssn0 = cluster[0].connect().session()
+ s0 = ssn0.sender("test-group-q; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.group_header_key':'group-id'}}}}")
+
+
+ # Kill original brokers, start new ones for the duration.
+ endtime = time.time() + self.duration();
+ i = 0
+ while time.time() < endtime:
+ traffic = GroupedTrafficGenerator( cluster[i].host_port(),
+ "test-group-q", "group-id" )
+ traffic.start()
+ time.sleep(1)
+
+ for x in range(2):
+ for b in cluster[i:]: b.ready() # Check if any broker crashed.
+ cluster[i].kill()
+ i += 1
+ b = cluster.start(expect=EXPECT_EXIT_FAIL)
+ time.sleep(1)
+
+ # wait for traffic to finish, verify success
+ self.assertEqual(0, traffic.results())
+
+ for i in range(i, len(cluster)): cluster[i].kill()
+
+
class StoreTests(BrokerTest):
"""
Cluster tests that can only be run if there is a store available.
diff --git a/qpid/cpp/src/tests/msg_group_test.cpp b/qpid/cpp/src/tests/msg_group_test.cpp
index 3a458ad569..6b9d09b89a 100644
--- a/qpid/cpp/src/tests/msg_group_test.cpp
+++ b/qpid/cpp/src/tests/msg_group_test.cpp
@@ -67,6 +67,9 @@ struct Options : public qpid::Options
bool randomizeSize;
bool stickyConsumer;
uint timeout;
+ uint interleave;
+ std::string prefix;
+ uint sendRate;
Options(const std::string& argv0=std::string())
: qpid::Options("Options"),
@@ -86,11 +89,13 @@ struct Options : public qpid::Options
allowDuplicates(false),
randomizeSize(false),
stickyConsumer(false),
- timeout(10)
+ timeout(10),
+ interleave(1),
+ sendRate(0)
{
addOptions()
("ack-frequency", qpid::optValue(ackFrequency, "N"), "Ack frequency (0 implies none of the messages will get accepted)")
- ("address,a", qpid::optValue(address, "ADDRESS"), "address to receive from")
+ ("address,a", qpid::optValue(address, "ADDRESS"), "address to send and receive from")
("allow-duplicates", qpid::optValue(allowDuplicates), "Ignore the delivery of duplicated messages")
("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to")
("capacity", qpid::optValue(capacity, "N"), "Pre-fetch window (0 implies no pre-fetch)")
@@ -98,10 +103,13 @@ struct Options : public qpid::Options
("durable", qpid::optValue(durable, "yes|no"), "Mark messages as durable.")
("failover-updates", qpid::optValue(failoverUpdates), "Listen for membership updates distributed via amq.failover")
("group-key", qpid::optValue(groupKey, "KEY"), "Key of the message header containing the group identifier.")
+ ("group-prefix", qpid::optValue(prefix, "STRING"), "Add 'prefix' to the start of all generated group identifiers.")
("group-size", qpid::optValue(groupSize, "N"), "Number of messages per a group.")
+ ("interleave", qpid::optValue(interleave, "N"), "Simultaineously interleave messages from N different groups.")
("messages,m", qpid::optValue(messages, "N"), "Number of messages to send per each sender.")
("receivers,r", qpid::optValue(receivers, "N"), "Number of message consumers.")
("randomize-group-size", qpid::optValue(randomizeSize), "Randomize the number of messages per group to [1...group-size].")
+ ("send-rate", qpid::optValue(sendRate,"N"), "Send at rate of N messages/second. 0 means send as fast as possible.")
("senders,s", qpid::optValue(senders, "N"), "Number of message producers.")
("sticky-consumers", qpid::optValue(stickyConsumer), "If set, verify that all messages in a group are consumed by the same client [TBD].")
("timeout", qpid::optValue(timeout, "N"), "Fail with a stall error should all consumers remain idle for timeout seconds.")
@@ -181,12 +189,12 @@ public:
// now verify
SequenceMap::iterator s = sequenceMap.find(groupId);
if (s == sequenceMap.end()) {
- sequenceMap[groupId] = 1;
- totalMsgsConsumed++;
QPID_LOG(debug, "Client " << client << " thinks this is the first message from group " << groupId << ":" << sequence);
- return sequence == 0;
- }
- if (sequence < s->second) {
+ // if duplication allowed, it is possible that the last msg(s) of an old sequence are redelivered on reconnect.
+ // in this case, set the sequence from the first msg.
+ sequenceMap[groupId] = (allowDuplicates) ? sequence : 0;
+ s = sequenceMap.find(groupId);
+ } else if (sequence < s->second) {
duplicateMsgs++;
QPID_LOG(debug, "Client " << client << " thinks this message is a duplicate! " << groupId << ":" << sequence);
return allowDuplicates;
@@ -222,7 +230,9 @@ public:
bool allMsgsConsumed() // true when done processing msgs
{
qpid::sys::Mutex::ScopedLock l(lock);
- return totalMsgsConsumed == totalMsgs;
+ return (totalMsgsPublished >= totalMsgs) &&
+ (totalMsgsConsumed >= totalMsgsPublished) &&
+ sequenceMap.size() == 0;
}
uint getConsumedTotal()
@@ -274,10 +284,85 @@ namespace {
}
};
- static Randomizer randomize;
+ static Randomizer randomizer;
}
+// tag each generated message with a group identifer
+//
+class GroupGenerator {
+
+ const std::string groupPrefix;
+ const uint groupSize;
+ const bool randomizeSize;
+ const uint interleave;
+
+ uint groupSuffix;
+ uint total;
+
+ struct GroupState {
+ std::string id;
+ const uint size;
+ uint count;
+ GroupState( const std::string& i, const uint s )
+ : id(i), size(s), count(0) {}
+ };
+ typedef std::list<GroupState> GroupList;
+ GroupList groups;
+ GroupList::iterator current;
+
+ // add a new group identifier to the list
+ void newGroup() {
+ std::ostringstream groupId(groupPrefix, ios_base::out|ios_base::ate);
+ groupId << std::string(":") << groupSuffix++;
+ uint size = (randomizeSize) ? randomizer(groupSize) : groupSize;
+ QPID_LOG(trace, "New group: GROUPID=[" << groupId.str() << "] size=" << size << " this=" << this);
+ GroupState group( groupId.str(), size );
+ groups.push_back( group );
+ }
+
+public:
+ GroupGenerator( const std::string& prefix,
+ const uint t,
+ const uint size,
+ const bool randomize,
+ const uint i)
+ : groupPrefix(prefix), groupSize(size),
+ randomizeSize(randomize), interleave(i), groupSuffix(0), total(t)
+ {
+ QPID_LOG(trace, "New group generator: PREFIX=[" << prefix << "] total=" << total << " size=" << size << " rand=" << randomize << " interleave=" << interleave << " this=" << this);
+ for (uint i = 0; i < 1 || i < interleave; ++i) {
+ newGroup();
+ }
+ current = groups.begin();
+ }
+
+ bool genGroup(std::string& groupId, uint& seq, bool& eos)
+ {
+ if (!total) return false;
+ --total;
+ if (current == groups.end())
+ current = groups.begin();
+ groupId = current->id;
+ seq = current->count++;
+ if (current->count == current->size) {
+ QPID_LOG(trace, "Last msg for " << current->id << ", " << current->count << " this=" << this);
+ eos = true;
+ if (total >= interleave) { // need a new group to replace this one
+ newGroup();
+ groups.erase(current++);
+ } else ++current;
+ } else {
+ ++current;
+ eos = total < interleave; // mark eos on the last message of each group
+ }
+ QPID_LOG(trace, "SENDING GROUPID=[" << groupId << "] seq=" << seq << " eos=" << eos << " this=" << this);
+ return true;
+ }
+};
+
+
+
class Client : public qpid::sys::Runnable
{
public:
@@ -291,6 +376,7 @@ public:
qpid::sys::Thread& getThread() { return thread; }
const std::string getErrorMsg() { return error.str(); }
void stop() {stopped = true;}
+ const std::string& getName() { return name; }
protected:
const std::string name;
@@ -323,16 +409,15 @@ public:
Message msg;
uint count = 0;
- while (!stopped && !checker.allMsgsConsumed()) {
-
+ while (!stopped) {
if (receiver.fetch(msg, Duration::SECOND)) { // msg retrieved
-
qpid::types::Variant::Map& properties = msg.getProperties();
-
std::string groupId = properties[opts.groupKey];
uint groupSeq = properties[SN];
bool eof = properties[EOS];
+ QPID_LOG(trace, "RECVING GROUPID=[" << groupId << "] seq=" << groupSeq << " eos=" << eof << " name=" << name);
+
qpid::sys::usleep(10);
if (!checker.checkSequence( groupId, groupSeq, name )) {
@@ -355,7 +440,8 @@ public:
}
// Clear out message properties & content for next iteration.
msg = Message(); // TODO aconway 2010-12-01: should be done by fetch
- }
+ } else if (checker.allMsgsConsumed()) // timed out, nothing else to do?
+ break;
}
session.acknowledge();
session.close();
@@ -367,6 +453,7 @@ public:
connection.close();
}
clientDone();
+ QPID_LOG(trace, "Consuming client " << name << " completed.");
}
};
@@ -375,9 +462,13 @@ public:
class Producer : public Client
{
GroupChecker& checker;
+ GroupGenerator generator;
public:
- Producer(const std::string& n, const Options& o, GroupChecker& c) : Client(n, o), checker(c) {};
+ Producer(const std::string& n, const Options& o, GroupChecker& c)
+ : Client(n, o), checker(c),
+ generator( n, o.messages, o.groupSize, o.randomizeSize, o.interleave )
+ {};
virtual ~Producer() {};
void run()
@@ -392,32 +483,29 @@ public:
if (opts.capacity) sender.setCapacity(opts.capacity);
Message msg;
msg.setDurable(opts.durable);
+ std::string groupId;
+ uint seq;
+ bool eos;
uint sent = 0;
- uint groupSeq = 0;
- uint groupSize = opts.groupSize;
- ostringstream group;
- group << name << ":" << sent;
- std::string groupId(group.str());
- while (!stopped && sent < opts.messages) {
- ++sent;
+ qpid::sys::AbsTime start = qpid::sys::now();
+ int64_t interval = 0;
+ if (opts.sendRate) interval = qpid::sys::TIME_SEC/opts.sendRate;
+
+ while (!stopped && generator.genGroup(groupId, seq, eos)) {
msg.getProperties()[opts.groupKey] = groupId;
- msg.getProperties()[SN] = groupSeq++;
- msg.getProperties()[EOS] = false;
- checker.sendingSequence( groupId, groupSeq-1, (groupSeq == groupSize), name );
- if (groupSeq == groupSize) {
- msg.getProperties()[EOS] = true;
- // generate new group
- ostringstream nextGroupId;
- nextGroupId << name << ":" << sent;
- groupId = nextGroupId.str();
- groupSeq = 0;
- if (opts.randomizeSize) {
- groupSize = randomize(opts.groupSize);
- }
- }
+ msg.getProperties()[SN] = seq;
+ msg.getProperties()[EOS] = eos;
+ checker.sendingSequence( groupId, seq, eos, name );
+
sender.send(msg);
- qpid::sys::usleep(10);
+ ++sent;
+
+ if (opts.sendRate) {
+ qpid::sys::AbsTime waitTill(start, sent*interval);
+ int64_t delay = qpid::sys::Duration(qpid::sys::now(), waitTill);
+ if (delay > 0) qpid::sys::usleep(delay/qpid::sys::TIME_USEC);
+ }
}
session.sync();
session.close();
@@ -429,6 +517,7 @@ public:
connection.close();
}
clientDone();
+ QPID_LOG(trace, "Producing client " << name << " completed.");
}
};
@@ -453,13 +542,13 @@ int main(int argc, char ** argv)
// fire off the producers && consumers
for (size_t j = 0; j < opts.senders; ++j) {
ostringstream name;
- name << "P_" << j;
+ name << opts.prefix << "P_" << j;
clients.push_back(Client::shared_ptr(new Producer( name.str(), opts, state )));
clients.back()->getThread() = qpid::sys::Thread(*clients.back());
}
for (size_t j = 0; j < opts.receivers; ++j) {
ostringstream name;
- name << "C_" << j;
+ name << opts.prefix << "C_" << j;
clients.push_back(Client::shared_ptr(new Consumer( name.str(), opts, state )));
clients.back()->getThread() = qpid::sys::Thread(*clients.back());
}
@@ -476,8 +565,9 @@ int main(int argc, char ** argv)
done = true;
for (std::vector<Client::shared_ptr>::iterator i = clients.begin();
i != clients.end(); ++i) {
+ QPID_LOG(debug, "Client " << (*i)->getName() << " state=" << (*i)->getState());
if ((*i)->getState() == Client::FAILURE) {
- std::cerr << argv[0] << ": test failed with client error: " << (*i)->getErrorMsg() << std::endl;
+ QPID_LOG(error, argv[0] << ": test failed with client error: " << (*i)->getErrorMsg());
clientFailed = true;
done = true;
break; // exit test.
@@ -505,7 +595,7 @@ int main(int argc, char ** argv)
if (clientFailed) {
status = 1;
} else if (stalledTime >= opts.timeout) {
- std::cerr << argv[0] << ": test failed due to stalled consumer." << std::endl;
+ QPID_LOG(error, argv[0] << ": test failed due to stalled consumer." );
status = 2;
}
@@ -517,10 +607,12 @@ int main(int argc, char ** argv)
}
if (opts.printReport && !status) state.print(std::cout);
- }
+ } else status = 4;
} catch(const std::exception& error) {
- std::cerr << argv[0] << ": " << error.what() << std::endl;
+ QPID_LOG(error, argv[0] << ": " << error.what());
status = 3;
}
+ QPID_LOG(trace, "TEST DONE [" << status << "]");
+
return status;
}
diff --git a/qpid/cpp/src/tests/run_msg_group_tests b/qpid/cpp/src/tests/run_msg_group_tests
index cae614ccff..ee4f4bef77 100755
--- a/qpid/cpp/src/tests/run_msg_group_tests
+++ b/qpid/cpp/src/tests/run_msg_group_tests
@@ -44,15 +44,15 @@ run_test() {
declare -i i=0
declare -a tests
tests=("qpid-config -a $BROKER_URL add queue $QUEUE_NAME --argument=qpid.group_header_key=${GROUP_KEY}"
- "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 103 --group-size 13 --receivers 2 --senders 3 --capacity 3 --ack-frequency 7 --randomize-group-size"
+ "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 103 --group-size 13 --receivers 2 --senders 3 --capacity 3 --ack-frequency 7 --randomize-group-size --interleave 3"
"msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 103 --group-size 13 --receivers 2 --senders 3 --capacity 7 --ack-frequency 7 --randomize-group-size"
"qpid-config -a $BROKER_URL add queue ${QUEUE_NAME}-two --argument=qpid.group_header_key=${GROUP_KEY}"
"msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 103 --group-size 13 --receivers 2 --senders 3 --capacity 7 --ack-frequency 3 --randomize-group-size"
- "msg_group_test -b $BROKER_URL -a ${QUEUE_NAME}-two --group-key $GROUP_KEY --messages 103 --group-size 13 --receivers 2 --senders 3 --capacity 3 --ack-frequency 7 --randomize-group-size"
+ "msg_group_test -b $BROKER_URL -a ${QUEUE_NAME}-two --group-key $GROUP_KEY --messages 103 --group-size 13 --receivers 2 --senders 3 --capacity 3 --ack-frequency 7 --randomize-group-size --interleave 5"
"msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 59 --group-size 5 --receivers 2 --senders 3 --capacity 1 --ack-frequency 3 --randomize-group-size"
"qpid-config -a $BROKER_URL del queue ${QUEUE_NAME}-two --force"
"msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 59 --group-size 3 --receivers 2 --senders 3 --capacity 1 --ack-frequency 1 --randomize-group-size"
- "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 103 --group-size 13 --receivers 2 --senders 3 --capacity 47 --ack-frequency 79"
+ "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 211 --group-size 13 --receivers 2 --senders 3 --capacity 47 --ack-frequency 79 --interleave 53"
"qpid-config -a $BROKER_URL del queue $QUEUE_NAME --force")
while [ -n "${tests[i]}" ]; do