diff options
Diffstat (limited to 'qpid/cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.cpp | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/MessageGroupManager.cpp | 105 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/MessageGroupManager.h | 13 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/cluster_tests.py | 70 | ||||
-rw-r--r-- | qpid/cpp/src/tests/msg_group_test.cpp | 178 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/run_msg_group_tests | 6 |
6 files changed, 323 insertions, 50 deletions
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index a7a1370801..14861ec3df 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -702,6 +702,7 @@ Manageable::status_t Broker::queryQueue( const std::string& name, QPID_LOG(error, "Query failed: queue not found, name=" << name); return Manageable::STATUS_UNKNOWN_OBJECT; } + q->query( results ); return Manageable::STATUS_OK;; } diff --git a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp index 4fc142f553..8047cd639d 100644 --- a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp +++ b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp @@ -305,3 +305,108 @@ boost::shared_ptr<MessageGroupManager> MessageGroupManager::create( Queue *q, } return empty; } + +/** Cluster replication: + + state map format: + + { "group-state": [ {"name": <group-name>, + "owner": <consumer-name>-or-empty, + "acquired-ct": <acquired count>, + "positions": [Seqnumbers, ... ]}, + {...} + ] + } +*/ + +namespace { + const std::string GROUP_NAME("name"); + const std::string GROUP_OWNER("owner"); + const std::string GROUP_ACQUIRED_CT("acquired-ct"); + const std::string GROUP_POSITIONS("positions"); + const std::string GROUP_STATE("group-state"); +} + + +/** Runs on UPDATER to snapshot current state */ +void MessageGroupManager::getState(qpid::framing::FieldTable& state ) const +{ + using namespace qpid::framing; + state.clear(); + framing::Array groupState(TYPE_CODE_MAP); + for (GroupMap::const_iterator g = messageGroups.begin(); + g != messageGroups.end(); ++g) { + + framing::FieldTable group; + group.setString(GROUP_NAME, g->first); + group.setString(GROUP_OWNER, g->second.owner); + group.setInt(GROUP_ACQUIRED_CT, g->second.acquired); + framing::Array positions(TYPE_CODE_UINT32); + for (GroupState::PositionFifo::const_iterator p = g->second.members.begin(); + p != g->second.members.end(); ++p) + positions.push_back(framing::Array::ValuePtr(new IntegerValue( *p ))); + group.setArray(GROUP_POSITIONS, positions); + groupState.push_back(framing::Array::ValuePtr(new FieldTableValue(group))); + } + state.setArray(GROUP_STATE, groupState); + + QPID_LOG(debug, "Queue \"" << queue->getName() << "\": replicating message group state, key=" << groupIdHeader); +} + + +/** called on UPDATEE to set state from snapshot */ +void MessageGroupManager::setState(const qpid::framing::FieldTable& state) +{ + using namespace qpid::framing; + messageGroups.clear(); + consumers.clear(); + freeGroups.clear(); + + framing::Array groupState(TYPE_CODE_MAP); + + bool ok = state.getArray(GROUP_STATE, groupState); + if (!ok) { + QPID_LOG(error, "Unable to find message group state information for queue \"" << + queue->getName() << "\": cluster inconsistency error!"); + return; + } + + for (framing::Array::const_iterator g = groupState.begin(); + g != groupState.end(); ++g) { + framing::FieldTable group; + ok = framing::getEncodedValue<FieldTable>(*g, group); + if (!ok) { + QPID_LOG(error, "Invalid message group state information for queue \"" << + queue->getName() << "\": table encoding error!"); + return; + } + MessageGroupManager::GroupState state; + if (!group.isSet(GROUP_NAME) || !group.isSet(GROUP_OWNER) || !group.isSet(GROUP_ACQUIRED_CT)) { + QPID_LOG(error, "Invalid message group state information for queue \"" << + queue->getName() << "\": fields missing error!"); + return; + } + state.group = group.getAsString(GROUP_NAME); + state.owner = group.getAsString(GROUP_OWNER); + state.acquired = group.getAsInt(GROUP_ACQUIRED_CT); + framing::Array positions(TYPE_CODE_UINT32); + ok = group.getArray(GROUP_POSITIONS, positions); + if (!ok) { + QPID_LOG(error, "Invalid message group state information for queue \"" << + queue->getName() << "\": position encoding error!"); + return; + } + + for (Array::const_iterator p = positions.begin(); p != positions.end(); ++p) + state.members.push_back((*p)->getIntegerValue<uint32_t, 4>()); + messageGroups[state.group] = state; + if (state.owned()) + consumers[state.owner]++; + else { + assert(state.members.size()); + freeGroups[state.members.front()] = &messageGroups[state.group]; + } + } + + QPID_LOG(debug, "Queue \"" << queue->getName() << "\": message group state replicated, key =" << groupIdHeader) +} diff --git a/qpid/cpp/src/qpid/broker/MessageGroupManager.h b/qpid/cpp/src/qpid/broker/MessageGroupManager.h index 0a1551f3ba..a9b73ade5d 100644 --- a/qpid/cpp/src/qpid/broker/MessageGroupManager.h +++ b/qpid/cpp/src/qpid/broker/MessageGroupManager.h @@ -24,7 +24,7 @@ /* for managing message grouping on Queues */ -#include "qpid/broker/QueueObserver.h" +#include "qpid/broker/StatefulQueueObserver.h" #include "qpid/broker/MessageAllocator.h" @@ -34,7 +34,7 @@ namespace broker { class QueueObserver; class MessageAllocator; -class MessageGroupManager : public QueueObserver, public MessageAllocator +class MessageGroupManager : public StatefulQueueObserver, public MessageAllocator { const std::string groupIdHeader; // msg header holding group identifier const unsigned int timestamp; // mark messages with timestamp if set @@ -55,8 +55,9 @@ class MessageGroupManager : public QueueObserver, public MessageAllocator typedef std::map<std::string, uint32_t> Consumers; // count of owned groups typedef std::map<framing::SequenceNumber, struct GroupState *> GroupFifo; + // note: update getState()/setState() when changing this object's state implementation GroupMap messageGroups; // index: group name - GroupFifo freeGroups; // ordered by oldest free msg + GroupFifo freeGroups; // ordered by oldest free msg Consumers consumers; // index: consumer name static const std::string qpidMessageGroupKey; @@ -95,13 +96,17 @@ class MessageGroupManager : public QueueObserver, public MessageAllocator static boost::shared_ptr<MessageGroupManager> create( Queue *q, const qpid::framing::FieldTable& settings ); MessageGroupManager(const std::string& header, Queue *q, unsigned int _timestamp=0 ) - : QueueObserver(), MessageAllocator(q), groupIdHeader( header ), timestamp(_timestamp) {} + : StatefulQueueObserver(std::string("MessageGroupManager:") + header), MessageAllocator(q), + groupIdHeader( header ), timestamp(_timestamp) {} void enqueued( const QueuedMessage& qm ); void acquired( const QueuedMessage& qm ); void requeued( const QueuedMessage& qm ); void dequeued( const QueuedMessage& qm ); void consumerAdded( const Consumer& ); void consumerRemoved( const Consumer& ); + void getState(qpid::framing::FieldTable& state ) const; + void setState(const qpid::framing::FieldTable&); + bool nextConsumableMessage( Consumer::shared_ptr& c, QueuedMessage& next, const sys::Mutex::ScopedLock&); // uses default nextBrowsableMessage() diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py index 807e9508c3..13f4f716da 100755 --- a/qpid/cpp/src/tests/cluster_tests.py +++ b/qpid/cpp/src/tests/cluster_tests.py @@ -1035,6 +1035,76 @@ class LongTests(BrokerTest): receiver.connection.detach() logger.setLevel(log_level) + def test_msg_group_failover(self): + """Test fail-over during continuous send-receive of grouped messages. + """ + + class GroupedTrafficGenerator(Thread): + def __init__(self, url, queue, group_key): + Thread.__init__(self) + self.url = url + self.queue = queue + self.group_key = group_key + self.status = -1 + + def run(self): + # generate traffic for approx 10 seconds (2011msgs / 200 per-sec) + cmd = ["msg_group_test", + "--broker=%s" % self.url, + "--address=%s" % self.queue, + "--connection-options={%s}" % (Cluster.CONNECTION_OPTIONS), + "--group-key=%s" % self.group_key, + "--receivers=2", + "--senders=3", + "--messages=2011", + "--send-rate=200", + "--capacity=11", + "--ack-frequency=23", + "--allow-duplicates", + "--group-size=37", + "--randomize-group-size", + "--interleave=13"] + # "--trace"] + self.generator = Popen( cmd ); + self.status = self.generator.wait() + return self.status + + def results(self): + self.join(timeout=30) # 3x assumed duration + if self.isAlive(): return -1 + return self.status + + # Original cluster will all be killed so expect exit with failure + cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL, args=["-t"]) + for b in cluster: b.ready() # Wait for brokers to be ready + + # create a queue with rather draconian flow control settings + ssn0 = cluster[0].connect().session() + s0 = ssn0.sender("test-group-q; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.group_header_key':'group-id'}}}}") + + + # Kill original brokers, start new ones for the duration. + endtime = time.time() + self.duration(); + i = 0 + while time.time() < endtime: + traffic = GroupedTrafficGenerator( cluster[i].host_port(), + "test-group-q", "group-id" ) + traffic.start() + time.sleep(1) + + for x in range(2): + for b in cluster[i:]: b.ready() # Check if any broker crashed. + cluster[i].kill() + i += 1 + b = cluster.start(expect=EXPECT_EXIT_FAIL) + time.sleep(1) + + # wait for traffic to finish, verify success + self.assertEqual(0, traffic.results()) + + for i in range(i, len(cluster)): cluster[i].kill() + + class StoreTests(BrokerTest): """ Cluster tests that can only be run if there is a store available. diff --git a/qpid/cpp/src/tests/msg_group_test.cpp b/qpid/cpp/src/tests/msg_group_test.cpp index 3a458ad569..6b9d09b89a 100644 --- a/qpid/cpp/src/tests/msg_group_test.cpp +++ b/qpid/cpp/src/tests/msg_group_test.cpp @@ -67,6 +67,9 @@ struct Options : public qpid::Options bool randomizeSize; bool stickyConsumer; uint timeout; + uint interleave; + std::string prefix; + uint sendRate; Options(const std::string& argv0=std::string()) : qpid::Options("Options"), @@ -86,11 +89,13 @@ struct Options : public qpid::Options allowDuplicates(false), randomizeSize(false), stickyConsumer(false), - timeout(10) + timeout(10), + interleave(1), + sendRate(0) { addOptions() ("ack-frequency", qpid::optValue(ackFrequency, "N"), "Ack frequency (0 implies none of the messages will get accepted)") - ("address,a", qpid::optValue(address, "ADDRESS"), "address to receive from") + ("address,a", qpid::optValue(address, "ADDRESS"), "address to send and receive from") ("allow-duplicates", qpid::optValue(allowDuplicates), "Ignore the delivery of duplicated messages") ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to") ("capacity", qpid::optValue(capacity, "N"), "Pre-fetch window (0 implies no pre-fetch)") @@ -98,10 +103,13 @@ struct Options : public qpid::Options ("durable", qpid::optValue(durable, "yes|no"), "Mark messages as durable.") ("failover-updates", qpid::optValue(failoverUpdates), "Listen for membership updates distributed via amq.failover") ("group-key", qpid::optValue(groupKey, "KEY"), "Key of the message header containing the group identifier.") + ("group-prefix", qpid::optValue(prefix, "STRING"), "Add 'prefix' to the start of all generated group identifiers.") ("group-size", qpid::optValue(groupSize, "N"), "Number of messages per a group.") + ("interleave", qpid::optValue(interleave, "N"), "Simultaineously interleave messages from N different groups.") ("messages,m", qpid::optValue(messages, "N"), "Number of messages to send per each sender.") ("receivers,r", qpid::optValue(receivers, "N"), "Number of message consumers.") ("randomize-group-size", qpid::optValue(randomizeSize), "Randomize the number of messages per group to [1...group-size].") + ("send-rate", qpid::optValue(sendRate,"N"), "Send at rate of N messages/second. 0 means send as fast as possible.") ("senders,s", qpid::optValue(senders, "N"), "Number of message producers.") ("sticky-consumers", qpid::optValue(stickyConsumer), "If set, verify that all messages in a group are consumed by the same client [TBD].") ("timeout", qpid::optValue(timeout, "N"), "Fail with a stall error should all consumers remain idle for timeout seconds.") @@ -181,12 +189,12 @@ public: // now verify SequenceMap::iterator s = sequenceMap.find(groupId); if (s == sequenceMap.end()) { - sequenceMap[groupId] = 1; - totalMsgsConsumed++; QPID_LOG(debug, "Client " << client << " thinks this is the first message from group " << groupId << ":" << sequence); - return sequence == 0; - } - if (sequence < s->second) { + // if duplication allowed, it is possible that the last msg(s) of an old sequence are redelivered on reconnect. + // in this case, set the sequence from the first msg. + sequenceMap[groupId] = (allowDuplicates) ? sequence : 0; + s = sequenceMap.find(groupId); + } else if (sequence < s->second) { duplicateMsgs++; QPID_LOG(debug, "Client " << client << " thinks this message is a duplicate! " << groupId << ":" << sequence); return allowDuplicates; @@ -222,7 +230,9 @@ public: bool allMsgsConsumed() // true when done processing msgs { qpid::sys::Mutex::ScopedLock l(lock); - return totalMsgsConsumed == totalMsgs; + return (totalMsgsPublished >= totalMsgs) && + (totalMsgsConsumed >= totalMsgsPublished) && + sequenceMap.size() == 0; } uint getConsumedTotal() @@ -274,10 +284,85 @@ namespace { } }; - static Randomizer randomize; + static Randomizer randomizer; } +// tag each generated message with a group identifer +// +class GroupGenerator { + + const std::string groupPrefix; + const uint groupSize; + const bool randomizeSize; + const uint interleave; + + uint groupSuffix; + uint total; + + struct GroupState { + std::string id; + const uint size; + uint count; + GroupState( const std::string& i, const uint s ) + : id(i), size(s), count(0) {} + }; + typedef std::list<GroupState> GroupList; + GroupList groups; + GroupList::iterator current; + + // add a new group identifier to the list + void newGroup() { + std::ostringstream groupId(groupPrefix, ios_base::out|ios_base::ate); + groupId << std::string(":") << groupSuffix++; + uint size = (randomizeSize) ? randomizer(groupSize) : groupSize; + QPID_LOG(trace, "New group: GROUPID=[" << groupId.str() << "] size=" << size << " this=" << this); + GroupState group( groupId.str(), size ); + groups.push_back( group ); + } + +public: + GroupGenerator( const std::string& prefix, + const uint t, + const uint size, + const bool randomize, + const uint i) + : groupPrefix(prefix), groupSize(size), + randomizeSize(randomize), interleave(i), groupSuffix(0), total(t) + { + QPID_LOG(trace, "New group generator: PREFIX=[" << prefix << "] total=" << total << " size=" << size << " rand=" << randomize << " interleave=" << interleave << " this=" << this); + for (uint i = 0; i < 1 || i < interleave; ++i) { + newGroup(); + } + current = groups.begin(); + } + + bool genGroup(std::string& groupId, uint& seq, bool& eos) + { + if (!total) return false; + --total; + if (current == groups.end()) + current = groups.begin(); + groupId = current->id; + seq = current->count++; + if (current->count == current->size) { + QPID_LOG(trace, "Last msg for " << current->id << ", " << current->count << " this=" << this); + eos = true; + if (total >= interleave) { // need a new group to replace this one + newGroup(); + groups.erase(current++); + } else ++current; + } else { + ++current; + eos = total < interleave; // mark eos on the last message of each group + } + QPID_LOG(trace, "SENDING GROUPID=[" << groupId << "] seq=" << seq << " eos=" << eos << " this=" << this); + return true; + } +}; + + + class Client : public qpid::sys::Runnable { public: @@ -291,6 +376,7 @@ public: qpid::sys::Thread& getThread() { return thread; } const std::string getErrorMsg() { return error.str(); } void stop() {stopped = true;} + const std::string& getName() { return name; } protected: const std::string name; @@ -323,16 +409,15 @@ public: Message msg; uint count = 0; - while (!stopped && !checker.allMsgsConsumed()) { - + while (!stopped) { if (receiver.fetch(msg, Duration::SECOND)) { // msg retrieved - qpid::types::Variant::Map& properties = msg.getProperties(); - std::string groupId = properties[opts.groupKey]; uint groupSeq = properties[SN]; bool eof = properties[EOS]; + QPID_LOG(trace, "RECVING GROUPID=[" << groupId << "] seq=" << groupSeq << " eos=" << eof << " name=" << name); + qpid::sys::usleep(10); if (!checker.checkSequence( groupId, groupSeq, name )) { @@ -355,7 +440,8 @@ public: } // Clear out message properties & content for next iteration. msg = Message(); // TODO aconway 2010-12-01: should be done by fetch - } + } else if (checker.allMsgsConsumed()) // timed out, nothing else to do? + break; } session.acknowledge(); session.close(); @@ -367,6 +453,7 @@ public: connection.close(); } clientDone(); + QPID_LOG(trace, "Consuming client " << name << " completed."); } }; @@ -375,9 +462,13 @@ public: class Producer : public Client { GroupChecker& checker; + GroupGenerator generator; public: - Producer(const std::string& n, const Options& o, GroupChecker& c) : Client(n, o), checker(c) {}; + Producer(const std::string& n, const Options& o, GroupChecker& c) + : Client(n, o), checker(c), + generator( n, o.messages, o.groupSize, o.randomizeSize, o.interleave ) + {}; virtual ~Producer() {}; void run() @@ -392,32 +483,29 @@ public: if (opts.capacity) sender.setCapacity(opts.capacity); Message msg; msg.setDurable(opts.durable); + std::string groupId; + uint seq; + bool eos; uint sent = 0; - uint groupSeq = 0; - uint groupSize = opts.groupSize; - ostringstream group; - group << name << ":" << sent; - std::string groupId(group.str()); - while (!stopped && sent < opts.messages) { - ++sent; + qpid::sys::AbsTime start = qpid::sys::now(); + int64_t interval = 0; + if (opts.sendRate) interval = qpid::sys::TIME_SEC/opts.sendRate; + + while (!stopped && generator.genGroup(groupId, seq, eos)) { msg.getProperties()[opts.groupKey] = groupId; - msg.getProperties()[SN] = groupSeq++; - msg.getProperties()[EOS] = false; - checker.sendingSequence( groupId, groupSeq-1, (groupSeq == groupSize), name ); - if (groupSeq == groupSize) { - msg.getProperties()[EOS] = true; - // generate new group - ostringstream nextGroupId; - nextGroupId << name << ":" << sent; - groupId = nextGroupId.str(); - groupSeq = 0; - if (opts.randomizeSize) { - groupSize = randomize(opts.groupSize); - } - } + msg.getProperties()[SN] = seq; + msg.getProperties()[EOS] = eos; + checker.sendingSequence( groupId, seq, eos, name ); + sender.send(msg); - qpid::sys::usleep(10); + ++sent; + + if (opts.sendRate) { + qpid::sys::AbsTime waitTill(start, sent*interval); + int64_t delay = qpid::sys::Duration(qpid::sys::now(), waitTill); + if (delay > 0) qpid::sys::usleep(delay/qpid::sys::TIME_USEC); + } } session.sync(); session.close(); @@ -429,6 +517,7 @@ public: connection.close(); } clientDone(); + QPID_LOG(trace, "Producing client " << name << " completed."); } }; @@ -453,13 +542,13 @@ int main(int argc, char ** argv) // fire off the producers && consumers for (size_t j = 0; j < opts.senders; ++j) { ostringstream name; - name << "P_" << j; + name << opts.prefix << "P_" << j; clients.push_back(Client::shared_ptr(new Producer( name.str(), opts, state ))); clients.back()->getThread() = qpid::sys::Thread(*clients.back()); } for (size_t j = 0; j < opts.receivers; ++j) { ostringstream name; - name << "C_" << j; + name << opts.prefix << "C_" << j; clients.push_back(Client::shared_ptr(new Consumer( name.str(), opts, state ))); clients.back()->getThread() = qpid::sys::Thread(*clients.back()); } @@ -476,8 +565,9 @@ int main(int argc, char ** argv) done = true; for (std::vector<Client::shared_ptr>::iterator i = clients.begin(); i != clients.end(); ++i) { + QPID_LOG(debug, "Client " << (*i)->getName() << " state=" << (*i)->getState()); if ((*i)->getState() == Client::FAILURE) { - std::cerr << argv[0] << ": test failed with client error: " << (*i)->getErrorMsg() << std::endl; + QPID_LOG(error, argv[0] << ": test failed with client error: " << (*i)->getErrorMsg()); clientFailed = true; done = true; break; // exit test. @@ -505,7 +595,7 @@ int main(int argc, char ** argv) if (clientFailed) { status = 1; } else if (stalledTime >= opts.timeout) { - std::cerr << argv[0] << ": test failed due to stalled consumer." << std::endl; + QPID_LOG(error, argv[0] << ": test failed due to stalled consumer." ); status = 2; } @@ -517,10 +607,12 @@ int main(int argc, char ** argv) } if (opts.printReport && !status) state.print(std::cout); - } + } else status = 4; } catch(const std::exception& error) { - std::cerr << argv[0] << ": " << error.what() << std::endl; + QPID_LOG(error, argv[0] << ": " << error.what()); status = 3; } + QPID_LOG(trace, "TEST DONE [" << status << "]"); + return status; } diff --git a/qpid/cpp/src/tests/run_msg_group_tests b/qpid/cpp/src/tests/run_msg_group_tests index cae614ccff..ee4f4bef77 100755 --- a/qpid/cpp/src/tests/run_msg_group_tests +++ b/qpid/cpp/src/tests/run_msg_group_tests @@ -44,15 +44,15 @@ run_test() { declare -i i=0 declare -a tests tests=("qpid-config -a $BROKER_URL add queue $QUEUE_NAME --argument=qpid.group_header_key=${GROUP_KEY}" - "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 103 --group-size 13 --receivers 2 --senders 3 --capacity 3 --ack-frequency 7 --randomize-group-size" + "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 103 --group-size 13 --receivers 2 --senders 3 --capacity 3 --ack-frequency 7 --randomize-group-size --interleave 3" "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 103 --group-size 13 --receivers 2 --senders 3 --capacity 7 --ack-frequency 7 --randomize-group-size" "qpid-config -a $BROKER_URL add queue ${QUEUE_NAME}-two --argument=qpid.group_header_key=${GROUP_KEY}" "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 103 --group-size 13 --receivers 2 --senders 3 --capacity 7 --ack-frequency 3 --randomize-group-size" - "msg_group_test -b $BROKER_URL -a ${QUEUE_NAME}-two --group-key $GROUP_KEY --messages 103 --group-size 13 --receivers 2 --senders 3 --capacity 3 --ack-frequency 7 --randomize-group-size" + "msg_group_test -b $BROKER_URL -a ${QUEUE_NAME}-two --group-key $GROUP_KEY --messages 103 --group-size 13 --receivers 2 --senders 3 --capacity 3 --ack-frequency 7 --randomize-group-size --interleave 5" "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 59 --group-size 5 --receivers 2 --senders 3 --capacity 1 --ack-frequency 3 --randomize-group-size" "qpid-config -a $BROKER_URL del queue ${QUEUE_NAME}-two --force" "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 59 --group-size 3 --receivers 2 --senders 3 --capacity 1 --ack-frequency 1 --randomize-group-size" - "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 103 --group-size 13 --receivers 2 --senders 3 --capacity 47 --ack-frequency 79" + "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 211 --group-size 13 --receivers 2 --senders 3 --capacity 47 --ack-frequency 79 --interleave 53" "qpid-config -a $BROKER_URL del queue $QUEUE_NAME --force") while [ -n "${tests[i]}" ]; do |