diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-10-04 17:41:03 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-10-04 17:41:03 +0000 |
commit | d59cf7064cf14338a2973e9b931ee8c8884b9983 (patch) | |
tree | a79e459769dd514eb49e2daeefca8bad3fd0143b | |
parent | 476f673216adabd0eb9cdef6b760c923c6faad2a (diff) | |
download | qpid-python-d59cf7064cf14338a2973e9b931ee8c8884b9983.tar.gz |
QPID-3346: add client and mgmt unit tests, fix bugs uncovered.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3346@1178873 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/MessageGroupManager.cpp | 78 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/MessageGroupManager.h | 18 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 2 | ||||
-rw-r--r-- | qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py | 1 | ||||
-rw-r--r-- | qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py | 557 |
5 files changed, 616 insertions, 40 deletions
diff --git a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp index c26185cd3e..f576a866fc 100644 --- a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp +++ b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp @@ -90,8 +90,6 @@ void MessageGroupManager::requeued( const QueuedMessage& qm ) { // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage // issue: const-ness?? - // @todo KAG BUG - how to ensure requeue happens in the correct order? - // @todo KAG BUG - if requeue is not in correct order - what do we do? throw? std::string group( getGroupId(qm) ); GroupMap::iterator gs = messageGroups.find( group ); assert( gs != messageGroups.end() ); @@ -117,10 +115,21 @@ void MessageGroupManager::dequeued( const QueuedMessage& qm ) assert( gs != messageGroups.end() ); GroupState& state( gs->second ); assert( state.members.size() != 0 ); + assert( state.acquired != 0 ); + state.acquired -= 1; // likely to be at or near begin() if dequeued in order - { - GroupState::PositionFifo::iterator pos = state.members.begin(); + bool reFreeNeeded = false; + if (state.members.front() == qm.position) { + if (!state.owned()) { + // will be on the freeGroups list if mgmt is dequeueing rather than a consumer! + // if on freelist, it is indexed by first member, which is about to be removed! + unFree(state); + reFreeNeeded = true; + } + state.members.pop_front(); + } else { + GroupState::PositionFifo::iterator pos = state.members.begin() + 1; GroupState::PositionFifo::iterator end = state.members.end(); while (pos != end) { if (*pos == qm.position) { @@ -131,35 +140,37 @@ void MessageGroupManager::dequeued( const QueuedMessage& qm ) } } - assert( state.acquired != 0 ); - state.acquired -= 1; uint32_t total = state.members.size(); if (total == 0) { - if (!state.owned()) { // unlikely, but need to remove from the free list before erase - unFree( state ); - } QPID_LOG( trace, "group queue " << qName << ": deleting group id=" << gs->first); messageGroups.erase( gs ); - } else { - if (state.acquired == 0 && state.owned()) { - QPID_LOG( trace, "group queue " << qName << - ": consumer name=" << state.owner << " released group id=" << gs->first); - disown(state); - } + } else if (state.acquired == 0 && state.owned()) { + QPID_LOG( trace, "group queue " << qName << + ": consumer name=" << state.owner << " released group id=" << gs->first); + disown(state); + } else if (reFreeNeeded) { + disown(state); } QPID_LOG( trace, "group queue " << qName << ": dequeued message from group id=" << group << " total=" << total ); } -void MessageGroupManager::consumerAdded( const Consumer& c ) +void MessageGroupManager::consumerAdded( const Consumer& /*c*/ ) { - assert(consumers.find(c.getName()) == consumers.end()); - consumers[c.getName()] = 0; // no groups owned yet - QPID_LOG( trace, "group queue " << qName << ": added consumer, name=" << c.getName() ); +#if 0 + // allow a re-subscribing consumer + if (consumers.find(c.getName()) == consumers.end()) { + consumers[c.getName()] = 0; // no groups owned yet + QPID_LOG( trace, "group queue " << qName << ": added consumer, name=" << c.getName() ); + } else { + QPID_LOG( trace, "group queue " << qName << ": consumer re-subscribed, name=" << c.getName() ); + } +#endif } -void MessageGroupManager::consumerRemoved( const Consumer& c ) +void MessageGroupManager::consumerRemoved( const Consumer& /*c*/ ) { +#if 0 const std::string& name(c.getName()); Consumers::iterator consumer = consumers.find(name); assert(consumer != consumers.end()); @@ -170,14 +181,22 @@ void MessageGroupManager::consumerRemoved( const Consumer& c ) GroupState& state( gs->second ); if (state.owner == name) { - --count; - disown(state); - QPID_LOG( trace, "group queue " << qName << - ": consumer name=" << name << " released group id=" << gs->first); + if (state.acquired == 0) { + --count; + disown(state); + QPID_LOG( trace, "group queue " << qName << + ": consumer name=" << name << " released group id=" << gs->first); + } } } - consumers.erase( consumer ); - QPID_LOG( trace, "group queue " << qName << ": removed consumer name=" << name ); + if (count == 0) { + consumers.erase( consumer ); + QPID_LOG( trace, "group queue " << qName << ": removed consumer name=" << name ); + } else { + // don't release groups with outstanding acquired msgs - consumer may re-subscribe! + QPID_LOG( trace, "group queue " << qName << ": consumer name=" << name << " unsubscribed with outstanding messages."); + } +#endif } @@ -196,9 +215,11 @@ bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, Queued return false; // shouldn't happen - should find nextFree } } else { // no free groups available +#if 0 if (consumers[c->getName()] == 0) { // and none currently owned return false; // so nothing available to consume } +#endif if (!messages.next( c->position, next )) return false; } @@ -356,7 +377,7 @@ void MessageGroupManager::setState(const qpid::framing::FieldTable& state) { using namespace qpid::framing; messageGroups.clear(); - consumers.clear(); + //consumers.clear(); freeGroups.clear(); framing::Array groupState(TYPE_CODE_MAP); @@ -398,7 +419,8 @@ void MessageGroupManager::setState(const qpid::framing::FieldTable& state) state.members.push_back((*p)->getIntegerValue<uint32_t, 4>()); messageGroups[state.group] = state; if (state.owned()) - consumers[state.owner]++; + //consumers[state.owner]++; + ; else { assert(state.members.size()); freeGroups[state.members.front()] = &messageGroups[state.group]; diff --git a/qpid/cpp/src/qpid/broker/MessageGroupManager.h b/qpid/cpp/src/qpid/broker/MessageGroupManager.h index e55374f104..6ec8ffe801 100644 --- a/qpid/cpp/src/qpid/broker/MessageGroupManager.h +++ b/qpid/cpp/src/qpid/broker/MessageGroupManager.h @@ -42,7 +42,7 @@ class MessageGroupManager : public StatefulQueueObserver, public MessageAllocato const std::string qName; // name of parent queue (for logs) struct GroupState { - typedef std::list<framing::SequenceNumber> PositionFifo; + typedef std::deque<framing::SequenceNumber> PositionFifo; std::string group; // group identifier std::string owner; // consumer with outstanding acquired messages @@ -54,13 +54,13 @@ class MessageGroupManager : public StatefulQueueObserver, public MessageAllocato bool owned() const {return !owner.empty();} }; typedef std::map<std::string, struct GroupState> GroupMap; - typedef std::map<std::string, uint32_t> Consumers; // count of owned groups + //typedef std::map<std::string, uint32_t> Consumers; // count of owned groups typedef std::map<framing::SequenceNumber, struct GroupState *> GroupFifo; // note: update getState()/setState() when changing this object's state implementation GroupMap messageGroups; // index: group name GroupFifo freeGroups; // ordered by oldest free msg - Consumers consumers; // index: consumer name + //Consumers consumers; // index: consumer name static const std::string qpidMessageGroupKey; static const std::string qpidMessageGroupTimestamp; @@ -76,21 +76,17 @@ class MessageGroupManager : public StatefulQueueObserver, public MessageAllocato void own( GroupState& state, const std::string& owner ) { state.owner = owner; - consumers[state.owner]++; + //consumers[state.owner]++; unFree( state ); } void disown( GroupState& state ) { - assert(consumers[state.owner]); - consumers[state.owner]--; + //assert(consumers[state.owner]); + //consumers[state.owner]--; state.owner.clear(); assert(state.members.size()); -#ifdef NDEBUG + assert(freeGroups.find(state.members.front()) == freeGroups.end()); freeGroups[state.members.front()] = &state; -#else - bool unique = freeGroups.insert(GroupFifo::value_type(state.members.front(), &state)).second; - (void) unique; assert(unique); -#endif } public: diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 56df6cb233..2f0735d41c 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -558,7 +558,7 @@ namespace { MessageFilter* MessageFilter::create( const ::qpid::types::Variant::Map *filter ) { using namespace qpid::types; - if (filter) { + if (filter && !filter->empty()) { Variant::Map::const_iterator i = filter->find(MessageFilter::typeKey); if (i != filter->end()) { diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py b/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py index 921786af22..7b779df5f4 100644 --- a/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py +++ b/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py @@ -33,3 +33,4 @@ from lvq import * from priority import * from threshold import * from extensions import * +from msg_groups import * diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py b/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py index 11b2caad08..e49a997a54 100644 --- a/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py +++ b/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py @@ -19,6 +19,7 @@ from qpid.messaging import * from qpid.tests.messaging import Base +import qmf.console from time import sleep # @@ -360,8 +361,564 @@ class MultiConsumerMsgGroupTests(Base): except Empty: pass + def test_transaction(self): + """ Verify behavior when using transactions. + """ + snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," + + " node: {x-declare: {arguments:" + + " {'qpid.group_header_key':'THE-GROUP'}}}}") + + groups = ["A","A","B","B","A","B"] + messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups] + index = 0 + for m in messages: + m.content['index'] = index + index += 1 + snd.send(m) + + s1 = self.conn.session(transactional=True) + c1 = s1.receiver("msg-group-q", options={"capacity":1}) + s2 = self.conn.session(transactional=True) + c2 = s2.receiver("msg-group-q", options={"capacity":1}) + + # C1 gets group A + m1 = c1.fetch(0) + assert m1.properties['THE-GROUP'] == 'A' + assert m1.content['index'] == 0 + + # C2 gets group B + m2 = c2.fetch(0) + assert m2.properties['THE-GROUP'] == 'B' + assert m2.content['index'] == 2 + + s1.acknowledge(m1) # A-0 consumed, A group freed + s2.acknowledge(m2) # B-2 consumed, B group freed + + s1.commit() + s2.rollback() # release B-2 and group B + + ## Q: ["A1","B2","B3","A4","B5"] + + # C2 should be able to get the next A + m3 = c2.fetch(0) + assert m3.properties['THE-GROUP'] == 'A' + assert m3.content['index'] == 1 + + # C1 should be able to get B-2 + m4 = c1.fetch(0) + assert m4.properties['THE-GROUP'] == 'B' + assert m4.content['index'] == 2 + + s2.acknowledge(m3) # C2 consumes A-1 + s1.acknowledge(m4) # C1 consumes B-2 + s1.commit() # C1 consume B-2 occurs, free group B + + ## Q: [["A1",]"B3","A4","B5"] + + # A-1 is still considered owned by C2, since the commit has yet to + # occur, so the next available to C1 would be B-3 + m5 = c1.fetch(0) # B-3 + assert m5.properties['THE-GROUP'] == 'B' + assert m5.content['index'] == 3 + + # and C2 should find A-4 available, since it owns the A group + m6 = c2.fetch(0) # A-4 + assert m6.properties['THE-GROUP'] == 'A' + assert m6.content['index'] == 4 + + s2.acknowledge(m6) # C2 consumes A-4 + + # uh-oh, A-1 and A-4 released, along with A group + s2.rollback() + + ## Q: ["A1",["B3"],"A4","B5"] + m7 = c1.fetch(0) # A-1 is found + assert m7.properties['THE-GROUP'] == 'A' + assert m7.content['index'] == 1 + + ## Q: [["A1"],["B3"],"A4","B5"] + # since C1 "owns" both A and B group, C2 should find nothing available + try: + m8 = c2.fetch(0) + assert False # should not get here + except Empty: + pass + + # C1 next gets A4 + m9 = c1.fetch(0) + assert m9.properties['THE-GROUP'] == 'A' + assert m9.content['index'] == 4 + + s1.acknowledge() + + ## Q: [["A1"],["B3"],["A4"],"B5"] + # even though C1 acknowledges A1,B3, and A4, B5 is still considered + # owned as the commit has yet to take place + try: + m10 = c2.fetch(0) + assert False # should not get here + except Empty: + pass + + # now A1,B3,A4 dequeued, B5 should be free + s1.commit() + + ## Q: ["B5"] + m11 = c2.fetch(0) + assert m11.properties['THE-GROUP'] == 'B' + assert m11.content['index'] == 5 + + s2.acknowledge() + s2.commit() + + def test_query(self): + """ Verify the queue query method against message groups + """ + snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," + + " node: {x-declare: {arguments:" + + " {'qpid.group_header_key':'THE-GROUP'}}}}") + + groups = ["A","B","C","A","B","C","A"] + messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups] + index = 0 + for m in messages: + m.content['index'] = index + index += 1 + snd.send(m) + + s1 = self.setup_session() + c1 = s1.receiver("msg-group-q", options={"capacity":1}) + s2 = self.setup_session() + c2 = s2.receiver("msg-group-q", options={"capacity":1}) + + m1 = c1.fetch(0) + m2 = c2.fetch(0) + + # at this point, group A should be owned by C1, group B by C2, and + # group C should be available + + # now setup a QMF session, so we can call methods + self.qmf_session = qmf.console.Session() + self.qmf_broker = self.qmf_session.addBroker(str(self.broker)) + brokers = self.qmf_session.getObjects(_class="broker") + assert len(brokers) == 1 + broker = brokers[0] + + # verify the query method call's group information + rc = broker.query("queue", "msg-group-q") + assert rc.status == 0 + assert rc.text == "OK" + results = rc.outArgs['results'] + assert 'qpid.message_group_queue' in results + q_info = results['qpid.message_group_queue'] + assert 'group_header_key' in q_info and q_info['group_header_key'] == "THE-GROUP" + assert 'group_state' in q_info and len(q_info['group_state']) == 3 + for g_info in q_info['group_state']: + assert 'group_id' in g_info + if g_info['group_id'] == "A": + assert g_info['msg_count'] == 3 + assert g_info['consumer'] != "" + elif g_info['group_id'] == "B": + assert g_info['msg_count'] == 2 + assert g_info['consumer'] != "" + elif g_info['group_id'] == "C": + assert g_info['msg_count'] == 2 + assert g_info['consumer'] == "" + else: + assert(False) # should never get here + self.qmf_session.delBroker(self.qmf_broker) + + def test_purge_free(self): + """ Verify we can purge a queue of all messages of a given "unowned" + group. + """ + snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," + + " node: {x-declare: {arguments:" + + " {'qpid.group_header_key':'THE-GROUP'}}}}") + + groups = ["A","B","A","B","C","A"] + messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups] + index = 0 + for m in messages: + m.content['index'] = index + index += 1 + snd.send(m) + + # now setup a QMF session, so we can call methods + self.qmf_session = qmf.console.Session() + self.qmf_broker = self.qmf_session.addBroker(str(self.broker)) + queue = self.qmf_session.getObjects(_class="queue", name="msg-group-q")[0] + assert queue + msg_filter = { 'filter_type' : 'header_match_str', + 'filter_params' : { 'header_key' : "THE-GROUP", + 'header_value' : "B" }} + assert queue.msgDepth == 6 + rc = queue.purge(0, msg_filter) + assert rc.status == 0 + queue.update() + assert queue.msgDepth == 4 + + # verify all B's removed.... + s2 = self.setup_session() + b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":1}) + count = 0 + try: + while True: + m2 = b1.fetch(0) + assert m2.properties['THE-GROUP'] != 'B' + count += 1 + except Empty: + pass + assert count == 4 + + self.qmf_session.delBroker(self.qmf_broker) + + def test_purge_acquired(self): + """ Verify we can purge messages from an acquired group. + """ + snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," + + " node: {x-declare: {arguments:" + + " {'qpid.group_header_key':'THE-GROUP'}}}}") + + groups = ["A","B","A","B","C","A"] + messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups] + index = 0 + for m in messages: + m.content['index'] = index + index += 1 + snd.send(m) + + # acquire group "A" + s1 = self.setup_session() + c1 = s1.receiver("msg-group-q", options={"capacity":1}) + m1 = c1.fetch(0) + assert m1.properties['THE-GROUP'] == 'A' + assert m1.content['index'] == 0 + + # now setup a QMF session, so we can purge group A + self.qmf_session = qmf.console.Session() + self.qmf_broker = self.qmf_session.addBroker(str(self.broker)) + queue = self.qmf_session.getObjects(_class="queue", name="msg-group-q")[0] + assert queue + msg_filter = { 'filter_type' : 'header_match_str', + 'filter_params' : { 'header_key' : "THE-GROUP", + 'header_value' : "A" }} + assert queue.msgDepth == 6 + rc = queue.purge(0, msg_filter) + assert rc.status == 0 + queue.update() + queue.msgDepth == 4 # the pending acquired A still counts! + + # verify all other A's removed.... + s2 = self.setup_session() + b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":1}) + count = 0 + try: + while True: + m2 = b1.fetch(0) + assert m2.properties['THE-GROUP'] != 'A' + count += 1 + except Empty: + pass + assert count == 3 # only 3 really available + s1.acknowledge() # ack the consumed A-0 + self.qmf_session.delBroker(self.qmf_broker) + + def test_purge_count(self): + """ Verify we can purge a fixed number of messages from an acquired + group. + """ + snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," + + " node: {x-declare: {arguments:" + + " {'qpid.group_header_key':'THE-GROUP'}}}}") + + groups = ["A","B","A","B","C","A"] + messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups] + index = 0 + for m in messages: + m.content['index'] = index + index += 1 + snd.send(m) + + # acquire group "A" + s1 = self.setup_session() + c1 = s1.receiver("msg-group-q", options={"capacity":1}) + m1 = c1.fetch(0) + assert m1.properties['THE-GROUP'] == 'A' + assert m1.content['index'] == 0 + + # now setup a QMF session, so we can purge group A + self.qmf_session = qmf.console.Session() + self.qmf_broker = self.qmf_session.addBroker(str(self.broker)) + queue = self.qmf_session.getObjects(_class="queue", name="msg-group-q")[0] + assert queue + msg_filter = { 'filter_type' : 'header_match_str', + 'filter_params' : { 'header_key' : "THE-GROUP", + 'header_value' : "A" }} + assert queue.msgDepth == 6 + rc = queue.purge(1, msg_filter) + assert rc.status == 0 + queue.update() + queue.msgDepth == 5 # the pending acquired A still counts! + + # verify all other A's removed.... + s2 = self.setup_session() + b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":1}) + count = 0 + a_count = 0 + try: + while True: + m2 = b1.fetch(0) + if m2.properties['THE-GROUP'] != 'A': + count += 1 + else: + a_count += 1 + except Empty: + pass + assert count == 3 # non-A's + assert a_count == 1 # and one is an A + s1.acknowledge() # ack the consumed A-0 + self.qmf_session.delBroker(self.qmf_broker) + + def test_move_all(self): + """ Verify we can move messages from an acquired group. + """ + snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," + + " node: {x-declare: {arguments:" + + " {'qpid.group_header_key':'THE-GROUP'}}}}") + + groups = ["A","B","A","B","C","A"] + messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups] + index = 0 + for m in messages: + m.content['index'] = index + index += 1 + snd.send(m) + + # set up destination queue + rcvr = self.ssn.receiver("dest-q; {create:always, delete:receiver," + + " node: {x-declare: {arguments:" + + " {'qpid.group_header_key':'THE-GROUP'}}}}") + + # acquire group "A" + s1 = self.setup_session() + c1 = s1.receiver("msg-group-q", options={"capacity":1}) + m1 = c1.fetch(0) + assert m1.properties['THE-GROUP'] == 'A' + assert m1.content['index'] == 0 + + # now setup a QMF session, so we can move what's left of group A + self.qmf_session = qmf.console.Session() + self.qmf_broker = self.qmf_session.addBroker(str(self.broker)) + brokers = self.qmf_session.getObjects(_class="broker") + assert len(brokers) == 1 + broker = brokers[0] + msg_filter = { 'filter_type' : 'header_match_str', + 'filter_params' : { 'header_key' : "THE-GROUP", + 'header_value' : "A" }} + rc = broker.queueMoveMessages("msg-group-q", "dest-q", 0, msg_filter) + assert rc.status == 0 + + # verify all other A's removed from msg-group-q + s2 = self.setup_session() + b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":1}) + count = 0 + try: + while True: + m2 = b1.fetch(0) + assert m2.properties['THE-GROUP'] != 'A' + count += 1 + except Empty: + pass + assert count == 3 # only 3 really available + + # verify the moved A's are at the dest-q + s2 = self.setup_session() + b1 = s2.receiver("dest-q; {mode: browse}", options={"capacity":1}) + count = 0 + try: + while True: + m2 = b1.fetch(0) + assert m2.properties['THE-GROUP'] == 'A' + assert m2.content['index'] == 2 or m2.content['index'] == 5 + count += 1 + except Empty: + pass + assert count == 2 # two A's moved + + s1.acknowledge() # ack the consumed A-0 + self.qmf_session.delBroker(self.qmf_broker) + + def test_move_count(self): + """ Verify we can move a fixed number of messages from an acquired group. + """ + snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," + + " node: {x-declare: {arguments:" + + " {'qpid.group_header_key':'THE-GROUP'}}}}") + + groups = ["A","B","A","B","C","A"] + messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups] + index = 0 + for m in messages: + m.content['index'] = index + index += 1 + snd.send(m) + + # set up destination queue + rcvr = self.ssn.receiver("dest-q; {create:always, delete:receiver," + + " node: {x-declare: {arguments:" + + " {'qpid.group_header_key':'THE-GROUP'}}}}") + + # now setup a QMF session, so we can move group B + self.qmf_session = qmf.console.Session() + self.qmf_broker = self.qmf_session.addBroker(str(self.broker)) + brokers = self.qmf_session.getObjects(_class="broker") + assert len(brokers) == 1 + broker = brokers[0] + msg_filter = { 'filter_type' : 'header_match_str', + 'filter_params' : { 'header_key' : "THE-GROUP", + 'header_value' : "B" }} + rc = broker.queueMoveMessages("msg-group-q", "dest-q", 3, msg_filter) + assert rc.status == 0 + + # verify all B's removed from msg-group-q + s2 = self.setup_session() + b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":1}) + count = 0 + try: + while True: + m2 = b1.fetch(0) + assert m2.properties['THE-GROUP'] != 'B' + count += 1 + except Empty: + pass + assert count == 4 + + # verify the moved B's are at the dest-q + s2 = self.setup_session() + b1 = s2.receiver("dest-q; {mode: browse}", options={"capacity":1}) + count = 0 + try: + while True: + m2 = b1.fetch(0) + assert m2.properties['THE-GROUP'] == 'B' + assert m2.content['index'] == 1 or m2.content['index'] == 3 + count += 1 + except Empty: + pass + assert count == 2 + self.qmf_session.delBroker(self.qmf_broker) + def test_reroute(self): + """ Verify we can reroute messages from an acquired group. + """ + snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," + + " node: {x-declare: {arguments:" + + " {'qpid.group_header_key':'THE-GROUP'}}}}") + + groups = ["A","B","A","B","C","A"] + messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups] + index = 0 + for m in messages: + m.content['index'] = index + index += 1 + snd.send(m) + + # create a topic exchange for the reroute + rcvr = self.ssn.receiver("reroute-q; {create: always, delete:receiver," + + " node: {type: topic}}") + + # acquire group "A" + s1 = self.setup_session() + c1 = s1.receiver("msg-group-q", options={"capacity":1}) + m1 = c1.fetch(0) + assert m1.properties['THE-GROUP'] == 'A' + assert m1.content['index'] == 0 + + # now setup a QMF session, so we can reroute group A + self.qmf_session = qmf.console.Session() + self.qmf_broker = self.qmf_session.addBroker(str(self.broker)) + queue = self.qmf_session.getObjects(_class="queue", name="msg-group-q")[0] + assert queue + msg_filter = { 'filter_type' : 'header_match_str', + 'filter_params' : { 'header_key' : "THE-GROUP", + 'header_value' : "A" }} + assert queue.msgDepth == 6 + rc = queue.reroute(0, False, "reroute-q", msg_filter) + assert rc.status == 0 + queue.update() + queue.msgDepth == 4 # the pending acquired A still counts! + + # verify all other A's removed.... + s2 = self.setup_session() + b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":1}) + count = 0 + try: + while True: + m2 = b1.fetch(0) + assert m2.properties['THE-GROUP'] != 'A' + count += 1 + except Empty: + pass + assert count == 3 # only 3 really available + + # and what of reroute-q? + count = 0 + try: + while True: + m2 = rcvr.fetch(0) + assert m2.properties['THE-GROUP'] == 'A' + assert m2.content['index'] == 2 or m2.content['index'] == 5 + count += 1 + except Empty: + pass + assert count == 2 + + s1.acknowledge() # ack the consumed A-0 + self.qmf_session.delBroker(self.qmf_broker) + + def test_queue_delete(self): + """ Test deleting a queue while consumers are active. + """ + + ## Create a msg group queue + + snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," + + " node: {x-declare: {arguments:" + + " {'qpid.group_header_key':'THE-GROUP'}}}}") + + groups = ["A","B","A","B","C"] + messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups] + index = 0 + for m in messages: + m.content['index'] = index + index += 1 + snd.send(m) + + ## Queue = A-0, B-1, A-2, b-3, C-4 + ## Owners= ---, ---, ---, ---, --- + + # create consumers + s1 = self.setup_session() + c1 = s1.receiver("msg-group-q", options={"capacity":1}) + s2 = self.setup_session() + c2 = s2.receiver("msg-group-q", options={"capacity":1}) + + # C1 should acquire A-0 + m1 = c1.fetch(0); + assert m1.properties['THE-GROUP'] == 'A' + assert m1.content['index'] == 0 + + # c2 acquires B-1 + m2 = c2.fetch(0) + assert m2.properties['THE-GROUP'] == 'B' + assert m2.content['index'] == 1 + + # with group A and B owned, and C free, delete the + # queue + snd.close() + self.ssn.close() class StickyConsumerMsgGroupTests(Base): """ |