summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-10-04 17:41:03 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-10-04 17:41:03 +0000
commitd59cf7064cf14338a2973e9b931ee8c8884b9983 (patch)
treea79e459769dd514eb49e2daeefca8bad3fd0143b
parent476f673216adabd0eb9cdef6b760c923c6faad2a (diff)
downloadqpid-python-d59cf7064cf14338a2973e9b931ee8c8884b9983.tar.gz
QPID-3346: add client and mgmt unit tests, fix bugs uncovered.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3346@1178873 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/MessageGroupManager.cpp78
-rw-r--r--qpid/cpp/src/qpid/broker/MessageGroupManager.h18
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp2
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py1
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py557
5 files changed, 616 insertions, 40 deletions
diff --git a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
index c26185cd3e..f576a866fc 100644
--- a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
+++ b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
@@ -90,8 +90,6 @@ void MessageGroupManager::requeued( const QueuedMessage& qm )
{
// @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
// issue: const-ness??
- // @todo KAG BUG - how to ensure requeue happens in the correct order?
- // @todo KAG BUG - if requeue is not in correct order - what do we do? throw?
std::string group( getGroupId(qm) );
GroupMap::iterator gs = messageGroups.find( group );
assert( gs != messageGroups.end() );
@@ -117,10 +115,21 @@ void MessageGroupManager::dequeued( const QueuedMessage& qm )
assert( gs != messageGroups.end() );
GroupState& state( gs->second );
assert( state.members.size() != 0 );
+ assert( state.acquired != 0 );
+ state.acquired -= 1;
// likely to be at or near begin() if dequeued in order
- {
- GroupState::PositionFifo::iterator pos = state.members.begin();
+ bool reFreeNeeded = false;
+ if (state.members.front() == qm.position) {
+ if (!state.owned()) {
+ // will be on the freeGroups list if mgmt is dequeueing rather than a consumer!
+ // if on freelist, it is indexed by first member, which is about to be removed!
+ unFree(state);
+ reFreeNeeded = true;
+ }
+ state.members.pop_front();
+ } else {
+ GroupState::PositionFifo::iterator pos = state.members.begin() + 1;
GroupState::PositionFifo::iterator end = state.members.end();
while (pos != end) {
if (*pos == qm.position) {
@@ -131,35 +140,37 @@ void MessageGroupManager::dequeued( const QueuedMessage& qm )
}
}
- assert( state.acquired != 0 );
- state.acquired -= 1;
uint32_t total = state.members.size();
if (total == 0) {
- if (!state.owned()) { // unlikely, but need to remove from the free list before erase
- unFree( state );
- }
QPID_LOG( trace, "group queue " << qName << ": deleting group id=" << gs->first);
messageGroups.erase( gs );
- } else {
- if (state.acquired == 0 && state.owned()) {
- QPID_LOG( trace, "group queue " << qName <<
- ": consumer name=" << state.owner << " released group id=" << gs->first);
- disown(state);
- }
+ } else if (state.acquired == 0 && state.owned()) {
+ QPID_LOG( trace, "group queue " << qName <<
+ ": consumer name=" << state.owner << " released group id=" << gs->first);
+ disown(state);
+ } else if (reFreeNeeded) {
+ disown(state);
}
QPID_LOG( trace, "group queue " << qName <<
": dequeued message from group id=" << group << " total=" << total );
}
-void MessageGroupManager::consumerAdded( const Consumer& c )
+void MessageGroupManager::consumerAdded( const Consumer& /*c*/ )
{
- assert(consumers.find(c.getName()) == consumers.end());
- consumers[c.getName()] = 0; // no groups owned yet
- QPID_LOG( trace, "group queue " << qName << ": added consumer, name=" << c.getName() );
+#if 0
+ // allow a re-subscribing consumer
+ if (consumers.find(c.getName()) == consumers.end()) {
+ consumers[c.getName()] = 0; // no groups owned yet
+ QPID_LOG( trace, "group queue " << qName << ": added consumer, name=" << c.getName() );
+ } else {
+ QPID_LOG( trace, "group queue " << qName << ": consumer re-subscribed, name=" << c.getName() );
+ }
+#endif
}
-void MessageGroupManager::consumerRemoved( const Consumer& c )
+void MessageGroupManager::consumerRemoved( const Consumer& /*c*/ )
{
+#if 0
const std::string& name(c.getName());
Consumers::iterator consumer = consumers.find(name);
assert(consumer != consumers.end());
@@ -170,14 +181,22 @@ void MessageGroupManager::consumerRemoved( const Consumer& c )
GroupState& state( gs->second );
if (state.owner == name) {
- --count;
- disown(state);
- QPID_LOG( trace, "group queue " << qName <<
- ": consumer name=" << name << " released group id=" << gs->first);
+ if (state.acquired == 0) {
+ --count;
+ disown(state);
+ QPID_LOG( trace, "group queue " << qName <<
+ ": consumer name=" << name << " released group id=" << gs->first);
+ }
}
}
- consumers.erase( consumer );
- QPID_LOG( trace, "group queue " << qName << ": removed consumer name=" << name );
+ if (count == 0) {
+ consumers.erase( consumer );
+ QPID_LOG( trace, "group queue " << qName << ": removed consumer name=" << name );
+ } else {
+ // don't release groups with outstanding acquired msgs - consumer may re-subscribe!
+ QPID_LOG( trace, "group queue " << qName << ": consumer name=" << name << " unsubscribed with outstanding messages.");
+ }
+#endif
}
@@ -196,9 +215,11 @@ bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, Queued
return false; // shouldn't happen - should find nextFree
}
} else { // no free groups available
+#if 0
if (consumers[c->getName()] == 0) { // and none currently owned
return false; // so nothing available to consume
}
+#endif
if (!messages.next( c->position, next ))
return false;
}
@@ -356,7 +377,7 @@ void MessageGroupManager::setState(const qpid::framing::FieldTable& state)
{
using namespace qpid::framing;
messageGroups.clear();
- consumers.clear();
+ //consumers.clear();
freeGroups.clear();
framing::Array groupState(TYPE_CODE_MAP);
@@ -398,7 +419,8 @@ void MessageGroupManager::setState(const qpid::framing::FieldTable& state)
state.members.push_back((*p)->getIntegerValue<uint32_t, 4>());
messageGroups[state.group] = state;
if (state.owned())
- consumers[state.owner]++;
+ //consumers[state.owner]++;
+ ;
else {
assert(state.members.size());
freeGroups[state.members.front()] = &messageGroups[state.group];
diff --git a/qpid/cpp/src/qpid/broker/MessageGroupManager.h b/qpid/cpp/src/qpid/broker/MessageGroupManager.h
index e55374f104..6ec8ffe801 100644
--- a/qpid/cpp/src/qpid/broker/MessageGroupManager.h
+++ b/qpid/cpp/src/qpid/broker/MessageGroupManager.h
@@ -42,7 +42,7 @@ class MessageGroupManager : public StatefulQueueObserver, public MessageAllocato
const std::string qName; // name of parent queue (for logs)
struct GroupState {
- typedef std::list<framing::SequenceNumber> PositionFifo;
+ typedef std::deque<framing::SequenceNumber> PositionFifo;
std::string group; // group identifier
std::string owner; // consumer with outstanding acquired messages
@@ -54,13 +54,13 @@ class MessageGroupManager : public StatefulQueueObserver, public MessageAllocato
bool owned() const {return !owner.empty();}
};
typedef std::map<std::string, struct GroupState> GroupMap;
- typedef std::map<std::string, uint32_t> Consumers; // count of owned groups
+ //typedef std::map<std::string, uint32_t> Consumers; // count of owned groups
typedef std::map<framing::SequenceNumber, struct GroupState *> GroupFifo;
// note: update getState()/setState() when changing this object's state implementation
GroupMap messageGroups; // index: group name
GroupFifo freeGroups; // ordered by oldest free msg
- Consumers consumers; // index: consumer name
+ //Consumers consumers; // index: consumer name
static const std::string qpidMessageGroupKey;
static const std::string qpidMessageGroupTimestamp;
@@ -76,21 +76,17 @@ class MessageGroupManager : public StatefulQueueObserver, public MessageAllocato
void own( GroupState& state, const std::string& owner )
{
state.owner = owner;
- consumers[state.owner]++;
+ //consumers[state.owner]++;
unFree( state );
}
void disown( GroupState& state )
{
- assert(consumers[state.owner]);
- consumers[state.owner]--;
+ //assert(consumers[state.owner]);
+ //consumers[state.owner]--;
state.owner.clear();
assert(state.members.size());
-#ifdef NDEBUG
+ assert(freeGroups.find(state.members.front()) == freeGroups.end());
freeGroups[state.members.front()] = &state;
-#else
- bool unique = freeGroups.insert(GroupFifo::value_type(state.members.front(), &state)).second;
- (void) unique; assert(unique);
-#endif
}
public:
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 56df6cb233..2f0735d41c 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -558,7 +558,7 @@ namespace {
MessageFilter* MessageFilter::create( const ::qpid::types::Variant::Map *filter )
{
using namespace qpid::types;
- if (filter) {
+ if (filter && !filter->empty()) {
Variant::Map::const_iterator i = filter->find(MessageFilter::typeKey);
if (i != filter->end()) {
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py b/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py
index 921786af22..7b779df5f4 100644
--- a/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py
+++ b/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py
@@ -33,3 +33,4 @@ from lvq import *
from priority import *
from threshold import *
from extensions import *
+from msg_groups import *
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py b/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py
index 11b2caad08..e49a997a54 100644
--- a/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py
+++ b/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py
@@ -19,6 +19,7 @@
from qpid.messaging import *
from qpid.tests.messaging import Base
+import qmf.console
from time import sleep
#
@@ -360,8 +361,564 @@ class MultiConsumerMsgGroupTests(Base):
except Empty:
pass
+ def test_transaction(self):
+ """ Verify behavior when using transactions.
+ """
+ snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
+ " node: {x-declare: {arguments:" +
+ " {'qpid.group_header_key':'THE-GROUP'}}}}")
+
+ groups = ["A","A","B","B","A","B"]
+ messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
+ index = 0
+ for m in messages:
+ m.content['index'] = index
+ index += 1
+ snd.send(m)
+
+ s1 = self.conn.session(transactional=True)
+ c1 = s1.receiver("msg-group-q", options={"capacity":1})
+ s2 = self.conn.session(transactional=True)
+ c2 = s2.receiver("msg-group-q", options={"capacity":1})
+
+ # C1 gets group A
+ m1 = c1.fetch(0)
+ assert m1.properties['THE-GROUP'] == 'A'
+ assert m1.content['index'] == 0
+
+ # C2 gets group B
+ m2 = c2.fetch(0)
+ assert m2.properties['THE-GROUP'] == 'B'
+ assert m2.content['index'] == 2
+
+ s1.acknowledge(m1) # A-0 consumed, A group freed
+ s2.acknowledge(m2) # B-2 consumed, B group freed
+
+ s1.commit()
+ s2.rollback() # release B-2 and group B
+
+ ## Q: ["A1","B2","B3","A4","B5"]
+
+ # C2 should be able to get the next A
+ m3 = c2.fetch(0)
+ assert m3.properties['THE-GROUP'] == 'A'
+ assert m3.content['index'] == 1
+
+ # C1 should be able to get B-2
+ m4 = c1.fetch(0)
+ assert m4.properties['THE-GROUP'] == 'B'
+ assert m4.content['index'] == 2
+
+ s2.acknowledge(m3) # C2 consumes A-1
+ s1.acknowledge(m4) # C1 consumes B-2
+ s1.commit() # C1 consume B-2 occurs, free group B
+
+ ## Q: [["A1",]"B3","A4","B5"]
+
+ # A-1 is still considered owned by C2, since the commit has yet to
+ # occur, so the next available to C1 would be B-3
+ m5 = c1.fetch(0) # B-3
+ assert m5.properties['THE-GROUP'] == 'B'
+ assert m5.content['index'] == 3
+
+ # and C2 should find A-4 available, since it owns the A group
+ m6 = c2.fetch(0) # A-4
+ assert m6.properties['THE-GROUP'] == 'A'
+ assert m6.content['index'] == 4
+
+ s2.acknowledge(m6) # C2 consumes A-4
+
+ # uh-oh, A-1 and A-4 released, along with A group
+ s2.rollback()
+
+ ## Q: ["A1",["B3"],"A4","B5"]
+ m7 = c1.fetch(0) # A-1 is found
+ assert m7.properties['THE-GROUP'] == 'A'
+ assert m7.content['index'] == 1
+
+ ## Q: [["A1"],["B3"],"A4","B5"]
+ # since C1 "owns" both A and B group, C2 should find nothing available
+ try:
+ m8 = c2.fetch(0)
+ assert False # should not get here
+ except Empty:
+ pass
+
+ # C1 next gets A4
+ m9 = c1.fetch(0)
+ assert m9.properties['THE-GROUP'] == 'A'
+ assert m9.content['index'] == 4
+
+ s1.acknowledge()
+
+ ## Q: [["A1"],["B3"],["A4"],"B5"]
+ # even though C1 acknowledges A1,B3, and A4, B5 is still considered
+ # owned as the commit has yet to take place
+ try:
+ m10 = c2.fetch(0)
+ assert False # should not get here
+ except Empty:
+ pass
+
+ # now A1,B3,A4 dequeued, B5 should be free
+ s1.commit()
+
+ ## Q: ["B5"]
+ m11 = c2.fetch(0)
+ assert m11.properties['THE-GROUP'] == 'B'
+ assert m11.content['index'] == 5
+
+ s2.acknowledge()
+ s2.commit()
+
+ def test_query(self):
+ """ Verify the queue query method against message groups
+ """
+ snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
+ " node: {x-declare: {arguments:" +
+ " {'qpid.group_header_key':'THE-GROUP'}}}}")
+
+ groups = ["A","B","C","A","B","C","A"]
+ messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
+ index = 0
+ for m in messages:
+ m.content['index'] = index
+ index += 1
+ snd.send(m)
+
+ s1 = self.setup_session()
+ c1 = s1.receiver("msg-group-q", options={"capacity":1})
+ s2 = self.setup_session()
+ c2 = s2.receiver("msg-group-q", options={"capacity":1})
+
+ m1 = c1.fetch(0)
+ m2 = c2.fetch(0)
+
+ # at this point, group A should be owned by C1, group B by C2, and
+ # group C should be available
+
+ # now setup a QMF session, so we can call methods
+ self.qmf_session = qmf.console.Session()
+ self.qmf_broker = self.qmf_session.addBroker(str(self.broker))
+ brokers = self.qmf_session.getObjects(_class="broker")
+ assert len(brokers) == 1
+ broker = brokers[0]
+
+ # verify the query method call's group information
+ rc = broker.query("queue", "msg-group-q")
+ assert rc.status == 0
+ assert rc.text == "OK"
+ results = rc.outArgs['results']
+ assert 'qpid.message_group_queue' in results
+ q_info = results['qpid.message_group_queue']
+ assert 'group_header_key' in q_info and q_info['group_header_key'] == "THE-GROUP"
+ assert 'group_state' in q_info and len(q_info['group_state']) == 3
+ for g_info in q_info['group_state']:
+ assert 'group_id' in g_info
+ if g_info['group_id'] == "A":
+ assert g_info['msg_count'] == 3
+ assert g_info['consumer'] != ""
+ elif g_info['group_id'] == "B":
+ assert g_info['msg_count'] == 2
+ assert g_info['consumer'] != ""
+ elif g_info['group_id'] == "C":
+ assert g_info['msg_count'] == 2
+ assert g_info['consumer'] == ""
+ else:
+ assert(False) # should never get here
+ self.qmf_session.delBroker(self.qmf_broker)
+
+ def test_purge_free(self):
+ """ Verify we can purge a queue of all messages of a given "unowned"
+ group.
+ """
+ snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
+ " node: {x-declare: {arguments:" +
+ " {'qpid.group_header_key':'THE-GROUP'}}}}")
+
+ groups = ["A","B","A","B","C","A"]
+ messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
+ index = 0
+ for m in messages:
+ m.content['index'] = index
+ index += 1
+ snd.send(m)
+
+ # now setup a QMF session, so we can call methods
+ self.qmf_session = qmf.console.Session()
+ self.qmf_broker = self.qmf_session.addBroker(str(self.broker))
+ queue = self.qmf_session.getObjects(_class="queue", name="msg-group-q")[0]
+ assert queue
+ msg_filter = { 'filter_type' : 'header_match_str',
+ 'filter_params' : { 'header_key' : "THE-GROUP",
+ 'header_value' : "B" }}
+ assert queue.msgDepth == 6
+ rc = queue.purge(0, msg_filter)
+ assert rc.status == 0
+ queue.update()
+ assert queue.msgDepth == 4
+
+ # verify all B's removed....
+ s2 = self.setup_session()
+ b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":1})
+ count = 0
+ try:
+ while True:
+ m2 = b1.fetch(0)
+ assert m2.properties['THE-GROUP'] != 'B'
+ count += 1
+ except Empty:
+ pass
+ assert count == 4
+
+ self.qmf_session.delBroker(self.qmf_broker)
+
+ def test_purge_acquired(self):
+ """ Verify we can purge messages from an acquired group.
+ """
+ snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
+ " node: {x-declare: {arguments:" +
+ " {'qpid.group_header_key':'THE-GROUP'}}}}")
+
+ groups = ["A","B","A","B","C","A"]
+ messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
+ index = 0
+ for m in messages:
+ m.content['index'] = index
+ index += 1
+ snd.send(m)
+
+ # acquire group "A"
+ s1 = self.setup_session()
+ c1 = s1.receiver("msg-group-q", options={"capacity":1})
+ m1 = c1.fetch(0)
+ assert m1.properties['THE-GROUP'] == 'A'
+ assert m1.content['index'] == 0
+
+ # now setup a QMF session, so we can purge group A
+ self.qmf_session = qmf.console.Session()
+ self.qmf_broker = self.qmf_session.addBroker(str(self.broker))
+ queue = self.qmf_session.getObjects(_class="queue", name="msg-group-q")[0]
+ assert queue
+ msg_filter = { 'filter_type' : 'header_match_str',
+ 'filter_params' : { 'header_key' : "THE-GROUP",
+ 'header_value' : "A" }}
+ assert queue.msgDepth == 6
+ rc = queue.purge(0, msg_filter)
+ assert rc.status == 0
+ queue.update()
+ queue.msgDepth == 4 # the pending acquired A still counts!
+
+ # verify all other A's removed....
+ s2 = self.setup_session()
+ b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":1})
+ count = 0
+ try:
+ while True:
+ m2 = b1.fetch(0)
+ assert m2.properties['THE-GROUP'] != 'A'
+ count += 1
+ except Empty:
+ pass
+ assert count == 3 # only 3 really available
+ s1.acknowledge() # ack the consumed A-0
+ self.qmf_session.delBroker(self.qmf_broker)
+
+ def test_purge_count(self):
+ """ Verify we can purge a fixed number of messages from an acquired
+ group.
+ """
+ snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
+ " node: {x-declare: {arguments:" +
+ " {'qpid.group_header_key':'THE-GROUP'}}}}")
+
+ groups = ["A","B","A","B","C","A"]
+ messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
+ index = 0
+ for m in messages:
+ m.content['index'] = index
+ index += 1
+ snd.send(m)
+
+ # acquire group "A"
+ s1 = self.setup_session()
+ c1 = s1.receiver("msg-group-q", options={"capacity":1})
+ m1 = c1.fetch(0)
+ assert m1.properties['THE-GROUP'] == 'A'
+ assert m1.content['index'] == 0
+
+ # now setup a QMF session, so we can purge group A
+ self.qmf_session = qmf.console.Session()
+ self.qmf_broker = self.qmf_session.addBroker(str(self.broker))
+ queue = self.qmf_session.getObjects(_class="queue", name="msg-group-q")[0]
+ assert queue
+ msg_filter = { 'filter_type' : 'header_match_str',
+ 'filter_params' : { 'header_key' : "THE-GROUP",
+ 'header_value' : "A" }}
+ assert queue.msgDepth == 6
+ rc = queue.purge(1, msg_filter)
+ assert rc.status == 0
+ queue.update()
+ queue.msgDepth == 5 # the pending acquired A still counts!
+
+ # verify all other A's removed....
+ s2 = self.setup_session()
+ b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":1})
+ count = 0
+ a_count = 0
+ try:
+ while True:
+ m2 = b1.fetch(0)
+ if m2.properties['THE-GROUP'] != 'A':
+ count += 1
+ else:
+ a_count += 1
+ except Empty:
+ pass
+ assert count == 3 # non-A's
+ assert a_count == 1 # and one is an A
+ s1.acknowledge() # ack the consumed A-0
+ self.qmf_session.delBroker(self.qmf_broker)
+
+ def test_move_all(self):
+ """ Verify we can move messages from an acquired group.
+ """
+ snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
+ " node: {x-declare: {arguments:" +
+ " {'qpid.group_header_key':'THE-GROUP'}}}}")
+
+ groups = ["A","B","A","B","C","A"]
+ messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
+ index = 0
+ for m in messages:
+ m.content['index'] = index
+ index += 1
+ snd.send(m)
+
+ # set up destination queue
+ rcvr = self.ssn.receiver("dest-q; {create:always, delete:receiver," +
+ " node: {x-declare: {arguments:" +
+ " {'qpid.group_header_key':'THE-GROUP'}}}}")
+
+ # acquire group "A"
+ s1 = self.setup_session()
+ c1 = s1.receiver("msg-group-q", options={"capacity":1})
+ m1 = c1.fetch(0)
+ assert m1.properties['THE-GROUP'] == 'A'
+ assert m1.content['index'] == 0
+
+ # now setup a QMF session, so we can move what's left of group A
+ self.qmf_session = qmf.console.Session()
+ self.qmf_broker = self.qmf_session.addBroker(str(self.broker))
+ brokers = self.qmf_session.getObjects(_class="broker")
+ assert len(brokers) == 1
+ broker = brokers[0]
+ msg_filter = { 'filter_type' : 'header_match_str',
+ 'filter_params' : { 'header_key' : "THE-GROUP",
+ 'header_value' : "A" }}
+ rc = broker.queueMoveMessages("msg-group-q", "dest-q", 0, msg_filter)
+ assert rc.status == 0
+
+ # verify all other A's removed from msg-group-q
+ s2 = self.setup_session()
+ b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":1})
+ count = 0
+ try:
+ while True:
+ m2 = b1.fetch(0)
+ assert m2.properties['THE-GROUP'] != 'A'
+ count += 1
+ except Empty:
+ pass
+ assert count == 3 # only 3 really available
+
+ # verify the moved A's are at the dest-q
+ s2 = self.setup_session()
+ b1 = s2.receiver("dest-q; {mode: browse}", options={"capacity":1})
+ count = 0
+ try:
+ while True:
+ m2 = b1.fetch(0)
+ assert m2.properties['THE-GROUP'] == 'A'
+ assert m2.content['index'] == 2 or m2.content['index'] == 5
+ count += 1
+ except Empty:
+ pass
+ assert count == 2 # two A's moved
+
+ s1.acknowledge() # ack the consumed A-0
+ self.qmf_session.delBroker(self.qmf_broker)
+
+ def test_move_count(self):
+ """ Verify we can move a fixed number of messages from an acquired group.
+ """
+ snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
+ " node: {x-declare: {arguments:" +
+ " {'qpid.group_header_key':'THE-GROUP'}}}}")
+
+ groups = ["A","B","A","B","C","A"]
+ messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
+ index = 0
+ for m in messages:
+ m.content['index'] = index
+ index += 1
+ snd.send(m)
+
+ # set up destination queue
+ rcvr = self.ssn.receiver("dest-q; {create:always, delete:receiver," +
+ " node: {x-declare: {arguments:" +
+ " {'qpid.group_header_key':'THE-GROUP'}}}}")
+
+ # now setup a QMF session, so we can move group B
+ self.qmf_session = qmf.console.Session()
+ self.qmf_broker = self.qmf_session.addBroker(str(self.broker))
+ brokers = self.qmf_session.getObjects(_class="broker")
+ assert len(brokers) == 1
+ broker = brokers[0]
+ msg_filter = { 'filter_type' : 'header_match_str',
+ 'filter_params' : { 'header_key' : "THE-GROUP",
+ 'header_value' : "B" }}
+ rc = broker.queueMoveMessages("msg-group-q", "dest-q", 3, msg_filter)
+ assert rc.status == 0
+
+ # verify all B's removed from msg-group-q
+ s2 = self.setup_session()
+ b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":1})
+ count = 0
+ try:
+ while True:
+ m2 = b1.fetch(0)
+ assert m2.properties['THE-GROUP'] != 'B'
+ count += 1
+ except Empty:
+ pass
+ assert count == 4
+
+ # verify the moved B's are at the dest-q
+ s2 = self.setup_session()
+ b1 = s2.receiver("dest-q; {mode: browse}", options={"capacity":1})
+ count = 0
+ try:
+ while True:
+ m2 = b1.fetch(0)
+ assert m2.properties['THE-GROUP'] == 'B'
+ assert m2.content['index'] == 1 or m2.content['index'] == 3
+ count += 1
+ except Empty:
+ pass
+ assert count == 2
+ self.qmf_session.delBroker(self.qmf_broker)
+ def test_reroute(self):
+ """ Verify we can reroute messages from an acquired group.
+ """
+ snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
+ " node: {x-declare: {arguments:" +
+ " {'qpid.group_header_key':'THE-GROUP'}}}}")
+
+ groups = ["A","B","A","B","C","A"]
+ messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
+ index = 0
+ for m in messages:
+ m.content['index'] = index
+ index += 1
+ snd.send(m)
+
+ # create a topic exchange for the reroute
+ rcvr = self.ssn.receiver("reroute-q; {create: always, delete:receiver," +
+ " node: {type: topic}}")
+
+ # acquire group "A"
+ s1 = self.setup_session()
+ c1 = s1.receiver("msg-group-q", options={"capacity":1})
+ m1 = c1.fetch(0)
+ assert m1.properties['THE-GROUP'] == 'A'
+ assert m1.content['index'] == 0
+
+ # now setup a QMF session, so we can reroute group A
+ self.qmf_session = qmf.console.Session()
+ self.qmf_broker = self.qmf_session.addBroker(str(self.broker))
+ queue = self.qmf_session.getObjects(_class="queue", name="msg-group-q")[0]
+ assert queue
+ msg_filter = { 'filter_type' : 'header_match_str',
+ 'filter_params' : { 'header_key' : "THE-GROUP",
+ 'header_value' : "A" }}
+ assert queue.msgDepth == 6
+ rc = queue.reroute(0, False, "reroute-q", msg_filter)
+ assert rc.status == 0
+ queue.update()
+ queue.msgDepth == 4 # the pending acquired A still counts!
+
+ # verify all other A's removed....
+ s2 = self.setup_session()
+ b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":1})
+ count = 0
+ try:
+ while True:
+ m2 = b1.fetch(0)
+ assert m2.properties['THE-GROUP'] != 'A'
+ count += 1
+ except Empty:
+ pass
+ assert count == 3 # only 3 really available
+
+ # and what of reroute-q?
+ count = 0
+ try:
+ while True:
+ m2 = rcvr.fetch(0)
+ assert m2.properties['THE-GROUP'] == 'A'
+ assert m2.content['index'] == 2 or m2.content['index'] == 5
+ count += 1
+ except Empty:
+ pass
+ assert count == 2
+
+ s1.acknowledge() # ack the consumed A-0
+ self.qmf_session.delBroker(self.qmf_broker)
+
+ def test_queue_delete(self):
+ """ Test deleting a queue while consumers are active.
+ """
+
+ ## Create a msg group queue
+
+ snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
+ " node: {x-declare: {arguments:" +
+ " {'qpid.group_header_key':'THE-GROUP'}}}}")
+
+ groups = ["A","B","A","B","C"]
+ messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
+ index = 0
+ for m in messages:
+ m.content['index'] = index
+ index += 1
+ snd.send(m)
+
+ ## Queue = A-0, B-1, A-2, b-3, C-4
+ ## Owners= ---, ---, ---, ---, ---
+
+ # create consumers
+ s1 = self.setup_session()
+ c1 = s1.receiver("msg-group-q", options={"capacity":1})
+ s2 = self.setup_session()
+ c2 = s2.receiver("msg-group-q", options={"capacity":1})
+
+ # C1 should acquire A-0
+ m1 = c1.fetch(0);
+ assert m1.properties['THE-GROUP'] == 'A'
+ assert m1.content['index'] == 0
+
+ # c2 acquires B-1
+ m2 = c2.fetch(0)
+ assert m2.properties['THE-GROUP'] == 'B'
+ assert m2.content['index'] == 1
+
+ # with group A and B owned, and C free, delete the
+ # queue
+ snd.close()
+ self.ssn.close()
class StickyConsumerMsgGroupTests(Base):
"""