summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-10-07 14:21:48 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-10-07 14:21:48 +0000
commit4fbbc6ecf68bd8f118f4a6165c8f5bfca2c3c8b6 (patch)
tree4a54f245efa1c2df1601d648c1fdd41fba08b802
parent92d889931fe1cea19d1e33658d5f30348bd7070e (diff)
downloadqpid-python-4fbbc6ecf68bd8f118f4a6165c8f5bfca2c3c8b6.tar.gz
QPID-3346: move message group feature into trunk.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1180050 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/CMakeLists.txt2
-rw-r--r--cpp/src/Makefile.am5
-rw-r--r--cpp/src/qpid/broker/Broker.cpp67
-rw-r--r--cpp/src/qpid/broker/Broker.h11
-rw-r--r--cpp/src/qpid/broker/Consumer.h16
-rw-r--r--cpp/src/qpid/broker/DeliveryRecord.cpp2
-rw-r--r--cpp/src/qpid/broker/DeliveryRecord.h2
-rw-r--r--cpp/src/qpid/broker/FifoDistributor.cpp58
-rw-r--r--cpp/src/qpid/broker/FifoDistributor.h58
-rw-r--r--cpp/src/qpid/broker/MessageDistributor.h76
-rw-r--r--cpp/src/qpid/broker/MessageGroupManager.cpp443
-rw-r--r--cpp/src/qpid/broker/MessageGroupManager.h125
-rw-r--r--cpp/src/qpid/broker/Messages.h1
-rw-r--r--cpp/src/qpid/broker/Queue.cpp480
-rw-r--r--cpp/src/qpid/broker/Queue.h54
-rw-r--r--cpp/src/qpid/broker/QueueEvents.cpp4
-rw-r--r--cpp/src/qpid/broker/QueueFlowLimit.cpp3
-rw-r--r--cpp/src/qpid/broker/QueueFlowLimit.h3
-rw-r--r--cpp/src/qpid/broker/QueueObserver.h40
-rw-r--r--cpp/src/qpid/broker/QueuePolicy.cpp3
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp66
-rw-r--r--cpp/src/qpid/broker/SemanticState.h11
-rw-r--r--cpp/src/qpid/broker/ThresholdAlerts.h3
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp14
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.cpp18
-rw-r--r--cpp/src/tests/CMakeLists.txt5
-rw-r--r--cpp/src/tests/Makefile.am19
-rw-r--r--cpp/src/tests/QueueTest.cpp313
-rwxr-xr-xcpp/src/tests/cluster_tests.py70
-rw-r--r--cpp/src/tests/msg_group_test.cpp618
-rw-r--r--cpp/src/tests/qpid-send.cpp100
-rwxr-xr-xcpp/src/tests/run_msg_group_tests66
-rwxr-xr-xcpp/src/tests/run_msg_group_tests_soak60
-rw-r--r--doc/book/src/AMQP-Messaging-Broker-CPP-Book.xml1
-rw-r--r--doc/book/src/AMQP-Messaging-Broker-CPP.xml1
-rw-r--r--doc/book/src/Using-message-groups.xml261
-rw-r--r--java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java17
-rw-r--r--specs/management-schema.xml10
-rw-r--r--tests/src/py/qpid_tests/broker_0_10/__init__.py1
-rw-r--r--tests/src/py/qpid_tests/broker_0_10/management.py30
-rw-r--r--tests/src/py/qpid_tests/broker_0_10/msg_groups.py981
-rwxr-xr-xtools/src/py/qpid-config22
42 files changed, 3886 insertions, 254 deletions
diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt
index c8aedd4c60..da1f539108 100644
--- a/cpp/src/CMakeLists.txt
+++ b/cpp/src/CMakeLists.txt
@@ -976,6 +976,8 @@ set (qpidbroker_SOURCES
qpid/broker/Queue.cpp
qpid/broker/QueueCleaner.cpp
qpid/broker/QueueListeners.cpp
+ qpid/broker/FifoDistributor.cpp
+ qpid/broker/MessageGroupManager.cpp
qpid/broker/PersistableMessage.cpp
qpid/broker/Bridge.cpp
qpid/broker/Connection.cpp
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 2663987f75..6230a8f6f6 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -671,6 +671,11 @@ libqpidbroker_la_SOURCES = \
qpid/broker/TxPublish.h \
qpid/broker/Vhost.cpp \
qpid/broker/Vhost.h \
+ qpid/broker/MessageDistributor.h \
+ qpid/broker/FifoDistributor.h \
+ qpid/broker/FifoDistributor.cpp \
+ qpid/broker/MessageGroupManager.cpp \
+ qpid/broker/MessageGroupManager.h \
qpid/management/ManagementAgent.cpp \
qpid/management/ManagementAgent.h \
qpid/management/ManagementDirectExchange.cpp \
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 598c43b1d8..bd94582d10 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -33,10 +33,12 @@
#include "qpid/broker/Link.h"
#include "qpid/broker/ExpiryPolicy.h"
#include "qpid/broker/QueueFlowLimit.h"
+#include "qpid/broker/MessageGroupManager.h"
#include "qmf/org/apache/qpid/broker/Package.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerCreate.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerDelete.h"
+#include "qmf/org/apache/qpid/broker/ArgsBrokerQuery.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerEcho.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerGetLogLevel.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerQueueMoveMessages.h"
@@ -122,7 +124,8 @@ Broker::Options::Options(const std::string& name) :
qmf1Support(true),
queueFlowStopRatio(80),
queueFlowResumeRatio(70),
- queueThresholdEventRatio(80)
+ queueThresholdEventRatio(80),
+ defaultMsgGroup("qpid.no-group")
{
int c = sys::SystemInfo::concurrency();
workerThreads=c+1;
@@ -158,7 +161,8 @@ Broker::Options::Options(const std::string& name) :
("async-queue-events", optValue(asyncQueueEvents, "yes|no"), "Set Queue Events async, used for services like replication")
("default-flow-stop-threshold", optValue(queueFlowStopRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is activated.")
("default-flow-resume-threshold", optValue(queueFlowResumeRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is de-activated.")
- ("default-event-threshold-ratio", optValue(queueThresholdEventRatio, "%age of limit"), "The ratio of any specified queue limit at which an event will be raised");
+ ("default-event-threshold-ratio", optValue(queueThresholdEventRatio, "%age of limit"), "The ratio of any specified queue limit at which an event will be raised")
+ ("default-message-group", optValue(defaultMsgGroup, "GROUP-IDENTIFER"), "Group identifier to assign to messages delivered to a message group queue that do not contain an identifier.");
}
const std::string empty;
@@ -249,6 +253,7 @@ Broker::Broker(const Broker::Options& conf) :
Plugin::earlyInitAll(*this);
QueueFlowLimit::setDefaults(conf.queueLimit, conf.queueFlowStopRatio, conf.queueFlowResumeRatio);
+ MessageGroupManager::setDefaults(conf.defaultMsgGroup);
// If no plugin store module registered itself, set up the null store.
if (NullMessageStore::isNullStore(store.get()))
@@ -453,7 +458,7 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
_qmf::ArgsBrokerQueueMoveMessages& moveArgs=
dynamic_cast<_qmf::ArgsBrokerQueueMoveMessages&>(args);
QPID_LOG (debug, "Broker::queueMoveMessages()");
- if (queueMoveMessages(moveArgs.i_srcQueue, moveArgs.i_destQueue, moveArgs.i_qty))
+ if (queueMoveMessages(moveArgs.i_srcQueue, moveArgs.i_destQueue, moveArgs.i_qty, moveArgs.i_filter))
status = Manageable::STATUS_OK;
else
return Manageable::STATUS_PARAMETER_INVALID;
@@ -483,6 +488,13 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
status = Manageable::STATUS_OK;
break;
}
+ case _qmf::Broker::METHOD_QUERY :
+ {
+ _qmf::ArgsBrokerQuery& a = dynamic_cast<_qmf::ArgsBrokerQuery&>(args);
+ status = queryObject(a.i_type, a.i_name, a.o_results, getManagementExecutionContext());
+ status = Manageable::STATUS_OK;
+ break;
+ }
default:
QPID_LOG (debug, "Broker ManagementMethod not implemented: id=" << methodId << "]");
status = Manageable::STATUS_NOT_IMPLEMENTED;
@@ -655,6 +667,50 @@ void Broker::deleteObject(const std::string& type, const std::string& name,
}
+Manageable::status_t Broker::queryObject(const std::string& type,
+ const std::string& name,
+ Variant::Map& results,
+ const ConnectionState* context)
+{
+ std::string userId;
+ std::string connectionId;
+ if (context) {
+ userId = context->getUserId();
+ connectionId = context->getUrl();
+ }
+ QPID_LOG (debug, "Broker::query(" << type << ", " << name << ")");
+
+ if (type == TYPE_QUEUE)
+ return queryQueue( name, userId, connectionId, results );
+
+ if (type == TYPE_EXCHANGE ||
+ type == TYPE_TOPIC ||
+ type == TYPE_BINDING)
+ return Manageable::STATUS_NOT_IMPLEMENTED;
+
+ throw UnknownObjectType(type);
+}
+
+Manageable::status_t Broker::queryQueue( const std::string& name,
+ const std::string& userId,
+ const std::string& /*connectionId*/,
+ Variant::Map& results )
+{
+ (void) results;
+ if (acl) {
+ if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_QUEUE, name, NULL) )
+ throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue query request from " << userId));
+ }
+
+ boost::shared_ptr<Queue> q(queues.find(name));
+ if (!q) {
+ QPID_LOG(error, "Query failed: queue not found, name=" << name);
+ return Manageable::STATUS_UNKNOWN_OBJECT;
+ }
+ q->query( results );
+ return Manageable::STATUS_OK;;
+}
+
void Broker::setLogLevel(const std::string& level)
{
QPID_LOG(notice, "Changing log level to " << level);
@@ -724,7 +780,8 @@ void Broker::connect(
uint32_t Broker::queueMoveMessages(
const std::string& srcQueue,
const std::string& destQueue,
- uint32_t qty)
+ uint32_t qty,
+ const Variant::Map& filter)
{
Queue::shared_ptr src_queue = queues.find(srcQueue);
if (!src_queue)
@@ -733,7 +790,7 @@ uint32_t Broker::queueMoveMessages(
if (!dest_queue)
return 0;
- return src_queue->move(dest_queue, qty);
+ return src_queue->move(dest_queue, qty, &filter);
}
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index 40f7b6273f..8b347db3c0 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -121,6 +121,7 @@ public:
uint queueFlowStopRatio; // producer flow control: on
uint queueFlowResumeRatio; // producer flow control: off
uint16_t queueThresholdEventRatio;
+ std::string defaultMsgGroup;
private:
std::string getHome();
@@ -157,7 +158,12 @@ public:
const qpid::types::Variant::Map& properties, bool strict, const ConnectionState* context);
void deleteObject(const std::string& type, const std::string& name,
const qpid::types::Variant::Map& options, const ConnectionState* context);
-
+ Manageable::status_t queryObject(const std::string& type, const std::string& name,
+ qpid::types::Variant::Map& results, const ConnectionState* context);
+ Manageable::status_t queryQueue( const std::string& name,
+ const std::string& userId,
+ const std::string& connectionId,
+ qpid::types::Variant::Map& results);
boost::shared_ptr<sys::Poller> poller;
sys::Timer timer;
std::auto_ptr<sys::Timer> clusterTimer;
@@ -258,7 +264,8 @@ public:
*/
uint32_t queueMoveMessages( const std::string& srcQueue,
const std::string& destQueue,
- uint32_t qty);
+ uint32_t qty,
+ const qpid::types::Variant::Map& filter);
boost::shared_ptr<sys::ProtocolFactory> getProtocolFactory(const std::string& name = TCP_TRANSPORT) const;
diff --git a/cpp/src/qpid/broker/Consumer.h b/cpp/src/qpid/broker/Consumer.h
index 317338a8ad..2af9b0c121 100644
--- a/cpp/src/qpid/broker/Consumer.h
+++ b/cpp/src/qpid/broker/Consumer.h
@@ -36,13 +36,19 @@ class Consumer {
// inListeners allows QueueListeners to efficiently track if this instance is registered
// for notifications without having to search its containers
bool inListeners;
- public:
- typedef boost::shared_ptr<Consumer> shared_ptr;
-
+ // the name is generated by broker and is unique within broker scope. It is not
+ // provided or known by the remote Consumer.
+ const std::string name;
+ public:
+ typedef boost::shared_ptr<Consumer> shared_ptr;
+
framing::SequenceNumber position;
-
- Consumer(bool preAcquires = true) : acquires(preAcquires), inListeners(false) {}
+
+ Consumer(const std::string& _name, bool preAcquires = true)
+ : acquires(preAcquires), inListeners(false), name(_name), position(0) {}
bool preAcquires() const { return acquires; }
+ const std::string& getName() const { return name; }
+
virtual bool deliver(QueuedMessage& msg) = 0;
virtual void notify() = 0;
virtual bool filter(boost::intrusive_ptr<Message>) { return true; }
diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp
index 11970db394..0b8fe95d5e 100644
--- a/cpp/src/qpid/broker/DeliveryRecord.cpp
+++ b/cpp/src/qpid/broker/DeliveryRecord.cpp
@@ -153,7 +153,7 @@ uint32_t DeliveryRecord::getCredit() const
}
void DeliveryRecord::acquire(DeliveryIds& results) {
- if (queue->acquire(msg)) {
+ if (queue->acquire(msg, tag)) {
acquired = true;
results.push_back(id);
if (!acceptExpected) {
diff --git a/cpp/src/qpid/broker/DeliveryRecord.h b/cpp/src/qpid/broker/DeliveryRecord.h
index 19ab37ac17..5a331357be 100644
--- a/cpp/src/qpid/broker/DeliveryRecord.h
+++ b/cpp/src/qpid/broker/DeliveryRecord.h
@@ -46,7 +46,7 @@ class DeliveryRecord
{
QueuedMessage msg;
mutable boost::shared_ptr<Queue> queue;
- std::string tag;
+ std::string tag; // name of consumer
DeliveryId id;
bool acquired : 1;
bool acceptExpected : 1;
diff --git a/cpp/src/qpid/broker/FifoDistributor.cpp b/cpp/src/qpid/broker/FifoDistributor.cpp
new file mode 100644
index 0000000000..cdb32d8c8c
--- /dev/null
+++ b/cpp/src/qpid/broker/FifoDistributor.cpp
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/FifoDistributor.h"
+
+using namespace qpid::broker;
+
+FifoDistributor::FifoDistributor(Messages& container)
+ : messages(container) {}
+
+bool FifoDistributor::nextConsumableMessage( Consumer::shared_ptr&, QueuedMessage& next )
+{
+ if (!messages.empty()) {
+ next = messages.front(); // by default, consume oldest msg
+ return true;
+ }
+ return false;
+}
+
+bool FifoDistributor::allocate(const std::string&, const QueuedMessage& )
+{
+ // by default, all messages present on the queue may be allocated as they have yet to
+ // be acquired.
+ return true;
+}
+
+bool FifoDistributor::nextBrowsableMessage( Consumer::shared_ptr& c, QueuedMessage& next )
+{
+ if (!messages.empty() && messages.next(c->position, next))
+ return true;
+ return false;
+}
+
+void FifoDistributor::query(qpid::types::Variant::Map&) const
+{
+ // nothing to see here....
+}
+
diff --git a/cpp/src/qpid/broker/FifoDistributor.h b/cpp/src/qpid/broker/FifoDistributor.h
new file mode 100644
index 0000000000..245537ed12
--- /dev/null
+++ b/cpp/src/qpid/broker/FifoDistributor.h
@@ -0,0 +1,58 @@
+#ifndef _broker_FifoDistributor_h
+#define _broker_FifoDistributor_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/** Simple MessageDistributor for FIFO Queues - the HEAD message is always the next
+ * available message for consumption.
+ */
+
+#include "qpid/broker/MessageDistributor.h"
+
+namespace qpid {
+namespace broker {
+
+class Messages;
+
+class FifoDistributor : public MessageDistributor
+{
+ public:
+ FifoDistributor(Messages& container);
+
+ /** Locking Note: all methods assume the caller is holding the Queue::messageLock
+ * during the method call.
+ */
+
+ /** MessageDistributor interface */
+
+ bool nextConsumableMessage( Consumer::shared_ptr& consumer, QueuedMessage& next );
+ bool allocate(const std::string& consumer, const QueuedMessage& target);
+ bool nextBrowsableMessage( Consumer::shared_ptr& consumer, QueuedMessage& next );
+ void query(qpid::types::Variant::Map&) const;
+
+ private:
+ Messages& messages;
+};
+
+}}
+
+#endif
diff --git a/cpp/src/qpid/broker/MessageDistributor.h b/cpp/src/qpid/broker/MessageDistributor.h
new file mode 100644
index 0000000000..090393c160
--- /dev/null
+++ b/cpp/src/qpid/broker/MessageDistributor.h
@@ -0,0 +1,76 @@
+#ifndef _broker_MessageDistributor_h
+#define _broker_MessageDistributor_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/** Abstraction used by Queue to determine the next "most desirable" message to provide to
+ * a particular consuming client
+ */
+
+
+#include "qpid/broker/Consumer.h"
+
+namespace qpid {
+namespace broker {
+
+struct QueuedMessage;
+
+class MessageDistributor
+{
+ public:
+ virtual ~MessageDistributor() {};
+
+ /** Locking Note: all methods assume the caller is holding the Queue::messageLock
+ * during the method call.
+ */
+
+ /** Determine the next message available for consumption by the consumer
+ * @param consumer the consumer that needs a message to consume
+ * @param next set to the next message that the consumer may consume.
+ * @return true if message is available and next is set
+ */
+ virtual bool nextConsumableMessage( Consumer::shared_ptr& consumer,
+ QueuedMessage& next ) = 0;
+
+ /** Allow the comsumer to take ownership of the given message.
+ * @param consumer the name of the consumer that is attempting to acquire the message
+ * @param qm the message to be acquired, previously returned from nextConsumableMessage()
+ * @return true if ownership is permitted, false if ownership cannot be assigned.
+ */
+ virtual bool allocate( const std::string& consumer,
+ const QueuedMessage& target) = 0;
+
+ /** Determine the next message available for browsing by the consumer
+ * @param consumer the consumer that is browsing the queue
+ * @param next set to the next message that the consumer may browse.
+ * @return true if a message is available and next is returned
+ */
+ virtual bool nextBrowsableMessage( Consumer::shared_ptr& consumer,
+ QueuedMessage& next ) = 0;
+
+ /** hook to add any interesting management state to the status map */
+ virtual void query(qpid::types::Variant::Map&) const = 0;
+};
+
+}}
+
+#endif
diff --git a/cpp/src/qpid/broker/MessageGroupManager.cpp b/cpp/src/qpid/broker/MessageGroupManager.cpp
new file mode 100644
index 0000000000..d4ca6af1d5
--- /dev/null
+++ b/cpp/src/qpid/broker/MessageGroupManager.cpp
@@ -0,0 +1,443 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/framing/FieldTable.h"
+#include "qpid/types/Variant.h"
+#include "qpid/log/Statement.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/MessageGroupManager.h"
+
+using namespace qpid::broker;
+
+namespace {
+ const std::string GROUP_QUERY_KEY("qpid.message_group_queue");
+ const std::string GROUP_HEADER_KEY("group_header_key");
+ const std::string GROUP_STATE_KEY("group_state");
+ const std::string GROUP_ID_KEY("group_id");
+ const std::string GROUP_MSG_COUNT("msg_count");
+ const std::string GROUP_TIMESTAMP("timestamp");
+ const std::string GROUP_CONSUMER("consumer");
+}
+
+
+const std::string MessageGroupManager::qpidMessageGroupKey("qpid.group_header_key");
+const std::string MessageGroupManager::qpidSharedGroup("qpid.shared_msg_group");
+const std::string MessageGroupManager::qpidMessageGroupTimestamp("qpid.group_timestamp");
+
+
+const std::string MessageGroupManager::getGroupId( const QueuedMessage& qm ) const
+{
+ const qpid::framing::FieldTable* headers = qm.payload->getApplicationHeaders();
+ if (!headers) return defaultGroupId;
+ qpid::framing::FieldTable::ValuePtr id = headers->get( groupIdHeader );
+ if (!id || !id->convertsTo<std::string>()) return defaultGroupId;
+ return id->get<std::string>();
+}
+
+
+void MessageGroupManager::enqueued( const QueuedMessage& qm )
+{
+ // @todo KAG optimization - store reference to group state in QueuedMessage
+ // issue: const-ness??
+ std::string group( getGroupId(qm) );
+ GroupState &state(messageGroups[group]);
+ state.members.push_back(qm.position);
+ uint32_t total = state.members.size();
+ QPID_LOG( trace, "group queue " << qName <<
+ ": added message to group id=" << group << " total=" << total );
+ if (total == 1) {
+ // newly created group, no owner
+ state.group = group;
+ assert(freeGroups.find(qm.position) == freeGroups.end());
+ freeGroups[qm.position] = &state;
+ }
+}
+
+
+void MessageGroupManager::acquired( const QueuedMessage& qm )
+{
+ // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
+ // issue: const-ness??
+ std::string group( getGroupId(qm) );
+ GroupMap::iterator gs = messageGroups.find( group );
+ assert( gs != messageGroups.end() );
+ GroupState& state( gs->second );
+ state.acquired += 1;
+ QPID_LOG( trace, "group queue " << qName <<
+ ": acquired message in group id=" << group << " acquired=" << state.acquired );
+}
+
+
+void MessageGroupManager::requeued( const QueuedMessage& qm )
+{
+ // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
+ // issue: const-ness??
+ std::string group( getGroupId(qm) );
+ GroupMap::iterator gs = messageGroups.find( group );
+ assert( gs != messageGroups.end() );
+ GroupState& state( gs->second );
+ assert( state.acquired != 0 );
+ state.acquired -= 1;
+ if (state.acquired == 0 && state.owned()) {
+ QPID_LOG( trace, "group queue " << qName <<
+ ": consumer name=" << state.owner << " released group id=" << gs->first);
+ disown(state);
+ }
+ QPID_LOG( trace, "group queue " << qName <<
+ ": requeued message to group id=" << group << " acquired=" << state.acquired );
+}
+
+
+void MessageGroupManager::dequeued( const QueuedMessage& qm )
+{
+ // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
+ // issue: const-ness??
+ std::string group( getGroupId(qm) );
+ GroupMap::iterator gs = messageGroups.find( group );
+ assert( gs != messageGroups.end() );
+ GroupState& state( gs->second );
+ assert( state.members.size() != 0 );
+ assert( state.acquired != 0 );
+ state.acquired -= 1;
+
+ // likely to be at or near begin() if dequeued in order
+ bool reFreeNeeded = false;
+ if (state.members.front() == qm.position) {
+ if (!state.owned()) {
+ // will be on the freeGroups list if mgmt is dequeueing rather than a consumer!
+ // if on freelist, it is indexed by first member, which is about to be removed!
+ unFree(state);
+ reFreeNeeded = true;
+ }
+ state.members.pop_front();
+ } else {
+ GroupState::PositionFifo::iterator pos = state.members.begin() + 1;
+ GroupState::PositionFifo::iterator end = state.members.end();
+ while (pos != end) {
+ if (*pos == qm.position) {
+ state.members.erase(pos);
+ break;
+ }
+ ++pos;
+ }
+ }
+
+ uint32_t total = state.members.size();
+ if (total == 0) {
+ QPID_LOG( trace, "group queue " << qName << ": deleting group id=" << gs->first);
+ messageGroups.erase( gs );
+ } else if (state.acquired == 0 && state.owned()) {
+ QPID_LOG( trace, "group queue " << qName <<
+ ": consumer name=" << state.owner << " released group id=" << gs->first);
+ disown(state);
+ } else if (reFreeNeeded) {
+ disown(state);
+ }
+ QPID_LOG( trace, "group queue " << qName <<
+ ": dequeued message from group id=" << group << " total=" << total );
+}
+
+void MessageGroupManager::consumerAdded( const Consumer& /*c*/ )
+{
+#if 0
+ // allow a re-subscribing consumer
+ if (consumers.find(c.getName()) == consumers.end()) {
+ consumers[c.getName()] = 0; // no groups owned yet
+ QPID_LOG( trace, "group queue " << qName << ": added consumer, name=" << c.getName() );
+ } else {
+ QPID_LOG( trace, "group queue " << qName << ": consumer re-subscribed, name=" << c.getName() );
+ }
+#endif
+}
+
+void MessageGroupManager::consumerRemoved( const Consumer& /*c*/ )
+{
+#if 0
+ const std::string& name(c.getName());
+ Consumers::iterator consumer = consumers.find(name);
+ assert(consumer != consumers.end());
+ size_t count = consumer->second;
+
+ for (GroupMap::iterator gs = messageGroups.begin();
+ count && gs != messageGroups.end(); ++gs) {
+
+ GroupState& state( gs->second );
+ if (state.owner == name) {
+ if (state.acquired == 0) {
+ --count;
+ disown(state);
+ QPID_LOG( trace, "group queue " << qName <<
+ ": consumer name=" << name << " released group id=" << gs->first);
+ }
+ }
+ }
+ if (count == 0) {
+ consumers.erase( consumer );
+ QPID_LOG( trace, "group queue " << qName << ": removed consumer name=" << name );
+ } else {
+ // don't release groups with outstanding acquired msgs - consumer may re-subscribe!
+ QPID_LOG( trace, "group queue " << qName << ": consumer name=" << name << " unsubscribed with outstanding messages.");
+ }
+#endif
+}
+
+
+bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, QueuedMessage& next )
+{
+ if (messages.empty())
+ return false;
+
+ if (!freeGroups.empty()) {
+ framing::SequenceNumber nextFree = freeGroups.begin()->first;
+ if (nextFree < c->position) { // next free group's msg is older than current position
+ bool ok = messages.find(nextFree, next);
+ (void) ok; assert( ok );
+ } else {
+ if (!messages.next( c->position, next ))
+ return false; // shouldn't happen - should find nextFree
+ }
+ } else { // no free groups available
+#if 0
+ if (consumers[c->getName()] == 0) { // and none currently owned
+ return false; // so nothing available to consume
+ }
+#endif
+ if (!messages.next( c->position, next ))
+ return false;
+ }
+
+ do {
+ // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
+ std::string group( getGroupId( next ) );
+ GroupMap::iterator gs = messageGroups.find( group );
+ assert( gs != messageGroups.end() );
+ GroupState& state( gs->second );
+ if (!state.owned() || state.owner == c->getName()) {
+ return true;
+ }
+ } while (messages.next( next.position, next ));
+ return false;
+}
+
+
+bool MessageGroupManager::allocate(const std::string& consumer, const QueuedMessage& qm)
+{
+ // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
+ std::string group( getGroupId(qm) );
+ GroupMap::iterator gs = messageGroups.find( group );
+ assert( gs != messageGroups.end() );
+ GroupState& state( gs->second );
+
+ if (!state.owned()) {
+ own( state, consumer );
+ QPID_LOG( trace, "group queue " << qName <<
+ ": consumer name=" << consumer << " has acquired group id=" << gs->first);
+ return true;
+ }
+ return state.owner == consumer;
+}
+
+bool MessageGroupManager::nextBrowsableMessage( Consumer::shared_ptr& c, QueuedMessage& next )
+{
+ // browse: allow access to any available msg, regardless of group ownership (?ok?)
+ if (!messages.empty() && messages.next(c->position, next))
+ return true;
+ return false;
+}
+
+void MessageGroupManager::query(qpid::types::Variant::Map& status) const
+{
+ /** Add a description of the current state of the message groups for this queue.
+ FORMAT:
+ { "qpid.message_group_queue":
+ { "group_header_key" : "<KEY>",
+ "group_state" :
+ [ { "group_id" : "<name>",
+ "msg_count" : <int>,
+ "timestamp" : <absTime>,
+ "consumer" : <consumer name> },
+ {...} // one for each known group
+ ]
+ }
+ }
+ **/
+
+ assert(status.find(GROUP_QUERY_KEY) == status.end());
+ qpid::types::Variant::Map state;
+ qpid::types::Variant::List groups;
+
+ state[GROUP_HEADER_KEY] = groupIdHeader;
+ for (GroupMap::const_iterator g = messageGroups.begin();
+ g != messageGroups.end(); ++g) {
+ qpid::types::Variant::Map info;
+ info[GROUP_ID_KEY] = g->first;
+ info[GROUP_MSG_COUNT] = g->second.members.size();
+ info[GROUP_TIMESTAMP] = 0; /** @todo KAG - NEED HEAD MSG TIMESTAMP */
+ info[GROUP_CONSUMER] = g->second.owner;
+ groups.push_back(info);
+ }
+ state[GROUP_STATE_KEY] = groups;
+ status[GROUP_QUERY_KEY] = state;
+}
+
+
+boost::shared_ptr<MessageGroupManager> MessageGroupManager::create( const std::string& qName,
+ Messages& messages,
+ const qpid::framing::FieldTable& settings )
+{
+ boost::shared_ptr<MessageGroupManager> empty;
+
+ if (settings.isSet(qpidMessageGroupKey)) {
+
+ // @todo: remove once "sticky" consumers are supported - see QPID-3347
+ if (!settings.isSet(qpidSharedGroup)) {
+ QPID_LOG( error, "Only shared groups are supported in this version of the broker. Use '--shared-groups' in qpid-config." );
+ return empty;
+ }
+
+ std::string headerKey = settings.getAsString(qpidMessageGroupKey);
+ if (headerKey.empty()) {
+ QPID_LOG( error, "A Message Group header key must be configured, queue=" << qName);
+ return empty;
+ }
+ unsigned int timestamp = settings.getAsInt(qpidMessageGroupTimestamp);
+
+ boost::shared_ptr<MessageGroupManager> manager( new MessageGroupManager( headerKey, qName, messages, timestamp ) );
+
+ QPID_LOG( debug, "Configured Queue '" << qName <<
+ "' for message grouping using header key '" << headerKey << "'" <<
+ " (timestamp=" << timestamp << ")");
+ return manager;
+ }
+ return empty;
+}
+
+std::string MessageGroupManager::defaultGroupId;
+void MessageGroupManager::setDefaults(const std::string& groupId) // static
+{
+ defaultGroupId = groupId;
+}
+
+/** Cluster replication:
+
+ state map format:
+
+ { "group-state": [ {"name": <group-name>,
+ "owner": <consumer-name>-or-empty,
+ "acquired-ct": <acquired count>,
+ "positions": [Seqnumbers, ... ]},
+ {...}
+ ]
+ }
+*/
+
+namespace {
+ const std::string GROUP_NAME("name");
+ const std::string GROUP_OWNER("owner");
+ const std::string GROUP_ACQUIRED_CT("acquired-ct");
+ const std::string GROUP_POSITIONS("positions");
+ const std::string GROUP_STATE("group-state");
+}
+
+
+/** Runs on UPDATER to snapshot current state */
+void MessageGroupManager::getState(qpid::framing::FieldTable& state ) const
+{
+ using namespace qpid::framing;
+ state.clear();
+ framing::Array groupState(TYPE_CODE_MAP);
+ for (GroupMap::const_iterator g = messageGroups.begin();
+ g != messageGroups.end(); ++g) {
+
+ framing::FieldTable group;
+ group.setString(GROUP_NAME, g->first);
+ group.setString(GROUP_OWNER, g->second.owner);
+ group.setInt(GROUP_ACQUIRED_CT, g->second.acquired);
+ framing::Array positions(TYPE_CODE_UINT32);
+ for (GroupState::PositionFifo::const_iterator p = g->second.members.begin();
+ p != g->second.members.end(); ++p)
+ positions.push_back(framing::Array::ValuePtr(new IntegerValue( *p )));
+ group.setArray(GROUP_POSITIONS, positions);
+ groupState.push_back(framing::Array::ValuePtr(new FieldTableValue(group)));
+ }
+ state.setArray(GROUP_STATE, groupState);
+
+ QPID_LOG(debug, "Queue \"" << qName << "\": replicating message group state, key=" << groupIdHeader);
+}
+
+
+/** called on UPDATEE to set state from snapshot */
+void MessageGroupManager::setState(const qpid::framing::FieldTable& state)
+{
+ using namespace qpid::framing;
+ messageGroups.clear();
+ //consumers.clear();
+ freeGroups.clear();
+
+ framing::Array groupState(TYPE_CODE_MAP);
+
+ bool ok = state.getArray(GROUP_STATE, groupState);
+ if (!ok) {
+ QPID_LOG(error, "Unable to find message group state information for queue \"" <<
+ qName << "\": cluster inconsistency error!");
+ return;
+ }
+
+ for (framing::Array::const_iterator g = groupState.begin();
+ g != groupState.end(); ++g) {
+ framing::FieldTable group;
+ ok = framing::getEncodedValue<FieldTable>(*g, group);
+ if (!ok) {
+ QPID_LOG(error, "Invalid message group state information for queue \"" <<
+ qName << "\": table encoding error!");
+ return;
+ }
+ MessageGroupManager::GroupState state;
+ if (!group.isSet(GROUP_NAME) || !group.isSet(GROUP_OWNER) || !group.isSet(GROUP_ACQUIRED_CT)) {
+ QPID_LOG(error, "Invalid message group state information for queue \"" <<
+ qName << "\": fields missing error!");
+ return;
+ }
+ state.group = group.getAsString(GROUP_NAME);
+ state.owner = group.getAsString(GROUP_OWNER);
+ state.acquired = group.getAsInt(GROUP_ACQUIRED_CT);
+ framing::Array positions(TYPE_CODE_UINT32);
+ ok = group.getArray(GROUP_POSITIONS, positions);
+ if (!ok) {
+ QPID_LOG(error, "Invalid message group state information for queue \"" <<
+ qName << "\": position encoding error!");
+ return;
+ }
+
+ for (Array::const_iterator p = positions.begin(); p != positions.end(); ++p)
+ state.members.push_back((*p)->getIntegerValue<uint32_t, 4>());
+ messageGroups[state.group] = state;
+ if (state.owned())
+ //consumers[state.owner]++;
+ ;
+ else {
+ assert(state.members.size());
+ freeGroups[state.members.front()] = &messageGroups[state.group];
+ }
+ }
+
+ QPID_LOG(debug, "Queue \"" << qName << "\": message group state replicated, key =" << groupIdHeader)
+}
diff --git a/cpp/src/qpid/broker/MessageGroupManager.h b/cpp/src/qpid/broker/MessageGroupManager.h
new file mode 100644
index 0000000000..35bdda94d5
--- /dev/null
+++ b/cpp/src/qpid/broker/MessageGroupManager.h
@@ -0,0 +1,125 @@
+#ifndef _broker_MessageGroupManager_h
+#define _broker_MessageGroupManager_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/* for managing message grouping on Queues */
+
+#include "qpid/broker/StatefulQueueObserver.h"
+#include "qpid/broker/MessageDistributor.h"
+
+
+namespace qpid {
+namespace broker {
+
+class QueueObserver;
+class MessageDistributor;
+
+class MessageGroupManager : public StatefulQueueObserver, public MessageDistributor
+{
+ static std::string defaultGroupId; // assigned if no group id header present
+
+ const std::string groupIdHeader; // msg header holding group identifier
+ const unsigned int timestamp; // mark messages with timestamp if set
+ Messages& messages; // parent Queue's in memory message container
+ const std::string qName; // name of parent queue (for logs)
+
+ struct GroupState {
+ typedef std::deque<framing::SequenceNumber> PositionFifo;
+
+ std::string group; // group identifier
+ std::string owner; // consumer with outstanding acquired messages
+ uint32_t acquired; // count of outstanding acquired messages
+ //uint32_t total; // count of enqueued messages in this group
+ PositionFifo members; // msgs belonging to this group
+
+ GroupState() : acquired(0) {}
+ bool owned() const {return !owner.empty();}
+ };
+ typedef std::map<std::string, struct GroupState> GroupMap;
+ //typedef std::map<std::string, uint32_t> Consumers; // count of owned groups
+ typedef std::map<framing::SequenceNumber, struct GroupState *> GroupFifo;
+
+ // note: update getState()/setState() when changing this object's state implementation
+ GroupMap messageGroups; // index: group name
+ GroupFifo freeGroups; // ordered by oldest free msg
+ //Consumers consumers; // index: consumer name
+
+ static const std::string qpidMessageGroupKey;
+ static const std::string qpidSharedGroup; // if specified, one group can be consumed by multiple receivers
+ static const std::string qpidMessageGroupTimestamp;
+
+ const std::string getGroupId( const QueuedMessage& qm ) const;
+ void unFree( const GroupState& state )
+ {
+ GroupFifo::iterator pos = freeGroups.find( state.members.front() );
+ assert( pos != freeGroups.end() && pos->second == &state );
+ freeGroups.erase( pos );
+ }
+ void own( GroupState& state, const std::string& owner )
+ {
+ state.owner = owner;
+ //consumers[state.owner]++;
+ unFree( state );
+ }
+ void disown( GroupState& state )
+ {
+ //assert(consumers[state.owner]);
+ //consumers[state.owner]--;
+ state.owner.clear();
+ assert(state.members.size());
+ assert(freeGroups.find(state.members.front()) == freeGroups.end());
+ freeGroups[state.members.front()] = &state;
+ }
+
+ public:
+
+ static QPID_BROKER_EXTERN void setDefaults(const std::string& groupId);
+ static boost::shared_ptr<MessageGroupManager> create( const std::string& qName,
+ Messages& messages,
+ const qpid::framing::FieldTable& settings );
+
+ MessageGroupManager(const std::string& header, const std::string& _qName,
+ Messages& container, unsigned int _timestamp=0 )
+ : StatefulQueueObserver(std::string("MessageGroupManager:") + header),
+ groupIdHeader( header ), timestamp(_timestamp), messages(container), qName(_qName) {}
+ void enqueued( const QueuedMessage& qm );
+ void acquired( const QueuedMessage& qm );
+ void requeued( const QueuedMessage& qm );
+ void dequeued( const QueuedMessage& qm );
+ void consumerAdded( const Consumer& );
+ void consumerRemoved( const Consumer& );
+ void getState(qpid::framing::FieldTable& state ) const;
+ void setState(const qpid::framing::FieldTable&);
+
+ // MessageDistributor iface
+ bool nextConsumableMessage(Consumer::shared_ptr& c, QueuedMessage& next);
+ bool allocate(const std::string& c, const QueuedMessage& qm);
+ bool nextBrowsableMessage(Consumer::shared_ptr& c, QueuedMessage& next);
+ void query(qpid::types::Variant::Map&) const;
+
+ bool match(const qpid::types::Variant::Map*, const QueuedMessage&) const;
+};
+
+}}
+
+#endif
diff --git a/cpp/src/qpid/broker/Messages.h b/cpp/src/qpid/broker/Messages.h
index c535fd1936..448f17432a 100644
--- a/cpp/src/qpid/broker/Messages.h
+++ b/cpp/src/qpid/broker/Messages.h
@@ -76,7 +76,6 @@ class Messages
* @return true if there is another message, false otherwise.
*/
virtual bool next(const framing::SequenceNumber&, QueuedMessage&) = 0;
-
/**
* Note: Caller is responsible for ensuring that there is a front
* (e.g. empty() returns false)
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index dd3f982699..3d878d02a8 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -33,6 +33,8 @@
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/QueueFlowLimit.h"
#include "qpid/broker/ThresholdAlerts.h"
+#include "qpid/broker/FifoDistributor.h"
+#include "qpid/broker/MessageGroupManager.h"
#include "qpid/StringUtils.h"
#include "qpid/log/Statement.h"
@@ -42,6 +44,7 @@
#include "qpid/sys/ClusterSafe.h"
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Time.h"
+#include "qpid/types/Variant.h"
#include "qmf/org/apache/qpid/broker/ArgsQueuePurge.h"
#include "qmf/org/apache/qpid/broker/ArgsQueueReroute.h"
@@ -111,7 +114,8 @@ Queue::Queue(const string& _name, bool _autodelete,
broker(b),
deleted(false),
barrier(*this),
- autoDeleteTimeout(0)
+ autoDeleteTimeout(0),
+ allocator(new FifoDistributor( *messages ))
{
if (parent != 0 && broker != 0) {
ManagementAgent* agent = broker->getManagementAgent();
@@ -220,6 +224,7 @@ void Queue::requeue(const QueuedMessage& msg){
enqueue(0, payload);
}
}
+ observeRequeue(msg, locker);
}
copy.notify();
}
@@ -229,7 +234,7 @@ bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& mess
Mutex::ScopedLock locker(messageLock);
assertClusterSafe();
QPID_LOG(debug, "Attempting to acquire message at " << position);
- if (messages->remove(position, message)) {
+ if (acquire(position, message, locker)) {
QPID_LOG(debug, "Acquired message at " << position << " from " << name);
return true;
} else {
@@ -238,9 +243,24 @@ bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& mess
}
}
-bool Queue::acquire(const QueuedMessage& msg) {
- QueuedMessage copy = msg;
- return acquireMessageAt(msg.position, copy);
+bool Queue::acquire(const QueuedMessage& msg, const std::string& consumer)
+{
+ Mutex::ScopedLock locker(messageLock);
+ assertClusterSafe();
+ QPID_LOG(debug, consumer << " attempting to acquire message at " << msg.position);
+
+ if (!allocator->allocate( consumer, msg )) {
+ QPID_LOG(debug, "Not permitted to acquire msg at " << msg.position << " from '" << name);
+ return false;
+ }
+
+ QueuedMessage copy(msg);
+ if (acquire( msg.position, copy, locker)) {
+ QPID_LOG(debug, "Acquired message at " << msg.position << " from " << name);
+ return true;
+ }
+ QPID_LOG(debug, "Could not acquire message at " << msg.position << " from " << name << "; no message at that position");
+ return false;
}
void Queue::notifyListener()
@@ -256,7 +276,7 @@ void Queue::notifyListener()
set.notify();
}
-bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
+bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr& c)
{
checkNotDeleted();
if (c->preAcquires()) {
@@ -274,46 +294,65 @@ bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
}
}
-Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
+Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr& c)
{
while (true) {
Mutex::ScopedLock locker(messageLock);
- if (messages->empty()) {
- QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
+ QueuedMessage msg;
+
+ if (!allocator->nextConsumableMessage(c, msg)) { // no next available
+ QPID_LOG(debug, "No messages available to dispatch to consumer " <<
+ c->getName() << " on queue '" << name << "'");
listeners.addListener(c);
return NO_MESSAGES;
- } else {
- QueuedMessage msg = messages->front();
- if (msg.payload->hasExpired()) {
- QPID_LOG(debug, "Message expired from queue '" << name << "'");
- popAndDequeue();
- continue;
- }
+ }
- if (c->filter(msg.payload)) {
- if (c->accept(msg.payload)) {
- m = msg;
- pop();
- return CONSUMED;
- } else {
- //message(s) are available but consumer hasn't got enough credit
- QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'");
- return CANT_CONSUME;
- }
+ if (msg.payload->hasExpired()) {
+ QPID_LOG(debug, "Message expired from queue '" << name << "'");
+ c->position = msg.position;
+ acquire( msg.position, msg, locker);
+ dequeue( 0, msg );
+ continue;
+ }
+
+ // a message is available for this consumer - can the consumer use it?
+
+ if (c->filter(msg.payload)) {
+ if (c->accept(msg.payload)) {
+ bool ok = allocator->allocate( c->getName(), msg ); // inform allocator
+ (void) ok; assert(ok);
+ ok = acquire( msg.position, msg, locker);
+ (void) ok; assert(ok);
+ m = msg;
+ c->position = m.position;
+ return CONSUMED;
} else {
- //consumer will never want this message
- QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
+ //message(s) are available but consumer hasn't got enough credit
+ QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'");
return CANT_CONSUME;
}
+ } else {
+ //consumer will never want this message
+ QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
+ c->position = msg.position;
+ return CANT_CONSUME;
}
}
}
-
-bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
+bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr& c)
{
- QueuedMessage msg(this);
- while (seek(msg, c)) {
+ while (true) {
+ Mutex::ScopedLock locker(messageLock);
+ QueuedMessage msg;
+
+ if (!allocator->nextBrowsableMessage(c, msg)) { // no next available
+ QPID_LOG(debug, "No browsable messages available for consumer " <<
+ c->getName() << " on queue '" << name << "'");
+ listeners.addListener(c);
+ return false;
+ }
+
if (c->filter(msg.payload) && !msg.payload->hasExpired()) {
if (c->accept(msg.payload)) {
//consumer wants the message
@@ -327,8 +366,8 @@ bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
}
} else {
//consumer will never want this message, continue seeking
- c->position = msg.position;
QPID_LOG(debug, "Browser skipping message from '" << name << "'");
+ c->position = msg.position;
}
}
return false;
@@ -358,61 +397,71 @@ bool Queue::dispatch(Consumer::shared_ptr c)
}
}
-// Find the next message
-bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) {
- Mutex::ScopedLock locker(messageLock);
- if (messages->next(c->position, msg)) {
- return true;
- } else {
- listeners.addListener(c);
- return false;
- }
-}
-
-QueuedMessage Queue::find(SequenceNumber pos) const {
+bool Queue::find(SequenceNumber pos, QueuedMessage& msg) const {
Mutex::ScopedLock locker(messageLock);
- QueuedMessage msg;
- messages->find(pos, msg);
- return msg;
+ if (messages->find(pos, msg))
+ return true;
+ return false;
}
void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){
assertClusterSafe();
- Mutex::ScopedLock locker(consumerLock);
- if(exclusive) {
- throw ResourceLockedException(
- QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed."));
- } else if(requestExclusive) {
- if(consumerCount) {
+ {
+ Mutex::ScopedLock locker(consumerLock);
+ if(exclusive) {
throw ResourceLockedException(
- QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied."));
- } else {
- exclusive = c->getSession();
+ QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed."));
+ } else if(requestExclusive) {
+ if(consumerCount) {
+ throw ResourceLockedException(
+ QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied."));
+ } else {
+ exclusive = c->getSession();
+ }
+ }
+ consumerCount++;
+ if (mgmtObject != 0)
+ mgmtObject->inc_consumerCount ();
+ //reset auto deletion timer if necessary
+ if (autoDeleteTimeout && autoDeleteTask) {
+ autoDeleteTask->cancel();
}
}
- consumerCount++;
- if (mgmtObject != 0)
- mgmtObject->inc_consumerCount ();
- //reset auto deletion timer if necessary
- if (autoDeleteTimeout && autoDeleteTask) {
- autoDeleteTask->cancel();
+ Mutex::ScopedLock locker(messageLock);
+ for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
+ try{
+ (*i)->consumerAdded(*c);
+ } catch (const std::exception& e) {
+ QPID_LOG(warning, "Exception on notification of new consumer for queue " << getName() << ": " << e.what());
+ }
}
}
void Queue::cancel(Consumer::shared_ptr c){
removeListener(c);
- Mutex::ScopedLock locker(consumerLock);
- consumerCount--;
- if(exclusive) exclusive = 0;
- if (mgmtObject != 0)
- mgmtObject->dec_consumerCount ();
+ {
+ Mutex::ScopedLock locker(consumerLock);
+ consumerCount--;
+ if(exclusive) exclusive = 0;
+ if (mgmtObject != 0)
+ mgmtObject->dec_consumerCount ();
+ }
+ Mutex::ScopedLock locker(messageLock);
+ for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
+ try{
+ (*i)->consumerRemoved(*c);
+ } catch (const std::exception& e) {
+ QPID_LOG(warning, "Exception on notification of removed consumer for queue " << getName() << ": " << e.what());
+ }
+ }
}
QueuedMessage Queue::get(){
Mutex::ScopedLock locker(messageLock);
QueuedMessage msg(this);
- messages->pop(msg);
+ if (messages->pop(msg))
+ observeAcquire(msg, locker);
return msg;
}
@@ -443,11 +492,118 @@ void Queue::purgeExpired(qpid::sys::Duration lapse)
Mutex::ScopedLock locker(messageLock);
messages->removeIf(boost::bind(&collect_if_expired, boost::ref(expired), _1));
}
- for_each(expired.begin(), expired.end(),
- boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
+
+ for (std::deque<QueuedMessage>::const_iterator i = expired.begin();
+ i != expired.end(); ++i) {
+ {
+ Mutex::ScopedLock locker(messageLock);
+ observeAcquire(*i, locker);
+ }
+ dequeue( 0, *i );
+ }
}
}
+
+namespace {
+ // for use with purge/move below - collect messages that match a given filter
+ //
+ class MessageFilter
+ {
+ public:
+ static const std::string typeKey;
+ static const std::string paramsKey;
+ static MessageFilter *create( const ::qpid::types::Variant::Map *filter );
+ virtual bool match( const QueuedMessage& ) const { return true; }
+ virtual ~MessageFilter() {}
+ protected:
+ MessageFilter() {};
+ };
+ const std::string MessageFilter::typeKey("filter_type");
+ const std::string MessageFilter::paramsKey("filter_params");
+
+ // filter by message header string value exact match
+ class HeaderMatchFilter : public MessageFilter
+ {
+ public:
+ /* Config:
+ { 'filter_type' : 'header_match_str',
+ 'filter_params' : { 'header_key' : "<header name>",
+ 'header_value' : "<value to match>"
+ }
+ }
+ */
+ static const std::string typeKey;
+ static const std::string headerKey;
+ static const std::string valueKey;
+ HeaderMatchFilter( const std::string& _header, const std::string& _value )
+ : MessageFilter (), header(_header), value(_value) {}
+ bool match( const QueuedMessage& msg ) const
+ {
+ const qpid::framing::FieldTable* headers = msg.payload->getApplicationHeaders();
+ if (!headers) return false;
+ FieldTable::ValuePtr h = headers->get(header);
+ if (!h || !h->convertsTo<std::string>()) return false;
+ return h->get<std::string>() == value;
+ }
+ private:
+ const std::string header;
+ const std::string value;
+ };
+ const std::string HeaderMatchFilter::typeKey("header_match_str");
+ const std::string HeaderMatchFilter::headerKey("header_key");
+ const std::string HeaderMatchFilter::valueKey("header_value");
+
+ // factory to create correct filter based on map
+ MessageFilter* MessageFilter::create( const ::qpid::types::Variant::Map *filter )
+ {
+ using namespace qpid::types;
+ if (filter && !filter->empty()) {
+ Variant::Map::const_iterator i = filter->find(MessageFilter::typeKey);
+ if (i != filter->end()) {
+
+ if (i->second.asString() == HeaderMatchFilter::typeKey) {
+ Variant::Map::const_iterator p = filter->find(MessageFilter::paramsKey);
+ if (p != filter->end() && p->second.getType() == VAR_MAP) {
+ Variant::Map::const_iterator k = p->second.asMap().find(HeaderMatchFilter::headerKey);
+ Variant::Map::const_iterator v = p->second.asMap().find(HeaderMatchFilter::valueKey);
+ if (k != p->second.asMap().end() && v != p->second.asMap().end()) {
+ std::string headerKey(k->second.asString());
+ std::string value(v->second.asString());
+ QPID_LOG(debug, "Message filtering by header value configured. key: " << headerKey << " value: " << value );
+ return new HeaderMatchFilter( headerKey, value );
+ }
+ }
+ }
+ }
+ QPID_LOG(error, "Ignoring unrecognized message filter: '" << *filter << "'");
+ }
+ return new MessageFilter();
+ }
+
+ // used by removeIf() to collect all messages matching a filter, maximum match count is
+ // optional.
+ struct Collector {
+ const uint32_t maxMatches;
+ MessageFilter& filter;
+ std::deque<QueuedMessage> matches;
+ Collector(MessageFilter& filter, uint32_t max)
+ : maxMatches(max), filter(filter) {}
+ bool operator() (QueuedMessage& qm)
+ {
+ if (maxMatches == 0 || matches.size() < maxMatches) {
+ if (filter.match( qm )) {
+ matches.push_back(qm);
+ return true;
+ }
+ }
+ return false;
+ }
+ };
+
+} // end namespace
+
+
/**
* purge - for purging all or some messages on a queue
* depending on the purge_request
@@ -459,62 +615,77 @@ void Queue::purgeExpired(qpid::sys::Duration lapse)
* The dest exchange may be supplied to re-route messages through the exchange.
* It is safe to re-route messages such that they arrive back on the same queue,
* even if the queue is ordered by priority.
+ *
+ * An optional filter can be supplied that will be applied against each message. The
+ * message is purged only if the filter matches. See MessageDistributor for more detail.
*/
-uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> dest)
+uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> dest,
+ const qpid::types::Variant::Map *filter)
{
- Mutex::ScopedLock locker(messageLock);
- uint32_t purge_count = purge_request; // only comes into play if >0
- std::deque<DeliverableMessage> rerouteQueue;
+ std::auto_ptr<MessageFilter> mf(MessageFilter::create(filter));
+ Collector c(*mf.get(), purge_request);
- uint32_t count = 0;
- // Either purge them all or just the some (purge_count) while the queue isn't empty.
- while((!purge_request || purge_count--) && !messages->empty()) {
+ Mutex::ScopedLock locker(messageLock);
+ messages->removeIf( boost::bind<bool>(boost::ref(c), _1) );
+ for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin();
+ qmsg != c.matches.end(); ++qmsg) {
+ // Update observers and message state:
+ observeAcquire(*qmsg, locker);
+ dequeue(0, *qmsg);
+ // now reroute if necessary
if (dest.get()) {
- //
- // If there is a destination exchange, stage the messages onto a reroute queue
- // so they don't wind up getting purged more than once.
- //
- DeliverableMessage msg(messages->front().payload);
- rerouteQueue.push_back(msg);
+ assert(qmsg->payload);
+ DeliverableMessage dmsg(qmsg->payload);
+ dest->routeWithAlternate(dmsg);
}
- popAndDequeue();
- count++;
- }
-
- //
- // Re-route purged messages into the destination exchange. Note that there's no need
- // to test dest.get() here because if it is NULL, the rerouteQueue will be empty.
- //
- while (!rerouteQueue.empty()) {
- DeliverableMessage msg(rerouteQueue.front());
- rerouteQueue.pop_front();
- dest->routeWithAlternate(msg);
}
-
- return count;
+ return c.matches.size();
}
-uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty) {
+uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty,
+ const qpid::types::Variant::Map *filter)
+{
+ std::auto_ptr<MessageFilter> mf(MessageFilter::create(filter));
+ Collector c(*mf.get(), qty);
+
Mutex::ScopedLock locker(messageLock);
- uint32_t move_count = qty; // only comes into play if qty >0
- uint32_t count = 0; // count how many were moved for returning
+ messages->removeIf( boost::bind<bool>(boost::ref(c), _1) );
- while((!qty || move_count--) && !messages->empty()) {
- QueuedMessage qmsg = messages->front();
- boost::intrusive_ptr<Message> msg = qmsg.payload;
- destq->deliver(msg); // deliver message to the destination queue
- pop();
- dequeue(0, qmsg);
- count++;
+ for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin();
+ qmsg != c.matches.end(); ++qmsg) {
+ // Update observers and message state:
+ observeAcquire(*qmsg, locker);
+ dequeue(0, *qmsg);
+ // and move to destination Queue.
+ assert(qmsg->payload);
+ destq->deliver(qmsg->payload);
}
- return count;
+ return c.matches.size();
}
-void Queue::pop()
+/** Acquire the front (oldest) message from the in-memory queue.
+ * assumes messageLock held by caller
+ */
+void Queue::pop(const Mutex::ScopedLock& locker)
{
assertClusterSafe();
- messages->pop();
- ++dequeueSincePurge;
+ QueuedMessage msg;
+ if (messages->pop(msg)) {
+ observeAcquire(msg, locker);
+ ++dequeueSincePurge;
+ }
+}
+
+/** Acquire the message at the given position, return true and msg if acquire succeeds */
+bool Queue::acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg,
+ const Mutex::ScopedLock& locker)
+{
+ if (messages->remove(position, msg)) {
+ observeAcquire(msg, locker);
+ ++dequeueSincePurge;
+ return true;
+ }
+ return false;
}
void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
@@ -528,8 +699,10 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
if (insertSeqNo) msg->insertCustomProperty(seqNoKey, sequence);
dequeueRequired = messages->push(qm, removed);
+ if (dequeueRequired)
+ observeAcquire(removed, locker);
listeners.populate(copy);
- enqueued(qm);
+ observeEnqueue(qm, locker);
}
copy.notify();
if (dequeueRequired) {
@@ -663,7 +836,7 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
Mutex::ScopedLock locker(messageLock);
if (!isEnqueued(msg)) return false;
if (!ctxt) {
- dequeued(msg);
+ observeDequeue(msg, locker);
}
}
// This check prevents messages which have been forced persistent on one queue from dequeuing
@@ -683,7 +856,7 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
void Queue::dequeueCommitted(const QueuedMessage& msg)
{
Mutex::ScopedLock locker(messageLock);
- dequeued(msg);
+ observeDequeue(msg, locker);
if (mgmtObject != 0) {
mgmtObject->inc_msgTxnDequeues();
mgmtObject->inc_byteTxnDequeues(msg.payload->contentSize());
@@ -691,21 +864,23 @@ void Queue::dequeueCommitted(const QueuedMessage& msg)
}
/**
- * Removes a message from the in-memory delivery queue as well
- * dequeing it from the logical (and persistent if applicable) queue
+ * Removes the first (oldest) message from the in-memory delivery queue as well dequeing
+ * it from the logical (and persistent if applicable) queue
*/
-void Queue::popAndDequeue()
+void Queue::popAndDequeue(const Mutex::ScopedLock& held)
{
- QueuedMessage msg = messages->front();
- pop();
- dequeue(0, msg);
+ if (!messages->empty()) {
+ QueuedMessage msg = messages->front();
+ pop(held);
+ dequeue(0, msg);
+ }
}
/**
* Updates policy and management when a message has been dequeued,
* expects messageLock to be held
*/
-void Queue::dequeued(const QueuedMessage& msg)
+void Queue::observeDequeue(const QueuedMessage& msg, const Mutex::ScopedLock&)
{
if (policy.get()) policy->dequeued(msg);
mgntDeqStats(msg.payload);
@@ -718,6 +893,33 @@ void Queue::dequeued(const QueuedMessage& msg)
}
}
+/** updates queue observers when a message has become unavailable for transfer,
+ * expects messageLock to be held
+ */
+void Queue::observeAcquire(const QueuedMessage& msg, const Mutex::ScopedLock&)
+{
+ for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
+ try{
+ (*i)->acquired(msg);
+ } catch (const std::exception& e) {
+ QPID_LOG(warning, "Exception on notification of message removal for queue " << getName() << ": " << e.what());
+ }
+ }
+}
+
+/** updates queue observers when a message has become re-available for transfer,
+ * expects messageLock to be held
+ */
+void Queue::observeRequeue(const QueuedMessage& msg, const Mutex::ScopedLock&)
+{
+ for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
+ try{
+ (*i)->requeued(msg);
+ } catch (const std::exception& e) {
+ QPID_LOG(warning, "Exception on notification of message requeue for queue " << getName() << ": " << e.what());
+ }
+ }
+}
void Queue::create(const FieldTable& _settings)
{
@@ -788,17 +990,28 @@ void Queue::configureImpl(const FieldTable& _settings)
if (lvqKey.size()) {
QPID_LOG(debug, "Configured queue " << getName() << " as Last Value Queue with key " << lvqKey);
messages = std::auto_ptr<Messages>(new MessageMap(lvqKey));
+ allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *messages ));
} else if (_settings.get(qpidLastValueQueueNoBrowse)) {
QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue with 'no-browse' on");
messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, true, broker);
+ allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *messages ));
} else if (_settings.get(qpidLastValueQueue)) {
QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue");
messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, false, broker);
+ allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *messages ));
} else {
std::auto_ptr<Messages> m = Fairshare::create(_settings);
if (m.get()) {
messages = m;
+ allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *messages ));
QPID_LOG(debug, "Configured queue " << getName() << " as priority queue.");
+ } else { // default (FIFO) queue type
+ // override default message allocator if message groups configured.
+ boost::shared_ptr<MessageGroupManager> mgm(MessageGroupManager::create( getName(), *messages, _settings));
+ if (mgm) {
+ allocator = mgm;
+ addObserver(mgm);
+ }
}
}
@@ -835,7 +1048,7 @@ void Queue::destroyed()
while(!messages->empty()){
DeliverableMessage msg(messages->front().payload);
alternateExchange->routeWithAlternate(msg);
- popAndDequeue();
+ popAndDequeue(locker);
}
alternateExchange->decAlternateUsers();
}
@@ -1057,7 +1270,7 @@ Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, str
case _qmf::Queue::METHOD_PURGE :
{
_qmf::ArgsQueuePurge& purgeArgs = (_qmf::ArgsQueuePurge&) args;
- purge(purgeArgs.i_request);
+ purge(purgeArgs.i_request, boost::shared_ptr<Exchange>(), &purgeArgs.i_filter);
status = Manageable::STATUS_OK;
}
break;
@@ -1078,7 +1291,7 @@ Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, str
}
}
- purge(rerouteArgs.i_request, dest);
+ purge(rerouteArgs.i_request, dest, &rerouteArgs.i_filter);
status = Manageable::STATUS_OK;
}
break;
@@ -1087,6 +1300,14 @@ Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, str
return status;
}
+
+void Queue::query(qpid::types::Variant::Map& results) const
+{
+ Mutex::ScopedLock locker(messageLock);
+ /** @todo add any interesting queue state into results */
+ if (allocator) allocator->query(results);
+}
+
void Queue::setPosition(SequenceNumber n) {
Mutex::ScopedLock locker(messageLock);
sequence = n;
@@ -1121,7 +1342,10 @@ void Queue::insertSequenceNumbers(const std::string& key)
QPID_LOG(debug, "Inserting sequence numbers as " << key);
}
-void Queue::enqueued(const QueuedMessage& m)
+/** updates queue observers and state when a message has become available for transfer,
+ * expects messageLock to be held
+ */
+void Queue::observeEnqueue(const QueuedMessage& m, const Mutex::ScopedLock&)
{
for (Observers::iterator i = observers.begin(); i != observers.end(); ++i) {
try {
@@ -1144,7 +1368,8 @@ void Queue::updateEnqueued(const QueuedMessage& m)
if (policy.get()) {
policy->recoverEnqueued(payload);
}
- enqueued(m);
+ Mutex::ScopedLock locker(messageLock);
+ observeEnqueue(m, locker);
} else {
QPID_LOG(warning, "Queue informed of enqueued message that has no payload");
}
@@ -1168,6 +1393,7 @@ void Queue::checkNotDeleted()
void Queue::addObserver(boost::shared_ptr<QueueObserver> observer)
{
+ Mutex::ScopedLock locker(messageLock);
observers.insert(observer);
}
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index 8435e75cab..59ae41e768 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -59,7 +59,7 @@ class MessageStore;
class QueueEvents;
class QueueRegistry;
class TransactionContext;
-class Exchange;
+class MessageDistributor;
/**
* The brokers representation of an amqp queue. Messages are
@@ -129,23 +129,32 @@ class Queue : public boost::enable_shared_from_this<Queue>,
UsageBarrier barrier;
int autoDeleteTimeout;
boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask;
+ boost::shared_ptr<MessageDistributor> allocator;
void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false);
void setPolicy(std::auto_ptr<QueuePolicy> policy);
- bool seek(QueuedMessage& msg, Consumer::shared_ptr position);
- bool getNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
- ConsumeCode consumeNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
- bool browseNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
+ bool getNextMessage(QueuedMessage& msg, Consumer::shared_ptr& c);
+ ConsumeCode consumeNextMessage(QueuedMessage& msg, Consumer::shared_ptr& c);
+ bool browseNextMessage(QueuedMessage& msg, Consumer::shared_ptr& c);
void notifyListener();
void removeListener(Consumer::shared_ptr);
bool isExcluded(boost::intrusive_ptr<Message>& msg);
- void enqueued(const QueuedMessage& msg);
- void dequeued(const QueuedMessage& msg);
- void pop();
- void popAndDequeue();
+ /** update queue observers, stats, policy, etc when the messages' state changes. Lock
+ * must be held by caller */
+ void observeEnqueue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
+ void observeAcquire(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
+ void observeRequeue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
+ void observeDequeue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
+
+ /** modify the Queue's message container - assumes messageLock held */
+ void pop(const sys::Mutex::ScopedLock& held); // acquire front msg
+ void popAndDequeue(const sys::Mutex::ScopedLock& held); // acquire and dequeue front msg
+ // acquire message @ position, return true and set msg if acquire succeeds
+ bool acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg,
+ const sys::Mutex::ScopedLock& held);
void forcePersistent(QueuedMessage& msg);
int getEventMode();
@@ -191,8 +200,15 @@ class Queue : public boost::enable_shared_from_this<Queue>,
Broker* broker = 0);
QPID_BROKER_EXTERN ~Queue();
+ /** allow the Consumer to consume or browse the next available message */
QPID_BROKER_EXTERN bool dispatch(Consumer::shared_ptr);
+ /** allow the Consumer to acquire a message that it has browsed.
+ * @param msg - message to be acquired.
+ * @return false if message is no longer available for acquire.
+ */
+ QPID_BROKER_EXTERN bool acquire(const QueuedMessage& msg, const std::string& consumer);
+
/**
* Used to configure a new queue and create a persistent record
* for it in store if required.
@@ -216,7 +232,11 @@ class Queue : public boost::enable_shared_from_this<Queue>,
bool bind(boost::shared_ptr<Exchange> exchange, const std::string& key,
const qpid::framing::FieldTable& arguments=qpid::framing::FieldTable());
- QPID_BROKER_EXTERN bool acquire(const QueuedMessage& msg);
+ /** Acquire the message at the given position if it is available for acquire. Not to
+ * be used by clients, but used by the broker for queue management.
+ * @param message - set to the acquired message if true returned.
+ * @return true if the message has been acquired.
+ */
QPID_BROKER_EXTERN bool acquireMessageAt(const qpid::framing::SequenceNumber& position, QueuedMessage& message);
/**
@@ -245,11 +265,14 @@ class Queue : public boost::enable_shared_from_this<Queue>,
bool exclusive = false);
QPID_BROKER_EXTERN void cancel(Consumer::shared_ptr c);
- uint32_t purge(const uint32_t purge_request=0, boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>()); //defaults to all messages
+ uint32_t purge(const uint32_t purge_request=0, //defaults to all messages
+ boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>(),
+ const ::qpid::types::Variant::Map *filter=0);
QPID_BROKER_EXTERN void purgeExpired(sys::Duration);
//move qty # of messages to destination Queue destq
- uint32_t move(const Queue::shared_ptr destq, uint32_t qty);
+ uint32_t move(const Queue::shared_ptr destq, uint32_t qty,
+ const qpid::types::Variant::Map *filter=0);
QPID_BROKER_EXTERN uint32_t getMessageCount() const;
QPID_BROKER_EXTERN uint32_t getEnqueueCompleteMessageCount() const;
@@ -302,12 +325,12 @@ class Queue : public boost::enable_shared_from_this<Queue>,
bool isEnqueued(const QueuedMessage& msg);
/**
- * Gets the next available message
+ * Acquires the next available (oldest) message
*/
QPID_BROKER_EXTERN QueuedMessage get();
- /** Get the message at position pos */
- QPID_BROKER_EXTERN QueuedMessage find(framing::SequenceNumber pos) const;
+ /** Get the message at position pos, returns true if found and sets msg */
+ QPID_BROKER_EXTERN bool find(framing::SequenceNumber pos, QueuedMessage& msg ) const;
const QueuePolicy* getPolicy();
@@ -336,6 +359,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
management::ManagementObject* GetManagementObject (void) const;
management::Manageable::status_t
ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
+ void query(::qpid::types::Variant::Map&) const;
/** Apply f to each Message on the queue. */
template <class F> void eachMessage(F f) {
diff --git a/cpp/src/qpid/broker/QueueEvents.cpp b/cpp/src/qpid/broker/QueueEvents.cpp
index 2c540ff1ad..c66bdabf0f 100644
--- a/cpp/src/qpid/broker/QueueEvents.cpp
+++ b/cpp/src/qpid/broker/QueueEvents.cpp
@@ -129,6 +129,10 @@ class EventGenerator : public QueueObserver
{
if (!enqueueOnly) manager.dequeued(m);
}
+
+ void acquired(const QueuedMessage&) {};
+ void requeued(const QueuedMessage&) {};
+
private:
QueueEvents& manager;
const bool enqueueOnly;
diff --git a/cpp/src/qpid/broker/QueueFlowLimit.cpp b/cpp/src/qpid/broker/QueueFlowLimit.cpp
index fcf8d089f9..f15bb45c01 100644
--- a/cpp/src/qpid/broker/QueueFlowLimit.cpp
+++ b/cpp/src/qpid/broker/QueueFlowLimit.cpp
@@ -377,7 +377,8 @@ void QueueFlowLimit::setState(const qpid::framing::FieldTable& state)
++i;
fcmsg.add(first, last);
for (SequenceNumber seq = first; seq <= last; ++seq) {
- QueuedMessage msg(queue->find(seq)); // fyi: msg.payload may be null if msg is delivered & unacked
+ QueuedMessage msg;
+ queue->find(seq, msg); // fyi: may not be found if msg is acquired & unacked
bool unique;
unique = index.insert(std::pair<framing::SequenceNumber, boost::intrusive_ptr<Message> >(seq, msg.payload)).second;
// Like this to avoid tripping up unused variable warning when NDEBUG set
diff --git a/cpp/src/qpid/broker/QueueFlowLimit.h b/cpp/src/qpid/broker/QueueFlowLimit.h
index c02e479976..ad8a2720ef 100644
--- a/cpp/src/qpid/broker/QueueFlowLimit.h
+++ b/cpp/src/qpid/broker/QueueFlowLimit.h
@@ -84,6 +84,9 @@ class Broker;
QPID_BROKER_EXTERN void enqueued(const QueuedMessage&);
/** the queue has removed QueuedMessage. Returns true if flow state changes */
QPID_BROKER_EXTERN void dequeued(const QueuedMessage&);
+ /** ignored */
+ QPID_BROKER_EXTERN void acquired(const QueuedMessage&) {};
+ QPID_BROKER_EXTERN void requeued(const QueuedMessage&) {};
/** for clustering: */
QPID_BROKER_EXTERN void getState(qpid::framing::FieldTable&) const;
diff --git a/cpp/src/qpid/broker/QueueObserver.h b/cpp/src/qpid/broker/QueueObserver.h
index 3ca01c051e..b58becd2ae 100644
--- a/cpp/src/qpid/broker/QueueObserver.h
+++ b/cpp/src/qpid/broker/QueueObserver.h
@@ -25,17 +25,51 @@ namespace qpid {
namespace broker {
struct QueuedMessage;
+class Consumer;
+
/**
- * Interface for notifying classes who want to act as 'observers' of a
- * queue of particular events.
+ * Interface for notifying classes who want to act as 'observers' of a queue of particular
+ * events.
+ *
+ * The events that are monitored reflect the relationship between a particular message and
+ * the queue it has been delivered to. A message can be considered in one of three states
+ * with respect to the queue:
+ *
+ * 1) "Available" - available for transfer to consumers (i.e. for browse or acquire),
+ *
+ * 2) "Acquired" - owned by a particular consumer, no longer available to other consumers
+ * (by either browse or acquire), but still considered on the queue.
+ *
+ * 3) "Dequeued" - removed from the queue and no longer available to any consumer.
+ *
+ * The queue events that are observable are:
+ *
+ * "Enqueued" - the message is "Available" - on the queue for transfer to any consumer
+ * (e.g. browse or acquire)
+ *
+ * "Acquired" - - a consumer has claimed exclusive access to it. It is no longer available
+ * for other consumers to browse or acquire, but it is not yet considered dequeued as it
+ * may be requeued by the consumer.
+ *
+ * "Requeued" - a previously-acquired message is released by its owner: it is put back on
+ * the queue at its original position and returns to the "Available" state.
+ *
+ * "Dequeued" - a message is no longer queued. At this point, the queue no longer tracks
+ * the message, and the broker considers the consumer's transaction complete.
*/
class QueueObserver
{
public:
virtual ~QueueObserver() {}
+
+ // note: the Queue will hold the messageLock while calling these methods!
virtual void enqueued(const QueuedMessage&) = 0;
virtual void dequeued(const QueuedMessage&) = 0;
- private:
+ virtual void acquired(const QueuedMessage&) = 0;
+ virtual void requeued(const QueuedMessage&) = 0;
+ virtual void consumerAdded( const Consumer& ) {};
+ virtual void consumerRemoved( const Consumer& ) {};
+ private:
};
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/QueuePolicy.cpp b/cpp/src/qpid/broker/QueuePolicy.cpp
index 6ae0d53b1a..0c245700af 100644
--- a/cpp/src/qpid/broker/QueuePolicy.cpp
+++ b/cpp/src/qpid/broker/QueuePolicy.cpp
@@ -269,8 +269,7 @@ bool RingQueuePolicy::checkLimit(boost::intrusive_ptr<Message> m)
do {
QueuedMessage oldest = queue.front();
-
- if (oldest.queue->acquire(oldest) || !strict) {
+ if (oldest.queue->acquireMessageAt(oldest.position, oldest) || !strict) {
queue.pop_front();
pendingDequeues.push_back(oldest);
QPID_LOG(debug, "Ring policy triggered in " << name
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index b4f146e699..94d0cc87f7 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -107,11 +107,18 @@ bool SemanticState::exists(const string& consumerTag){
return consumers.find(consumerTag) != consumers.end();
}
+namespace {
+ const std::string SEPARATOR("::");
+}
+
void SemanticState::consume(const string& tag,
Queue::shared_ptr queue, bool ackRequired, bool acquire,
bool exclusive, const string& resumeId, uint64_t resumeTtl, const FieldTable& arguments)
{
- ConsumerImpl::shared_ptr c(new ConsumerImpl(this, tag, queue, ackRequired, acquire, exclusive, resumeId, resumeTtl, arguments));
+ // "tag" is only guaranteed to be unique to this session (see AMQP 0-10 Message.subscribe, destination).
+ // Create a globally unique name so the broker can identify individual consumers
+ std::string name = session.getSessionId().str() + SEPARATOR + tag;
+ ConsumerImpl::shared_ptr c(new ConsumerImpl(this, name, queue, ackRequired, acquire, exclusive, tag, resumeId, resumeTtl, arguments));
queue->consume(c, exclusive);//may throw exception
consumers[tag] = c;
}
@@ -267,15 +274,15 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
bool ack,
bool _acquire,
bool _exclusive,
+ const string& _tag,
const string& _resumeId,
uint64_t _resumeTtl,
const framing::FieldTable& _arguments
) :
- Consumer(_acquire),
+ Consumer(_name, _acquire),
parent(_parent),
- name(_name),
queue(_queue),
ackExpected(ack),
acquire(_acquire),
@@ -284,6 +291,7 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
windowActive(false),
exclusive(_exclusive),
resumeId(_resumeId),
+ tag(_tag),
resumeTtl(_resumeTtl),
arguments(_arguments),
msgCredit(0),
@@ -300,7 +308,7 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
if (agent != 0)
{
- mgmtObject = new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId() ,name,
+ mgmtObject = new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId(), getTag(),
!acquire, ackExpected, exclusive, ManagementAgent::toMap(arguments));
agent->addObject (mgmtObject);
mgmtObject->set_creditMode("WINDOW");
@@ -332,16 +340,15 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
{
assertClusterSafe();
allocateCredit(msg.payload);
- DeliveryRecord record(msg, queue, name, acquire, !ackExpected, windowing);
+ DeliveryRecord record(msg, queue, getTag(), acquire, !ackExpected, windowing);
bool sync = syncFrequency && ++deliveryCount >= syncFrequency;
if (sync) deliveryCount = 0;//reset
parent->deliver(record, sync);
- if (!ackExpected && acquire) record.setEnded();//allows message to be released now its been delivered
if (windowing || ackExpected || !acquire) {
parent->record(record);
}
- if (acquire && !ackExpected) {
- queue->dequeue(0, msg);
+ if (acquire && !ackExpected) { // auto acquire && auto accept
+ record.accept( 0 /*no ctxt*/ );
}
if (mgmtObject) { mgmtObject->inc_delivered(); }
return true;
@@ -371,7 +378,7 @@ struct ConsumerName {
};
ostream& operator<<(ostream& o, const ConsumerName& pc) {
- return o << pc.consumer.getName() << " on "
+ return o << pc.consumer.getTag() << " on "
<< pc.consumer.getParent().getSession().getSessionId();
}
}
@@ -561,50 +568,61 @@ void SemanticState::deliver(DeliveryRecord& msg, bool sync)
return deliveryAdapter.deliver(msg, sync);
}
-SemanticState::ConsumerImpl& SemanticState::find(const std::string& destination)
+const SemanticState::ConsumerImpl::shared_ptr SemanticState::find(const std::string& destination) const
{
- ConsumerImplMap::iterator i = consumers.find(destination);
- if (i == consumers.end()) {
- throw NotFoundException(QPID_MSG("Unknown destination " << destination));
+ ConsumerImpl::shared_ptr consumer;
+ if (!find(destination, consumer)) {
+ throw NotFoundException(QPID_MSG("Unknown destination " << destination << " session=" << session.getSessionId()));
} else {
- return *(i->second);
+ return consumer;
+ }
+}
+
+bool SemanticState::find(const std::string& destination, ConsumerImpl::shared_ptr& consumer) const
+{
+ // @todo KAG gsim: shouldn't the consumers map be locked????
+ ConsumerImplMap::const_iterator i = consumers.find(destination);
+ if (i == consumers.end()) {
+ return false;
}
+ consumer = i->second;
+ return true;
}
void SemanticState::setWindowMode(const std::string& destination)
{
- find(destination).setWindowMode();
+ find(destination)->setWindowMode();
}
void SemanticState::setCreditMode(const std::string& destination)
{
- find(destination).setCreditMode();
+ find(destination)->setCreditMode();
}
void SemanticState::addByteCredit(const std::string& destination, uint32_t value)
{
- ConsumerImpl& c = find(destination);
- c.addByteCredit(value);
- c.requestDispatch();
+ ConsumerImpl::shared_ptr c = find(destination);
+ c->addByteCredit(value);
+ c->requestDispatch();
}
void SemanticState::addMessageCredit(const std::string& destination, uint32_t value)
{
- ConsumerImpl& c = find(destination);
- c.addMessageCredit(value);
- c.requestDispatch();
+ ConsumerImpl::shared_ptr c = find(destination);
+ c->addMessageCredit(value);
+ c->requestDispatch();
}
void SemanticState::flush(const std::string& destination)
{
- find(destination).flush();
+ find(destination)->flush();
}
void SemanticState::stop(const std::string& destination)
{
- find(destination).stop();
+ find(destination)->stop();
}
void SemanticState::ConsumerImpl::setWindowMode()
diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h
index 69d980947b..12ccc75f11 100644
--- a/cpp/src/qpid/broker/SemanticState.h
+++ b/cpp/src/qpid/broker/SemanticState.h
@@ -75,7 +75,6 @@ class SemanticState : private boost::noncopyable {
{
mutable qpid::sys::Mutex lock;
SemanticState* const parent;
- const std::string name;
const boost::shared_ptr<Queue> queue;
const bool ackExpected;
const bool acquire;
@@ -84,6 +83,7 @@ class SemanticState : private boost::noncopyable {
bool windowActive;
bool exclusive;
std::string resumeId;
+ const std::string tag; // <destination> from AMQP 0-10 Message.subscribe command
uint64_t resumeTtl;
framing::FieldTable arguments;
uint32_t msgCredit;
@@ -103,7 +103,8 @@ class SemanticState : private boost::noncopyable {
ConsumerImpl(SemanticState* parent,
const std::string& name, boost::shared_ptr<Queue> queue,
bool ack, bool acquire, bool exclusive,
- const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments);
+ const std::string& tag, const std::string& resumeId,
+ uint64_t resumeTtl, const framing::FieldTable& arguments);
~ConsumerImpl();
OwnershipToken* getSession();
bool deliver(QueuedMessage& msg);
@@ -130,8 +131,6 @@ class SemanticState : private boost::noncopyable {
bool doOutput();
- std::string getName() const { return name; }
-
bool isAckExpected() const { return ackExpected; }
bool isAcquire() const { return acquire; }
bool isWindowing() const { return windowing; }
@@ -139,6 +138,7 @@ class SemanticState : private boost::noncopyable {
uint32_t getMsgCredit() const { return msgCredit; }
uint32_t getByteCredit() const { return byteCredit; }
std::string getResumeId() const { return resumeId; };
+ const std::string& getTag() const { return tag; }
uint64_t getResumeTtl() const { return resumeTtl; }
const framing::FieldTable& getArguments() const { return arguments; }
@@ -190,7 +190,8 @@ class SemanticState : private boost::noncopyable {
SessionContext& getSession() { return session; }
const SessionContext& getSession() const { return session; }
- ConsumerImpl& find(const std::string& destination);
+ const ConsumerImpl::shared_ptr find(const std::string& destination) const;
+ bool find(const std::string& destination, ConsumerImpl::shared_ptr&) const;
/**
* Get named queue, never returns 0.
diff --git a/cpp/src/qpid/broker/ThresholdAlerts.h b/cpp/src/qpid/broker/ThresholdAlerts.h
index c77722e700..2b4a46b736 100644
--- a/cpp/src/qpid/broker/ThresholdAlerts.h
+++ b/cpp/src/qpid/broker/ThresholdAlerts.h
@@ -50,6 +50,9 @@ class ThresholdAlerts : public QueueObserver
const long repeatInterval);
void enqueued(const QueuedMessage&);
void dequeued(const QueuedMessage&);
+ void acquired(const QueuedMessage&) {};
+ void requeued(const QueuedMessage&) {};
+
static void observe(Queue& queue, qpid::management::ManagementAgent& agent,
const uint64_t countThreshold,
const uint64_t sizeThreshold,
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 015301573e..394749aad2 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -406,11 +406,11 @@ void Connection::shadowSetUser(const std::string& userId) {
void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled, const SequenceNumber& position)
{
- broker::SemanticState::ConsumerImpl& c = semanticState().find(name);
- c.position = position;
- c.setBlocked(blocked);
- if (notifyEnabled) c.enableNotify(); else c.disableNotify();
- updateIn.consumerNumbering.add(c.shared_from_this());
+ broker::SemanticState::ConsumerImpl::shared_ptr c = semanticState().find(name);
+ c->position = position;
+ c->setBlocked(blocked);
+ if (notifyEnabled) c->enableNotify(); else c->disableNotify();
+ updateIn.consumerNumbering.add(c);
}
@@ -444,7 +444,7 @@ void Connection::outputTask(uint16_t channel, const std::string& name) {
if (!session)
throw Exception(QPID_MSG(cluster << " channel not attached " << *this
<< "[" << channel << "] "));
- OutputTask* task = &session->getSemanticState().find(name);
+ OutputTask* task = session->getSemanticState().find(name).get();
connection->getOutputTasks().addOutputTask(task);
}
@@ -547,7 +547,7 @@ void Connection::deliveryRecord(const string& qname,
m.position = position;
if (enqueued) queue->updateEnqueued(m); //inform queue of the message
} else { // Message at original position in original queue
- m = queue->find(position);
+ queue->find(position, m);
}
// FIXME aconway 2011-08-19: removed:
// if (!m.payload)
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp
index 2be1bf1f77..2446c12f2b 100644
--- a/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -421,8 +421,8 @@ void UpdateClient::updateOutputTask(const sys::OutputTask* task) {
boost::polymorphic_downcast<const SemanticState::ConsumerImpl*> (task);
SemanticState::ConsumerImpl* ci = const_cast<SemanticState::ConsumerImpl*>(cci);
uint16_t channel = ci->getParent().getSession().getChannel();
- ClusterConnectionProxy(shadowConnection).outputTask(channel, ci->getName());
- QPID_LOG(debug, *this << " updating output task " << ci->getName()
+ ClusterConnectionProxy(shadowConnection).outputTask(channel, ci->getTag());
+ QPID_LOG(debug, *this << " updating output task " << ci->getTag()
<< " channel=" << channel);
}
@@ -521,13 +521,13 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) {
void UpdateClient::updateConsumer(
const broker::SemanticState::ConsumerImpl::shared_ptr& ci)
{
- QPID_LOG(debug, *this << " updating consumer " << ci->getName() << " on "
+ QPID_LOG(debug, *this << " updating consumer " << ci->getTag() << " on "
<< shadowSession.getId());
using namespace message;
shadowSession.messageSubscribe(
arg::queue = ci->getQueue()->getName(),
- arg::destination = ci->getName(),
+ arg::destination = ci->getTag(),
arg::acceptMode = ci->isAckExpected() ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE,
arg::acquireMode = ci->isAcquire() ? ACQUIRE_MODE_PRE_ACQUIRED : ACQUIRE_MODE_NOT_ACQUIRED,
arg::exclusive = ci->isExclusive(),
@@ -535,18 +535,18 @@ void UpdateClient::updateConsumer(
arg::resumeTtl = ci->getResumeTtl(),
arg::arguments = ci->getArguments()
);
- shadowSession.messageSetFlowMode(ci->getName(), ci->isWindowing() ? FLOW_MODE_WINDOW : FLOW_MODE_CREDIT);
- shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_MESSAGE, ci->getMsgCredit());
- shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_BYTE, ci->getByteCredit());
+ shadowSession.messageSetFlowMode(ci->getTag(), ci->isWindowing() ? FLOW_MODE_WINDOW : FLOW_MODE_CREDIT);
+ shadowSession.messageFlow(ci->getTag(), CREDIT_UNIT_MESSAGE, ci->getMsgCredit());
+ shadowSession.messageFlow(ci->getTag(), CREDIT_UNIT_BYTE, ci->getByteCredit());
ClusterConnectionProxy(shadowSession).consumerState(
- ci->getName(),
+ ci->getTag(),
ci->isBlocked(),
ci->isNotifyEnabled(),
ci->position
);
consumerNumbering.add(ci.get());
- QPID_LOG(debug, *this << " updated consumer " << ci->getName()
+ QPID_LOG(debug, *this << " updated consumer " << ci->getTag()
<< " on " << shadowSession.getId());
}
diff --git a/cpp/src/tests/CMakeLists.txt b/cpp/src/tests/CMakeLists.txt
index cc33478114..7d781e5eb3 100644
--- a/cpp/src/tests/CMakeLists.txt
+++ b/cpp/src/tests/CMakeLists.txt
@@ -272,6 +272,11 @@ add_executable (datagen datagen.cpp ${platform_test_additions})
target_link_libraries (datagen qpidclient)
remember_location(datagen)
+add_executable (msg_group_test msg_group_test.cpp ${platform_test_additions})
+target_link_libraries (msg_group_test qpidmessaging)
+remember_location(msg_group_test)
+
+
# qpid-perftest and qpid-latency-test are generally useful so install them
install (TARGETS qpid-perftest qpid-latency-test RUNTIME
DESTINATION ${QPID_INSTALL_BINDIR})
diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am
index cd569e901c..78ac6db5f1 100644
--- a/cpp/src/tests/Makefile.am
+++ b/cpp/src/tests/Makefile.am
@@ -291,13 +291,19 @@ qpid_stream_INCLUDES=$(PUBLIC_INCLUDES)
qpid_stream_SOURCES=qpid-stream.cpp
qpid_stream_LDADD=$(lib_messaging)
+check_PROGRAMS+=msg_group_test
+msg_group_test_INCLUDES=$(PUBLIC_INCLUDES)
+msg_group_test_SOURCES=msg_group_test.cpp
+msg_group_test_LDADD=$(lib_messaging)
+
TESTS_ENVIRONMENT = \
VALGRIND=$(VALGRIND) \
LIBTOOL="$(LIBTOOL)" \
QPID_DATA_DIR= \
$(srcdir)/run_test
-system_tests = qpid-client-test quick_perftest quick_topictest run_header_test quick_txtest
+system_tests = qpid-client-test quick_perftest quick_topictest run_header_test quick_txtest \
+ run_msg_group_tests
TESTS += start_broker $(system_tests) python_tests stop_broker run_federation_tests run_federation_sys_tests \
run_acl_tests run_cli_tests replication_test dynamic_log_level_test \
run_queue_flow_limit_tests ipv6_test
@@ -342,7 +348,8 @@ EXTRA_DIST += \
start_broker.ps1 \
stop_broker.ps1 \
topictest.ps1 \
- run_queue_flow_limit_tests
+ run_queue_flow_limit_tests \
+ run_msg_group_tests
check_LTLIBRARIES += libdlclose_noop.la
libdlclose_noop_la_LDFLAGS = -module -rpath $(abs_builddir)
@@ -353,7 +360,10 @@ CLEANFILES+=valgrind.out *.log *.vglog* dummy_test qpidd.port $(unit_wrappers)
# Longer running stability tests, not run by default check: target.
# Not run under valgrind, too slow
-LONG_TESTS+=start_broker fanout_perftest shared_perftest multiq_perftest topic_perftest run_ring_queue_test stop_broker \
+LONG_TESTS+=start_broker \
+ fanout_perftest shared_perftest multiq_perftest topic_perftest run_ring_queue_test \
+ run_msg_groups_tests_soak \
+ stop_broker \
run_long_federation_sys_tests \
run_failover_soak reliable_replication_test \
federated_cluster_test_with_node_failure
@@ -366,7 +376,8 @@ EXTRA_DIST+= \
run_failover_soak \
reliable_replication_test \
federated_cluster_test_with_node_failure \
- sasl_test_setup.sh
+ sasl_test_setup.sh \
+ run_msg_groups_tests_soak
check-long:
$(MAKE) check TESTS="$(LONG_TESTS)" VALGRIND=
diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp
index d94a5cab7f..7bf061ff54 100644
--- a/cpp/src/tests/QueueTest.cpp
+++ b/cpp/src/tests/QueueTest.cpp
@@ -56,12 +56,12 @@ class TestConsumer : public virtual Consumer{
public:
typedef boost::shared_ptr<TestConsumer> shared_ptr;
- intrusive_ptr<Message> last;
+ QueuedMessage last;
bool received;
- TestConsumer(bool acquire = true):Consumer(acquire), received(false) {};
+ TestConsumer(std::string name="test", bool acquire = true):Consumer(name, acquire), received(false) {};
virtual bool deliver(QueuedMessage& msg){
- last = msg.payload;
+ last = msg;
received = true;
return true;
};
@@ -149,16 +149,16 @@ QPID_AUTO_TEST_CASE(testConsumers){
queue->deliver(msg1);
BOOST_CHECK(queue->dispatch(c1));
- BOOST_CHECK_EQUAL(msg1.get(), c1->last.get());
+ BOOST_CHECK_EQUAL(msg1.get(), c1->last.payload.get());
queue->deliver(msg2);
BOOST_CHECK(queue->dispatch(c2));
- BOOST_CHECK_EQUAL(msg2.get(), c2->last.get());
+ BOOST_CHECK_EQUAL(msg2.get(), c2->last.payload.get());
c1->received = false;
queue->deliver(msg3);
BOOST_CHECK(queue->dispatch(c1));
- BOOST_CHECK_EQUAL(msg3.get(), c1->last.get());
+ BOOST_CHECK_EQUAL(msg3.get(), c1->last.payload.get());
//Test cancellation:
queue->cancel(c1);
@@ -214,7 +214,7 @@ QPID_AUTO_TEST_CASE(testDequeue){
if (!consumer->received)
sleep(2);
- BOOST_CHECK_EQUAL(msg3.get(), consumer->last.get());
+ BOOST_CHECK_EQUAL(msg3.get(), consumer->last.payload.get());
BOOST_CHECK_EQUAL(uint32_t(0), queue->getMessageCount());
received = queue->get().payload;
@@ -298,14 +298,14 @@ QPID_AUTO_TEST_CASE(testSeek){
queue->deliver(msg2);
queue->deliver(msg3);
- TestConsumer::shared_ptr consumer(new TestConsumer(false));
+ TestConsumer::shared_ptr consumer(new TestConsumer("test", false));
SequenceNumber seq(2);
consumer->position = seq;
QueuedMessage qm;
queue->dispatch(consumer);
- BOOST_CHECK_EQUAL(msg3.get(), consumer->last.get());
+ BOOST_CHECK_EQUAL(msg3.get(), consumer->last.payload.get());
queue->dispatch(consumer);
queue->dispatch(consumer); // make sure over-run is safe
@@ -325,14 +325,18 @@ QPID_AUTO_TEST_CASE(testSearch){
queue->deliver(msg3);
SequenceNumber seq(2);
- QueuedMessage qm = queue->find(seq);
+ QueuedMessage qm;
+ TestConsumer::shared_ptr c1(new TestConsumer());
+
+ BOOST_CHECK(queue->find(seq, qm));
BOOST_CHECK_EQUAL(seq.getValue(), qm.position.getValue());
- queue->acquire(qm);
+ queue->acquire(qm, c1->getName());
BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u);
SequenceNumber seq1(3);
- QueuedMessage qm1 = queue->find(seq1);
+ QueuedMessage qm1;
+ BOOST_CHECK(queue->find(seq1, qm1));
BOOST_CHECK_EQUAL(seq1.getValue(), qm1.position.getValue());
}
@@ -552,12 +556,13 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){
QueuedMessage qmsg2(queue.get(), msg2, ++sequence);
framing::SequenceNumber sequence1(10);
QueuedMessage qmsg3(queue.get(), 0, sequence1);
+ TestConsumer::shared_ptr dummy(new TestConsumer());
- BOOST_CHECK(!queue->acquire(qmsg));
- BOOST_CHECK(queue->acquire(qmsg2));
+ BOOST_CHECK(!queue->acquire(qmsg, dummy->getName()));
+ BOOST_CHECK(queue->acquire(qmsg2, dummy->getName()));
// Acquire the massage again to test failure case.
- BOOST_CHECK(!queue->acquire(qmsg2));
- BOOST_CHECK(!queue->acquire(qmsg3));
+ BOOST_CHECK(!queue->acquire(qmsg2, dummy->getName()));
+ BOOST_CHECK(!queue->acquire(qmsg3, dummy->getName()));
BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u);
@@ -567,7 +572,7 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){
// set mode to no browse and check
args.setOrdering(client::LVQ_NO_BROWSE);
queue->configure(args);
- TestConsumer::shared_ptr c1(new TestConsumer(false));
+ TestConsumer::shared_ptr c1(new TestConsumer("test", false));
queue->dispatch(c1);
queue->dispatch(c1);
@@ -696,6 +701,280 @@ QPID_AUTO_TEST_CASE(testQueueCleaner) {
BOOST_CHECK_EQUAL(queue->getMessageCount(), 0u);
}
+
+namespace {
+ // helper for group tests
+ void verifyAcquire( Queue::shared_ptr queue,
+ TestConsumer::shared_ptr c,
+ std::deque<QueuedMessage>& results,
+ const std::string& expectedGroup,
+ const int expectedId )
+ {
+ queue->dispatch(c);
+ results.push_back(c->last);
+ std::string group = c->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsString("GROUP-ID");
+ int id = c->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID");
+ BOOST_CHECK_EQUAL( group, expectedGroup );
+ BOOST_CHECK_EQUAL( id, expectedId );
+ }
+}
+
+QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) {
+ //
+ // Verify that consumers of grouped messages own the groups once a message is acquired,
+ // and release the groups once all acquired messages have been dequeued or requeued
+ //
+ FieldTable args;
+ Queue::shared_ptr queue(new Queue("my_queue", true));
+ args.setString("qpid.group_header_key", "GROUP-ID");
+ args.setInt("qpid.shared_msg_group", 1);
+ queue->configure(args);
+
+ std::string groups[] = { std::string("a"), std::string("a"), std::string("a"),
+ std::string("b"), std::string("b"), std::string("b"),
+ std::string("c"), std::string("c"), std::string("c") };
+ for (int i = 0; i < 9; ++i) {
+ intrusive_ptr<Message> msg = create_message("e", "A");
+ msg->insertCustomProperty("GROUP-ID", groups[i]);
+ msg->insertCustomProperty("MY-ID", i);
+ queue->deliver(msg);
+ }
+
+ // Queue = a-0, a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8...
+ // Owners= ---, ---, ---, ---, ---, ---, ---, ---, ---,
+
+ BOOST_CHECK_EQUAL(uint32_t(9), queue->getMessageCount());
+
+ TestConsumer::shared_ptr c1(new TestConsumer("C1"));
+ TestConsumer::shared_ptr c2(new TestConsumer("C2"));
+
+ queue->consume(c1);
+ queue->consume(c2);
+
+ std::deque<QueuedMessage> dequeMeC1;
+ std::deque<QueuedMessage> dequeMeC2;
+
+
+ verifyAcquire(queue, c1, dequeMeC1, "a", 0 ); // c1 now owns group "a" (acquire a-0)
+ verifyAcquire(queue, c2, dequeMeC2, "b", 3 ); // c2 should now own group "b" (acquire b-3)
+
+ // now let c1 complete the 'a-0' message - this should free the 'a' group
+ queue->dequeue( 0, dequeMeC1.front() );
+ dequeMeC1.pop_front();
+
+ // Queue = a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8...
+ // Owners= ---, ---, ^C2, ^C2, ^C2, ---, ---, ---
+
+ // now c2 should pick up the next 'a-1', since it is oldest free
+ verifyAcquire(queue, c2, dequeMeC2, "a", 1 ); // c2 should now own groups "a" and "b"
+
+ // Queue = a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8...
+ // Owners= ^C2, ^C2, ^C2, ^C2, ^C2, ---, ---, ---
+
+ // c1 should only be able to snarf up the first "c" message now...
+ verifyAcquire(queue, c1, dequeMeC1, "c", 6 ); // should skip to the first "c"
+
+ // Queue = a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8...
+ // Owners= ^C2, ^C2, ^C2, ^C2, ^C2, ^C1, ^C1, ^C1
+
+ // hmmm... what if c2 now dequeues "b-3"? (now only has a-1 acquired)
+ queue->dequeue( 0, dequeMeC2.front() );
+ dequeMeC2.pop_front();
+
+ // Queue = a-1, a-2, b-4, b-5, c-6, c-7, c-8...
+ // Owners= ^C2, ^C2, ---, ---, ^C1, ^C1, ^C1
+
+ // b group is free, c is owned by c1 - c1's next get should grab 'b-4'
+ verifyAcquire(queue, c1, dequeMeC1, "b", 4 );
+
+ // Queue = a-1, a-2, b-4, b-5, c-6, c-7, c-8...
+ // Owners= ^C2, ^C2, ^C1, ^C1, ^C1, ^C1, ^C1
+
+ // c2 can now only grab a-2, and that's all
+ verifyAcquire(queue, c2, dequeMeC2, "a", 2 );
+
+ // now C2 can't get any more, since C1 owns "b" and "c" group...
+ bool gotOne = queue->dispatch(c2);
+ BOOST_CHECK( !gotOne );
+
+ // hmmm... what if c1 now dequeues "c-6"? (now only own's b-4)
+ queue->dequeue( 0, dequeMeC1.front() );
+ dequeMeC1.pop_front();
+
+ // Queue = a-1, a-2, b-4, b-5, c-7, c-8...
+ // Owners= ^C2, ^C2, ^C1, ^C1, ---, ---
+
+ // c2 can now grab c-7
+ verifyAcquire(queue, c2, dequeMeC2, "c", 7 );
+
+ // Queue = a-1, a-2, b-4, b-5, c-7, c-8...
+ // Owners= ^C2, ^C2, ^C1, ^C1, ^C2, ^C2
+
+ // what happens if C-2 "requeues" a-1 and a-2?
+ queue->requeue( dequeMeC2.front() );
+ dequeMeC2.pop_front();
+ queue->requeue( dequeMeC2.front() );
+ dequeMeC2.pop_front(); // now just has c-7 acquired
+
+ // Queue = a-1, a-2, b-4, b-5, c-7, c-8...
+ // Owners= ---, ---, ^C1, ^C1, ^C2, ^C2
+
+ // now c1 will grab a-1 and a-2...
+ verifyAcquire(queue, c1, dequeMeC1, "a", 1 );
+ verifyAcquire(queue, c1, dequeMeC1, "a", 2 );
+
+ // Queue = a-1, a-2, b-4, b-5, c-7, c-8...
+ // Owners= ^C1, ^C1, ^C1, ^C1, ^C2, ^C2
+
+ // c2 can now acquire c-8 only
+ verifyAcquire(queue, c2, dequeMeC2, "c", 8 );
+
+ // and c1 can get b-5
+ verifyAcquire(queue, c1, dequeMeC1, "b", 5 );
+
+ // should be no more acquire-able for anyone now:
+ gotOne = queue->dispatch(c1);
+ BOOST_CHECK( !gotOne );
+ gotOne = queue->dispatch(c2);
+ BOOST_CHECK( !gotOne );
+
+ // requeue all of C1's acquired messages, then cancel C1
+ while (!dequeMeC1.empty()) {
+ queue->requeue(dequeMeC1.front());
+ dequeMeC1.pop_front();
+ }
+ queue->cancel(c1);
+
+ // Queue = a-1, a-2, b-4, b-5, c-7, c-8...
+ // Owners= ---, ---, ---, ---, ^C2, ^C2
+
+ // b-4, a-1, a-2, b-5 all should be available, right?
+ verifyAcquire(queue, c2, dequeMeC2, "a", 1 );
+
+ while (!dequeMeC2.empty()) {
+ queue->dequeue(0, dequeMeC2.front());
+ dequeMeC2.pop_front();
+ }
+
+ // Queue = a-2, b-4, b-5
+ // Owners= ---, ---, ---
+
+ TestConsumer::shared_ptr c3(new TestConsumer("C3"));
+ std::deque<QueuedMessage> dequeMeC3;
+
+ verifyAcquire(queue, c3, dequeMeC3, "a", 2 );
+ verifyAcquire(queue, c2, dequeMeC2, "b", 4 );
+
+ // Queue = a-2, b-4, b-5
+ // Owners= ^C3, ^C2, ^C2
+
+ gotOne = queue->dispatch(c3);
+ BOOST_CHECK( !gotOne );
+
+ verifyAcquire(queue, c2, dequeMeC2, "b", 5 );
+
+ while (!dequeMeC2.empty()) {
+ queue->dequeue(0, dequeMeC2.front());
+ dequeMeC2.pop_front();
+ }
+
+ // Queue = a-2,
+ // Owners= ^C3,
+
+ intrusive_ptr<Message> msg = create_message("e", "A");
+ msg->insertCustomProperty("GROUP-ID", "a");
+ msg->insertCustomProperty("MY-ID", 9);
+ queue->deliver(msg);
+
+ // Queue = a-2, a-9
+ // Owners= ^C3, ^C3
+
+ gotOne = queue->dispatch(c2);
+ BOOST_CHECK( !gotOne );
+
+ msg = create_message("e", "A");
+ msg->insertCustomProperty("GROUP-ID", "b");
+ msg->insertCustomProperty("MY-ID", 10);
+ queue->deliver(msg);
+
+ // Queue = a-2, a-9, b-10
+ // Owners= ^C3, ^C3, ----
+
+ verifyAcquire(queue, c2, dequeMeC2, "b", 10 );
+ verifyAcquire(queue, c3, dequeMeC3, "a", 9 );
+
+ gotOne = queue->dispatch(c3);
+ BOOST_CHECK( !gotOne );
+
+ queue->cancel(c2);
+ queue->cancel(c3);
+}
+
+
+QPID_AUTO_TEST_CASE(testGroupsMultiConsumerDefaults) {
+ //
+ // Verify that the same default group name is automatically applied to messages that
+ // do not specify a group name.
+ //
+ FieldTable args;
+ Queue::shared_ptr queue(new Queue("my_queue", true));
+ args.setString("qpid.group_header_key", "GROUP-ID");
+ args.setInt("qpid.shared_msg_group", 1);
+ queue->configure(args);
+
+ for (int i = 0; i < 3; ++i) {
+ intrusive_ptr<Message> msg = create_message("e", "A");
+ // no "GROUP-ID" header
+ msg->insertCustomProperty("MY-ID", i);
+ queue->deliver(msg);
+ }
+
+ // Queue = 0, 1, 2
+
+ BOOST_CHECK_EQUAL(uint32_t(3), queue->getMessageCount());
+
+ TestConsumer::shared_ptr c1(new TestConsumer("C1"));
+ TestConsumer::shared_ptr c2(new TestConsumer("C2"));
+
+ queue->consume(c1);
+ queue->consume(c2);
+
+ std::deque<QueuedMessage> dequeMeC1;
+ std::deque<QueuedMessage> dequeMeC2;
+
+ queue->dispatch(c1); // c1 now owns default group (acquired 0)
+ dequeMeC1.push_back(c1->last);
+ int id = c1->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID");
+ BOOST_CHECK_EQUAL( id, 0 );
+
+ bool gotOne = queue->dispatch(c2); // c2 should get nothing
+ BOOST_CHECK( !gotOne );
+
+ queue->dispatch(c1); // c1 now acquires 1
+ dequeMeC1.push_back(c1->last);
+ id = c1->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID");
+ BOOST_CHECK_EQUAL( id, 1 );
+
+ gotOne = queue->dispatch(c2); // c2 should still get nothing
+ BOOST_CHECK( !gotOne );
+
+ while (!dequeMeC1.empty()) {
+ queue->dequeue(0, dequeMeC1.front());
+ dequeMeC1.pop_front();
+ }
+
+ // now default group should be available...
+ queue->dispatch(c2); // c2 now owns default group (acquired 2)
+ id = c2->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID");
+ BOOST_CHECK_EQUAL( id, 2 );
+
+ gotOne = queue->dispatch(c1); // c1 should get nothing
+ BOOST_CHECK( !gotOne );
+
+ queue->cancel(c1);
+ queue->cancel(c2);
+}
+
QPID_AUTO_TEST_CASE(testMultiQueueLastNode){
TestMessageStoreOC testStore;
diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py
index 4e3339650b..d217f9fbde 100755
--- a/cpp/src/tests/cluster_tests.py
+++ b/cpp/src/tests/cluster_tests.py
@@ -1418,6 +1418,76 @@ class LongTests(BrokerTest):
if receiver: receiver.connection.detach()
logger.setLevel(log_level)
+ def test_msg_group_failover(self):
+ """Test fail-over during continuous send-receive of grouped messages.
+ """
+
+ class GroupedTrafficGenerator(Thread):
+ def __init__(self, url, queue, group_key):
+ Thread.__init__(self)
+ self.url = url
+ self.queue = queue
+ self.group_key = group_key
+ self.status = -1
+
+ def run(self):
+ # generate traffic for approx 10 seconds (2011msgs / 200 per-sec)
+ cmd = ["msg_group_test",
+ "--broker=%s" % self.url,
+ "--address=%s" % self.queue,
+ "--connection-options={%s}" % (Cluster.CONNECTION_OPTIONS),
+ "--group-key=%s" % self.group_key,
+ "--receivers=2",
+ "--senders=3",
+ "--messages=2011",
+ "--send-rate=200",
+ "--capacity=11",
+ "--ack-frequency=23",
+ "--allow-duplicates",
+ "--group-size=37",
+ "--randomize-group-size",
+ "--interleave=13"]
+ # "--trace"]
+ self.generator = Popen( cmd );
+ self.status = self.generator.wait()
+ return self.status
+
+ def results(self):
+ self.join(timeout=30) # 3x assumed duration
+ if self.isAlive(): return -1
+ return self.status
+
+ # Original cluster will all be killed so expect exit with failure
+ cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL, args=["-t"])
+ for b in cluster: b.ready() # Wait for brokers to be ready
+
+ # create a queue with rather draconian flow control settings
+ ssn0 = cluster[0].connect().session()
+ q_args = "{'qpid.group_header_key':'group-id', 'qpid.shared_msg_group':1}"
+ s0 = ssn0.sender("test-group-q; {create:always, node:{type:queue, x-declare:{arguments:%s}}}" % q_args)
+
+ # Kill original brokers, start new ones for the duration.
+ endtime = time.time() + self.duration();
+ i = 0
+ while time.time() < endtime:
+ traffic = GroupedTrafficGenerator( cluster[i].host_port(),
+ "test-group-q", "group-id" )
+ traffic.start()
+ time.sleep(1)
+
+ for x in range(2):
+ for b in cluster[i:]: b.ready() # Check if any broker crashed.
+ cluster[i].kill()
+ i += 1
+ b = cluster.start(expect=EXPECT_EXIT_FAIL)
+ time.sleep(1)
+
+ # wait for traffic to finish, verify success
+ self.assertEqual(0, traffic.results())
+
+ for i in range(i, len(cluster)): cluster[i].kill()
+
+
class StoreTests(BrokerTest):
"""
Cluster tests that can only be run if there is a store available.
diff --git a/cpp/src/tests/msg_group_test.cpp b/cpp/src/tests/msg_group_test.cpp
new file mode 100644
index 0000000000..6b9d09b89a
--- /dev/null
+++ b/cpp/src/tests/msg_group_test.cpp
@@ -0,0 +1,618 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/messaging/Address.h>
+#include <qpid/messaging/Connection.h>
+#include <qpid/messaging/Receiver.h>
+#include <qpid/messaging/Sender.h>
+#include <qpid/messaging/Session.h>
+#include <qpid/messaging/Message.h>
+#include <qpid/messaging/FailoverUpdates.h>
+#include <qpid/Options.h>
+#include <qpid/log/Logger.h>
+#include <qpid/log/Options.h>
+#include "qpid/log/Statement.h"
+#include "qpid/sys/Time.h"
+#include "qpid/sys/Runnable.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/sys/SystemInfo.h"
+
+#include <iostream>
+#include <memory>
+#include <stdlib.h>
+
+using namespace qpid::messaging;
+using namespace qpid::types;
+using namespace std;
+
+namespace qpid {
+namespace tests {
+
+struct Options : public qpid::Options
+{
+ bool help;
+ std::string url;
+ std::string address;
+ std::string connectionOptions;
+ uint messages;
+ uint capacity;
+ uint ackFrequency;
+ bool failoverUpdates;
+ qpid::log::Options log;
+ uint senders;
+ uint receivers;
+ uint groupSize;
+ bool printReport;
+ std::string groupKey;
+ bool durable;
+ bool allowDuplicates;
+ bool randomizeSize;
+ bool stickyConsumer;
+ uint timeout;
+ uint interleave;
+ std::string prefix;
+ uint sendRate;
+
+ Options(const std::string& argv0=std::string())
+ : qpid::Options("Options"),
+ help(false),
+ url("amqp:tcp:127.0.0.1"),
+ messages(10000),
+ capacity(1000),
+ ackFrequency(100),
+ failoverUpdates(false),
+ log(argv0),
+ senders(2),
+ receivers(2),
+ groupSize(10),
+ printReport(false),
+ groupKey("qpid.no_group"),
+ durable(false),
+ allowDuplicates(false),
+ randomizeSize(false),
+ stickyConsumer(false),
+ timeout(10),
+ interleave(1),
+ sendRate(0)
+ {
+ addOptions()
+ ("ack-frequency", qpid::optValue(ackFrequency, "N"), "Ack frequency (0 implies none of the messages will get accepted)")
+ ("address,a", qpid::optValue(address, "ADDRESS"), "address to send and receive from")
+ ("allow-duplicates", qpid::optValue(allowDuplicates), "Ignore the delivery of duplicated messages")
+ ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to")
+ ("capacity", qpid::optValue(capacity, "N"), "Pre-fetch window (0 implies no pre-fetch)")
+ ("connection-options", qpid::optValue(connectionOptions, "OPTIONS"), "options for the connection")
+ ("durable", qpid::optValue(durable, "yes|no"), "Mark messages as durable.")
+ ("failover-updates", qpid::optValue(failoverUpdates), "Listen for membership updates distributed via amq.failover")
+ ("group-key", qpid::optValue(groupKey, "KEY"), "Key of the message header containing the group identifier.")
+ ("group-prefix", qpid::optValue(prefix, "STRING"), "Add 'prefix' to the start of all generated group identifiers.")
+ ("group-size", qpid::optValue(groupSize, "N"), "Number of messages per a group.")
+ ("interleave", qpid::optValue(interleave, "N"), "Simultaineously interleave messages from N different groups.")
+ ("messages,m", qpid::optValue(messages, "N"), "Number of messages to send per each sender.")
+ ("receivers,r", qpid::optValue(receivers, "N"), "Number of message consumers.")
+ ("randomize-group-size", qpid::optValue(randomizeSize), "Randomize the number of messages per group to [1...group-size].")
+ ("send-rate", qpid::optValue(sendRate,"N"), "Send at rate of N messages/second. 0 means send as fast as possible.")
+ ("senders,s", qpid::optValue(senders, "N"), "Number of message producers.")
+ ("sticky-consumers", qpid::optValue(stickyConsumer), "If set, verify that all messages in a group are consumed by the same client [TBD].")
+ ("timeout", qpid::optValue(timeout, "N"), "Fail with a stall error should all consumers remain idle for timeout seconds.")
+ ("print-report", qpid::optValue(printReport), "Dump message group statistics to stdout.")
+ ("help", qpid::optValue(help), "print this usage statement");
+ add(log);
+ //("check-redelivered", qpid::optValue(checkRedelivered), "Fails with exception if a duplicate is not marked as redelivered (only relevant when ignore-duplicates is selected)")
+ //("tx", qpid::optValue(tx, "N"), "batch size for transactions (0 implies transaction are not used)")
+ //("rollback-frequency", qpid::optValue(rollbackFrequency, "N"), "rollback frequency (0 implies no transaction will be rolledback)")
+ }
+
+ bool parse(int argc, char** argv)
+ {
+ try {
+ qpid::Options::parse(argc, argv);
+ if (address.empty()) throw qpid::Exception("Address must be specified!");
+ qpid::log::Logger::instance().configure(log);
+ if (help) {
+ std::ostringstream msg;
+ std::cout << msg << *this << std::endl << std::endl
+ << "Verifies the behavior of grouped messages." << std::endl;
+ return false;
+ } else {
+ return true;
+ }
+ } catch (const std::exception& e) {
+ std::cerr << *this << std::endl << std::endl << e.what() << std::endl;
+ return false;
+ }
+ }
+};
+
+const string EOS("eos");
+const string SN("sn");
+
+
+// class that monitors group state across all publishers and consumers. tracks the next
+// expected sequence for each group, and total messages consumed.
+class GroupChecker
+{
+ qpid::sys::Mutex lock;
+
+ const uint totalMsgs;
+ uint totalMsgsConsumed;
+ uint totalMsgsPublished;
+ bool allowDuplicates;
+ uint duplicateMsgs;
+
+ typedef std::map<std::string, uint> SequenceMap;
+ SequenceMap sequenceMap;
+
+ // Statistics - for each group, store the names of all clients that consumed messages
+ // from that group, and the number of messages consumed per client.
+ typedef std::map<std::string, uint> ClientCounter;
+ typedef std::map<std::string, ClientCounter> GroupStatistics;
+ GroupStatistics statistics;
+
+public:
+
+ GroupChecker( uint t, bool d ) :
+ totalMsgs(t), totalMsgsConsumed(0), totalMsgsPublished(0), allowDuplicates(d),
+ duplicateMsgs(0) {}
+
+ bool checkSequence( const std::string& groupId,
+ uint sequence, const std::string& client )
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+
+ QPID_LOG(debug, "Client " << client << " has received " << groupId << ":" << sequence);
+
+ GroupStatistics::iterator gs = statistics.find(groupId);
+ if (gs == statistics.end()) {
+ statistics[groupId][client] = 1;
+ } else {
+ gs->second[client]++;
+ }
+ // now verify
+ SequenceMap::iterator s = sequenceMap.find(groupId);
+ if (s == sequenceMap.end()) {
+ QPID_LOG(debug, "Client " << client << " thinks this is the first message from group " << groupId << ":" << sequence);
+ // if duplication allowed, it is possible that the last msg(s) of an old sequence are redelivered on reconnect.
+ // in this case, set the sequence from the first msg.
+ sequenceMap[groupId] = (allowDuplicates) ? sequence : 0;
+ s = sequenceMap.find(groupId);
+ } else if (sequence < s->second) {
+ duplicateMsgs++;
+ QPID_LOG(debug, "Client " << client << " thinks this message is a duplicate! " << groupId << ":" << sequence);
+ return allowDuplicates;
+ }
+ totalMsgsConsumed++;
+ return sequence == s->second++;
+ }
+
+ void sendingSequence( const std::string& groupId,
+ uint sequence, bool eos,
+ const std::string& client )
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ ++totalMsgsPublished;
+
+ QPID_LOG(debug, "Client " << client << " sending " << groupId << ":" << sequence <<
+ ((eos) ? " (last)" : ""));
+ }
+
+ bool eraseGroup( const std::string& groupId, const std::string& name )
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ QPID_LOG(debug, "Deleting group " << groupId << " (by client " << name << ")");
+ return sequenceMap.erase( groupId ) == 1;
+ }
+
+ uint getNextExpectedSequence( const std::string& groupId )
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ return sequenceMap[groupId];
+ }
+
+ bool allMsgsConsumed() // true when done processing msgs
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ return (totalMsgsPublished >= totalMsgs) &&
+ (totalMsgsConsumed >= totalMsgsPublished) &&
+ sequenceMap.size() == 0;
+ }
+
+ uint getConsumedTotal()
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ return totalMsgsConsumed;
+ }
+
+ uint getPublishedTotal()
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ return totalMsgsPublished;
+ }
+
+ ostream& print(ostream& out)
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ out << "Total Published: " << totalMsgsPublished << ", Total Consumed: " << totalMsgsConsumed <<
+ ", Duplicates detected: " << duplicateMsgs << std::endl;
+ out << "Total Groups: " << statistics.size() << std::endl;
+ unsigned long consumers = 0;
+ for (GroupStatistics::iterator gs = statistics.begin(); gs != statistics.end(); ++gs) {
+ out << " GroupId: " << gs->first;
+ consumers += gs->second.size(); // # of consumers that processed this group
+ if (gs->second.size() == 1)
+ out << " completely consumed by a single client." << std::endl;
+ else
+ out << " consumed by " << gs->second.size() << " different clients." << std::endl;
+
+ for (ClientCounter::iterator cc = gs->second.begin(); cc != gs->second.end(); ++cc) {
+ out << " Client: " << cc->first << " consumed " << cc->second << " messages from the group." << std::endl;
+ }
+ }
+ out << "Average # of consumers per group: " << ((statistics.size() != 0) ? (double(consumers)/statistics.size()) : 0) << std::endl;
+ return out;
+ }
+};
+
+
+namespace {
+ // rand() is not thread safe. Create a singleton obj to hold a lock while calling
+ // rand() so it can be called safely by multiple concurrent clients.
+ class Randomizer {
+ qpid::sys::Mutex lock;
+ public:
+ uint operator()(uint max) {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ return (rand() % max) + 1;
+ }
+ };
+
+ static Randomizer randomizer;
+}
+
+
+// tag each generated message with a group identifer
+//
+class GroupGenerator {
+
+ const std::string groupPrefix;
+ const uint groupSize;
+ const bool randomizeSize;
+ const uint interleave;
+
+ uint groupSuffix;
+ uint total;
+
+ struct GroupState {
+ std::string id;
+ const uint size;
+ uint count;
+ GroupState( const std::string& i, const uint s )
+ : id(i), size(s), count(0) {}
+ };
+ typedef std::list<GroupState> GroupList;
+ GroupList groups;
+ GroupList::iterator current;
+
+ // add a new group identifier to the list
+ void newGroup() {
+ std::ostringstream groupId(groupPrefix, ios_base::out|ios_base::ate);
+ groupId << std::string(":") << groupSuffix++;
+ uint size = (randomizeSize) ? randomizer(groupSize) : groupSize;
+ QPID_LOG(trace, "New group: GROUPID=[" << groupId.str() << "] size=" << size << " this=" << this);
+ GroupState group( groupId.str(), size );
+ groups.push_back( group );
+ }
+
+public:
+ GroupGenerator( const std::string& prefix,
+ const uint t,
+ const uint size,
+ const bool randomize,
+ const uint i)
+ : groupPrefix(prefix), groupSize(size),
+ randomizeSize(randomize), interleave(i), groupSuffix(0), total(t)
+ {
+ QPID_LOG(trace, "New group generator: PREFIX=[" << prefix << "] total=" << total << " size=" << size << " rand=" << randomize << " interleave=" << interleave << " this=" << this);
+ for (uint i = 0; i < 1 || i < interleave; ++i) {
+ newGroup();
+ }
+ current = groups.begin();
+ }
+
+ bool genGroup(std::string& groupId, uint& seq, bool& eos)
+ {
+ if (!total) return false;
+ --total;
+ if (current == groups.end())
+ current = groups.begin();
+ groupId = current->id;
+ seq = current->count++;
+ if (current->count == current->size) {
+ QPID_LOG(trace, "Last msg for " << current->id << ", " << current->count << " this=" << this);
+ eos = true;
+ if (total >= interleave) { // need a new group to replace this one
+ newGroup();
+ groups.erase(current++);
+ } else ++current;
+ } else {
+ ++current;
+ eos = total < interleave; // mark eos on the last message of each group
+ }
+ QPID_LOG(trace, "SENDING GROUPID=[" << groupId << "] seq=" << seq << " eos=" << eos << " this=" << this);
+ return true;
+ }
+};
+
+
+
+class Client : public qpid::sys::Runnable
+{
+public:
+ typedef boost::shared_ptr<Client> shared_ptr;
+ enum State {ACTIVE, DONE, FAILURE};
+ Client( const std::string& n, const Options& o ) : name(n), opts(o), state(ACTIVE), stopped(false) {}
+ virtual ~Client() {}
+ State getState() { return state; }
+ void testFailed( const std::string& reason ) { state = FAILURE; error << "Client '" << name << "' failed: " << reason; }
+ void clientDone() { if (state == ACTIVE) state = DONE; }
+ qpid::sys::Thread& getThread() { return thread; }
+ const std::string getErrorMsg() { return error.str(); }
+ void stop() {stopped = true;}
+ const std::string& getName() { return name; }
+
+protected:
+ const std::string name;
+ const Options& opts;
+ qpid::sys::Thread thread;
+ ostringstream error;
+ State state;
+ bool stopped;
+};
+
+
+class Consumer : public Client
+{
+ GroupChecker& checker;
+
+public:
+ Consumer(const std::string& n, const Options& o, GroupChecker& c ) : Client(n, o), checker(c) {};
+ virtual ~Consumer() {};
+
+ void run()
+ {
+ Connection connection;
+ try {
+ connection = Connection(opts.url, opts.connectionOptions);
+ connection.open();
+ std::auto_ptr<FailoverUpdates> updates(opts.failoverUpdates ? new FailoverUpdates(connection) : 0);
+ Session session = connection.createSession();
+ Receiver receiver = session.createReceiver(opts.address);
+ receiver.setCapacity(opts.capacity);
+ Message msg;
+ uint count = 0;
+
+ while (!stopped) {
+ if (receiver.fetch(msg, Duration::SECOND)) { // msg retrieved
+ qpid::types::Variant::Map& properties = msg.getProperties();
+ std::string groupId = properties[opts.groupKey];
+ uint groupSeq = properties[SN];
+ bool eof = properties[EOS];
+
+ QPID_LOG(trace, "RECVING GROUPID=[" << groupId << "] seq=" << groupSeq << " eos=" << eof << " name=" << name);
+
+ qpid::sys::usleep(10);
+
+ if (!checker.checkSequence( groupId, groupSeq, name )) {
+ ostringstream msg;
+ msg << "Check sequence failed. Group=" << groupId << " rcvd seq=" << groupSeq << " expected=" << checker.getNextExpectedSequence( groupId );
+ testFailed( msg.str() );
+ break;
+ } else if (eof) {
+ if (!checker.eraseGroup( groupId, name )) {
+ ostringstream msg;
+ msg << "Erase group failed. Group=" << groupId << " rcvd seq=" << groupSeq;
+ testFailed( msg.str() );
+ break;
+ }
+ }
+
+ ++count;
+ if (opts.ackFrequency && (count % opts.ackFrequency == 0)) {
+ session.acknowledge();
+ }
+ // Clear out message properties & content for next iteration.
+ msg = Message(); // TODO aconway 2010-12-01: should be done by fetch
+ } else if (checker.allMsgsConsumed()) // timed out, nothing else to do?
+ break;
+ }
+ session.acknowledge();
+ session.close();
+ connection.close();
+ } catch(const std::exception& error) {
+ ostringstream msg;
+ msg << "consumer error: " << error.what();
+ testFailed( msg.str() );
+ connection.close();
+ }
+ clientDone();
+ QPID_LOG(trace, "Consuming client " << name << " completed.");
+ }
+};
+
+
+
+class Producer : public Client
+{
+ GroupChecker& checker;
+ GroupGenerator generator;
+
+public:
+ Producer(const std::string& n, const Options& o, GroupChecker& c)
+ : Client(n, o), checker(c),
+ generator( n, o.messages, o.groupSize, o.randomizeSize, o.interleave )
+ {};
+ virtual ~Producer() {};
+
+ void run()
+ {
+ Connection connection;
+ try {
+ connection = Connection(opts.url, opts.connectionOptions);
+ connection.open();
+ std::auto_ptr<FailoverUpdates> updates(opts.failoverUpdates ? new FailoverUpdates(connection) : 0);
+ Session session = connection.createSession();
+ Sender sender = session.createSender(opts.address);
+ if (opts.capacity) sender.setCapacity(opts.capacity);
+ Message msg;
+ msg.setDurable(opts.durable);
+ std::string groupId;
+ uint seq;
+ bool eos;
+ uint sent = 0;
+
+ qpid::sys::AbsTime start = qpid::sys::now();
+ int64_t interval = 0;
+ if (opts.sendRate) interval = qpid::sys::TIME_SEC/opts.sendRate;
+
+ while (!stopped && generator.genGroup(groupId, seq, eos)) {
+ msg.getProperties()[opts.groupKey] = groupId;
+ msg.getProperties()[SN] = seq;
+ msg.getProperties()[EOS] = eos;
+ checker.sendingSequence( groupId, seq, eos, name );
+
+ sender.send(msg);
+ ++sent;
+
+ if (opts.sendRate) {
+ qpid::sys::AbsTime waitTill(start, sent*interval);
+ int64_t delay = qpid::sys::Duration(qpid::sys::now(), waitTill);
+ if (delay > 0) qpid::sys::usleep(delay/qpid::sys::TIME_USEC);
+ }
+ }
+ session.sync();
+ session.close();
+ connection.close();
+ } catch(const std::exception& error) {
+ ostringstream msg;
+ msg << "producer '" << name << "' error: " << error.what();
+ testFailed(msg.str());
+ connection.close();
+ }
+ clientDone();
+ QPID_LOG(trace, "Producing client " << name << " completed.");
+ }
+};
+
+
+}} // namespace qpid::tests
+
+using namespace qpid::tests;
+
+int main(int argc, char ** argv)
+{
+ int status = 0;
+ try {
+ Options opts;
+ if (opts.parse(argc, argv)) {
+
+ GroupChecker state( opts.senders * opts.messages,
+ opts.allowDuplicates);
+ std::vector<Client::shared_ptr> clients;
+
+ if (opts.randomizeSize) srand((unsigned int)qpid::sys::SystemInfo::getProcessId());
+
+ // fire off the producers && consumers
+ for (size_t j = 0; j < opts.senders; ++j) {
+ ostringstream name;
+ name << opts.prefix << "P_" << j;
+ clients.push_back(Client::shared_ptr(new Producer( name.str(), opts, state )));
+ clients.back()->getThread() = qpid::sys::Thread(*clients.back());
+ }
+ for (size_t j = 0; j < opts.receivers; ++j) {
+ ostringstream name;
+ name << opts.prefix << "C_" << j;
+ clients.push_back(Client::shared_ptr(new Consumer( name.str(), opts, state )));
+ clients.back()->getThread() = qpid::sys::Thread(*clients.back());
+ }
+
+ // wait for all pubs/subs to finish.... or for consumers to fail or stall.
+ uint stalledTime = 0;
+ bool done;
+ bool clientFailed = false;
+ do {
+ uint lastCount = state.getConsumedTotal();
+ qpid::sys::usleep( 1000000 );
+
+ // check each client for status
+ done = true;
+ for (std::vector<Client::shared_ptr>::iterator i = clients.begin();
+ i != clients.end(); ++i) {
+ QPID_LOG(debug, "Client " << (*i)->getName() << " state=" << (*i)->getState());
+ if ((*i)->getState() == Client::FAILURE) {
+ QPID_LOG(error, argv[0] << ": test failed with client error: " << (*i)->getErrorMsg());
+ clientFailed = true;
+ done = true;
+ break; // exit test.
+ } else if ((*i)->getState() != Client::DONE) {
+ done = false;
+ }
+ }
+
+ if (!done) {
+ // check that consumers are still receiving messages
+ if (lastCount == state.getConsumedTotal())
+ stalledTime++;
+ else {
+ lastCount = state.getConsumedTotal();
+ stalledTime = 0;
+ }
+ }
+
+ QPID_LOG(debug, "Consumed to date = " << state.getConsumedTotal() <<
+ " Published to date = " << state.getPublishedTotal() <<
+ " total=" << opts.senders * opts.messages );
+
+ } while (!done && stalledTime < opts.timeout);
+
+ if (clientFailed) {
+ status = 1;
+ } else if (stalledTime >= opts.timeout) {
+ QPID_LOG(error, argv[0] << ": test failed due to stalled consumer." );
+ status = 2;
+ }
+
+ // Wait for started threads.
+ for (std::vector<Client::shared_ptr>::iterator i = clients.begin();
+ i != clients.end(); ++i) {
+ (*i)->stop();
+ (*i)->getThread().join();
+ }
+
+ if (opts.printReport && !status) state.print(std::cout);
+ } else status = 4;
+ } catch(const std::exception& error) {
+ QPID_LOG(error, argv[0] << ": " << error.what());
+ status = 3;
+ }
+ QPID_LOG(trace, "TEST DONE [" << status << "]");
+
+ return status;
+}
diff --git a/cpp/src/tests/qpid-send.cpp b/cpp/src/tests/qpid-send.cpp
index ef5e98e2a0..b1213a484f 100644
--- a/cpp/src/tests/qpid-send.cpp
+++ b/cpp/src/tests/qpid-send.cpp
@@ -28,6 +28,7 @@
#include <qpid/messaging/FailoverUpdates.h>
#include <qpid/sys/Time.h>
#include <qpid/sys/Monitor.h>
+#include <qpid/sys/SystemInfo.h>
#include "TestOptions.h"
#include "Statistics.h"
@@ -76,6 +77,11 @@ struct Options : public qpid::Options
uint flowControl;
bool sequence;
bool timestamp;
+ std::string groupKey;
+ std::string groupPrefix;
+ uint groupSize;
+ bool groupRandSize;
+ uint groupInterleave;
Options(const std::string& argv0=std::string())
: qpid::Options("Options"),
@@ -100,7 +106,11 @@ struct Options : public qpid::Options
sendRate(0),
flowControl(0),
sequence(true),
- timestamp(true)
+ timestamp(true),
+ groupPrefix("GROUP-"),
+ groupSize(10),
+ groupRandSize(false),
+ groupInterleave(1)
{
addOptions()
("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to")
@@ -111,8 +121,8 @@ struct Options : public qpid::Options
("reply-to", qpid::optValue(replyto, "REPLY-TO"), "specify reply-to address")
("send-eos", qpid::optValue(sendEos, "N"), "Send N EOS messages to mark end of input")
("durable", qpid::optValue(durable, "yes|no"), "Mark messages as durable.")
- ("ttl", qpid::optValue(ttl, "msecs"), "Time-to-live for messages, in milliseconds")
- ("priority", qpid::optValue(priority, "PRIORITY"), "Priority for messages (higher value implies higher priority)")
+ ("ttl", qpid::optValue(ttl, "msecs"), "Time-to-live for messages, in milliseconds")
+ ("priority", qpid::optValue(priority, "PRIORITY"), "Priority for messages (higher value implies higher priority)")
("property,P", qpid::optValue(properties, "NAME=VALUE"), "specify message property")
("correlation-id", qpid::optValue(correlationid, "ID"), "correlation-id for message")
("user-id", qpid::optValue(userid, "USERID"), "userid for message")
@@ -131,6 +141,11 @@ struct Options : public qpid::Options
("flow-control", qpid::optValue(flowControl,"N"), "Do end to end flow control to limit queue depth to 2*N. 0 means no flow control.")
("sequence", qpid::optValue(sequence, "yes|no"), "Add a sequence number messages property (required for duplicate/lost message detection)")
("timestamp", qpid::optValue(timestamp, "yes|no"), "Add a time stamp messages property (required for latency measurement)")
+ ("group-key", qpid::optValue(groupKey, "KEY"), "Generate groups of messages using message header 'KEY' to hold the group identifier")
+ ("group-prefix", qpid::optValue(groupPrefix, "STRING"), "Generate group identifers with 'STRING' prefix (if group-key specified)")
+ ("group-size", qpid::optValue(groupSize, "N"), "Number of messages per a group (if group-key specified)")
+ ("group-randomize-size", qpid::optValue(groupRandSize), "Randomize the number of messages per group to [1...group-size] (if group-key specified)")
+ ("group-interleave", qpid::optValue(groupInterleave, "N"), "Simultaineously interleave messages from N different groups (if group-key specified)")
("help", qpid::optValue(help), "print this usage statement");
add(log);
}
@@ -252,6 +267,68 @@ class MapContentGenerator : public ContentGenerator {
const Options& opts;
};
+// tag each generated message with a group identifer
+//
+class GroupGenerator {
+public:
+ GroupGenerator(const std::string& key,
+ const std::string& prefix,
+ const uint size,
+ const bool randomize,
+ const uint interleave)
+ : groupKey(key), groupPrefix(prefix), groupSize(size),
+ randomizeSize(randomize), groupSuffix(0)
+ {
+ if (randomize) srand((unsigned int)qpid::sys::SystemInfo::getProcessId());
+
+ for (uint i = 0; i < 1 || i < interleave; ++i) {
+ newGroup();
+ }
+ current = groups.begin();
+ }
+
+ void setGroupInfo(Message &msg)
+ {
+ if (current == groups.end())
+ current = groups.begin();
+ msg.getProperties()[groupKey] = current->id;
+ // std::cout << "SENDING GROUPID=[" << current->id << "]" << std::endl;
+ if (++(current->count) == current->size) {
+ newGroup();
+ groups.erase(current++);
+ } else
+ ++current;
+ }
+
+ private:
+ const std::string& groupKey;
+ const std::string& groupPrefix;
+ const uint groupSize;
+ const bool randomizeSize;
+
+ uint groupSuffix;
+
+ struct GroupState {
+ std::string id;
+ const uint size;
+ uint count;
+ GroupState( const std::string& i, const uint s )
+ : id(i), size(s), count(0) {}
+ };
+ typedef std::list<GroupState> GroupList;
+ GroupList groups;
+ GroupList::iterator current;
+
+ void newGroup() {
+ std::ostringstream groupId(groupPrefix, ios_base::out|ios_base::ate);
+ groupId << groupSuffix++;
+ uint size = (randomizeSize) ? (rand() % groupSize) + 1 : groupSize;
+ // std::cout << "New group: GROUPID=[" << groupId.str() << "] size=" << size << std::endl;
+ GroupState group( groupId.str(), size );
+ groups.push_back( group );
+ }
+};
+
int main(int argc, char ** argv)
{
Connection connection;
@@ -296,6 +373,14 @@ int main(int argc, char ** argv)
else
contentGen.reset(new FixedContentGenerator(opts.contentString));
+ std::auto_ptr<GroupGenerator> groupGen;
+ if (!opts.groupKey.empty())
+ groupGen.reset(new GroupGenerator(opts.groupKey,
+ opts.groupPrefix,
+ opts.groupSize,
+ opts.groupRandSize,
+ opts.groupInterleave));
+
qpid::sys::AbsTime start = qpid::sys::now();
int64_t interval = 0;
if (opts.sendRate) interval = qpid::sys::TIME_SEC/opts.sendRate;
@@ -312,9 +397,6 @@ int main(int argc, char ** argv)
++sent;
if (opts.sequence)
msg.getProperties()[SN] = sent;
- if (opts.timestamp)
- msg.getProperties()[TS] = int64_t(
- qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()));
if (opts.flowControl) {
if ((sent % opts.flowControl) == 0) {
msg.setReplyTo(flowControlAddress);
@@ -323,6 +405,12 @@ int main(int argc, char ** argv)
else
msg.setReplyTo(Address()); // Clear the reply address.
}
+ if (groupGen.get())
+ groupGen->setGroupInfo(msg);
+
+ if (opts.timestamp)
+ msg.getProperties()[TS] = int64_t(
+ qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()));
sender.send(msg);
reporter.message(msg);
diff --git a/cpp/src/tests/run_msg_group_tests b/cpp/src/tests/run_msg_group_tests
new file mode 100755
index 0000000000..8423022521
--- /dev/null
+++ b/cpp/src/tests/run_msg_group_tests
@@ -0,0 +1,66 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#script to run a sequence of message group queue tests via make
+
+#setup path to find qpid-config and msg_group_test progs
+source ./test_env.sh
+
+export PATH=$PWD:$srcdir:$PYTHON_COMMANDS:$PATH
+
+#set port to connect to via env var
+test -s qpidd.port && QPID_PORT=`cat qpidd.port`
+
+#trap cleanup INT TERM QUIT
+
+QUEUE_NAME="group-queue"
+GROUP_KEY="My-Group-Id"
+
+BROKER_URL="${QPID_BROKER:-localhost}:${QPID_PORT:-5672}"
+
+run_test() {
+ $@
+}
+
+##set -x
+
+declare -i i=0
+declare -a tests
+tests=("qpid-config -a $BROKER_URL add queue $QUEUE_NAME --group-header=${GROUP_KEY} --shared-groups"
+ "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 103 --group-size 13 --receivers 2 --senders 3 --capacity 3 --ack-frequency 7 --randomize-group-size --interleave 3"
+ "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 103 --group-size 13 --receivers 2 --senders 3 --capacity 7 --ack-frequency 7 --randomize-group-size"
+ "qpid-config -a $BROKER_URL add queue ${QUEUE_NAME}-two --group-header=${GROUP_KEY} --shared-groups"
+ "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 103 --group-size 13 --receivers 2 --senders 3 --capacity 7 --ack-frequency 3 --randomize-group-size"
+ "msg_group_test -b $BROKER_URL -a ${QUEUE_NAME}-two --group-key $GROUP_KEY --messages 103 --group-size 13 --receivers 2 --senders 3 --capacity 3 --ack-frequency 7 --randomize-group-size --interleave 5"
+ "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 59 --group-size 5 --receivers 2 --senders 3 --capacity 1 --ack-frequency 3 --randomize-group-size"
+ "qpid-config -a $BROKER_URL del queue ${QUEUE_NAME}-two --force"
+ "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 59 --group-size 3 --receivers 2 --senders 3 --capacity 1 --ack-frequency 1 --randomize-group-size"
+ "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 211 --group-size 13 --receivers 2 --senders 3 --capacity 47 --ack-frequency 79 --interleave 53"
+ "qpid-config -a $BROKER_URL del queue $QUEUE_NAME --force")
+
+while [ -n "${tests[i]}" ]; do
+ run_test ${tests[i]}
+ RETCODE=$?
+ if test x$RETCODE != x0; then
+ echo "FAILED message group test. Failed command: \"${tests[i]}\"";
+ exit 1;
+ fi
+ i+=1
+done
diff --git a/cpp/src/tests/run_msg_group_tests_soak b/cpp/src/tests/run_msg_group_tests_soak
new file mode 100755
index 0000000000..5231f74755
--- /dev/null
+++ b/cpp/src/tests/run_msg_group_tests_soak
@@ -0,0 +1,60 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#script to run a sequence of long-running message group tests via make
+
+#setup path to find qpid-config and msg_group_test test progs
+source ./test_env.sh
+
+export PATH=$PWD:$srcdir:$PYTHON_COMMANDS:$PATH
+
+#set port to connect to via env var
+test -s qpidd.port && QPID_PORT=`cat qpidd.port`
+
+#trap cleanup INT TERM QUIT
+
+QUEUE_NAME="group-queue"
+GROUP_KEY="My-Group-Id"
+
+BROKER_URL="${QPID_BROKER:-localhost}:${QPID_PORT:-5672}"
+
+run_test() {
+ $@
+}
+
+##set -x
+
+declare -i i=0
+declare -a tests
+tests=("qpid-config -a $BROKER_URL add queue $QUEUE_NAME --group-header=${GROUP_KEY} --shared-groups"
+ "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 10007 --receivers 3 --senders 5 --group-size 211 --randomize-group-size --capacity 47 --ack-frequency 97"
+ "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 10007 --receivers 3 --senders 5 --group-size 211 --randomize-group-size --capacity 79 --ack-frequency 79"
+ "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 10007 --receivers 3 --senders 5 --group-size 211 --randomize-group-size --capacity 97 --ack-frequency 47"
+ "qpid-config -a $BROKER_URL del queue $QUEUE_NAME --force")
+
+while [ -n "${tests[i]}" ]; do
+ run_test ${tests[i]}
+ RETCODE=$?
+ if test x$RETCODE != x0; then
+ echo "FAILED message group test. Failed command: \"${tests[i]}\"";
+ exit 1;
+ fi
+ i+=1
+done
diff --git a/doc/book/src/AMQP-Messaging-Broker-CPP-Book.xml b/doc/book/src/AMQP-Messaging-Broker-CPP-Book.xml
index 6c27d7c668..10d83ec887 100644
--- a/doc/book/src/AMQP-Messaging-Broker-CPP-Book.xml
+++ b/doc/book/src/AMQP-Messaging-Broker-CPP-Book.xml
@@ -59,6 +59,7 @@
<xi:include xmlns:xi="http://www.w3.org/2001/XInclude" href="producer-flow-control.xml"/>
<xi:include xmlns:xi="http://www.w3.org/2001/XInclude" href="AMQP-Compatibility.xml"/>
<xi:include xmlns:xi="http://www.w3.org/2001/XInclude" href="Qpid-Interoperability-Documentation.xml"/>
+ <xi:include xmlns:xi="http://www.w3.org/2001/XInclude" href="Using-message-groups.xml"/>
</chapter>
diff --git a/doc/book/src/AMQP-Messaging-Broker-CPP.xml b/doc/book/src/AMQP-Messaging-Broker-CPP.xml
index 15f5660455..92b474b0c0 100644
--- a/doc/book/src/AMQP-Messaging-Broker-CPP.xml
+++ b/doc/book/src/AMQP-Messaging-Broker-CPP.xml
@@ -52,6 +52,7 @@
<xi:include xmlns:xi="http://www.w3.org/2001/XInclude" href="Starting-a-cluster.xml"/>
<xi:include xmlns:xi="http://www.w3.org/2001/XInclude" href="ACL.xml"/>
<xi:include xmlns:xi="http://www.w3.org/2001/XInclude" href="producer-flow-control.xml"/>
+ <xi:include xmlns:xi="http://www.w3.org/2001/XInclude" href="Using-message-groups.xml"/>
</chapter>
diff --git a/doc/book/src/Using-message-groups.xml b/doc/book/src/Using-message-groups.xml
new file mode 100644
index 0000000000..7de7fbb995
--- /dev/null
+++ b/doc/book/src/Using-message-groups.xml
@@ -0,0 +1,261 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+
+<section id="Using-message-groups">
+ <title>
+ Using Message Groups
+ </title>
+
+ <section role="h2" id="usingmessagegroups-Overview">
+ <title>
+ Overview
+ </title>
+ <para>
+ The broker allows messaging applications to classify a set of related messages as
+ belonging to a group. This allows a message producer to indicate to the consumer
+ that a group of messages should be considered a single logical operation with
+ respect to the application.
+ </para>
+ <para>
+ The broker can use this group identification to enforce policies controlling how
+ messages from a given group can be distributed to consumers. For instance, the
+ broker can be configured to guarantee all the messages from a particular group are
+ processed in order across multiple consumers.
+ </para>
+ <para>
+ For example, assume we have a shopping application that manages items in a virtual
+ shopping cart. A user may add an item to their shopping cart, then change their
+ mind and remove it. If the application sends an <emphasis>add</emphasis> message to the broker,
+ immediately followed by a <emphasis>remove</emphasis> message, they will be queued in the proper
+ order - <emphasis>add</emphasis>, followed by <emphasis>remove</emphasis>.
+ </para>
+ <para>
+ However, if there are multiple consumers, it is possible that once a consumer
+ acquires the <emphasis>add</emphasis> message, a different consumer may acquire the
+ <emphasis>remove</emphasis> message. This allows both messages to be processed in parallel,
+ which could result in a "race" where the <emphasis>remove</emphasis> operation is incorrectly
+ performed before the <emphasis>add</emphasis> operation.
+ </para>
+ </section>
+ <!--h2-->
+ <section role="h2" id="usingmessagegroups-GroupingMessages">
+ <title>
+ Grouping Messages
+ </title>
+ <para>
+ In order to group messages, the application would designate a particular
+ application header as containing a message's <emphasis>group identifier</emphasis>. The group
+ identifier stored in that header field would be a string value set by the message
+ producer. Messages from the same group would have the same group identifier
+ value. The key that identifies the header must also be known to the message
+ consumers. This allows the consumers to determine a message's assigned group.
+ </para>
+ <para>
+ The header that is used to hold the group identifier, as well as the values used
+ as group identifiers, are totally under control of the application.
+ </para>
+ </section>
+ <section role="h2" id="usingmessagegroups-BrokerRole">
+ <title>
+ The Role of the Broker
+ </title>
+ <para>
+ The broker will apply the following processing on each grouped message:
+ <itemizedlist>
+ <listitem>Enqueue a received message on the destination queue.</listitem>
+ <listitem>Determine the message's group by examining the message's group identifier header.</listitem>
+ <listitem>Enforce <emphasis>consumption ordering</emphasis> among messages belonging to the same group.</listitem>
+ </itemizedlist>
+ <emphasis>Consumption ordering</emphasis> means that the broker will not allow outstanding
+ unacknowledged messages to <emphasis>more than one consumer for a given group</emphasis>.
+ </para>
+ <para>
+ This means that only one consumer can be processing messages from a particular
+ group at a given time. When the consumer acknowledges all of its acquired
+ messages, then the broker <emphasis>may</emphasis> pass the next message from that group to
+ another consumer.
+ </para>
+ <para>
+ Specifically, for any given group, the broker allows only the first N messages in
+ the group to be available for delivery to a particular consumer. The value of N
+ would be determined by the selected consumer's configured prefetch capacity. The
+ broker blocks access to the remaining messages in that group by any other
+ consumer. Once the selected consumer has acknowledged that first set of delivered
+ messages, the broker allows the next messages in the group to be available for
+ delivery. The next set of messages may be delivered to a different consumer.
+ </para>
+ <para>
+ Note well that distinct message groups would not block each other from delivery.
+ For example, assume a queue contains messages from two different message groups -
+ say group "A" and group "B" - and they are enqueued such that "A"'s messages are
+ in front of "B". If the first message of group "A" is in the process of being
+ consumed by a client, then the remaining "A" messages are blocked, but the
+ messages of the "B" group are available for consumption by other consumers - even
+ though it is "behind" group "A" in the queue.
+ </para>
+ </section>
+ <section role="h2" id="usingmessagegroups-ConsumerGuide">
+ <title>
+ Well Behaved Consumers
+ </title>
+ <para>
+ In order to guarantee the ordering policy, the consuming application has to ensure
+ that it has completely processed the data in a received message before accepting
+ that message, as described in Section 2.6.2. Transfer of Responsibility, of the
+ AMQP-0.10 specification.
+ </para>
+ <para>
+ The term <emphasis>processed</emphasis> means that the application has finished updating all
+ state related to the message that has been received.
+ </para>
+ <note>
+ <title>Be Advised</title>
+ <para>
+ It is possible for a consumer to affect the ordering of grouped messages even
+ when the broker is enforcing consumption order. This can be done by selectively
+ acknowledging and releasing messages from the same group.
+ </para>
+ <para>
+ Assume a consumer has received two messages from group "A", "A-1" and "A-2", in
+ that order. If the consumer releases "A-1" then acknowledges "A-2", "A-1" will
+ be put back onto the queue and "A-2" will be removed from the queue. This
+ allows another consumer to acquire and process "A-1" <emphasis>after</emphasis> "A-2" has been
+ processed.
+ </para>
+ <para>
+ Under some application-defined circumstances, this may be acceptable behavior.
+ However, if order must be preserved, the client should either release <emphasis>all</emphasis>
+ currently held messages, or discard the target message using reject.
+ </para>
+ </note>
+ </section>
+ <!--h2-->
+ <section role="h2" id="usingmessagegroups-BrokerConfig">
+ <title>
+ Broker Configuration
+ </title>
+ <para>
+ In order for the broker to determine a message's group, the key for the header
+ that contains the group identifier must be provided to the broker via
+ configuration. This is done on a per-queue basis, when the queue is first
+ configured.
+ </para>
+ <para>
+ This means that message group classification is determined by the message's destination
+ queue.
+ </para>
+ <para>
+ Specifically, the queue "holds" the header key that is used to find the message's
+ group identifier. All messages arriving at the queue are expected to use the same
+ header key for holding the identifer. Once the message is enqueued, the broker
+ looks up the group identifier in the message's header, and classifies the message
+ by its group.
+ </para>
+ <para>
+ Message group support can be enabled on a queue using the
+ <command>qpid-config</command> command line tool. The following options should be
+ provided when adding a new queue:
+ <programlisting>
+ --group-header=<replaceable>header-name</replaceable> Enable message group support for this queue. Specify name of application header that holds the group identifier.
+ --shared-groups Enforce ordered message group consumption across multiple consumers.
+ </programlisting>
+ </para>
+ <para>
+ Message group support may also be specified in the
+ <command>queue.declare</command> method via the <command>arguments</command>
+ parameter map, or using the messaging address syntax. The following keys must be
+ provided in the arguments map to enable message group support on a queue:
+ </para>
+ <table>
+ <title>Queue Declare/Addres Syntax Message Group Configuration Arguments</title>
+ <tgroup cols="2">
+ <thead>
+ <row>
+ <entry>Key</entry>
+ <entry>Value</entry>
+ </row>
+ </thead>
+ <tbody>
+ <row>
+ <entry>qpid.group_header_key</entry>
+ <entry>string - key for message header that holds the group identifier value</entry>
+ </row>
+ <row>
+ <entry>qpid.shared_msg_group</entry>
+ <entry>1 - enforce ordering across multiple consumers</entry>
+ </row>
+ </tbody>
+ </tgroup>
+ </table>
+ <para>
+ It is important to note that there is no need to provide the actual group
+ identifer values that will be used. The broker learns this values as messages are
+ recieved. Also, there is no practical limit - aside from resource limitations -
+ to the number of different groups that the broker can track at run time.
+ </para>
+ <note>
+ <title>Restrictions</title>
+ <para>
+ Message grouping is not supported on LVQ or Priority queues.
+ </para>
+ </note>
+ <example>
+ <title>Creating a message group queue via qpid-config</title>
+ <para>
+ This example uses the qpid-config tool to create a message group queue called
+ "MyMsgQueue". The message header that contains the group identifier will use
+ the key "GROUP_KEY".
+ </para>
+ <programlisting>
+ qpid-config add queue MyMsgQueue --group-header="GROUP_KEY" --shared-groups"
+ </programlisting>
+ </example>
+ <example>
+ <title>Creating a message group queue using address syntax (C++)</title>
+ <para>
+ This example uses the messaging address syntax to create a message group queue
+ with the same configuration as the previous example.
+ </para>
+ <programlisting>
+ sender = session.createSender("MyMsgQueue; {create:always, delete:receiver, node: {x-declare: {arguments:"
+ " {'qpid.group_header_key':'GROUP_KEY', 'qpid.shared_msg_group':1}}}}")
+ </programlisting>
+ </example>
+ <section role="h3" id="usingmessagegroups-DefaultGroup">
+ <title>
+ Default Group
+ </title>
+ <para>
+ Should a message without a group identifier arrive at a queue configured for message grouping, the broker assigns the message to the default group. Therefore, all such "unidentified" messages are considered by the broker as part of the same group. The name of the default group is <command>"qpid.no-group"</command>. This default can be overridden by suppling a different value to the broker configuration item <command>"default-message-group"</command>:
+ <example>
+ <title>Overriding the default message group identifier for the broker</title>
+ <programlisting>
+ qpidd --default-msg-group "EMPTY-GROUP"
+ </programlisting>
+ </example>
+ </para>
+ </section>
+ </section>
+ </section>
+
+
+
diff --git a/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java b/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
index 9c77032d4c..171f4afbe2 100644
--- a/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
+++ b/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
@@ -694,7 +694,8 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable
public BrokerSchema.BrokerClass.QueueMoveMessagesMethodResponseCommand queueMoveMessages(final BrokerSchema.BrokerClass.QueueMoveMessagesMethodResponseCommandFactory factory,
final String srcQueue,
final String destQueue,
- final Long qty)
+ final Long qty,
+ final Map filter) // TODO: move based on group identifier
{
// TODO
return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED);
@@ -731,6 +732,14 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable
return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED);
}
+ public BrokerSchema.BrokerClass.QueryMethodResponseCommand query(final BrokerSchema.BrokerClass.QueryMethodResponseCommandFactory factory,
+ final String type,
+ final String name)
+ {
+ //TODO:
+ return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED);
+ }
+
public UUID getId()
{
return _obj.getId();
@@ -1102,7 +1111,8 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable
}
public BrokerSchema.QueueClass.PurgeMethodResponseCommand purge(final BrokerSchema.QueueClass.PurgeMethodResponseCommandFactory factory,
- final Long request)
+ final Long request,
+ final Map filter) // TODO: support for purge-by-group-identifier
{
try
{
@@ -1118,7 +1128,8 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable
public BrokerSchema.QueueClass.RerouteMethodResponseCommand reroute(final BrokerSchema.QueueClass.RerouteMethodResponseCommandFactory factory,
final Long request,
final Boolean useAltExchange,
- final String exchange)
+ final String exchange,
+ final Map filter) // TODO: support for re-route-by-group-identifier
{
//TODO
return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED);
diff --git a/specs/management-schema.xml b/specs/management-schema.xml
index 5922bfa4fa..dd4acf66d5 100644
--- a/specs/management-schema.xml
+++ b/specs/management-schema.xml
@@ -92,6 +92,7 @@
<arg name="srcQueue" dir="I" type="sstr" desc="Source queue"/>
<arg name="destQueue" dir="I" type="sstr" desc="Destination queue"/>
<arg name="qty" dir="I" type="uint32" desc="# of messages to move. 0 means all messages"/>
+ <arg name="filter" dir="I" type="map" default="{}" desc="if specified, move only those messages matching this filter"/>
</method>
<method name="setLogLevel" desc="Set the log level">
@@ -115,6 +116,13 @@
<arg name="options" dir="I" type="map" desc="Type specific object options for deletion"/>
</method>
+ <method name="query" desc="Query the current state of an object.">
+ <arg name="type" dir="I" type="sstr" desc="The type of object to query."/>
+ <arg name="name" dir="I" type="sstr" desc="The name of the object to query"/>
+ <arg name="results" dir="O" type="map" desc="A snapshot of the object's state."/>
+ </method>
+
+
</class>
<!--
@@ -180,12 +188,14 @@
<method name="purge" desc="Discard all or some messages on a queue">
<arg name="request" dir="I" type="uint32" desc="0 for all messages or n>0 for n messages"/>
+ <arg name="filter" dir="I" type="map" default="{}" desc="if specified, purge only those messages matching this filter"/>
</method>
<method name="reroute" desc="Remove all or some messages on this queue and route them to an exchange">
<arg name="request" dir="I" type="uint32" desc="0 for all messages or n>0 for n messages"/>
<arg name="useAltExchange" dir="I" type="bool" desc="Iff true, use the queue's configured alternate exchange; iff false, use exchange named in the 'exchange' argument"/>
<arg name="exchange" dir="I" type="sstr" desc="Name of the exchange to route the messages through"/>
+ <arg name="filter" dir="I" type="map" default="{}" desc="if specified, reroute only those messages matching this filter"/>
</method>
</class>
diff --git a/tests/src/py/qpid_tests/broker_0_10/__init__.py b/tests/src/py/qpid_tests/broker_0_10/__init__.py
index 921786af22..7b779df5f4 100644
--- a/tests/src/py/qpid_tests/broker_0_10/__init__.py
+++ b/tests/src/py/qpid_tests/broker_0_10/__init__.py
@@ -33,3 +33,4 @@ from lvq import *
from priority import *
from threshold import *
from extensions import *
+from msg_groups import *
diff --git a/tests/src/py/qpid_tests/broker_0_10/management.py b/tests/src/py/qpid_tests/broker_0_10/management.py
index 952878e0b7..5aaa1a7c7d 100644
--- a/tests/src/py/qpid_tests/broker_0_10/management.py
+++ b/tests/src/py/qpid_tests/broker_0_10/management.py
@@ -156,7 +156,7 @@ class ManagementTest (TestBase010):
queues = self.qmf.getObjects(_class="queue")
"Move 10 messages from src-queue to dest-queue"
- result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 10)
+ result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 10, {})
self.assertEqual (result.status, 0)
sq = self.qmf.getObjects(_class="queue", name="src-queue")[0]
@@ -166,7 +166,7 @@ class ManagementTest (TestBase010):
self.assertEqual (dq.msgDepth,10)
"Move all remaining messages to destination"
- result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 0)
+ result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 0, {})
self.assertEqual (result.status,0)
sq = self.qmf.getObjects(_class="queue", name="src-queue")[0]
@@ -176,16 +176,16 @@ class ManagementTest (TestBase010):
self.assertEqual (dq.msgDepth,20)
"Use a bad source queue name"
- result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("bad-src-queue", "dest-queue", 0)
+ result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("bad-src-queue", "dest-queue", 0, {})
self.assertEqual (result.status,4)
"Use a bad destination queue name"
- result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "bad-dest-queue", 0)
+ result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "bad-dest-queue", 0, {})
self.assertEqual (result.status,4)
" Use a large qty (40) to move from dest-queue back to "
" src-queue- should move all "
- result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("dest-queue", "src-queue", 40)
+ result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("dest-queue", "src-queue", 40, {})
self.assertEqual (result.status,0)
sq = self.qmf.getObjects(_class="queue", name="src-queue")[0]
@@ -225,19 +225,19 @@ class ManagementTest (TestBase010):
pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
"Purge top message from purge-queue"
- result = pq.purge(1)
+ result = pq.purge(1, {})
self.assertEqual (result.status, 0)
pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
self.assertEqual (pq.msgDepth,19)
"Purge top 9 messages from purge-queue"
- result = pq.purge(9)
+ result = pq.purge(9, {})
self.assertEqual (result.status, 0)
pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
self.assertEqual (pq.msgDepth,10)
"Purge all messages from purge-queue"
- result = pq.purge(0)
+ result = pq.purge(0, {})
self.assertEqual (result.status, 0)
pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
self.assertEqual (pq.msgDepth,0)
@@ -263,7 +263,7 @@ class ManagementTest (TestBase010):
#reroute messages from test queue to amq.fanout (and hence to
#rerouted queue):
pq = self.qmf.getObjects(_class="queue", name="test-queue")[0]
- result = pq.reroute(0, False, "amq.fanout")
+ result = pq.reroute(0, False, "amq.fanout", {})
self.assertEqual(result.status, 0)
#verify messages are all rerouted:
@@ -301,7 +301,7 @@ class ManagementTest (TestBase010):
pq = self.qmf.getObjects(_class="queue", name="reroute-queue")[0]
"Reroute top message from reroute-queue to alternate exchange"
- result = pq.reroute(1, True, "")
+ result = pq.reroute(1, True, "", {})
self.assertEqual(result.status, 0)
pq.update()
aq = self.qmf.getObjects(_class="queue", name="alt-queue1")[0]
@@ -309,7 +309,7 @@ class ManagementTest (TestBase010):
self.assertEqual(aq.msgDepth,1)
"Reroute top 9 messages from reroute-queue to alt.direct2"
- result = pq.reroute(9, False, "alt.direct2")
+ result = pq.reroute(9, False, "alt.direct2", {})
self.assertEqual(result.status, 0)
pq.update()
aq = self.qmf.getObjects(_class="queue", name="alt-queue2")[0]
@@ -317,11 +317,11 @@ class ManagementTest (TestBase010):
self.assertEqual(aq.msgDepth,9)
"Reroute using a non-existent exchange"
- result = pq.reroute(0, False, "amq.nosuchexchange")
+ result = pq.reroute(0, False, "amq.nosuchexchange", {})
self.assertEqual(result.status, 4)
"Reroute all messages from reroute-queue"
- result = pq.reroute(0, False, "alt.direct2")
+ result = pq.reroute(0, False, "alt.direct2", {})
self.assertEqual(result.status, 0)
pq.update()
aq = self.qmf.getObjects(_class="queue", name="alt-queue2")[0]
@@ -337,7 +337,7 @@ class ManagementTest (TestBase010):
session.message_transfer(destination="amq.direct", message=msg)
"Reroute onto the same queue"
- result = pq.reroute(0, False, "amq.direct")
+ result = pq.reroute(0, False, "amq.direct", {})
self.assertEqual(result.status, 0)
pq.update()
self.assertEqual(pq.msgDepth,20)
@@ -365,7 +365,7 @@ class ManagementTest (TestBase010):
# 4. Call reroute on queue Y and specify that messages should
# be sent to exchange A
y = self.qmf.getObjects(_class="queue", name="Y")[0]
- result = y.reroute(1, False, "A")
+ result = y.reroute(1, False, "A", {})
self.assertEqual(result.status, 0)
# 5. verify that the message is rerouted through B (as A has
diff --git a/tests/src/py/qpid_tests/broker_0_10/msg_groups.py b/tests/src/py/qpid_tests/broker_0_10/msg_groups.py
new file mode 100644
index 0000000000..611be0a6b0
--- /dev/null
+++ b/tests/src/py/qpid_tests/broker_0_10/msg_groups.py
@@ -0,0 +1,981 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpid.messaging import *
+from qpid.tests.messaging import Base
+import qmf.console
+
+from time import sleep
+#
+# Tests the Broker's support for message groups
+#
+
+class MultiConsumerMsgGroupTests(Base):
+ """
+ Tests for the behavior of multi-consumer message groups. These tests allow
+ a messages from the same group be consumed by multiple different clients as
+ long as each message is processed "in sequence". See QPID-3346 for
+ details.
+ """
+
+ def setup_connection(self):
+ return Connection.establish(self.broker, **self.connection_options())
+
+ def setup_session(self):
+ return self.conn.session()
+
+ def test_simple(self):
+ """ Verify simple acquire/accept actions on a set of grouped
+ messages shared between two receivers.
+ """
+ ## Create a msg group queue
+
+ snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
+ " node: {x-declare: {arguments:" +
+ " {'qpid.group_header_key':'THE-GROUP'," +
+ "'qpid.shared_msg_group':1}}}}")
+
+ groups = ["A","A","A","B","B","B","C","C","C"]
+ messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
+ index = 0
+ for m in messages:
+ m.content['index'] = index
+ index += 1
+ snd.send(m)
+
+ ## Queue = a-0, a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8...
+ ## Owners= ---, ---, ---, ---, ---, ---, ---, ---, ---,
+
+ # create consumers on separate sessions: C1,C2
+ s1 = self.setup_session()
+ c1 = s1.receiver("msg-group-q", options={"capacity":1})
+ s2 = self.setup_session()
+ c2 = s2.receiver("msg-group-q", options={"capacity":1})
+
+ # C1 should acquire A-0, then C2 should acquire B-3
+
+ m1 = c1.fetch(0);
+ assert m1.properties['THE-GROUP'] == 'A'
+ assert m1.content['index'] == 0
+
+ m2 = c2.fetch(0);
+ assert m2.properties['THE-GROUP'] == 'B'
+ assert m2.content['index'] == 3
+
+ # C1 Acknowledge A-0
+ c1.session.acknowledge(m1);
+
+ # C2 should next acquire A-1
+ m3 = c2.fetch(0);
+ assert m3.properties['THE-GROUP'] == 'A'
+ assert m3.content['index'] == 1
+
+ # C1 should next acquire C-6, since groups A&B are held by c2
+ m4 = c1.fetch(0);
+ assert m4.properties['THE-GROUP'] == 'C'
+ assert m4.content['index'] == 6
+
+ ## Queue = XXX, a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8...
+ ## Owners= ---, ^C2, +C2, ^C2, +C2, +C2, ^C1, +C1, +C1,
+
+ # C2 Acknowledge B-3, freeing up the rest of B group
+ c2.session.acknowledge(m2);
+
+ ## Queue = XXX, a-1, a-2, XXX, b-4, b-5, c-6, c-7, c-8...
+ ## Owners= ---, ^C2, +C2, ---, ---, ---, ^C1, +C1, +C1,
+
+ # C1 should now acquire B-4, since it is next "free"
+ m5 = c1.fetch(0);
+ assert m5.properties['THE-GROUP'] == 'B'
+ assert m5.content['index'] == 4
+
+ ## Queue = XXX, a-1, a-2, XXX, b-4, b-5, c-6, c-7, c-8...
+ ## Owners= ---, ^C2, +C2, ---, ^C1, +C1, ^C1, +C1, +C1,
+
+ # C1 acknowledges C-6, freeing the C group
+ c1.session.acknowledge(m4)
+
+ ## Queue = XXX, a-1, a-2, XXX, b-4, b-5, XXX, c-7, c-8...
+ ## Owners= ---, ^C2, +C2, ---, ^C1, +C1, ---, ---, ---
+
+ # C2 should next fetch A-2, followed by C-7
+ m7 = c2.fetch(0);
+ assert m7.properties['THE-GROUP'] == 'A'
+ assert m7.content['index'] == 2
+
+ m8 = c2.fetch(0);
+ assert m8.properties['THE-GROUP'] == 'C'
+ assert m8.content['index'] == 7
+
+ ## Queue = XXX, a-1, a-2, XXX, b-4, b-5, XXX, c-7, c-8...
+ ## Owners= ---, ^C2, ^C2, ---, ^C1, +C1, ---, ^C2, +C2
+
+ # have C2 ack all fetched messages, freeing C-8
+ c2.session.acknowledge()
+
+ ## Queue = XXX, XXX, XXX, XXX, b-4, b-5, XXX, XXX, c-8...
+ ## Owners= ---, ---, ---, ---, ^C1, +C1, ---, ---, ---
+
+ # the next fetch of C2 would get C-8, since B-5 is "owned"
+ m9 = c2.fetch(0);
+ assert m9.properties['THE-GROUP'] == 'C'
+ assert m9.content['index'] == 8
+
+ ## Queue = XXX, XXX, XXX, XXX, b-4, b-5, XXX, XXX, c-8...
+ ## Owners= ---, ---, ---, ---, ^C1, +C1, ---, ---, ^C2
+
+ # C1 acks B-4, freeing B-5 for consumption
+ c1.session.acknowledge(m5)
+
+ ## Queue = XXX, XXX, XXX, XXX, XXX, b-5, XXX, XXX, c-8...
+ ## Owners= ---, ---, ---, ---, ---, ^C2, ---, ---, ^C2
+
+ # the next fetch of C2 would get B-5
+ m10 = c2.fetch(0);
+ assert m10.properties['THE-GROUP'] == 'B'
+ assert m10.content['index'] == 5
+
+ # there should be no more left for C1:
+ try:
+ mx = c1.fetch(0)
+ assert False # should never get here
+ except Empty:
+ pass
+
+ c1.session.acknowledge()
+ c2.session.acknowledge()
+ c1.close()
+ c2.close()
+ snd.close()
+
+ def test_simple_browse(self):
+ """ Test the behavior of a browsing subscription on a message grouping
+ queue.
+ """
+
+ ## Create a msg group queue
+
+ snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
+ " node: {x-declare: {arguments:" +
+ " {'qpid.group_header_key':'THE-GROUP'," +
+ "'qpid.shared_msg_group':1}}}}")
+
+ groups = ["A","B","A","B","C"]
+ messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
+ index = 0
+ for m in messages:
+ m.content['index'] = index
+ index += 1
+ snd.send(m)
+
+ ## Queue = A-0, B-1, A-2, b-3, C-4
+ ## Owners= ---, ---, ---, ---, ---
+
+ # create consumer and browser
+ s1 = self.setup_session()
+ c1 = s1.receiver("msg-group-q", options={"capacity":1})
+ s2 = self.setup_session()
+ b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":1})
+
+ # C1 should acquire A-0
+
+ m1 = c1.fetch(0);
+ assert m1.properties['THE-GROUP'] == 'A'
+ assert m1.content['index'] == 0
+
+ ## Queue = A-0, B-1, A-2, b-3, C-4
+ ## Owners= ^C1, ---, +C1, ---, ---
+
+ m2 = b1.fetch(0)
+ assert m2.properties['THE-GROUP'] == 'B'
+ assert m2.content['index'] == 1
+
+ # verify that the browser may see A-2, even though its group is owned
+ # by C1
+ m2 = b1.fetch(0)
+ assert m2.properties['THE-GROUP'] == 'A'
+ assert m2.content['index'] == 2
+
+ m2 = b1.fetch(0)
+ assert m2.properties['THE-GROUP'] == 'B'
+ assert m2.content['index'] == 3
+
+ # verify the consumer can own groups currently seen by the browser
+ m3 = c1.fetch(0);
+ assert m3.properties['THE-GROUP'] == 'B'
+ assert m3.content['index'] == 1
+
+ m2 = b1.fetch(0)
+ assert m2.properties['THE-GROUP'] == 'C'
+ assert m2.content['index'] == 4
+
+ def test_release(self):
+ """ Verify releasing a message can free its assocated group
+ """
+ snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
+ " node: {x-declare: {arguments:" +
+ " {'qpid.group_header_key':'THE-GROUP'," +
+ "'qpid.shared_msg_group':1}}}}")
+
+ groups = ["A","A","B","B"]
+ messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
+ index = 0
+ for m in messages:
+ m.content['index'] = index
+ index += 1
+ snd.send(m)
+
+ s1 = self.setup_session()
+ c1 = s1.receiver("msg-group-q", options={"capacity":1})
+ s2 = self.setup_session()
+ c2 = s2.receiver("msg-group-q", options={"capacity":1})
+
+ m1 = c1.fetch(0)
+ assert m1.properties['THE-GROUP'] == 'A'
+ assert m1.content['index'] == 0
+
+ m2 = c2.fetch(0)
+ assert m2.properties['THE-GROUP'] == 'B'
+ assert m2.content['index'] == 2
+
+ # C1 release m1, and the first group
+
+ s1.acknowledge(m1, Disposition(RELEASED, set_redelivered=True))
+
+ # C2 should be able to get group 'A', msg 'A-0' now
+ m2 = c2.fetch(0)
+ assert m2.properties['THE-GROUP'] == 'A'
+ assert m2.content['index'] == 0
+
+ def test_reject(self):
+ """ Verify rejecting a message can free its associated group
+ """
+ snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
+ " node: {x-declare: {arguments:" +
+ " {'qpid.group_header_key':'THE-GROUP'," +
+ "'qpid.shared_msg_group':1}}}}")
+
+ groups = ["A","A","B","B"]
+ messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
+ index = 0
+ for m in messages:
+ m.content['index'] = index
+ index += 1
+ snd.send(m)
+
+ s1 = self.setup_session()
+ c1 = s1.receiver("msg-group-q", options={"capacity":1})
+ s2 = self.setup_session()
+ c2 = s2.receiver("msg-group-q", options={"capacity":1})
+
+ m1 = c1.fetch(0)
+ assert m1.properties['THE-GROUP'] == 'A'
+ assert m1.content['index'] == 0
+
+ m2 = c2.fetch(0)
+ assert m2.properties['THE-GROUP'] == 'B'
+ assert m2.content['index'] == 2
+
+ # C1 rejects m1, and the first group is released
+ s1.acknowledge(m1, Disposition(REJECTED))
+
+ # C2 should be able to get group 'A', msg 'A-1' now
+ m2 = c2.fetch(0)
+ assert m2.properties['THE-GROUP'] == 'A'
+ assert m2.content['index'] == 1
+
+ def test_close(self):
+ """ Verify behavior when a consumer that 'owns' a group closes.
+ """
+ snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
+ " node: {x-declare: {arguments:" +
+ " {'qpid.group_header_key':'THE-GROUP'," +
+ "'qpid.shared_msg_group':1}}}}")
+
+ groups = ["A","A","B","B"]
+ messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
+ index = 0
+ for m in messages:
+ m.content['index'] = index
+ index += 1
+ snd.send(m)
+
+ s1 = self.setup_session()
+ c1 = s1.receiver("msg-group-q", options={"capacity":1})
+ s2 = self.setup_session()
+ c2 = s2.receiver("msg-group-q", options={"capacity":1})
+
+ # C1 will own group A
+ m1 = c1.fetch(0)
+ assert m1.properties['THE-GROUP'] == 'A'
+ assert m1.content['index'] == 0
+
+ # C2 will own group B
+ m2 = c2.fetch(0)
+ assert m2.properties['THE-GROUP'] == 'B'
+ assert m2.content['index'] == 2
+
+ # C1 shuffles off the mortal coil...
+ c1.close();
+
+ # but the session (s1) remains active, so "A" remains blocked
+ # from c2, c2 should fetch the next B-3
+
+ m2 = c2.fetch(0)
+ assert m2.properties['THE-GROUP'] == 'B'
+ assert m2.content['index'] == 3
+
+ # and there should be no more messages available for C2
+ try:
+ m2 = c2.fetch(0)
+ assert False # should never get here
+ except Empty:
+ pass
+
+ # close session s1, releasing the A group
+ s1.close()
+
+ m2 = c2.fetch(0)
+ assert m2.properties['THE-GROUP'] == 'A'
+ assert m2.content['index'] == 0
+
+ m2 = c2.fetch(0)
+ assert m2.properties['THE-GROUP'] == 'A'
+ assert m2.content['index'] == 1
+
+ # and there should be no more messages now
+ try:
+ m2 = c2.fetch(0)
+ assert False # should never get here
+ except Empty:
+ pass
+
+ def test_transaction(self):
+ """ Verify behavior when using transactions.
+ """
+ snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
+ " node: {x-declare: {arguments:" +
+ " {'qpid.group_header_key':'THE-GROUP'," +
+ "'qpid.shared_msg_group':1}}}}")
+
+ groups = ["A","A","B","B","A","B"]
+ messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
+ index = 0
+ for m in messages:
+ m.content['index'] = index
+ index += 1
+ snd.send(m)
+
+ s1 = self.conn.session(transactional=True)
+ c1 = s1.receiver("msg-group-q", options={"capacity":1})
+ s2 = self.conn.session(transactional=True)
+ c2 = s2.receiver("msg-group-q", options={"capacity":1})
+
+ # C1 gets group A
+ m1 = c1.fetch(0)
+ assert m1.properties['THE-GROUP'] == 'A'
+ assert m1.content['index'] == 0
+
+ # C2 gets group B
+ m2 = c2.fetch(0)
+ assert m2.properties['THE-GROUP'] == 'B'
+ assert m2.content['index'] == 2
+
+ s1.acknowledge(m1) # A-0 consumed, A group freed
+ s2.acknowledge(m2) # B-2 consumed, B group freed
+
+ s1.commit()
+ s2.rollback() # release B-2 and group B
+
+ ## Q: ["A1","B2","B3","A4","B5"]
+
+ # C2 should be able to get the next A
+ m3 = c2.fetch(0)
+ assert m3.properties['THE-GROUP'] == 'A'
+ assert m3.content['index'] == 1
+
+ # C1 should be able to get B-2
+ m4 = c1.fetch(0)
+ assert m4.properties['THE-GROUP'] == 'B'
+ assert m4.content['index'] == 2
+
+ s2.acknowledge(m3) # C2 consumes A-1
+ s1.acknowledge(m4) # C1 consumes B-2
+ s1.commit() # C1 consume B-2 occurs, free group B
+
+ ## Q: [["A1",]"B3","A4","B5"]
+
+ # A-1 is still considered owned by C2, since the commit has yet to
+ # occur, so the next available to C1 would be B-3
+ m5 = c1.fetch(0) # B-3
+ assert m5.properties['THE-GROUP'] == 'B'
+ assert m5.content['index'] == 3
+
+ # and C2 should find A-4 available, since it owns the A group
+ m6 = c2.fetch(0) # A-4
+ assert m6.properties['THE-GROUP'] == 'A'
+ assert m6.content['index'] == 4
+
+ s2.acknowledge(m6) # C2 consumes A-4
+
+ # uh-oh, A-1 and A-4 released, along with A group
+ s2.rollback()
+
+ ## Q: ["A1",["B3"],"A4","B5"]
+ m7 = c1.fetch(0) # A-1 is found
+ assert m7.properties['THE-GROUP'] == 'A'
+ assert m7.content['index'] == 1
+
+ ## Q: [["A1"],["B3"],"A4","B5"]
+ # since C1 "owns" both A and B group, C2 should find nothing available
+ try:
+ m8 = c2.fetch(0)
+ assert False # should not get here
+ except Empty:
+ pass
+
+ # C1 next gets A4
+ m9 = c1.fetch(0)
+ assert m9.properties['THE-GROUP'] == 'A'
+ assert m9.content['index'] == 4
+
+ s1.acknowledge()
+
+ ## Q: [["A1"],["B3"],["A4"],"B5"]
+ # even though C1 acknowledges A1,B3, and A4, B5 is still considered
+ # owned as the commit has yet to take place
+ try:
+ m10 = c2.fetch(0)
+ assert False # should not get here
+ except Empty:
+ pass
+
+ # now A1,B3,A4 dequeued, B5 should be free
+ s1.commit()
+
+ ## Q: ["B5"]
+ m11 = c2.fetch(0)
+ assert m11.properties['THE-GROUP'] == 'B'
+ assert m11.content['index'] == 5
+
+ s2.acknowledge()
+ s2.commit()
+
+ def test_query(self):
+ """ Verify the queue query method against message groups
+ """
+ snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
+ " node: {x-declare: {arguments:" +
+ " {'qpid.group_header_key':'THE-GROUP'," +
+ "'qpid.shared_msg_group':1}}}}")
+
+ groups = ["A","B","C","A","B","C","A"]
+ messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
+ index = 0
+ for m in messages:
+ m.content['index'] = index
+ index += 1
+ snd.send(m)
+
+ s1 = self.setup_session()
+ c1 = s1.receiver("msg-group-q", options={"capacity":1})
+ s2 = self.setup_session()
+ c2 = s2.receiver("msg-group-q", options={"capacity":1})
+
+ m1 = c1.fetch(0)
+ m2 = c2.fetch(0)
+
+ # at this point, group A should be owned by C1, group B by C2, and
+ # group C should be available
+
+ # now setup a QMF session, so we can call methods
+ self.qmf_session = qmf.console.Session()
+ self.qmf_broker = self.qmf_session.addBroker(str(self.broker))
+ brokers = self.qmf_session.getObjects(_class="broker")
+ assert len(brokers) == 1
+ broker = brokers[0]
+
+ # verify the query method call's group information
+ rc = broker.query("queue", "msg-group-q")
+ assert rc.status == 0
+ assert rc.text == "OK"
+ results = rc.outArgs['results']
+ assert 'qpid.message_group_queue' in results
+ q_info = results['qpid.message_group_queue']
+ assert 'group_header_key' in q_info and q_info['group_header_key'] == "THE-GROUP"
+ assert 'group_state' in q_info and len(q_info['group_state']) == 3
+ for g_info in q_info['group_state']:
+ assert 'group_id' in g_info
+ if g_info['group_id'] == "A":
+ assert g_info['msg_count'] == 3
+ assert g_info['consumer'] != ""
+ elif g_info['group_id'] == "B":
+ assert g_info['msg_count'] == 2
+ assert g_info['consumer'] != ""
+ elif g_info['group_id'] == "C":
+ assert g_info['msg_count'] == 2
+ assert g_info['consumer'] == ""
+ else:
+ assert(False) # should never get here
+ self.qmf_session.delBroker(self.qmf_broker)
+
+ def test_purge_free(self):
+ """ Verify we can purge a queue of all messages of a given "unowned"
+ group.
+ """
+ snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
+ " node: {x-declare: {arguments:" +
+ " {'qpid.group_header_key':'THE-GROUP'," +
+ "'qpid.shared_msg_group':1}}}}")
+
+ groups = ["A","B","A","B","C","A"]
+ messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
+ index = 0
+ for m in messages:
+ m.content['index'] = index
+ index += 1
+ snd.send(m)
+
+ # now setup a QMF session, so we can call methods
+ self.qmf_session = qmf.console.Session()
+ self.qmf_broker = self.qmf_session.addBroker(str(self.broker))
+ queue = self.qmf_session.getObjects(_class="queue", name="msg-group-q")[0]
+ assert queue
+ msg_filter = { 'filter_type' : 'header_match_str',
+ 'filter_params' : { 'header_key' : "THE-GROUP",
+ 'header_value' : "B" }}
+ assert queue.msgDepth == 6
+ rc = queue.purge(0, msg_filter)
+ assert rc.status == 0
+ queue.update()
+ assert queue.msgDepth == 4
+
+ # verify all B's removed....
+ s2 = self.setup_session()
+ b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":1})
+ count = 0
+ try:
+ while True:
+ m2 = b1.fetch(0)
+ assert m2.properties['THE-GROUP'] != 'B'
+ count += 1
+ except Empty:
+ pass
+ assert count == 4
+
+ self.qmf_session.delBroker(self.qmf_broker)
+
+ def test_purge_acquired(self):
+ """ Verify we can purge messages from an acquired group.
+ """
+ snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
+ " node: {x-declare: {arguments:" +
+ " {'qpid.group_header_key':'THE-GROUP'," +
+ "'qpid.shared_msg_group':1}}}}")
+
+ groups = ["A","B","A","B","C","A"]
+ messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
+ index = 0
+ for m in messages:
+ m.content['index'] = index
+ index += 1
+ snd.send(m)
+
+ # acquire group "A"
+ s1 = self.setup_session()
+ c1 = s1.receiver("msg-group-q", options={"capacity":1})
+ m1 = c1.fetch(0)
+ assert m1.properties['THE-GROUP'] == 'A'
+ assert m1.content['index'] == 0
+
+ # now setup a QMF session, so we can purge group A
+ self.qmf_session = qmf.console.Session()
+ self.qmf_broker = self.qmf_session.addBroker(str(self.broker))
+ queue = self.qmf_session.getObjects(_class="queue", name="msg-group-q")[0]
+ assert queue
+ msg_filter = { 'filter_type' : 'header_match_str',
+ 'filter_params' : { 'header_key' : "THE-GROUP",
+ 'header_value' : "A" }}
+ assert queue.msgDepth == 6
+ rc = queue.purge(0, msg_filter)
+ assert rc.status == 0
+ queue.update()
+ queue.msgDepth == 4 # the pending acquired A still counts!
+
+ # verify all other A's removed....
+ s2 = self.setup_session()
+ b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":1})
+ count = 0
+ try:
+ while True:
+ m2 = b1.fetch(0)
+ assert m2.properties['THE-GROUP'] != 'A'
+ count += 1
+ except Empty:
+ pass
+ assert count == 3 # only 3 really available
+ s1.acknowledge() # ack the consumed A-0
+ self.qmf_session.delBroker(self.qmf_broker)
+
+ def test_purge_count(self):
+ """ Verify we can purge a fixed number of messages from an acquired
+ group.
+ """
+ snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
+ " node: {x-declare: {arguments:" +
+ " {'qpid.group_header_key':'THE-GROUP'," +
+ "'qpid.shared_msg_group':1}}}}")
+
+ groups = ["A","B","A","B","C","A"]
+ messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
+ index = 0
+ for m in messages:
+ m.content['index'] = index
+ index += 1
+ snd.send(m)
+
+ # acquire group "A"
+ s1 = self.setup_session()
+ c1 = s1.receiver("msg-group-q", options={"capacity":1})
+ m1 = c1.fetch(0)
+ assert m1.properties['THE-GROUP'] == 'A'
+ assert m1.content['index'] == 0
+
+ # now setup a QMF session, so we can purge group A
+ self.qmf_session = qmf.console.Session()
+ self.qmf_broker = self.qmf_session.addBroker(str(self.broker))
+ queue = self.qmf_session.getObjects(_class="queue", name="msg-group-q")[0]
+ assert queue
+ msg_filter = { 'filter_type' : 'header_match_str',
+ 'filter_params' : { 'header_key' : "THE-GROUP",
+ 'header_value' : "A" }}
+ assert queue.msgDepth == 6
+ rc = queue.purge(1, msg_filter)
+ assert rc.status == 0
+ queue.update()
+ queue.msgDepth == 5 # the pending acquired A still counts!
+
+ # verify all other A's removed....
+ s2 = self.setup_session()
+ b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":1})
+ count = 0
+ a_count = 0
+ try:
+ while True:
+ m2 = b1.fetch(0)
+ if m2.properties['THE-GROUP'] != 'A':
+ count += 1
+ else:
+ a_count += 1
+ except Empty:
+ pass
+ assert count == 3 # non-A's
+ assert a_count == 1 # and one is an A
+ s1.acknowledge() # ack the consumed A-0
+ self.qmf_session.delBroker(self.qmf_broker)
+
+ def test_move_all(self):
+ """ Verify we can move messages from an acquired group.
+ """
+ snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
+ " node: {x-declare: {arguments:" +
+ " {'qpid.group_header_key':'THE-GROUP'," +
+ "'qpid.shared_msg_group':1}}}}")
+
+ groups = ["A","B","A","B","C","A"]
+ messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
+ index = 0
+ for m in messages:
+ m.content['index'] = index
+ index += 1
+ snd.send(m)
+
+ # set up destination queue
+ rcvr = self.ssn.receiver("dest-q; {create:always, delete:receiver," +
+ " node: {x-declare: {arguments:" +
+ " {'qpid.group_header_key':'THE-GROUP'," +
+ "'qpid.shared_msg_group':1}}}}")
+
+ # acquire group "A"
+ s1 = self.setup_session()
+ c1 = s1.receiver("msg-group-q", options={"capacity":1})
+ m1 = c1.fetch(0)
+ assert m1.properties['THE-GROUP'] == 'A'
+ assert m1.content['index'] == 0
+
+ # now setup a QMF session, so we can move what's left of group A
+ self.qmf_session = qmf.console.Session()
+ self.qmf_broker = self.qmf_session.addBroker(str(self.broker))
+ brokers = self.qmf_session.getObjects(_class="broker")
+ assert len(brokers) == 1
+ broker = brokers[0]
+ msg_filter = { 'filter_type' : 'header_match_str',
+ 'filter_params' : { 'header_key' : "THE-GROUP",
+ 'header_value' : "A" }}
+ rc = broker.queueMoveMessages("msg-group-q", "dest-q", 0, msg_filter)
+ assert rc.status == 0
+
+ # verify all other A's removed from msg-group-q
+ s2 = self.setup_session()
+ b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":1})
+ count = 0
+ try:
+ while True:
+ m2 = b1.fetch(0)
+ assert m2.properties['THE-GROUP'] != 'A'
+ count += 1
+ except Empty:
+ pass
+ assert count == 3 # only 3 really available
+
+ # verify the moved A's are at the dest-q
+ s2 = self.setup_session()
+ b1 = s2.receiver("dest-q; {mode: browse}", options={"capacity":1})
+ count = 0
+ try:
+ while True:
+ m2 = b1.fetch(0)
+ assert m2.properties['THE-GROUP'] == 'A'
+ assert m2.content['index'] == 2 or m2.content['index'] == 5
+ count += 1
+ except Empty:
+ pass
+ assert count == 2 # two A's moved
+
+ s1.acknowledge() # ack the consumed A-0
+ self.qmf_session.delBroker(self.qmf_broker)
+
+ def test_move_count(self):
+ """ Verify we can move a fixed number of messages from an acquired group.
+ """
+ snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
+ " node: {x-declare: {arguments:" +
+ " {'qpid.group_header_key':'THE-GROUP'," +
+ "'qpid.shared_msg_group':1}}}}")
+
+ groups = ["A","B","A","B","C","A"]
+ messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
+ index = 0
+ for m in messages:
+ m.content['index'] = index
+ index += 1
+ snd.send(m)
+
+ # set up destination queue
+ rcvr = self.ssn.receiver("dest-q; {create:always, delete:receiver," +
+ " node: {x-declare: {arguments:" +
+ " {'qpid.group_header_key':'THE-GROUP'," +
+ "'qpid.shared_msg_group':1}}}}")
+
+ # now setup a QMF session, so we can move group B
+ self.qmf_session = qmf.console.Session()
+ self.qmf_broker = self.qmf_session.addBroker(str(self.broker))
+ brokers = self.qmf_session.getObjects(_class="broker")
+ assert len(brokers) == 1
+ broker = brokers[0]
+ msg_filter = { 'filter_type' : 'header_match_str',
+ 'filter_params' : { 'header_key' : "THE-GROUP",
+ 'header_value' : "B" }}
+ rc = broker.queueMoveMessages("msg-group-q", "dest-q", 3, msg_filter)
+ assert rc.status == 0
+
+ # verify all B's removed from msg-group-q
+ s2 = self.setup_session()
+ b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":1})
+ count = 0
+ try:
+ while True:
+ m2 = b1.fetch(0)
+ assert m2.properties['THE-GROUP'] != 'B'
+ count += 1
+ except Empty:
+ pass
+ assert count == 4
+
+ # verify the moved B's are at the dest-q
+ s2 = self.setup_session()
+ b1 = s2.receiver("dest-q; {mode: browse}", options={"capacity":1})
+ count = 0
+ try:
+ while True:
+ m2 = b1.fetch(0)
+ assert m2.properties['THE-GROUP'] == 'B'
+ assert m2.content['index'] == 1 or m2.content['index'] == 3
+ count += 1
+ except Empty:
+ pass
+ assert count == 2
+
+ self.qmf_session.delBroker(self.qmf_broker)
+
+ def test_reroute(self):
+ """ Verify we can reroute messages from an acquired group.
+ """
+ snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
+ " node: {x-declare: {arguments:" +
+ " {'qpid.group_header_key':'THE-GROUP'," +
+ "'qpid.shared_msg_group':1}}}}")
+
+ groups = ["A","B","A","B","C","A"]
+ messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
+ index = 0
+ for m in messages:
+ m.content['index'] = index
+ index += 1
+ snd.send(m)
+
+ # create a topic exchange for the reroute
+ rcvr = self.ssn.receiver("reroute-q; {create: always, delete:receiver," +
+ " node: {type: topic}}")
+
+ # acquire group "A"
+ s1 = self.setup_session()
+ c1 = s1.receiver("msg-group-q", options={"capacity":1})
+ m1 = c1.fetch(0)
+ assert m1.properties['THE-GROUP'] == 'A'
+ assert m1.content['index'] == 0
+
+ # now setup a QMF session, so we can reroute group A
+ self.qmf_session = qmf.console.Session()
+ self.qmf_broker = self.qmf_session.addBroker(str(self.broker))
+ queue = self.qmf_session.getObjects(_class="queue", name="msg-group-q")[0]
+ assert queue
+ msg_filter = { 'filter_type' : 'header_match_str',
+ 'filter_params' : { 'header_key' : "THE-GROUP",
+ 'header_value' : "A" }}
+ assert queue.msgDepth == 6
+ rc = queue.reroute(0, False, "reroute-q", msg_filter)
+ assert rc.status == 0
+ queue.update()
+ queue.msgDepth == 4 # the pending acquired A still counts!
+
+ # verify all other A's removed....
+ s2 = self.setup_session()
+ b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":1})
+ count = 0
+ try:
+ while True:
+ m2 = b1.fetch(0)
+ assert m2.properties['THE-GROUP'] != 'A'
+ count += 1
+ except Empty:
+ pass
+ assert count == 3 # only 3 really available
+
+ # and what of reroute-q?
+ count = 0
+ try:
+ while True:
+ m2 = rcvr.fetch(0)
+ assert m2.properties['THE-GROUP'] == 'A'
+ assert m2.content['index'] == 2 or m2.content['index'] == 5
+ count += 1
+ except Empty:
+ pass
+ assert count == 2
+
+ s1.acknowledge() # ack the consumed A-0
+ self.qmf_session.delBroker(self.qmf_broker)
+
+ def test_queue_delete(self):
+ """ Test deleting a queue while consumers are active.
+ """
+
+ ## Create a msg group queue
+
+ snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
+ " node: {x-declare: {arguments:" +
+ " {'qpid.group_header_key':'THE-GROUP'," +
+ "'qpid.shared_msg_group':1}}}}")
+
+ groups = ["A","B","A","B","C"]
+ messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
+ index = 0
+ for m in messages:
+ m.content['index'] = index
+ index += 1
+ snd.send(m)
+
+ ## Queue = A-0, B-1, A-2, b-3, C-4
+ ## Owners= ---, ---, ---, ---, ---
+
+ # create consumers
+ s1 = self.setup_session()
+ c1 = s1.receiver("msg-group-q", options={"capacity":1})
+ s2 = self.setup_session()
+ c2 = s2.receiver("msg-group-q", options={"capacity":1})
+
+ # C1 should acquire A-0
+ m1 = c1.fetch(0);
+ assert m1.properties['THE-GROUP'] == 'A'
+ assert m1.content['index'] == 0
+
+ # c2 acquires B-1
+ m2 = c2.fetch(0)
+ assert m2.properties['THE-GROUP'] == 'B'
+ assert m2.content['index'] == 1
+
+ # with group A and B owned, and C free, delete the
+ # queue
+ snd.close()
+ self.ssn.close()
+
+ def test_default_group_id(self):
+ """ Verify the queue assigns the default group id should a message
+ arrive without a group identifier.
+ """
+ snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
+ " node: {x-declare: {arguments:" +
+ " {'qpid.group_header_key':'THE-GROUP'," +
+ "'qpid.shared_msg_group':1}}}}")
+
+ m = Message(content={}, properties={"NO-GROUP-HEADER":"HA-HA"})
+ snd.send(m)
+
+ # now setup a QMF session, so we can call methods
+ self.qmf_session = qmf.console.Session()
+ self.qmf_broker = self.qmf_session.addBroker(str(self.broker))
+ brokers = self.qmf_session.getObjects(_class="broker")
+ assert len(brokers) == 1
+ broker = brokers[0]
+
+ # grab the group state off the queue, and verify the default group is
+ # present ("qpid.no-group" is the broker default)
+ rc = broker.query("queue", "msg-group-q")
+ assert rc.status == 0
+ assert rc.text == "OK"
+ results = rc.outArgs['results']
+ assert 'qpid.message_group_queue' in results
+ q_info = results['qpid.message_group_queue']
+ assert 'group_header_key' in q_info and q_info['group_header_key'] == "THE-GROUP"
+ assert 'group_state' in q_info and len(q_info['group_state']) == 1
+ g_info = q_info['group_state'][0]
+ assert 'group_id' in g_info
+ assert g_info['group_id'] == 'qpid.no-group'
+
+ self.qmf_session.delBroker(self.qmf_broker)
+
+
+class StickyConsumerMsgGroupTests(Base):
+ """
+ Tests for the behavior of sticky-consumer message groups. These tests
+ expect all messages from the same group be consumed by the same clients.
+ See QPID-3347 for details.
+ """
+ pass # TBD
diff --git a/tools/src/py/qpid-config b/tools/src/py/qpid-config
index cd80e26a1e..1ee35da8c3 100755
--- a/tools/src/py/qpid-config
+++ b/tools/src/py/qpid-config
@@ -96,6 +96,8 @@ class Config:
self._flowResumeCount = None
self._flowStopSize = None
self._flowResumeSize = None
+ self._msgGroupHeader = None
+ self._sharedMsgGroup = False
self._extra_arguments = []
self._returnCode = 0
@@ -116,13 +118,16 @@ FLOW_STOP_COUNT = "qpid.flow_stop_count"
FLOW_RESUME_COUNT = "qpid.flow_resume_count"
FLOW_STOP_SIZE = "qpid.flow_stop_size"
FLOW_RESUME_SIZE = "qpid.flow_resume_size"
+MSG_GROUP_HDR_KEY = "qpid.group_header_key"
+SHARED_MSG_GROUP = "qpid.shared_msg_group"
#There are various arguments to declare that have specific program
#options in this utility. However there is now a generic mechanism for
#passing arguments as well. The SPECIAL_ARGS list contains the
#arguments for which there are specific program options defined
#i.e. the arguments for which there is special processing on add and
#list
-SPECIAL_ARGS=[FILECOUNT,FILESIZE,MAX_QUEUE_SIZE,MAX_QUEUE_COUNT,POLICY_TYPE,CLUSTER_DURABLE,LVQ,LVQNB,MSG_SEQUENCE,IVE,QUEUE_EVENT_GENERATION,FLOW_STOP_COUNT,FLOW_STOP_SIZE,FLOW_RESUME_SIZE]
+SPECIAL_ARGS=[FILECOUNT,FILESIZE,MAX_QUEUE_SIZE,MAX_QUEUE_COUNT,POLICY_TYPE,CLUSTER_DURABLE,LVQ,LVQNB,MSG_SEQUENCE,IVE,QUEUE_EVENT_GENERATION,FLOW_STOP_COUNT,FLOW_STOP_SIZE,FLOW_RESUME_SIZE,
+ MSG_GROUP_HDR_KEY,SHARED_MSG_GROUP]
class JHelpFormatter(IndentedHelpFormatter):
"""Format usage and description without stripping newlines from usage strings
@@ -182,6 +187,10 @@ def OptionsAndArguments(argv):
help="Turn on sender flow control when the number of queued messages exceeds this value.")
group3.add_option("--flow-resume-count", action="store", type="int", metavar="<n>",
help="Turn off sender flow control when the number of queued messages drops below this value.")
+ group3.add_option("--group-header", action="store", type="string", metavar="<header-name>",
+ help="Enable message groups. Specify name of header that holds group identifier.")
+ group3.add_option("--shared-groups", action="store_true",
+ help="Allow message group consumption across multiple consumers.")
group3.add_option("--argument", dest="extra_arguments", action="append", default=[],
metavar="<NAME=VALUE>", help="Specify a key-value pair to add to queue arguments")
# no option for declaring an exclusive queue - which can only be used by the session that creates it.
@@ -263,6 +272,10 @@ def OptionsAndArguments(argv):
config._flowStopCount = opts.flow_stop_count
if opts.flow_resume_count:
config._flowResumeCount = opts.flow_resume_count
+ if opts.group_header:
+ config._msgGroupHeader = opts.group_header
+ if opts.shared_groups:
+ config._sharedMsgGroup = True
if opts.extra_arguments:
config._extra_arguments = opts.extra_arguments
return args
@@ -442,6 +455,8 @@ class BrokerManager:
if FLOW_RESUME_SIZE in args: print "--flow-resume-size=%s" % args[FLOW_RESUME_SIZE],
if FLOW_STOP_COUNT in args: print "--flow-stop-count=%s" % args[FLOW_STOP_COUNT],
if FLOW_RESUME_COUNT in args: print "--flow-resume-count=%s" % args[FLOW_RESUME_COUNT],
+ if MSG_GROUP_HDR_KEY in args: print "--group-header=%s" % args[MSG_GROUP_HDR_KEY],
+ if SHARED_MSG_GROUP in args and args[SHARED_MSG_GROUP] == 1: print "--shared-groups",
print " ".join(["--argument %s=%s" % (k, v) for k,v in args.iteritems() if not k in SPECIAL_ARGS])
def QueueListRecurse(self, filter):
@@ -534,6 +549,11 @@ class BrokerManager:
if config._flowResumeCount:
declArgs[FLOW_RESUME_COUNT] = config._flowResumeCount
+ if config._msgGroupHeader:
+ declArgs[MSG_GROUP_HDR_KEY] = config._msgGroupHeader
+ if config._sharedMsgGroup:
+ declArgs[SHARED_MSG_GROUP] = 1
+
if config._altern_ex != None:
self.broker.getAmqpSession().queue_declare(queue=qname, alternate_exchange=config._altern_ex, passive=config._passive, durable=config._durable, arguments=declArgs)
else: