summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-10-05 14:52:11 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-10-05 14:52:11 +0000
commite4dc56159ebad0801f3c7fdfed3e0f6138b42219 (patch)
tree431832a54b894c9186c32e7faa52151f17d2c294
parentbfbc8b09403cf94a83b5aea177a000c4c7f0381c (diff)
downloadqpid-python-e4dc56159ebad0801f3c7fdfed3e0f6138b42219.tar.gz
QPID-3346: allow configuration of the default group identifier
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3346@1179248 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp8
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h1
-rw-r--r--qpid/cpp/src/qpid/broker/MessageGroupManager.cpp11
-rw-r--r--qpid/cpp/src/qpid/broker/MessageGroupManager.h4
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py36
5 files changed, 54 insertions, 6 deletions
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index b02b9fa818..bd94582d10 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -33,6 +33,7 @@
#include "qpid/broker/Link.h"
#include "qpid/broker/ExpiryPolicy.h"
#include "qpid/broker/QueueFlowLimit.h"
+#include "qpid/broker/MessageGroupManager.h"
#include "qmf/org/apache/qpid/broker/Package.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerCreate.h"
@@ -123,7 +124,8 @@ Broker::Options::Options(const std::string& name) :
qmf1Support(true),
queueFlowStopRatio(80),
queueFlowResumeRatio(70),
- queueThresholdEventRatio(80)
+ queueThresholdEventRatio(80),
+ defaultMsgGroup("qpid.no-group")
{
int c = sys::SystemInfo::concurrency();
workerThreads=c+1;
@@ -159,7 +161,8 @@ Broker::Options::Options(const std::string& name) :
("async-queue-events", optValue(asyncQueueEvents, "yes|no"), "Set Queue Events async, used for services like replication")
("default-flow-stop-threshold", optValue(queueFlowStopRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is activated.")
("default-flow-resume-threshold", optValue(queueFlowResumeRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is de-activated.")
- ("default-event-threshold-ratio", optValue(queueThresholdEventRatio, "%age of limit"), "The ratio of any specified queue limit at which an event will be raised");
+ ("default-event-threshold-ratio", optValue(queueThresholdEventRatio, "%age of limit"), "The ratio of any specified queue limit at which an event will be raised")
+ ("default-message-group", optValue(defaultMsgGroup, "GROUP-IDENTIFER"), "Group identifier to assign to messages delivered to a message group queue that do not contain an identifier.");
}
const std::string empty;
@@ -250,6 +253,7 @@ Broker::Broker(const Broker::Options& conf) :
Plugin::earlyInitAll(*this);
QueueFlowLimit::setDefaults(conf.queueLimit, conf.queueFlowStopRatio, conf.queueFlowResumeRatio);
+ MessageGroupManager::setDefaults(conf.defaultMsgGroup);
// If no plugin store module registered itself, set up the null store.
if (NullMessageStore::isNullStore(store.get()))
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index 1612726693..8b347db3c0 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -121,6 +121,7 @@ public:
uint queueFlowStopRatio; // producer flow control: on
uint queueFlowResumeRatio; // producer flow control: off
uint16_t queueThresholdEventRatio;
+ std::string defaultMsgGroup;
private:
std::string getHome();
diff --git a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
index 9c97a294a9..d4ca6af1d5 100644
--- a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
+++ b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
@@ -41,15 +41,14 @@ namespace {
const std::string MessageGroupManager::qpidMessageGroupKey("qpid.group_header_key");
const std::string MessageGroupManager::qpidSharedGroup("qpid.shared_msg_group");
const std::string MessageGroupManager::qpidMessageGroupTimestamp("qpid.group_timestamp");
-const std::string MessageGroupManager::qpidMessageGroupDefault("qpid.no_group"); /** @todo KAG: make configurable in Broker options */
const std::string MessageGroupManager::getGroupId( const QueuedMessage& qm ) const
{
const qpid::framing::FieldTable* headers = qm.payload->getApplicationHeaders();
- if (!headers) return qpidMessageGroupDefault;
+ if (!headers) return defaultGroupId;
qpid::framing::FieldTable::ValuePtr id = headers->get( groupIdHeader );
- if (!id || !id->convertsTo<std::string>()) return qpidMessageGroupDefault;
+ if (!id || !id->convertsTo<std::string>()) return defaultGroupId;
return id->get<std::string>();
}
@@ -331,6 +330,12 @@ boost::shared_ptr<MessageGroupManager> MessageGroupManager::create( const std::s
return empty;
}
+std::string MessageGroupManager::defaultGroupId;
+void MessageGroupManager::setDefaults(const std::string& groupId) // static
+{
+ defaultGroupId = groupId;
+}
+
/** Cluster replication:
state map format:
diff --git a/qpid/cpp/src/qpid/broker/MessageGroupManager.h b/qpid/cpp/src/qpid/broker/MessageGroupManager.h
index 58464c484d..35bdda94d5 100644
--- a/qpid/cpp/src/qpid/broker/MessageGroupManager.h
+++ b/qpid/cpp/src/qpid/broker/MessageGroupManager.h
@@ -36,6 +36,8 @@ class MessageDistributor;
class MessageGroupManager : public StatefulQueueObserver, public MessageDistributor
{
+ static std::string defaultGroupId; // assigned if no group id header present
+
const std::string groupIdHeader; // msg header holding group identifier
const unsigned int timestamp; // mark messages with timestamp if set
Messages& messages; // parent Queue's in memory message container
@@ -65,7 +67,6 @@ class MessageGroupManager : public StatefulQueueObserver, public MessageDistribu
static const std::string qpidMessageGroupKey;
static const std::string qpidSharedGroup; // if specified, one group can be consumed by multiple receivers
static const std::string qpidMessageGroupTimestamp;
- static const std::string qpidMessageGroupDefault;
const std::string getGroupId( const QueuedMessage& qm ) const;
void unFree( const GroupState& state )
@@ -92,6 +93,7 @@ class MessageGroupManager : public StatefulQueueObserver, public MessageDistribu
public:
+ static QPID_BROKER_EXTERN void setDefaults(const std::string& groupId);
static boost::shared_ptr<MessageGroupManager> create( const std::string& qName,
Messages& messages,
const qpid::framing::FieldTable& settings );
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py b/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py
index 1e7570f15b..611be0a6b0 100644
--- a/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py
+++ b/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py
@@ -936,6 +936,42 @@ class MultiConsumerMsgGroupTests(Base):
snd.close()
self.ssn.close()
+ def test_default_group_id(self):
+ """ Verify the queue assigns the default group id should a message
+ arrive without a group identifier.
+ """
+ snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
+ " node: {x-declare: {arguments:" +
+ " {'qpid.group_header_key':'THE-GROUP'," +
+ "'qpid.shared_msg_group':1}}}}")
+
+ m = Message(content={}, properties={"NO-GROUP-HEADER":"HA-HA"})
+ snd.send(m)
+
+ # now setup a QMF session, so we can call methods
+ self.qmf_session = qmf.console.Session()
+ self.qmf_broker = self.qmf_session.addBroker(str(self.broker))
+ brokers = self.qmf_session.getObjects(_class="broker")
+ assert len(brokers) == 1
+ broker = brokers[0]
+
+ # grab the group state off the queue, and verify the default group is
+ # present ("qpid.no-group" is the broker default)
+ rc = broker.query("queue", "msg-group-q")
+ assert rc.status == 0
+ assert rc.text == "OK"
+ results = rc.outArgs['results']
+ assert 'qpid.message_group_queue' in results
+ q_info = results['qpid.message_group_queue']
+ assert 'group_header_key' in q_info and q_info['group_header_key'] == "THE-GROUP"
+ assert 'group_state' in q_info and len(q_info['group_state']) == 1
+ g_info = q_info['group_state'][0]
+ assert 'group_id' in g_info
+ assert g_info['group_id'] == 'qpid.no-group'
+
+ self.qmf_session.delBroker(self.qmf_broker)
+
+
class StickyConsumerMsgGroupTests(Base):
"""
Tests for the behavior of sticky-consumer message groups. These tests