From e4dc56159ebad0801f3c7fdfed3e0f6138b42219 Mon Sep 17 00:00:00 2001 From: Kenneth Anthony Giusti Date: Wed, 5 Oct 2011 14:52:11 +0000 Subject: QPID-3346: allow configuration of the default group identifier git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3346@1179248 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/broker/Broker.cpp | 8 +++-- qpid/cpp/src/qpid/broker/Broker.h | 1 + qpid/cpp/src/qpid/broker/MessageGroupManager.cpp | 11 +++++-- qpid/cpp/src/qpid/broker/MessageGroupManager.h | 4 ++- .../src/py/qpid_tests/broker_0_10/msg_groups.py | 36 ++++++++++++++++++++++ 5 files changed, 54 insertions(+), 6 deletions(-) diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index b02b9fa818..bd94582d10 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -33,6 +33,7 @@ #include "qpid/broker/Link.h" #include "qpid/broker/ExpiryPolicy.h" #include "qpid/broker/QueueFlowLimit.h" +#include "qpid/broker/MessageGroupManager.h" #include "qmf/org/apache/qpid/broker/Package.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerCreate.h" @@ -123,7 +124,8 @@ Broker::Options::Options(const std::string& name) : qmf1Support(true), queueFlowStopRatio(80), queueFlowResumeRatio(70), - queueThresholdEventRatio(80) + queueThresholdEventRatio(80), + defaultMsgGroup("qpid.no-group") { int c = sys::SystemInfo::concurrency(); workerThreads=c+1; @@ -159,7 +161,8 @@ Broker::Options::Options(const std::string& name) : ("async-queue-events", optValue(asyncQueueEvents, "yes|no"), "Set Queue Events async, used for services like replication") ("default-flow-stop-threshold", optValue(queueFlowStopRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is activated.") ("default-flow-resume-threshold", optValue(queueFlowResumeRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is de-activated.") - ("default-event-threshold-ratio", optValue(queueThresholdEventRatio, "%age of limit"), "The ratio of any specified queue limit at which an event will be raised"); + ("default-event-threshold-ratio", optValue(queueThresholdEventRatio, "%age of limit"), "The ratio of any specified queue limit at which an event will be raised") + ("default-message-group", optValue(defaultMsgGroup, "GROUP-IDENTIFER"), "Group identifier to assign to messages delivered to a message group queue that do not contain an identifier."); } const std::string empty; @@ -250,6 +253,7 @@ Broker::Broker(const Broker::Options& conf) : Plugin::earlyInitAll(*this); QueueFlowLimit::setDefaults(conf.queueLimit, conf.queueFlowStopRatio, conf.queueFlowResumeRatio); + MessageGroupManager::setDefaults(conf.defaultMsgGroup); // If no plugin store module registered itself, set up the null store. if (NullMessageStore::isNullStore(store.get())) diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index 1612726693..8b347db3c0 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -121,6 +121,7 @@ public: uint queueFlowStopRatio; // producer flow control: on uint queueFlowResumeRatio; // producer flow control: off uint16_t queueThresholdEventRatio; + std::string defaultMsgGroup; private: std::string getHome(); diff --git a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp index 9c97a294a9..d4ca6af1d5 100644 --- a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp +++ b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp @@ -41,15 +41,14 @@ namespace { const std::string MessageGroupManager::qpidMessageGroupKey("qpid.group_header_key"); const std::string MessageGroupManager::qpidSharedGroup("qpid.shared_msg_group"); const std::string MessageGroupManager::qpidMessageGroupTimestamp("qpid.group_timestamp"); -const std::string MessageGroupManager::qpidMessageGroupDefault("qpid.no_group"); /** @todo KAG: make configurable in Broker options */ const std::string MessageGroupManager::getGroupId( const QueuedMessage& qm ) const { const qpid::framing::FieldTable* headers = qm.payload->getApplicationHeaders(); - if (!headers) return qpidMessageGroupDefault; + if (!headers) return defaultGroupId; qpid::framing::FieldTable::ValuePtr id = headers->get( groupIdHeader ); - if (!id || !id->convertsTo()) return qpidMessageGroupDefault; + if (!id || !id->convertsTo()) return defaultGroupId; return id->get(); } @@ -331,6 +330,12 @@ boost::shared_ptr MessageGroupManager::create( const std::s return empty; } +std::string MessageGroupManager::defaultGroupId; +void MessageGroupManager::setDefaults(const std::string& groupId) // static +{ + defaultGroupId = groupId; +} + /** Cluster replication: state map format: diff --git a/qpid/cpp/src/qpid/broker/MessageGroupManager.h b/qpid/cpp/src/qpid/broker/MessageGroupManager.h index 58464c484d..35bdda94d5 100644 --- a/qpid/cpp/src/qpid/broker/MessageGroupManager.h +++ b/qpid/cpp/src/qpid/broker/MessageGroupManager.h @@ -36,6 +36,8 @@ class MessageDistributor; class MessageGroupManager : public StatefulQueueObserver, public MessageDistributor { + static std::string defaultGroupId; // assigned if no group id header present + const std::string groupIdHeader; // msg header holding group identifier const unsigned int timestamp; // mark messages with timestamp if set Messages& messages; // parent Queue's in memory message container @@ -65,7 +67,6 @@ class MessageGroupManager : public StatefulQueueObserver, public MessageDistribu static const std::string qpidMessageGroupKey; static const std::string qpidSharedGroup; // if specified, one group can be consumed by multiple receivers static const std::string qpidMessageGroupTimestamp; - static const std::string qpidMessageGroupDefault; const std::string getGroupId( const QueuedMessage& qm ) const; void unFree( const GroupState& state ) @@ -92,6 +93,7 @@ class MessageGroupManager : public StatefulQueueObserver, public MessageDistribu public: + static QPID_BROKER_EXTERN void setDefaults(const std::string& groupId); static boost::shared_ptr create( const std::string& qName, Messages& messages, const qpid::framing::FieldTable& settings ); diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py b/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py index 1e7570f15b..611be0a6b0 100644 --- a/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py +++ b/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py @@ -936,6 +936,42 @@ class MultiConsumerMsgGroupTests(Base): snd.close() self.ssn.close() + def test_default_group_id(self): + """ Verify the queue assigns the default group id should a message + arrive without a group identifier. + """ + snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," + + " node: {x-declare: {arguments:" + + " {'qpid.group_header_key':'THE-GROUP'," + + "'qpid.shared_msg_group':1}}}}") + + m = Message(content={}, properties={"NO-GROUP-HEADER":"HA-HA"}) + snd.send(m) + + # now setup a QMF session, so we can call methods + self.qmf_session = qmf.console.Session() + self.qmf_broker = self.qmf_session.addBroker(str(self.broker)) + brokers = self.qmf_session.getObjects(_class="broker") + assert len(brokers) == 1 + broker = brokers[0] + + # grab the group state off the queue, and verify the default group is + # present ("qpid.no-group" is the broker default) + rc = broker.query("queue", "msg-group-q") + assert rc.status == 0 + assert rc.text == "OK" + results = rc.outArgs['results'] + assert 'qpid.message_group_queue' in results + q_info = results['qpid.message_group_queue'] + assert 'group_header_key' in q_info and q_info['group_header_key'] == "THE-GROUP" + assert 'group_state' in q_info and len(q_info['group_state']) == 1 + g_info = q_info['group_state'][0] + assert 'group_id' in g_info + assert g_info['group_id'] == 'qpid.no-group' + + self.qmf_session.delBroker(self.qmf_broker) + + class StickyConsumerMsgGroupTests(Base): """ Tests for the behavior of sticky-consumer message groups. These tests -- cgit v1.2.1