summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorKen Giusti <kgiusti@apache.org>2011-10-07 14:21:48 +0000
committerKen Giusti <kgiusti@apache.org>2011-10-07 14:21:48 +0000
commitae4e2dd017d1863b7b0fd9e760f56696a49b414c (patch)
tree4a54f245efa1c2df1601d648c1fdd41fba08b802 /cpp/src
parenteb9363d2d8eb1b6041c70c3db64297dfaeaae3d4 (diff)
downloadqpid-python-ae4e2dd017d1863b7b0fd9e760f56696a49b414c.tar.gz
QPID-3346: move message group feature into trunk.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1180050 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/CMakeLists.txt2
-rw-r--r--cpp/src/Makefile.am5
-rw-r--r--cpp/src/qpid/broker/Broker.cpp67
-rw-r--r--cpp/src/qpid/broker/Broker.h11
-rw-r--r--cpp/src/qpid/broker/Consumer.h16
-rw-r--r--cpp/src/qpid/broker/DeliveryRecord.cpp2
-rw-r--r--cpp/src/qpid/broker/DeliveryRecord.h2
-rw-r--r--cpp/src/qpid/broker/FifoDistributor.cpp58
-rw-r--r--cpp/src/qpid/broker/FifoDistributor.h58
-rw-r--r--cpp/src/qpid/broker/MessageDistributor.h76
-rw-r--r--cpp/src/qpid/broker/MessageGroupManager.cpp443
-rw-r--r--cpp/src/qpid/broker/MessageGroupManager.h125
-rw-r--r--cpp/src/qpid/broker/Messages.h1
-rw-r--r--cpp/src/qpid/broker/Queue.cpp480
-rw-r--r--cpp/src/qpid/broker/Queue.h54
-rw-r--r--cpp/src/qpid/broker/QueueEvents.cpp4
-rw-r--r--cpp/src/qpid/broker/QueueFlowLimit.cpp3
-rw-r--r--cpp/src/qpid/broker/QueueFlowLimit.h3
-rw-r--r--cpp/src/qpid/broker/QueueObserver.h40
-rw-r--r--cpp/src/qpid/broker/QueuePolicy.cpp3
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp66
-rw-r--r--cpp/src/qpid/broker/SemanticState.h11
-rw-r--r--cpp/src/qpid/broker/ThresholdAlerts.h3
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp14
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.cpp18
-rw-r--r--cpp/src/tests/CMakeLists.txt5
-rw-r--r--cpp/src/tests/Makefile.am19
-rw-r--r--cpp/src/tests/QueueTest.cpp313
-rwxr-xr-xcpp/src/tests/cluster_tests.py70
-rw-r--r--cpp/src/tests/msg_group_test.cpp618
-rw-r--r--cpp/src/tests/qpid-send.cpp100
-rwxr-xr-xcpp/src/tests/run_msg_group_tests66
-rwxr-xr-xcpp/src/tests/run_msg_group_tests_soak60
33 files changed, 2581 insertions, 235 deletions
diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt
index c8aedd4c60..da1f539108 100644
--- a/cpp/src/CMakeLists.txt
+++ b/cpp/src/CMakeLists.txt
@@ -976,6 +976,8 @@ set (qpidbroker_SOURCES
qpid/broker/Queue.cpp
qpid/broker/QueueCleaner.cpp
qpid/broker/QueueListeners.cpp
+ qpid/broker/FifoDistributor.cpp
+ qpid/broker/MessageGroupManager.cpp
qpid/broker/PersistableMessage.cpp
qpid/broker/Bridge.cpp
qpid/broker/Connection.cpp
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 2663987f75..6230a8f6f6 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -671,6 +671,11 @@ libqpidbroker_la_SOURCES = \
qpid/broker/TxPublish.h \
qpid/broker/Vhost.cpp \
qpid/broker/Vhost.h \
+ qpid/broker/MessageDistributor.h \
+ qpid/broker/FifoDistributor.h \
+ qpid/broker/FifoDistributor.cpp \
+ qpid/broker/MessageGroupManager.cpp \
+ qpid/broker/MessageGroupManager.h \
qpid/management/ManagementAgent.cpp \
qpid/management/ManagementAgent.h \
qpid/management/ManagementDirectExchange.cpp \
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 598c43b1d8..bd94582d10 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -33,10 +33,12 @@
#include "qpid/broker/Link.h"
#include "qpid/broker/ExpiryPolicy.h"
#include "qpid/broker/QueueFlowLimit.h"
+#include "qpid/broker/MessageGroupManager.h"
#include "qmf/org/apache/qpid/broker/Package.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerCreate.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerDelete.h"
+#include "qmf/org/apache/qpid/broker/ArgsBrokerQuery.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerEcho.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerGetLogLevel.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerQueueMoveMessages.h"
@@ -122,7 +124,8 @@ Broker::Options::Options(const std::string& name) :
qmf1Support(true),
queueFlowStopRatio(80),
queueFlowResumeRatio(70),
- queueThresholdEventRatio(80)
+ queueThresholdEventRatio(80),
+ defaultMsgGroup("qpid.no-group")
{
int c = sys::SystemInfo::concurrency();
workerThreads=c+1;
@@ -158,7 +161,8 @@ Broker::Options::Options(const std::string& name) :
("async-queue-events", optValue(asyncQueueEvents, "yes|no"), "Set Queue Events async, used for services like replication")
("default-flow-stop-threshold", optValue(queueFlowStopRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is activated.")
("default-flow-resume-threshold", optValue(queueFlowResumeRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is de-activated.")
- ("default-event-threshold-ratio", optValue(queueThresholdEventRatio, "%age of limit"), "The ratio of any specified queue limit at which an event will be raised");
+ ("default-event-threshold-ratio", optValue(queueThresholdEventRatio, "%age of limit"), "The ratio of any specified queue limit at which an event will be raised")
+ ("default-message-group", optValue(defaultMsgGroup, "GROUP-IDENTIFER"), "Group identifier to assign to messages delivered to a message group queue that do not contain an identifier.");
}
const std::string empty;
@@ -249,6 +253,7 @@ Broker::Broker(const Broker::Options& conf) :
Plugin::earlyInitAll(*this);
QueueFlowLimit::setDefaults(conf.queueLimit, conf.queueFlowStopRatio, conf.queueFlowResumeRatio);
+ MessageGroupManager::setDefaults(conf.defaultMsgGroup);
// If no plugin store module registered itself, set up the null store.
if (NullMessageStore::isNullStore(store.get()))
@@ -453,7 +458,7 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
_qmf::ArgsBrokerQueueMoveMessages& moveArgs=
dynamic_cast<_qmf::ArgsBrokerQueueMoveMessages&>(args);
QPID_LOG (debug, "Broker::queueMoveMessages()");
- if (queueMoveMessages(moveArgs.i_srcQueue, moveArgs.i_destQueue, moveArgs.i_qty))
+ if (queueMoveMessages(moveArgs.i_srcQueue, moveArgs.i_destQueue, moveArgs.i_qty, moveArgs.i_filter))
status = Manageable::STATUS_OK;
else
return Manageable::STATUS_PARAMETER_INVALID;
@@ -483,6 +488,13 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
status = Manageable::STATUS_OK;
break;
}
+ case _qmf::Broker::METHOD_QUERY :
+ {
+ _qmf::ArgsBrokerQuery& a = dynamic_cast<_qmf::ArgsBrokerQuery&>(args);
+ status = queryObject(a.i_type, a.i_name, a.o_results, getManagementExecutionContext());
+ status = Manageable::STATUS_OK;
+ break;
+ }
default:
QPID_LOG (debug, "Broker ManagementMethod not implemented: id=" << methodId << "]");
status = Manageable::STATUS_NOT_IMPLEMENTED;
@@ -655,6 +667,50 @@ void Broker::deleteObject(const std::string& type, const std::string& name,
}
+Manageable::status_t Broker::queryObject(const std::string& type,
+ const std::string& name,
+ Variant::Map& results,
+ const ConnectionState* context)
+{
+ std::string userId;
+ std::string connectionId;
+ if (context) {
+ userId = context->getUserId();
+ connectionId = context->getUrl();
+ }
+ QPID_LOG (debug, "Broker::query(" << type << ", " << name << ")");
+
+ if (type == TYPE_QUEUE)
+ return queryQueue( name, userId, connectionId, results );
+
+ if (type == TYPE_EXCHANGE ||
+ type == TYPE_TOPIC ||
+ type == TYPE_BINDING)
+ return Manageable::STATUS_NOT_IMPLEMENTED;
+
+ throw UnknownObjectType(type);
+}
+
+Manageable::status_t Broker::queryQueue( const std::string& name,
+ const std::string& userId,
+ const std::string& /*connectionId*/,
+ Variant::Map& results )
+{
+ (void) results;
+ if (acl) {
+ if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_QUEUE, name, NULL) )
+ throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue query request from " << userId));
+ }
+
+ boost::shared_ptr<Queue> q(queues.find(name));
+ if (!q) {
+ QPID_LOG(error, "Query failed: queue not found, name=" << name);
+ return Manageable::STATUS_UNKNOWN_OBJECT;
+ }
+ q->query( results );
+ return Manageable::STATUS_OK;;
+}
+
void Broker::setLogLevel(const std::string& level)
{
QPID_LOG(notice, "Changing log level to " << level);
@@ -724,7 +780,8 @@ void Broker::connect(
uint32_t Broker::queueMoveMessages(
const std::string& srcQueue,
const std::string& destQueue,
- uint32_t qty)
+ uint32_t qty,
+ const Variant::Map& filter)
{
Queue::shared_ptr src_queue = queues.find(srcQueue);
if (!src_queue)
@@ -733,7 +790,7 @@ uint32_t Broker::queueMoveMessages(
if (!dest_queue)
return 0;
- return src_queue->move(dest_queue, qty);
+ return src_queue->move(dest_queue, qty, &filter);
}
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index 40f7b6273f..8b347db3c0 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -121,6 +121,7 @@ public:
uint queueFlowStopRatio; // producer flow control: on
uint queueFlowResumeRatio; // producer flow control: off
uint16_t queueThresholdEventRatio;
+ std::string defaultMsgGroup;
private:
std::string getHome();
@@ -157,7 +158,12 @@ public:
const qpid::types::Variant::Map& properties, bool strict, const ConnectionState* context);
void deleteObject(const std::string& type, const std::string& name,
const qpid::types::Variant::Map& options, const ConnectionState* context);
-
+ Manageable::status_t queryObject(const std::string& type, const std::string& name,
+ qpid::types::Variant::Map& results, const ConnectionState* context);
+ Manageable::status_t queryQueue( const std::string& name,
+ const std::string& userId,
+ const std::string& connectionId,
+ qpid::types::Variant::Map& results);
boost::shared_ptr<sys::Poller> poller;
sys::Timer timer;
std::auto_ptr<sys::Timer> clusterTimer;
@@ -258,7 +264,8 @@ public:
*/
uint32_t queueMoveMessages( const std::string& srcQueue,
const std::string& destQueue,
- uint32_t qty);
+ uint32_t qty,
+ const qpid::types::Variant::Map& filter);
boost::shared_ptr<sys::ProtocolFactory> getProtocolFactory(const std::string& name = TCP_TRANSPORT) const;
diff --git a/cpp/src/qpid/broker/Consumer.h b/cpp/src/qpid/broker/Consumer.h
index 317338a8ad..2af9b0c121 100644
--- a/cpp/src/qpid/broker/Consumer.h
+++ b/cpp/src/qpid/broker/Consumer.h
@@ -36,13 +36,19 @@ class Consumer {
// inListeners allows QueueListeners to efficiently track if this instance is registered
// for notifications without having to search its containers
bool inListeners;
- public:
- typedef boost::shared_ptr<Consumer> shared_ptr;
-
+ // the name is generated by broker and is unique within broker scope. It is not
+ // provided or known by the remote Consumer.
+ const std::string name;
+ public:
+ typedef boost::shared_ptr<Consumer> shared_ptr;
+
framing::SequenceNumber position;
-
- Consumer(bool preAcquires = true) : acquires(preAcquires), inListeners(false) {}
+
+ Consumer(const std::string& _name, bool preAcquires = true)
+ : acquires(preAcquires), inListeners(false), name(_name), position(0) {}
bool preAcquires() const { return acquires; }
+ const std::string& getName() const { return name; }
+
virtual bool deliver(QueuedMessage& msg) = 0;
virtual void notify() = 0;
virtual bool filter(boost::intrusive_ptr<Message>) { return true; }
diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp
index 11970db394..0b8fe95d5e 100644
--- a/cpp/src/qpid/broker/DeliveryRecord.cpp
+++ b/cpp/src/qpid/broker/DeliveryRecord.cpp
@@ -153,7 +153,7 @@ uint32_t DeliveryRecord::getCredit() const
}
void DeliveryRecord::acquire(DeliveryIds& results) {
- if (queue->acquire(msg)) {
+ if (queue->acquire(msg, tag)) {
acquired = true;
results.push_back(id);
if (!acceptExpected) {
diff --git a/cpp/src/qpid/broker/DeliveryRecord.h b/cpp/src/qpid/broker/DeliveryRecord.h
index 19ab37ac17..5a331357be 100644
--- a/cpp/src/qpid/broker/DeliveryRecord.h
+++ b/cpp/src/qpid/broker/DeliveryRecord.h
@@ -46,7 +46,7 @@ class DeliveryRecord
{
QueuedMessage msg;
mutable boost::shared_ptr<Queue> queue;
- std::string tag;
+ std::string tag; // name of consumer
DeliveryId id;
bool acquired : 1;
bool acceptExpected : 1;
diff --git a/cpp/src/qpid/broker/FifoDistributor.cpp b/cpp/src/qpid/broker/FifoDistributor.cpp
new file mode 100644
index 0000000000..cdb32d8c8c
--- /dev/null
+++ b/cpp/src/qpid/broker/FifoDistributor.cpp
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/FifoDistributor.h"
+
+using namespace qpid::broker;
+
+FifoDistributor::FifoDistributor(Messages& container)
+ : messages(container) {}
+
+bool FifoDistributor::nextConsumableMessage( Consumer::shared_ptr&, QueuedMessage& next )
+{
+ if (!messages.empty()) {
+ next = messages.front(); // by default, consume oldest msg
+ return true;
+ }
+ return false;
+}
+
+bool FifoDistributor::allocate(const std::string&, const QueuedMessage& )
+{
+ // by default, all messages present on the queue may be allocated as they have yet to
+ // be acquired.
+ return true;
+}
+
+bool FifoDistributor::nextBrowsableMessage( Consumer::shared_ptr& c, QueuedMessage& next )
+{
+ if (!messages.empty() && messages.next(c->position, next))
+ return true;
+ return false;
+}
+
+void FifoDistributor::query(qpid::types::Variant::Map&) const
+{
+ // nothing to see here....
+}
+
diff --git a/cpp/src/qpid/broker/FifoDistributor.h b/cpp/src/qpid/broker/FifoDistributor.h
new file mode 100644
index 0000000000..245537ed12
--- /dev/null
+++ b/cpp/src/qpid/broker/FifoDistributor.h
@@ -0,0 +1,58 @@
+#ifndef _broker_FifoDistributor_h
+#define _broker_FifoDistributor_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/** Simple MessageDistributor for FIFO Queues - the HEAD message is always the next
+ * available message for consumption.
+ */
+
+#include "qpid/broker/MessageDistributor.h"
+
+namespace qpid {
+namespace broker {
+
+class Messages;
+
+class FifoDistributor : public MessageDistributor
+{
+ public:
+ FifoDistributor(Messages& container);
+
+ /** Locking Note: all methods assume the caller is holding the Queue::messageLock
+ * during the method call.
+ */
+
+ /** MessageDistributor interface */
+
+ bool nextConsumableMessage( Consumer::shared_ptr& consumer, QueuedMessage& next );
+ bool allocate(const std::string& consumer, const QueuedMessage& target);
+ bool nextBrowsableMessage( Consumer::shared_ptr& consumer, QueuedMessage& next );
+ void query(qpid::types::Variant::Map&) const;
+
+ private:
+ Messages& messages;
+};
+
+}}
+
+#endif
diff --git a/cpp/src/qpid/broker/MessageDistributor.h b/cpp/src/qpid/broker/MessageDistributor.h
new file mode 100644
index 0000000000..090393c160
--- /dev/null
+++ b/cpp/src/qpid/broker/MessageDistributor.h
@@ -0,0 +1,76 @@
+#ifndef _broker_MessageDistributor_h
+#define _broker_MessageDistributor_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/** Abstraction used by Queue to determine the next "most desirable" message to provide to
+ * a particular consuming client
+ */
+
+
+#include "qpid/broker/Consumer.h"
+
+namespace qpid {
+namespace broker {
+
+struct QueuedMessage;
+
+class MessageDistributor
+{
+ public:
+ virtual ~MessageDistributor() {};
+
+ /** Locking Note: all methods assume the caller is holding the Queue::messageLock
+ * during the method call.
+ */
+
+ /** Determine the next message available for consumption by the consumer
+ * @param consumer the consumer that needs a message to consume
+ * @param next set to the next message that the consumer may consume.
+ * @return true if message is available and next is set
+ */
+ virtual bool nextConsumableMessage( Consumer::shared_ptr& consumer,
+ QueuedMessage& next ) = 0;
+
+ /** Allow the comsumer to take ownership of the given message.
+ * @param consumer the name of the consumer that is attempting to acquire the message
+ * @param qm the message to be acquired, previously returned from nextConsumableMessage()
+ * @return true if ownership is permitted, false if ownership cannot be assigned.
+ */
+ virtual bool allocate( const std::string& consumer,
+ const QueuedMessage& target) = 0;
+
+ /** Determine the next message available for browsing by the consumer
+ * @param consumer the consumer that is browsing the queue
+ * @param next set to the next message that the consumer may browse.
+ * @return true if a message is available and next is returned
+ */
+ virtual bool nextBrowsableMessage( Consumer::shared_ptr& consumer,
+ QueuedMessage& next ) = 0;
+
+ /** hook to add any interesting management state to the status map */
+ virtual void query(qpid::types::Variant::Map&) const = 0;
+};
+
+}}
+
+#endif
diff --git a/cpp/src/qpid/broker/MessageGroupManager.cpp b/cpp/src/qpid/broker/MessageGroupManager.cpp
new file mode 100644
index 0000000000..d4ca6af1d5
--- /dev/null
+++ b/cpp/src/qpid/broker/MessageGroupManager.cpp
@@ -0,0 +1,443 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/framing/FieldTable.h"
+#include "qpid/types/Variant.h"
+#include "qpid/log/Statement.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/MessageGroupManager.h"
+
+using namespace qpid::broker;
+
+namespace {
+ const std::string GROUP_QUERY_KEY("qpid.message_group_queue");
+ const std::string GROUP_HEADER_KEY("group_header_key");
+ const std::string GROUP_STATE_KEY("group_state");
+ const std::string GROUP_ID_KEY("group_id");
+ const std::string GROUP_MSG_COUNT("msg_count");
+ const std::string GROUP_TIMESTAMP("timestamp");
+ const std::string GROUP_CONSUMER("consumer");
+}
+
+
+const std::string MessageGroupManager::qpidMessageGroupKey("qpid.group_header_key");
+const std::string MessageGroupManager::qpidSharedGroup("qpid.shared_msg_group");
+const std::string MessageGroupManager::qpidMessageGroupTimestamp("qpid.group_timestamp");
+
+
+const std::string MessageGroupManager::getGroupId( const QueuedMessage& qm ) const
+{
+ const qpid::framing::FieldTable* headers = qm.payload->getApplicationHeaders();
+ if (!headers) return defaultGroupId;
+ qpid::framing::FieldTable::ValuePtr id = headers->get( groupIdHeader );
+ if (!id || !id->convertsTo<std::string>()) return defaultGroupId;
+ return id->get<std::string>();
+}
+
+
+void MessageGroupManager::enqueued( const QueuedMessage& qm )
+{
+ // @todo KAG optimization - store reference to group state in QueuedMessage
+ // issue: const-ness??
+ std::string group( getGroupId(qm) );
+ GroupState &state(messageGroups[group]);
+ state.members.push_back(qm.position);
+ uint32_t total = state.members.size();
+ QPID_LOG( trace, "group queue " << qName <<
+ ": added message to group id=" << group << " total=" << total );
+ if (total == 1) {
+ // newly created group, no owner
+ state.group = group;
+ assert(freeGroups.find(qm.position) == freeGroups.end());
+ freeGroups[qm.position] = &state;
+ }
+}
+
+
+void MessageGroupManager::acquired( const QueuedMessage& qm )
+{
+ // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
+ // issue: const-ness??
+ std::string group( getGroupId(qm) );
+ GroupMap::iterator gs = messageGroups.find( group );
+ assert( gs != messageGroups.end() );
+ GroupState& state( gs->second );
+ state.acquired += 1;
+ QPID_LOG( trace, "group queue " << qName <<
+ ": acquired message in group id=" << group << " acquired=" << state.acquired );
+}
+
+
+void MessageGroupManager::requeued( const QueuedMessage& qm )
+{
+ // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
+ // issue: const-ness??
+ std::string group( getGroupId(qm) );
+ GroupMap::iterator gs = messageGroups.find( group );
+ assert( gs != messageGroups.end() );
+ GroupState& state( gs->second );
+ assert( state.acquired != 0 );
+ state.acquired -= 1;
+ if (state.acquired == 0 && state.owned()) {
+ QPID_LOG( trace, "group queue " << qName <<
+ ": consumer name=" << state.owner << " released group id=" << gs->first);
+ disown(state);
+ }
+ QPID_LOG( trace, "group queue " << qName <<
+ ": requeued message to group id=" << group << " acquired=" << state.acquired );
+}
+
+
+void MessageGroupManager::dequeued( const QueuedMessage& qm )
+{
+ // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
+ // issue: const-ness??
+ std::string group( getGroupId(qm) );
+ GroupMap::iterator gs = messageGroups.find( group );
+ assert( gs != messageGroups.end() );
+ GroupState& state( gs->second );
+ assert( state.members.size() != 0 );
+ assert( state.acquired != 0 );
+ state.acquired -= 1;
+
+ // likely to be at or near begin() if dequeued in order
+ bool reFreeNeeded = false;
+ if (state.members.front() == qm.position) {
+ if (!state.owned()) {
+ // will be on the freeGroups list if mgmt is dequeueing rather than a consumer!
+ // if on freelist, it is indexed by first member, which is about to be removed!
+ unFree(state);
+ reFreeNeeded = true;
+ }
+ state.members.pop_front();
+ } else {
+ GroupState::PositionFifo::iterator pos = state.members.begin() + 1;
+ GroupState::PositionFifo::iterator end = state.members.end();
+ while (pos != end) {
+ if (*pos == qm.position) {
+ state.members.erase(pos);
+ break;
+ }
+ ++pos;
+ }
+ }
+
+ uint32_t total = state.members.size();
+ if (total == 0) {
+ QPID_LOG( trace, "group queue " << qName << ": deleting group id=" << gs->first);
+ messageGroups.erase( gs );
+ } else if (state.acquired == 0 && state.owned()) {
+ QPID_LOG( trace, "group queue " << qName <<
+ ": consumer name=" << state.owner << " released group id=" << gs->first);
+ disown(state);
+ } else if (reFreeNeeded) {
+ disown(state);
+ }
+ QPID_LOG( trace, "group queue " << qName <<
+ ": dequeued message from group id=" << group << " total=" << total );
+}
+
+void MessageGroupManager::consumerAdded( const Consumer& /*c*/ )
+{
+#if 0
+ // allow a re-subscribing consumer
+ if (consumers.find(c.getName()) == consumers.end()) {
+ consumers[c.getName()] = 0; // no groups owned yet
+ QPID_LOG( trace, "group queue " << qName << ": added consumer, name=" << c.getName() );
+ } else {
+ QPID_LOG( trace, "group queue " << qName << ": consumer re-subscribed, name=" << c.getName() );
+ }
+#endif
+}
+
+void MessageGroupManager::consumerRemoved( const Consumer& /*c*/ )
+{
+#if 0
+ const std::string& name(c.getName());
+ Consumers::iterator consumer = consumers.find(name);
+ assert(consumer != consumers.end());
+ size_t count = consumer->second;
+
+ for (GroupMap::iterator gs = messageGroups.begin();
+ count && gs != messageGroups.end(); ++gs) {
+
+ GroupState& state( gs->second );
+ if (state.owner == name) {
+ if (state.acquired == 0) {
+ --count;
+ disown(state);
+ QPID_LOG( trace, "group queue " << qName <<
+ ": consumer name=" << name << " released group id=" << gs->first);
+ }
+ }
+ }
+ if (count == 0) {
+ consumers.erase( consumer );
+ QPID_LOG( trace, "group queue " << qName << ": removed consumer name=" << name );
+ } else {
+ // don't release groups with outstanding acquired msgs - consumer may re-subscribe!
+ QPID_LOG( trace, "group queue " << qName << ": consumer name=" << name << " unsubscribed with outstanding messages.");
+ }
+#endif
+}
+
+
+bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, QueuedMessage& next )
+{
+ if (messages.empty())
+ return false;
+
+ if (!freeGroups.empty()) {
+ framing::SequenceNumber nextFree = freeGroups.begin()->first;
+ if (nextFree < c->position) { // next free group's msg is older than current position
+ bool ok = messages.find(nextFree, next);
+ (void) ok; assert( ok );
+ } else {
+ if (!messages.next( c->position, next ))
+ return false; // shouldn't happen - should find nextFree
+ }
+ } else { // no free groups available
+#if 0
+ if (consumers[c->getName()] == 0) { // and none currently owned
+ return false; // so nothing available to consume
+ }
+#endif
+ if (!messages.next( c->position, next ))
+ return false;
+ }
+
+ do {
+ // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
+ std::string group( getGroupId( next ) );
+ GroupMap::iterator gs = messageGroups.find( group );
+ assert( gs != messageGroups.end() );
+ GroupState& state( gs->second );
+ if (!state.owned() || state.owner == c->getName()) {
+ return true;
+ }
+ } while (messages.next( next.position, next ));
+ return false;
+}
+
+
+bool MessageGroupManager::allocate(const std::string& consumer, const QueuedMessage& qm)
+{
+ // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
+ std::string group( getGroupId(qm) );
+ GroupMap::iterator gs = messageGroups.find( group );
+ assert( gs != messageGroups.end() );
+ GroupState& state( gs->second );
+
+ if (!state.owned()) {
+ own( state, consumer );
+ QPID_LOG( trace, "group queue " << qName <<
+ ": consumer name=" << consumer << " has acquired group id=" << gs->first);
+ return true;
+ }
+ return state.owner == consumer;
+}
+
+bool MessageGroupManager::nextBrowsableMessage( Consumer::shared_ptr& c, QueuedMessage& next )
+{
+ // browse: allow access to any available msg, regardless of group ownership (?ok?)
+ if (!messages.empty() && messages.next(c->position, next))
+ return true;
+ return false;
+}
+
+void MessageGroupManager::query(qpid::types::Variant::Map& status) const
+{
+ /** Add a description of the current state of the message groups for this queue.
+ FORMAT:
+ { "qpid.message_group_queue":
+ { "group_header_key" : "<KEY>",
+ "group_state" :
+ [ { "group_id" : "<name>",
+ "msg_count" : <int>,
+ "timestamp" : <absTime>,
+ "consumer" : <consumer name> },
+ {...} // one for each known group
+ ]
+ }
+ }
+ **/
+
+ assert(status.find(GROUP_QUERY_KEY) == status.end());
+ qpid::types::Variant::Map state;
+ qpid::types::Variant::List groups;
+
+ state[GROUP_HEADER_KEY] = groupIdHeader;
+ for (GroupMap::const_iterator g = messageGroups.begin();
+ g != messageGroups.end(); ++g) {
+ qpid::types::Variant::Map info;
+ info[GROUP_ID_KEY] = g->first;
+ info[GROUP_MSG_COUNT] = g->second.members.size();
+ info[GROUP_TIMESTAMP] = 0; /** @todo KAG - NEED HEAD MSG TIMESTAMP */
+ info[GROUP_CONSUMER] = g->second.owner;
+ groups.push_back(info);
+ }
+ state[GROUP_STATE_KEY] = groups;
+ status[GROUP_QUERY_KEY] = state;
+}
+
+
+boost::shared_ptr<MessageGroupManager> MessageGroupManager::create( const std::string& qName,
+ Messages& messages,
+ const qpid::framing::FieldTable& settings )
+{
+ boost::shared_ptr<MessageGroupManager> empty;
+
+ if (settings.isSet(qpidMessageGroupKey)) {
+
+ // @todo: remove once "sticky" consumers are supported - see QPID-3347
+ if (!settings.isSet(qpidSharedGroup)) {
+ QPID_LOG( error, "Only shared groups are supported in this version of the broker. Use '--shared-groups' in qpid-config." );
+ return empty;
+ }
+
+ std::string headerKey = settings.getAsString(qpidMessageGroupKey);
+ if (headerKey.empty()) {
+ QPID_LOG( error, "A Message Group header key must be configured, queue=" << qName);
+ return empty;
+ }
+ unsigned int timestamp = settings.getAsInt(qpidMessageGroupTimestamp);
+
+ boost::shared_ptr<MessageGroupManager> manager( new MessageGroupManager( headerKey, qName, messages, timestamp ) );
+
+ QPID_LOG( debug, "Configured Queue '" << qName <<
+ "' for message grouping using header key '" << headerKey << "'" <<
+ " (timestamp=" << timestamp << ")");
+ return manager;
+ }
+ return empty;
+}
+
+std::string MessageGroupManager::defaultGroupId;
+void MessageGroupManager::setDefaults(const std::string& groupId) // static
+{
+ defaultGroupId = groupId;
+}
+
+/** Cluster replication:
+
+ state map format:
+
+ { "group-state": [ {"name": <group-name>,
+ "owner": <consumer-name>-or-empty,
+ "acquired-ct": <acquired count>,
+ "positions": [Seqnumbers, ... ]},
+ {...}
+ ]
+ }
+*/
+
+namespace {
+ const std::string GROUP_NAME("name");
+ const std::string GROUP_OWNER("owner");
+ const std::string GROUP_ACQUIRED_CT("acquired-ct");
+ const std::string GROUP_POSITIONS("positions");
+ const std::string GROUP_STATE("group-state");
+}
+
+
+/** Runs on UPDATER to snapshot current state */
+void MessageGroupManager::getState(qpid::framing::FieldTable& state ) const
+{
+ using namespace qpid::framing;
+ state.clear();
+ framing::Array groupState(TYPE_CODE_MAP);
+ for (GroupMap::const_iterator g = messageGroups.begin();
+ g != messageGroups.end(); ++g) {
+
+ framing::FieldTable group;
+ group.setString(GROUP_NAME, g->first);
+ group.setString(GROUP_OWNER, g->second.owner);
+ group.setInt(GROUP_ACQUIRED_CT, g->second.acquired);
+ framing::Array positions(TYPE_CODE_UINT32);
+ for (GroupState::PositionFifo::const_iterator p = g->second.members.begin();
+ p != g->second.members.end(); ++p)
+ positions.push_back(framing::Array::ValuePtr(new IntegerValue( *p )));
+ group.setArray(GROUP_POSITIONS, positions);
+ groupState.push_back(framing::Array::ValuePtr(new FieldTableValue(group)));
+ }
+ state.setArray(GROUP_STATE, groupState);
+
+ QPID_LOG(debug, "Queue \"" << qName << "\": replicating message group state, key=" << groupIdHeader);
+}
+
+
+/** called on UPDATEE to set state from snapshot */
+void MessageGroupManager::setState(const qpid::framing::FieldTable& state)
+{
+ using namespace qpid::framing;
+ messageGroups.clear();
+ //consumers.clear();
+ freeGroups.clear();
+
+ framing::Array groupState(TYPE_CODE_MAP);
+
+ bool ok = state.getArray(GROUP_STATE, groupState);
+ if (!ok) {
+ QPID_LOG(error, "Unable to find message group state information for queue \"" <<
+ qName << "\": cluster inconsistency error!");
+ return;
+ }
+
+ for (framing::Array::const_iterator g = groupState.begin();
+ g != groupState.end(); ++g) {
+ framing::FieldTable group;
+ ok = framing::getEncodedValue<FieldTable>(*g, group);
+ if (!ok) {
+ QPID_LOG(error, "Invalid message group state information for queue \"" <<
+ qName << "\": table encoding error!");
+ return;
+ }
+ MessageGroupManager::GroupState state;
+ if (!group.isSet(GROUP_NAME) || !group.isSet(GROUP_OWNER) || !group.isSet(GROUP_ACQUIRED_CT)) {
+ QPID_LOG(error, "Invalid message group state information for queue \"" <<
+ qName << "\": fields missing error!");
+ return;
+ }
+ state.group = group.getAsString(GROUP_NAME);
+ state.owner = group.getAsString(GROUP_OWNER);
+ state.acquired = group.getAsInt(GROUP_ACQUIRED_CT);
+ framing::Array positions(TYPE_CODE_UINT32);
+ ok = group.getArray(GROUP_POSITIONS, positions);
+ if (!ok) {
+ QPID_LOG(error, "Invalid message group state information for queue \"" <<
+ qName << "\": position encoding error!");
+ return;
+ }
+
+ for (Array::const_iterator p = positions.begin(); p != positions.end(); ++p)
+ state.members.push_back((*p)->getIntegerValue<uint32_t, 4>());
+ messageGroups[state.group] = state;
+ if (state.owned())
+ //consumers[state.owner]++;
+ ;
+ else {
+ assert(state.members.size());
+ freeGroups[state.members.front()] = &messageGroups[state.group];
+ }
+ }
+
+ QPID_LOG(debug, "Queue \"" << qName << "\": message group state replicated, key =" << groupIdHeader)
+}
diff --git a/cpp/src/qpid/broker/MessageGroupManager.h b/cpp/src/qpid/broker/MessageGroupManager.h
new file mode 100644
index 0000000000..35bdda94d5
--- /dev/null
+++ b/cpp/src/qpid/broker/MessageGroupManager.h
@@ -0,0 +1,125 @@
+#ifndef _broker_MessageGroupManager_h
+#define _broker_MessageGroupManager_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/* for managing message grouping on Queues */
+
+#include "qpid/broker/StatefulQueueObserver.h"
+#include "qpid/broker/MessageDistributor.h"
+
+
+namespace qpid {
+namespace broker {
+
+class QueueObserver;
+class MessageDistributor;
+
+class MessageGroupManager : public StatefulQueueObserver, public MessageDistributor
+{
+ static std::string defaultGroupId; // assigned if no group id header present
+
+ const std::string groupIdHeader; // msg header holding group identifier
+ const unsigned int timestamp; // mark messages with timestamp if set
+ Messages& messages; // parent Queue's in memory message container
+ const std::string qName; // name of parent queue (for logs)
+
+ struct GroupState {
+ typedef std::deque<framing::SequenceNumber> PositionFifo;
+
+ std::string group; // group identifier
+ std::string owner; // consumer with outstanding acquired messages
+ uint32_t acquired; // count of outstanding acquired messages
+ //uint32_t total; // count of enqueued messages in this group
+ PositionFifo members; // msgs belonging to this group
+
+ GroupState() : acquired(0) {}
+ bool owned() const {return !owner.empty();}
+ };
+ typedef std::map<std::string, struct GroupState> GroupMap;
+ //typedef std::map<std::string, uint32_t> Consumers; // count of owned groups
+ typedef std::map<framing::SequenceNumber, struct GroupState *> GroupFifo;
+
+ // note: update getState()/setState() when changing this object's state implementation
+ GroupMap messageGroups; // index: group name
+ GroupFifo freeGroups; // ordered by oldest free msg
+ //Consumers consumers; // index: consumer name
+
+ static const std::string qpidMessageGroupKey;
+ static const std::string qpidSharedGroup; // if specified, one group can be consumed by multiple receivers
+ static const std::string qpidMessageGroupTimestamp;
+
+ const std::string getGroupId( const QueuedMessage& qm ) const;
+ void unFree( const GroupState& state )
+ {
+ GroupFifo::iterator pos = freeGroups.find( state.members.front() );
+ assert( pos != freeGroups.end() && pos->second == &state );
+ freeGroups.erase( pos );
+ }
+ void own( GroupState& state, const std::string& owner )
+ {
+ state.owner = owner;
+ //consumers[state.owner]++;
+ unFree( state );
+ }
+ void disown( GroupState& state )
+ {
+ //assert(consumers[state.owner]);
+ //consumers[state.owner]--;
+ state.owner.clear();
+ assert(state.members.size());
+ assert(freeGroups.find(state.members.front()) == freeGroups.end());
+ freeGroups[state.members.front()] = &state;
+ }
+
+ public:
+
+ static QPID_BROKER_EXTERN void setDefaults(const std::string& groupId);
+ static boost::shared_ptr<MessageGroupManager> create( const std::string& qName,
+ Messages& messages,
+ const qpid::framing::FieldTable& settings );
+
+ MessageGroupManager(const std::string& header, const std::string& _qName,
+ Messages& container, unsigned int _timestamp=0 )
+ : StatefulQueueObserver(std::string("MessageGroupManager:") + header),
+ groupIdHeader( header ), timestamp(_timestamp), messages(container), qName(_qName) {}
+ void enqueued( const QueuedMessage& qm );
+ void acquired( const QueuedMessage& qm );
+ void requeued( const QueuedMessage& qm );
+ void dequeued( const QueuedMessage& qm );
+ void consumerAdded( const Consumer& );
+ void consumerRemoved( const Consumer& );
+ void getState(qpid::framing::FieldTable& state ) const;
+ void setState(const qpid::framing::FieldTable&);
+
+ // MessageDistributor iface
+ bool nextConsumableMessage(Consumer::shared_ptr& c, QueuedMessage& next);
+ bool allocate(const std::string& c, const QueuedMessage& qm);
+ bool nextBrowsableMessage(Consumer::shared_ptr& c, QueuedMessage& next);
+ void query(qpid::types::Variant::Map&) const;
+
+ bool match(const qpid::types::Variant::Map*, const QueuedMessage&) const;
+};
+
+}}
+
+#endif
diff --git a/cpp/src/qpid/broker/Messages.h b/cpp/src/qpid/broker/Messages.h
index c535fd1936..448f17432a 100644
--- a/cpp/src/qpid/broker/Messages.h
+++ b/cpp/src/qpid/broker/Messages.h
@@ -76,7 +76,6 @@ class Messages
* @return true if there is another message, false otherwise.
*/
virtual bool next(const framing::SequenceNumber&, QueuedMessage&) = 0;
-
/**
* Note: Caller is responsible for ensuring that there is a front
* (e.g. empty() returns false)
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index dd3f982699..3d878d02a8 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -33,6 +33,8 @@
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/QueueFlowLimit.h"
#include "qpid/broker/ThresholdAlerts.h"
+#include "qpid/broker/FifoDistributor.h"
+#include "qpid/broker/MessageGroupManager.h"
#include "qpid/StringUtils.h"
#include "qpid/log/Statement.h"
@@ -42,6 +44,7 @@
#include "qpid/sys/ClusterSafe.h"
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Time.h"
+#include "qpid/types/Variant.h"
#include "qmf/org/apache/qpid/broker/ArgsQueuePurge.h"
#include "qmf/org/apache/qpid/broker/ArgsQueueReroute.h"
@@ -111,7 +114,8 @@ Queue::Queue(const string& _name, bool _autodelete,
broker(b),
deleted(false),
barrier(*this),
- autoDeleteTimeout(0)
+ autoDeleteTimeout(0),
+ allocator(new FifoDistributor( *messages ))
{
if (parent != 0 && broker != 0) {
ManagementAgent* agent = broker->getManagementAgent();
@@ -220,6 +224,7 @@ void Queue::requeue(const QueuedMessage& msg){
enqueue(0, payload);
}
}
+ observeRequeue(msg, locker);
}
copy.notify();
}
@@ -229,7 +234,7 @@ bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& mess
Mutex::ScopedLock locker(messageLock);
assertClusterSafe();
QPID_LOG(debug, "Attempting to acquire message at " << position);
- if (messages->remove(position, message)) {
+ if (acquire(position, message, locker)) {
QPID_LOG(debug, "Acquired message at " << position << " from " << name);
return true;
} else {
@@ -238,9 +243,24 @@ bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& mess
}
}
-bool Queue::acquire(const QueuedMessage& msg) {
- QueuedMessage copy = msg;
- return acquireMessageAt(msg.position, copy);
+bool Queue::acquire(const QueuedMessage& msg, const std::string& consumer)
+{
+ Mutex::ScopedLock locker(messageLock);
+ assertClusterSafe();
+ QPID_LOG(debug, consumer << " attempting to acquire message at " << msg.position);
+
+ if (!allocator->allocate( consumer, msg )) {
+ QPID_LOG(debug, "Not permitted to acquire msg at " << msg.position << " from '" << name);
+ return false;
+ }
+
+ QueuedMessage copy(msg);
+ if (acquire( msg.position, copy, locker)) {
+ QPID_LOG(debug, "Acquired message at " << msg.position << " from " << name);
+ return true;
+ }
+ QPID_LOG(debug, "Could not acquire message at " << msg.position << " from " << name << "; no message at that position");
+ return false;
}
void Queue::notifyListener()
@@ -256,7 +276,7 @@ void Queue::notifyListener()
set.notify();
}
-bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
+bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr& c)
{
checkNotDeleted();
if (c->preAcquires()) {
@@ -274,46 +294,65 @@ bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
}
}
-Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
+Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr& c)
{
while (true) {
Mutex::ScopedLock locker(messageLock);
- if (messages->empty()) {
- QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
+ QueuedMessage msg;
+
+ if (!allocator->nextConsumableMessage(c, msg)) { // no next available
+ QPID_LOG(debug, "No messages available to dispatch to consumer " <<
+ c->getName() << " on queue '" << name << "'");
listeners.addListener(c);
return NO_MESSAGES;
- } else {
- QueuedMessage msg = messages->front();
- if (msg.payload->hasExpired()) {
- QPID_LOG(debug, "Message expired from queue '" << name << "'");
- popAndDequeue();
- continue;
- }
+ }
- if (c->filter(msg.payload)) {
- if (c->accept(msg.payload)) {
- m = msg;
- pop();
- return CONSUMED;
- } else {
- //message(s) are available but consumer hasn't got enough credit
- QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'");
- return CANT_CONSUME;
- }
+ if (msg.payload->hasExpired()) {
+ QPID_LOG(debug, "Message expired from queue '" << name << "'");
+ c->position = msg.position;
+ acquire( msg.position, msg, locker);
+ dequeue( 0, msg );
+ continue;
+ }
+
+ // a message is available for this consumer - can the consumer use it?
+
+ if (c->filter(msg.payload)) {
+ if (c->accept(msg.payload)) {
+ bool ok = allocator->allocate( c->getName(), msg ); // inform allocator
+ (void) ok; assert(ok);
+ ok = acquire( msg.position, msg, locker);
+ (void) ok; assert(ok);
+ m = msg;
+ c->position = m.position;
+ return CONSUMED;
} else {
- //consumer will never want this message
- QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
+ //message(s) are available but consumer hasn't got enough credit
+ QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'");
return CANT_CONSUME;
}
+ } else {
+ //consumer will never want this message
+ QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
+ c->position = msg.position;
+ return CANT_CONSUME;
}
}
}
-
-bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
+bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr& c)
{
- QueuedMessage msg(this);
- while (seek(msg, c)) {
+ while (true) {
+ Mutex::ScopedLock locker(messageLock);
+ QueuedMessage msg;
+
+ if (!allocator->nextBrowsableMessage(c, msg)) { // no next available
+ QPID_LOG(debug, "No browsable messages available for consumer " <<
+ c->getName() << " on queue '" << name << "'");
+ listeners.addListener(c);
+ return false;
+ }
+
if (c->filter(msg.payload) && !msg.payload->hasExpired()) {
if (c->accept(msg.payload)) {
//consumer wants the message
@@ -327,8 +366,8 @@ bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
}
} else {
//consumer will never want this message, continue seeking
- c->position = msg.position;
QPID_LOG(debug, "Browser skipping message from '" << name << "'");
+ c->position = msg.position;
}
}
return false;
@@ -358,61 +397,71 @@ bool Queue::dispatch(Consumer::shared_ptr c)
}
}
-// Find the next message
-bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) {
- Mutex::ScopedLock locker(messageLock);
- if (messages->next(c->position, msg)) {
- return true;
- } else {
- listeners.addListener(c);
- return false;
- }
-}
-
-QueuedMessage Queue::find(SequenceNumber pos) const {
+bool Queue::find(SequenceNumber pos, QueuedMessage& msg) const {
Mutex::ScopedLock locker(messageLock);
- QueuedMessage msg;
- messages->find(pos, msg);
- return msg;
+ if (messages->find(pos, msg))
+ return true;
+ return false;
}
void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){
assertClusterSafe();
- Mutex::ScopedLock locker(consumerLock);
- if(exclusive) {
- throw ResourceLockedException(
- QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed."));
- } else if(requestExclusive) {
- if(consumerCount) {
+ {
+ Mutex::ScopedLock locker(consumerLock);
+ if(exclusive) {
throw ResourceLockedException(
- QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied."));
- } else {
- exclusive = c->getSession();
+ QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed."));
+ } else if(requestExclusive) {
+ if(consumerCount) {
+ throw ResourceLockedException(
+ QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied."));
+ } else {
+ exclusive = c->getSession();
+ }
+ }
+ consumerCount++;
+ if (mgmtObject != 0)
+ mgmtObject->inc_consumerCount ();
+ //reset auto deletion timer if necessary
+ if (autoDeleteTimeout && autoDeleteTask) {
+ autoDeleteTask->cancel();
}
}
- consumerCount++;
- if (mgmtObject != 0)
- mgmtObject->inc_consumerCount ();
- //reset auto deletion timer if necessary
- if (autoDeleteTimeout && autoDeleteTask) {
- autoDeleteTask->cancel();
+ Mutex::ScopedLock locker(messageLock);
+ for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
+ try{
+ (*i)->consumerAdded(*c);
+ } catch (const std::exception& e) {
+ QPID_LOG(warning, "Exception on notification of new consumer for queue " << getName() << ": " << e.what());
+ }
}
}
void Queue::cancel(Consumer::shared_ptr c){
removeListener(c);
- Mutex::ScopedLock locker(consumerLock);
- consumerCount--;
- if(exclusive) exclusive = 0;
- if (mgmtObject != 0)
- mgmtObject->dec_consumerCount ();
+ {
+ Mutex::ScopedLock locker(consumerLock);
+ consumerCount--;
+ if(exclusive) exclusive = 0;
+ if (mgmtObject != 0)
+ mgmtObject->dec_consumerCount ();
+ }
+ Mutex::ScopedLock locker(messageLock);
+ for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
+ try{
+ (*i)->consumerRemoved(*c);
+ } catch (const std::exception& e) {
+ QPID_LOG(warning, "Exception on notification of removed consumer for queue " << getName() << ": " << e.what());
+ }
+ }
}
QueuedMessage Queue::get(){
Mutex::ScopedLock locker(messageLock);
QueuedMessage msg(this);
- messages->pop(msg);
+ if (messages->pop(msg))
+ observeAcquire(msg, locker);
return msg;
}
@@ -443,11 +492,118 @@ void Queue::purgeExpired(qpid::sys::Duration lapse)
Mutex::ScopedLock locker(messageLock);
messages->removeIf(boost::bind(&collect_if_expired, boost::ref(expired), _1));
}
- for_each(expired.begin(), expired.end(),
- boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
+
+ for (std::deque<QueuedMessage>::const_iterator i = expired.begin();
+ i != expired.end(); ++i) {
+ {
+ Mutex::ScopedLock locker(messageLock);
+ observeAcquire(*i, locker);
+ }
+ dequeue( 0, *i );
+ }
}
}
+
+namespace {
+ // for use with purge/move below - collect messages that match a given filter
+ //
+ class MessageFilter
+ {
+ public:
+ static const std::string typeKey;
+ static const std::string paramsKey;
+ static MessageFilter *create( const ::qpid::types::Variant::Map *filter );
+ virtual bool match( const QueuedMessage& ) const { return true; }
+ virtual ~MessageFilter() {}
+ protected:
+ MessageFilter() {};
+ };
+ const std::string MessageFilter::typeKey("filter_type");
+ const std::string MessageFilter::paramsKey("filter_params");
+
+ // filter by message header string value exact match
+ class HeaderMatchFilter : public MessageFilter
+ {
+ public:
+ /* Config:
+ { 'filter_type' : 'header_match_str',
+ 'filter_params' : { 'header_key' : "<header name>",
+ 'header_value' : "<value to match>"
+ }
+ }
+ */
+ static const std::string typeKey;
+ static const std::string headerKey;
+ static const std::string valueKey;
+ HeaderMatchFilter( const std::string& _header, const std::string& _value )
+ : MessageFilter (), header(_header), value(_value) {}
+ bool match( const QueuedMessage& msg ) const
+ {
+ const qpid::framing::FieldTable* headers = msg.payload->getApplicationHeaders();
+ if (!headers) return false;
+ FieldTable::ValuePtr h = headers->get(header);
+ if (!h || !h->convertsTo<std::string>()) return false;
+ return h->get<std::string>() == value;
+ }
+ private:
+ const std::string header;
+ const std::string value;
+ };
+ const std::string HeaderMatchFilter::typeKey("header_match_str");
+ const std::string HeaderMatchFilter::headerKey("header_key");
+ const std::string HeaderMatchFilter::valueKey("header_value");
+
+ // factory to create correct filter based on map
+ MessageFilter* MessageFilter::create( const ::qpid::types::Variant::Map *filter )
+ {
+ using namespace qpid::types;
+ if (filter && !filter->empty()) {
+ Variant::Map::const_iterator i = filter->find(MessageFilter::typeKey);
+ if (i != filter->end()) {
+
+ if (i->second.asString() == HeaderMatchFilter::typeKey) {
+ Variant::Map::const_iterator p = filter->find(MessageFilter::paramsKey);
+ if (p != filter->end() && p->second.getType() == VAR_MAP) {
+ Variant::Map::const_iterator k = p->second.asMap().find(HeaderMatchFilter::headerKey);
+ Variant::Map::const_iterator v = p->second.asMap().find(HeaderMatchFilter::valueKey);
+ if (k != p->second.asMap().end() && v != p->second.asMap().end()) {
+ std::string headerKey(k->second.asString());
+ std::string value(v->second.asString());
+ QPID_LOG(debug, "Message filtering by header value configured. key: " << headerKey << " value: " << value );
+ return new HeaderMatchFilter( headerKey, value );
+ }
+ }
+ }
+ }
+ QPID_LOG(error, "Ignoring unrecognized message filter: '" << *filter << "'");
+ }
+ return new MessageFilter();
+ }
+
+ // used by removeIf() to collect all messages matching a filter, maximum match count is
+ // optional.
+ struct Collector {
+ const uint32_t maxMatches;
+ MessageFilter& filter;
+ std::deque<QueuedMessage> matches;
+ Collector(MessageFilter& filter, uint32_t max)
+ : maxMatches(max), filter(filter) {}
+ bool operator() (QueuedMessage& qm)
+ {
+ if (maxMatches == 0 || matches.size() < maxMatches) {
+ if (filter.match( qm )) {
+ matches.push_back(qm);
+ return true;
+ }
+ }
+ return false;
+ }
+ };
+
+} // end namespace
+
+
/**
* purge - for purging all or some messages on a queue
* depending on the purge_request
@@ -459,62 +615,77 @@ void Queue::purgeExpired(qpid::sys::Duration lapse)
* The dest exchange may be supplied to re-route messages through the exchange.
* It is safe to re-route messages such that they arrive back on the same queue,
* even if the queue is ordered by priority.
+ *
+ * An optional filter can be supplied that will be applied against each message. The
+ * message is purged only if the filter matches. See MessageDistributor for more detail.
*/
-uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> dest)
+uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> dest,
+ const qpid::types::Variant::Map *filter)
{
- Mutex::ScopedLock locker(messageLock);
- uint32_t purge_count = purge_request; // only comes into play if >0
- std::deque<DeliverableMessage> rerouteQueue;
+ std::auto_ptr<MessageFilter> mf(MessageFilter::create(filter));
+ Collector c(*mf.get(), purge_request);
- uint32_t count = 0;
- // Either purge them all or just the some (purge_count) while the queue isn't empty.
- while((!purge_request || purge_count--) && !messages->empty()) {
+ Mutex::ScopedLock locker(messageLock);
+ messages->removeIf( boost::bind<bool>(boost::ref(c), _1) );
+ for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin();
+ qmsg != c.matches.end(); ++qmsg) {
+ // Update observers and message state:
+ observeAcquire(*qmsg, locker);
+ dequeue(0, *qmsg);
+ // now reroute if necessary
if (dest.get()) {
- //
- // If there is a destination exchange, stage the messages onto a reroute queue
- // so they don't wind up getting purged more than once.
- //
- DeliverableMessage msg(messages->front().payload);
- rerouteQueue.push_back(msg);
+ assert(qmsg->payload);
+ DeliverableMessage dmsg(qmsg->payload);
+ dest->routeWithAlternate(dmsg);
}
- popAndDequeue();
- count++;
- }
-
- //
- // Re-route purged messages into the destination exchange. Note that there's no need
- // to test dest.get() here because if it is NULL, the rerouteQueue will be empty.
- //
- while (!rerouteQueue.empty()) {
- DeliverableMessage msg(rerouteQueue.front());
- rerouteQueue.pop_front();
- dest->routeWithAlternate(msg);
}
-
- return count;
+ return c.matches.size();
}
-uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty) {
+uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty,
+ const qpid::types::Variant::Map *filter)
+{
+ std::auto_ptr<MessageFilter> mf(MessageFilter::create(filter));
+ Collector c(*mf.get(), qty);
+
Mutex::ScopedLock locker(messageLock);
- uint32_t move_count = qty; // only comes into play if qty >0
- uint32_t count = 0; // count how many were moved for returning
+ messages->removeIf( boost::bind<bool>(boost::ref(c), _1) );
- while((!qty || move_count--) && !messages->empty()) {
- QueuedMessage qmsg = messages->front();
- boost::intrusive_ptr<Message> msg = qmsg.payload;
- destq->deliver(msg); // deliver message to the destination queue
- pop();
- dequeue(0, qmsg);
- count++;
+ for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin();
+ qmsg != c.matches.end(); ++qmsg) {
+ // Update observers and message state:
+ observeAcquire(*qmsg, locker);
+ dequeue(0, *qmsg);
+ // and move to destination Queue.
+ assert(qmsg->payload);
+ destq->deliver(qmsg->payload);
}
- return count;
+ return c.matches.size();
}
-void Queue::pop()
+/** Acquire the front (oldest) message from the in-memory queue.
+ * assumes messageLock held by caller
+ */
+void Queue::pop(const Mutex::ScopedLock& locker)
{
assertClusterSafe();
- messages->pop();
- ++dequeueSincePurge;
+ QueuedMessage msg;
+ if (messages->pop(msg)) {
+ observeAcquire(msg, locker);
+ ++dequeueSincePurge;
+ }
+}
+
+/** Acquire the message at the given position, return true and msg if acquire succeeds */
+bool Queue::acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg,
+ const Mutex::ScopedLock& locker)
+{
+ if (messages->remove(position, msg)) {
+ observeAcquire(msg, locker);
+ ++dequeueSincePurge;
+ return true;
+ }
+ return false;
}
void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
@@ -528,8 +699,10 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
if (insertSeqNo) msg->insertCustomProperty(seqNoKey, sequence);
dequeueRequired = messages->push(qm, removed);
+ if (dequeueRequired)
+ observeAcquire(removed, locker);
listeners.populate(copy);
- enqueued(qm);
+ observeEnqueue(qm, locker);
}
copy.notify();
if (dequeueRequired) {
@@ -663,7 +836,7 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
Mutex::ScopedLock locker(messageLock);
if (!isEnqueued(msg)) return false;
if (!ctxt) {
- dequeued(msg);
+ observeDequeue(msg, locker);
}
}
// This check prevents messages which have been forced persistent on one queue from dequeuing
@@ -683,7 +856,7 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
void Queue::dequeueCommitted(const QueuedMessage& msg)
{
Mutex::ScopedLock locker(messageLock);
- dequeued(msg);
+ observeDequeue(msg, locker);
if (mgmtObject != 0) {
mgmtObject->inc_msgTxnDequeues();
mgmtObject->inc_byteTxnDequeues(msg.payload->contentSize());
@@ -691,21 +864,23 @@ void Queue::dequeueCommitted(const QueuedMessage& msg)
}
/**
- * Removes a message from the in-memory delivery queue as well
- * dequeing it from the logical (and persistent if applicable) queue
+ * Removes the first (oldest) message from the in-memory delivery queue as well dequeing
+ * it from the logical (and persistent if applicable) queue
*/
-void Queue::popAndDequeue()
+void Queue::popAndDequeue(const Mutex::ScopedLock& held)
{
- QueuedMessage msg = messages->front();
- pop();
- dequeue(0, msg);
+ if (!messages->empty()) {
+ QueuedMessage msg = messages->front();
+ pop(held);
+ dequeue(0, msg);
+ }
}
/**
* Updates policy and management when a message has been dequeued,
* expects messageLock to be held
*/
-void Queue::dequeued(const QueuedMessage& msg)
+void Queue::observeDequeue(const QueuedMessage& msg, const Mutex::ScopedLock&)
{
if (policy.get()) policy->dequeued(msg);
mgntDeqStats(msg.payload);
@@ -718,6 +893,33 @@ void Queue::dequeued(const QueuedMessage& msg)
}
}
+/** updates queue observers when a message has become unavailable for transfer,
+ * expects messageLock to be held
+ */
+void Queue::observeAcquire(const QueuedMessage& msg, const Mutex::ScopedLock&)
+{
+ for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
+ try{
+ (*i)->acquired(msg);
+ } catch (const std::exception& e) {
+ QPID_LOG(warning, "Exception on notification of message removal for queue " << getName() << ": " << e.what());
+ }
+ }
+}
+
+/** updates queue observers when a message has become re-available for transfer,
+ * expects messageLock to be held
+ */
+void Queue::observeRequeue(const QueuedMessage& msg, const Mutex::ScopedLock&)
+{
+ for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
+ try{
+ (*i)->requeued(msg);
+ } catch (const std::exception& e) {
+ QPID_LOG(warning, "Exception on notification of message requeue for queue " << getName() << ": " << e.what());
+ }
+ }
+}
void Queue::create(const FieldTable& _settings)
{
@@ -788,17 +990,28 @@ void Queue::configureImpl(const FieldTable& _settings)
if (lvqKey.size()) {
QPID_LOG(debug, "Configured queue " << getName() << " as Last Value Queue with key " << lvqKey);
messages = std::auto_ptr<Messages>(new MessageMap(lvqKey));
+ allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *messages ));
} else if (_settings.get(qpidLastValueQueueNoBrowse)) {
QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue with 'no-browse' on");
messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, true, broker);
+ allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *messages ));
} else if (_settings.get(qpidLastValueQueue)) {
QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue");
messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, false, broker);
+ allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *messages ));
} else {
std::auto_ptr<Messages> m = Fairshare::create(_settings);
if (m.get()) {
messages = m;
+ allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *messages ));
QPID_LOG(debug, "Configured queue " << getName() << " as priority queue.");
+ } else { // default (FIFO) queue type
+ // override default message allocator if message groups configured.
+ boost::shared_ptr<MessageGroupManager> mgm(MessageGroupManager::create( getName(), *messages, _settings));
+ if (mgm) {
+ allocator = mgm;
+ addObserver(mgm);
+ }
}
}
@@ -835,7 +1048,7 @@ void Queue::destroyed()
while(!messages->empty()){
DeliverableMessage msg(messages->front().payload);
alternateExchange->routeWithAlternate(msg);
- popAndDequeue();
+ popAndDequeue(locker);
}
alternateExchange->decAlternateUsers();
}
@@ -1057,7 +1270,7 @@ Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, str
case _qmf::Queue::METHOD_PURGE :
{
_qmf::ArgsQueuePurge& purgeArgs = (_qmf::ArgsQueuePurge&) args;
- purge(purgeArgs.i_request);
+ purge(purgeArgs.i_request, boost::shared_ptr<Exchange>(), &purgeArgs.i_filter);
status = Manageable::STATUS_OK;
}
break;
@@ -1078,7 +1291,7 @@ Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, str
}
}
- purge(rerouteArgs.i_request, dest);
+ purge(rerouteArgs.i_request, dest, &rerouteArgs.i_filter);
status = Manageable::STATUS_OK;
}
break;
@@ -1087,6 +1300,14 @@ Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, str
return status;
}
+
+void Queue::query(qpid::types::Variant::Map& results) const
+{
+ Mutex::ScopedLock locker(messageLock);
+ /** @todo add any interesting queue state into results */
+ if (allocator) allocator->query(results);
+}
+
void Queue::setPosition(SequenceNumber n) {
Mutex::ScopedLock locker(messageLock);
sequence = n;
@@ -1121,7 +1342,10 @@ void Queue::insertSequenceNumbers(const std::string& key)
QPID_LOG(debug, "Inserting sequence numbers as " << key);
}
-void Queue::enqueued(const QueuedMessage& m)
+/** updates queue observers and state when a message has become available for transfer,
+ * expects messageLock to be held
+ */
+void Queue::observeEnqueue(const QueuedMessage& m, const Mutex::ScopedLock&)
{
for (Observers::iterator i = observers.begin(); i != observers.end(); ++i) {
try {
@@ -1144,7 +1368,8 @@ void Queue::updateEnqueued(const QueuedMessage& m)
if (policy.get()) {
policy->recoverEnqueued(payload);
}
- enqueued(m);
+ Mutex::ScopedLock locker(messageLock);
+ observeEnqueue(m, locker);
} else {
QPID_LOG(warning, "Queue informed of enqueued message that has no payload");
}
@@ -1168,6 +1393,7 @@ void Queue::checkNotDeleted()
void Queue::addObserver(boost::shared_ptr<QueueObserver> observer)
{
+ Mutex::ScopedLock locker(messageLock);
observers.insert(observer);
}
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index 8435e75cab..59ae41e768 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -59,7 +59,7 @@ class MessageStore;
class QueueEvents;
class QueueRegistry;
class TransactionContext;
-class Exchange;
+class MessageDistributor;
/**
* The brokers representation of an amqp queue. Messages are
@@ -129,23 +129,32 @@ class Queue : public boost::enable_shared_from_this<Queue>,
UsageBarrier barrier;
int autoDeleteTimeout;
boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask;
+ boost::shared_ptr<MessageDistributor> allocator;
void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false);
void setPolicy(std::auto_ptr<QueuePolicy> policy);
- bool seek(QueuedMessage& msg, Consumer::shared_ptr position);
- bool getNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
- ConsumeCode consumeNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
- bool browseNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
+ bool getNextMessage(QueuedMessage& msg, Consumer::shared_ptr& c);
+ ConsumeCode consumeNextMessage(QueuedMessage& msg, Consumer::shared_ptr& c);
+ bool browseNextMessage(QueuedMessage& msg, Consumer::shared_ptr& c);
void notifyListener();
void removeListener(Consumer::shared_ptr);
bool isExcluded(boost::intrusive_ptr<Message>& msg);
- void enqueued(const QueuedMessage& msg);
- void dequeued(const QueuedMessage& msg);
- void pop();
- void popAndDequeue();
+ /** update queue observers, stats, policy, etc when the messages' state changes. Lock
+ * must be held by caller */
+ void observeEnqueue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
+ void observeAcquire(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
+ void observeRequeue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
+ void observeDequeue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
+
+ /** modify the Queue's message container - assumes messageLock held */
+ void pop(const sys::Mutex::ScopedLock& held); // acquire front msg
+ void popAndDequeue(const sys::Mutex::ScopedLock& held); // acquire and dequeue front msg
+ // acquire message @ position, return true and set msg if acquire succeeds
+ bool acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg,
+ const sys::Mutex::ScopedLock& held);
void forcePersistent(QueuedMessage& msg);
int getEventMode();
@@ -191,8 +200,15 @@ class Queue : public boost::enable_shared_from_this<Queue>,
Broker* broker = 0);
QPID_BROKER_EXTERN ~Queue();
+ /** allow the Consumer to consume or browse the next available message */
QPID_BROKER_EXTERN bool dispatch(Consumer::shared_ptr);
+ /** allow the Consumer to acquire a message that it has browsed.
+ * @param msg - message to be acquired.
+ * @return false if message is no longer available for acquire.
+ */
+ QPID_BROKER_EXTERN bool acquire(const QueuedMessage& msg, const std::string& consumer);
+
/**
* Used to configure a new queue and create a persistent record
* for it in store if required.
@@ -216,7 +232,11 @@ class Queue : public boost::enable_shared_from_this<Queue>,
bool bind(boost::shared_ptr<Exchange> exchange, const std::string& key,
const qpid::framing::FieldTable& arguments=qpid::framing::FieldTable());
- QPID_BROKER_EXTERN bool acquire(const QueuedMessage& msg);
+ /** Acquire the message at the given position if it is available for acquire. Not to
+ * be used by clients, but used by the broker for queue management.
+ * @param message - set to the acquired message if true returned.
+ * @return true if the message has been acquired.
+ */
QPID_BROKER_EXTERN bool acquireMessageAt(const qpid::framing::SequenceNumber& position, QueuedMessage& message);
/**
@@ -245,11 +265,14 @@ class Queue : public boost::enable_shared_from_this<Queue>,
bool exclusive = false);
QPID_BROKER_EXTERN void cancel(Consumer::shared_ptr c);
- uint32_t purge(const uint32_t purge_request=0, boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>()); //defaults to all messages
+ uint32_t purge(const uint32_t purge_request=0, //defaults to all messages
+ boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>(),
+ const ::qpid::types::Variant::Map *filter=0);
QPID_BROKER_EXTERN void purgeExpired(sys::Duration);
//move qty # of messages to destination Queue destq
- uint32_t move(const Queue::shared_ptr destq, uint32_t qty);
+ uint32_t move(const Queue::shared_ptr destq, uint32_t qty,
+ const qpid::types::Variant::Map *filter=0);
QPID_BROKER_EXTERN uint32_t getMessageCount() const;
QPID_BROKER_EXTERN uint32_t getEnqueueCompleteMessageCount() const;
@@ -302,12 +325,12 @@ class Queue : public boost::enable_shared_from_this<Queue>,
bool isEnqueued(const QueuedMessage& msg);
/**
- * Gets the next available message
+ * Acquires the next available (oldest) message
*/
QPID_BROKER_EXTERN QueuedMessage get();
- /** Get the message at position pos */
- QPID_BROKER_EXTERN QueuedMessage find(framing::SequenceNumber pos) const;
+ /** Get the message at position pos, returns true if found and sets msg */
+ QPID_BROKER_EXTERN bool find(framing::SequenceNumber pos, QueuedMessage& msg ) const;
const QueuePolicy* getPolicy();
@@ -336,6 +359,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
management::ManagementObject* GetManagementObject (void) const;
management::Manageable::status_t
ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
+ void query(::qpid::types::Variant::Map&) const;
/** Apply f to each Message on the queue. */
template <class F> void eachMessage(F f) {
diff --git a/cpp/src/qpid/broker/QueueEvents.cpp b/cpp/src/qpid/broker/QueueEvents.cpp
index 2c540ff1ad..c66bdabf0f 100644
--- a/cpp/src/qpid/broker/QueueEvents.cpp
+++ b/cpp/src/qpid/broker/QueueEvents.cpp
@@ -129,6 +129,10 @@ class EventGenerator : public QueueObserver
{
if (!enqueueOnly) manager.dequeued(m);
}
+
+ void acquired(const QueuedMessage&) {};
+ void requeued(const QueuedMessage&) {};
+
private:
QueueEvents& manager;
const bool enqueueOnly;
diff --git a/cpp/src/qpid/broker/QueueFlowLimit.cpp b/cpp/src/qpid/broker/QueueFlowLimit.cpp
index fcf8d089f9..f15bb45c01 100644
--- a/cpp/src/qpid/broker/QueueFlowLimit.cpp
+++ b/cpp/src/qpid/broker/QueueFlowLimit.cpp
@@ -377,7 +377,8 @@ void QueueFlowLimit::setState(const qpid::framing::FieldTable& state)
++i;
fcmsg.add(first, last);
for (SequenceNumber seq = first; seq <= last; ++seq) {
- QueuedMessage msg(queue->find(seq)); // fyi: msg.payload may be null if msg is delivered & unacked
+ QueuedMessage msg;
+ queue->find(seq, msg); // fyi: may not be found if msg is acquired & unacked
bool unique;
unique = index.insert(std::pair<framing::SequenceNumber, boost::intrusive_ptr<Message> >(seq, msg.payload)).second;
// Like this to avoid tripping up unused variable warning when NDEBUG set
diff --git a/cpp/src/qpid/broker/QueueFlowLimit.h b/cpp/src/qpid/broker/QueueFlowLimit.h
index c02e479976..ad8a2720ef 100644
--- a/cpp/src/qpid/broker/QueueFlowLimit.h
+++ b/cpp/src/qpid/broker/QueueFlowLimit.h
@@ -84,6 +84,9 @@ class Broker;
QPID_BROKER_EXTERN void enqueued(const QueuedMessage&);
/** the queue has removed QueuedMessage. Returns true if flow state changes */
QPID_BROKER_EXTERN void dequeued(const QueuedMessage&);
+ /** ignored */
+ QPID_BROKER_EXTERN void acquired(const QueuedMessage&) {};
+ QPID_BROKER_EXTERN void requeued(const QueuedMessage&) {};
/** for clustering: */
QPID_BROKER_EXTERN void getState(qpid::framing::FieldTable&) const;
diff --git a/cpp/src/qpid/broker/QueueObserver.h b/cpp/src/qpid/broker/QueueObserver.h
index 3ca01c051e..b58becd2ae 100644
--- a/cpp/src/qpid/broker/QueueObserver.h
+++ b/cpp/src/qpid/broker/QueueObserver.h
@@ -25,17 +25,51 @@ namespace qpid {
namespace broker {
struct QueuedMessage;
+class Consumer;
+
/**
- * Interface for notifying classes who want to act as 'observers' of a
- * queue of particular events.
+ * Interface for notifying classes who want to act as 'observers' of a queue of particular
+ * events.
+ *
+ * The events that are monitored reflect the relationship between a particular message and
+ * the queue it has been delivered to. A message can be considered in one of three states
+ * with respect to the queue:
+ *
+ * 1) "Available" - available for transfer to consumers (i.e. for browse or acquire),
+ *
+ * 2) "Acquired" - owned by a particular consumer, no longer available to other consumers
+ * (by either browse or acquire), but still considered on the queue.
+ *
+ * 3) "Dequeued" - removed from the queue and no longer available to any consumer.
+ *
+ * The queue events that are observable are:
+ *
+ * "Enqueued" - the message is "Available" - on the queue for transfer to any consumer
+ * (e.g. browse or acquire)
+ *
+ * "Acquired" - - a consumer has claimed exclusive access to it. It is no longer available
+ * for other consumers to browse or acquire, but it is not yet considered dequeued as it
+ * may be requeued by the consumer.
+ *
+ * "Requeued" - a previously-acquired message is released by its owner: it is put back on
+ * the queue at its original position and returns to the "Available" state.
+ *
+ * "Dequeued" - a message is no longer queued. At this point, the queue no longer tracks
+ * the message, and the broker considers the consumer's transaction complete.
*/
class QueueObserver
{
public:
virtual ~QueueObserver() {}
+
+ // note: the Queue will hold the messageLock while calling these methods!
virtual void enqueued(const QueuedMessage&) = 0;
virtual void dequeued(const QueuedMessage&) = 0;
- private:
+ virtual void acquired(const QueuedMessage&) = 0;
+ virtual void requeued(const QueuedMessage&) = 0;
+ virtual void consumerAdded( const Consumer& ) {};
+ virtual void consumerRemoved( const Consumer& ) {};
+ private:
};
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/QueuePolicy.cpp b/cpp/src/qpid/broker/QueuePolicy.cpp
index 6ae0d53b1a..0c245700af 100644
--- a/cpp/src/qpid/broker/QueuePolicy.cpp
+++ b/cpp/src/qpid/broker/QueuePolicy.cpp
@@ -269,8 +269,7 @@ bool RingQueuePolicy::checkLimit(boost::intrusive_ptr<Message> m)
do {
QueuedMessage oldest = queue.front();
-
- if (oldest.queue->acquire(oldest) || !strict) {
+ if (oldest.queue->acquireMessageAt(oldest.position, oldest) || !strict) {
queue.pop_front();
pendingDequeues.push_back(oldest);
QPID_LOG(debug, "Ring policy triggered in " << name
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index b4f146e699..94d0cc87f7 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -107,11 +107,18 @@ bool SemanticState::exists(const string& consumerTag){
return consumers.find(consumerTag) != consumers.end();
}
+namespace {
+ const std::string SEPARATOR("::");
+}
+
void SemanticState::consume(const string& tag,
Queue::shared_ptr queue, bool ackRequired, bool acquire,
bool exclusive, const string& resumeId, uint64_t resumeTtl, const FieldTable& arguments)
{
- ConsumerImpl::shared_ptr c(new ConsumerImpl(this, tag, queue, ackRequired, acquire, exclusive, resumeId, resumeTtl, arguments));
+ // "tag" is only guaranteed to be unique to this session (see AMQP 0-10 Message.subscribe, destination).
+ // Create a globally unique name so the broker can identify individual consumers
+ std::string name = session.getSessionId().str() + SEPARATOR + tag;
+ ConsumerImpl::shared_ptr c(new ConsumerImpl(this, name, queue, ackRequired, acquire, exclusive, tag, resumeId, resumeTtl, arguments));
queue->consume(c, exclusive);//may throw exception
consumers[tag] = c;
}
@@ -267,15 +274,15 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
bool ack,
bool _acquire,
bool _exclusive,
+ const string& _tag,
const string& _resumeId,
uint64_t _resumeTtl,
const framing::FieldTable& _arguments
) :
- Consumer(_acquire),
+ Consumer(_name, _acquire),
parent(_parent),
- name(_name),
queue(_queue),
ackExpected(ack),
acquire(_acquire),
@@ -284,6 +291,7 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
windowActive(false),
exclusive(_exclusive),
resumeId(_resumeId),
+ tag(_tag),
resumeTtl(_resumeTtl),
arguments(_arguments),
msgCredit(0),
@@ -300,7 +308,7 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
if (agent != 0)
{
- mgmtObject = new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId() ,name,
+ mgmtObject = new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId(), getTag(),
!acquire, ackExpected, exclusive, ManagementAgent::toMap(arguments));
agent->addObject (mgmtObject);
mgmtObject->set_creditMode("WINDOW");
@@ -332,16 +340,15 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
{
assertClusterSafe();
allocateCredit(msg.payload);
- DeliveryRecord record(msg, queue, name, acquire, !ackExpected, windowing);
+ DeliveryRecord record(msg, queue, getTag(), acquire, !ackExpected, windowing);
bool sync = syncFrequency && ++deliveryCount >= syncFrequency;
if (sync) deliveryCount = 0;//reset
parent->deliver(record, sync);
- if (!ackExpected && acquire) record.setEnded();//allows message to be released now its been delivered
if (windowing || ackExpected || !acquire) {
parent->record(record);
}
- if (acquire && !ackExpected) {
- queue->dequeue(0, msg);
+ if (acquire && !ackExpected) { // auto acquire && auto accept
+ record.accept( 0 /*no ctxt*/ );
}
if (mgmtObject) { mgmtObject->inc_delivered(); }
return true;
@@ -371,7 +378,7 @@ struct ConsumerName {
};
ostream& operator<<(ostream& o, const ConsumerName& pc) {
- return o << pc.consumer.getName() << " on "
+ return o << pc.consumer.getTag() << " on "
<< pc.consumer.getParent().getSession().getSessionId();
}
}
@@ -561,50 +568,61 @@ void SemanticState::deliver(DeliveryRecord& msg, bool sync)
return deliveryAdapter.deliver(msg, sync);
}
-SemanticState::ConsumerImpl& SemanticState::find(const std::string& destination)
+const SemanticState::ConsumerImpl::shared_ptr SemanticState::find(const std::string& destination) const
{
- ConsumerImplMap::iterator i = consumers.find(destination);
- if (i == consumers.end()) {
- throw NotFoundException(QPID_MSG("Unknown destination " << destination));
+ ConsumerImpl::shared_ptr consumer;
+ if (!find(destination, consumer)) {
+ throw NotFoundException(QPID_MSG("Unknown destination " << destination << " session=" << session.getSessionId()));
} else {
- return *(i->second);
+ return consumer;
+ }
+}
+
+bool SemanticState::find(const std::string& destination, ConsumerImpl::shared_ptr& consumer) const
+{
+ // @todo KAG gsim: shouldn't the consumers map be locked????
+ ConsumerImplMap::const_iterator i = consumers.find(destination);
+ if (i == consumers.end()) {
+ return false;
}
+ consumer = i->second;
+ return true;
}
void SemanticState::setWindowMode(const std::string& destination)
{
- find(destination).setWindowMode();
+ find(destination)->setWindowMode();
}
void SemanticState::setCreditMode(const std::string& destination)
{
- find(destination).setCreditMode();
+ find(destination)->setCreditMode();
}
void SemanticState::addByteCredit(const std::string& destination, uint32_t value)
{
- ConsumerImpl& c = find(destination);
- c.addByteCredit(value);
- c.requestDispatch();
+ ConsumerImpl::shared_ptr c = find(destination);
+ c->addByteCredit(value);
+ c->requestDispatch();
}
void SemanticState::addMessageCredit(const std::string& destination, uint32_t value)
{
- ConsumerImpl& c = find(destination);
- c.addMessageCredit(value);
- c.requestDispatch();
+ ConsumerImpl::shared_ptr c = find(destination);
+ c->addMessageCredit(value);
+ c->requestDispatch();
}
void SemanticState::flush(const std::string& destination)
{
- find(destination).flush();
+ find(destination)->flush();
}
void SemanticState::stop(const std::string& destination)
{
- find(destination).stop();
+ find(destination)->stop();
}
void SemanticState::ConsumerImpl::setWindowMode()
diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h
index 69d980947b..12ccc75f11 100644
--- a/cpp/src/qpid/broker/SemanticState.h
+++ b/cpp/src/qpid/broker/SemanticState.h
@@ -75,7 +75,6 @@ class SemanticState : private boost::noncopyable {
{
mutable qpid::sys::Mutex lock;
SemanticState* const parent;
- const std::string name;
const boost::shared_ptr<Queue> queue;
const bool ackExpected;
const bool acquire;
@@ -84,6 +83,7 @@ class SemanticState : private boost::noncopyable {
bool windowActive;
bool exclusive;
std::string resumeId;
+ const std::string tag; // <destination> from AMQP 0-10 Message.subscribe command
uint64_t resumeTtl;
framing::FieldTable arguments;
uint32_t msgCredit;
@@ -103,7 +103,8 @@ class SemanticState : private boost::noncopyable {
ConsumerImpl(SemanticState* parent,
const std::string& name, boost::shared_ptr<Queue> queue,
bool ack, bool acquire, bool exclusive,
- const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments);
+ const std::string& tag, const std::string& resumeId,
+ uint64_t resumeTtl, const framing::FieldTable& arguments);
~ConsumerImpl();
OwnershipToken* getSession();
bool deliver(QueuedMessage& msg);
@@ -130,8 +131,6 @@ class SemanticState : private boost::noncopyable {
bool doOutput();
- std::string getName() const { return name; }
-
bool isAckExpected() const { return ackExpected; }
bool isAcquire() const { return acquire; }
bool isWindowing() const { return windowing; }
@@ -139,6 +138,7 @@ class SemanticState : private boost::noncopyable {
uint32_t getMsgCredit() const { return msgCredit; }
uint32_t getByteCredit() const { return byteCredit; }
std::string getResumeId() const { return resumeId; };
+ const std::string& getTag() const { return tag; }
uint64_t getResumeTtl() const { return resumeTtl; }
const framing::FieldTable& getArguments() const { return arguments; }
@@ -190,7 +190,8 @@ class SemanticState : private boost::noncopyable {
SessionContext& getSession() { return session; }
const SessionContext& getSession() const { return session; }
- ConsumerImpl& find(const std::string& destination);
+ const ConsumerImpl::shared_ptr find(const std::string& destination) const;
+ bool find(const std::string& destination, ConsumerImpl::shared_ptr&) const;
/**
* Get named queue, never returns 0.
diff --git a/cpp/src/qpid/broker/ThresholdAlerts.h b/cpp/src/qpid/broker/ThresholdAlerts.h
index c77722e700..2b4a46b736 100644
--- a/cpp/src/qpid/broker/ThresholdAlerts.h
+++ b/cpp/src/qpid/broker/ThresholdAlerts.h
@@ -50,6 +50,9 @@ class ThresholdAlerts : public QueueObserver
const long repeatInterval);
void enqueued(const QueuedMessage&);
void dequeued(const QueuedMessage&);
+ void acquired(const QueuedMessage&) {};
+ void requeued(const QueuedMessage&) {};
+
static void observe(Queue& queue, qpid::management::ManagementAgent& agent,
const uint64_t countThreshold,
const uint64_t sizeThreshold,
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 015301573e..394749aad2 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -406,11 +406,11 @@ void Connection::shadowSetUser(const std::string& userId) {
void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled, const SequenceNumber& position)
{
- broker::SemanticState::ConsumerImpl& c = semanticState().find(name);
- c.position = position;
- c.setBlocked(blocked);
- if (notifyEnabled) c.enableNotify(); else c.disableNotify();
- updateIn.consumerNumbering.add(c.shared_from_this());
+ broker::SemanticState::ConsumerImpl::shared_ptr c = semanticState().find(name);
+ c->position = position;
+ c->setBlocked(blocked);
+ if (notifyEnabled) c->enableNotify(); else c->disableNotify();
+ updateIn.consumerNumbering.add(c);
}
@@ -444,7 +444,7 @@ void Connection::outputTask(uint16_t channel, const std::string& name) {
if (!session)
throw Exception(QPID_MSG(cluster << " channel not attached " << *this
<< "[" << channel << "] "));
- OutputTask* task = &session->getSemanticState().find(name);
+ OutputTask* task = session->getSemanticState().find(name).get();
connection->getOutputTasks().addOutputTask(task);
}
@@ -547,7 +547,7 @@ void Connection::deliveryRecord(const string& qname,
m.position = position;
if (enqueued) queue->updateEnqueued(m); //inform queue of the message
} else { // Message at original position in original queue
- m = queue->find(position);
+ queue->find(position, m);
}
// FIXME aconway 2011-08-19: removed:
// if (!m.payload)
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp
index 2be1bf1f77..2446c12f2b 100644
--- a/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -421,8 +421,8 @@ void UpdateClient::updateOutputTask(const sys::OutputTask* task) {
boost::polymorphic_downcast<const SemanticState::ConsumerImpl*> (task);
SemanticState::ConsumerImpl* ci = const_cast<SemanticState::ConsumerImpl*>(cci);
uint16_t channel = ci->getParent().getSession().getChannel();
- ClusterConnectionProxy(shadowConnection).outputTask(channel, ci->getName());
- QPID_LOG(debug, *this << " updating output task " << ci->getName()
+ ClusterConnectionProxy(shadowConnection).outputTask(channel, ci->getTag());
+ QPID_LOG(debug, *this << " updating output task " << ci->getTag()
<< " channel=" << channel);
}
@@ -521,13 +521,13 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) {
void UpdateClient::updateConsumer(
const broker::SemanticState::ConsumerImpl::shared_ptr& ci)
{
- QPID_LOG(debug, *this << " updating consumer " << ci->getName() << " on "
+ QPID_LOG(debug, *this << " updating consumer " << ci->getTag() << " on "
<< shadowSession.getId());
using namespace message;
shadowSession.messageSubscribe(
arg::queue = ci->getQueue()->getName(),
- arg::destination = ci->getName(),
+ arg::destination = ci->getTag(),
arg::acceptMode = ci->isAckExpected() ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE,
arg::acquireMode = ci->isAcquire() ? ACQUIRE_MODE_PRE_ACQUIRED : ACQUIRE_MODE_NOT_ACQUIRED,
arg::exclusive = ci->isExclusive(),
@@ -535,18 +535,18 @@ void UpdateClient::updateConsumer(
arg::resumeTtl = ci->getResumeTtl(),
arg::arguments = ci->getArguments()
);
- shadowSession.messageSetFlowMode(ci->getName(), ci->isWindowing() ? FLOW_MODE_WINDOW : FLOW_MODE_CREDIT);
- shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_MESSAGE, ci->getMsgCredit());
- shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_BYTE, ci->getByteCredit());
+ shadowSession.messageSetFlowMode(ci->getTag(), ci->isWindowing() ? FLOW_MODE_WINDOW : FLOW_MODE_CREDIT);
+ shadowSession.messageFlow(ci->getTag(), CREDIT_UNIT_MESSAGE, ci->getMsgCredit());
+ shadowSession.messageFlow(ci->getTag(), CREDIT_UNIT_BYTE, ci->getByteCredit());
ClusterConnectionProxy(shadowSession).consumerState(
- ci->getName(),
+ ci->getTag(),
ci->isBlocked(),
ci->isNotifyEnabled(),
ci->position
);
consumerNumbering.add(ci.get());
- QPID_LOG(debug, *this << " updated consumer " << ci->getName()
+ QPID_LOG(debug, *this << " updated consumer " << ci->getTag()
<< " on " << shadowSession.getId());
}
diff --git a/cpp/src/tests/CMakeLists.txt b/cpp/src/tests/CMakeLists.txt
index cc33478114..7d781e5eb3 100644
--- a/cpp/src/tests/CMakeLists.txt
+++ b/cpp/src/tests/CMakeLists.txt
@@ -272,6 +272,11 @@ add_executable (datagen datagen.cpp ${platform_test_additions})
target_link_libraries (datagen qpidclient)
remember_location(datagen)
+add_executable (msg_group_test msg_group_test.cpp ${platform_test_additions})
+target_link_libraries (msg_group_test qpidmessaging)
+remember_location(msg_group_test)
+
+
# qpid-perftest and qpid-latency-test are generally useful so install them
install (TARGETS qpid-perftest qpid-latency-test RUNTIME
DESTINATION ${QPID_INSTALL_BINDIR})
diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am
index cd569e901c..78ac6db5f1 100644
--- a/cpp/src/tests/Makefile.am
+++ b/cpp/src/tests/Makefile.am
@@ -291,13 +291,19 @@ qpid_stream_INCLUDES=$(PUBLIC_INCLUDES)
qpid_stream_SOURCES=qpid-stream.cpp
qpid_stream_LDADD=$(lib_messaging)
+check_PROGRAMS+=msg_group_test
+msg_group_test_INCLUDES=$(PUBLIC_INCLUDES)
+msg_group_test_SOURCES=msg_group_test.cpp
+msg_group_test_LDADD=$(lib_messaging)
+
TESTS_ENVIRONMENT = \
VALGRIND=$(VALGRIND) \
LIBTOOL="$(LIBTOOL)" \
QPID_DATA_DIR= \
$(srcdir)/run_test
-system_tests = qpid-client-test quick_perftest quick_topictest run_header_test quick_txtest
+system_tests = qpid-client-test quick_perftest quick_topictest run_header_test quick_txtest \
+ run_msg_group_tests
TESTS += start_broker $(system_tests) python_tests stop_broker run_federation_tests run_federation_sys_tests \
run_acl_tests run_cli_tests replication_test dynamic_log_level_test \
run_queue_flow_limit_tests ipv6_test
@@ -342,7 +348,8 @@ EXTRA_DIST += \
start_broker.ps1 \
stop_broker.ps1 \
topictest.ps1 \
- run_queue_flow_limit_tests
+ run_queue_flow_limit_tests \
+ run_msg_group_tests
check_LTLIBRARIES += libdlclose_noop.la
libdlclose_noop_la_LDFLAGS = -module -rpath $(abs_builddir)
@@ -353,7 +360,10 @@ CLEANFILES+=valgrind.out *.log *.vglog* dummy_test qpidd.port $(unit_wrappers)
# Longer running stability tests, not run by default check: target.
# Not run under valgrind, too slow
-LONG_TESTS+=start_broker fanout_perftest shared_perftest multiq_perftest topic_perftest run_ring_queue_test stop_broker \
+LONG_TESTS+=start_broker \
+ fanout_perftest shared_perftest multiq_perftest topic_perftest run_ring_queue_test \
+ run_msg_groups_tests_soak \
+ stop_broker \
run_long_federation_sys_tests \
run_failover_soak reliable_replication_test \
federated_cluster_test_with_node_failure
@@ -366,7 +376,8 @@ EXTRA_DIST+= \
run_failover_soak \
reliable_replication_test \
federated_cluster_test_with_node_failure \
- sasl_test_setup.sh
+ sasl_test_setup.sh \
+ run_msg_groups_tests_soak
check-long:
$(MAKE) check TESTS="$(LONG_TESTS)" VALGRIND=
diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp
index d94a5cab7f..7bf061ff54 100644
--- a/cpp/src/tests/QueueTest.cpp
+++ b/cpp/src/tests/QueueTest.cpp
@@ -56,12 +56,12 @@ class TestConsumer : public virtual Consumer{
public:
typedef boost::shared_ptr<TestConsumer> shared_ptr;
- intrusive_ptr<Message> last;
+ QueuedMessage last;
bool received;
- TestConsumer(bool acquire = true):Consumer(acquire), received(false) {};
+ TestConsumer(std::string name="test", bool acquire = true):Consumer(name, acquire), received(false) {};
virtual bool deliver(QueuedMessage& msg){
- last = msg.payload;
+ last = msg;
received = true;
return true;
};
@@ -149,16 +149,16 @@ QPID_AUTO_TEST_CASE(testConsumers){
queue->deliver(msg1);
BOOST_CHECK(queue->dispatch(c1));
- BOOST_CHECK_EQUAL(msg1.get(), c1->last.get());
+ BOOST_CHECK_EQUAL(msg1.get(), c1->last.payload.get());
queue->deliver(msg2);
BOOST_CHECK(queue->dispatch(c2));
- BOOST_CHECK_EQUAL(msg2.get(), c2->last.get());
+ BOOST_CHECK_EQUAL(msg2.get(), c2->last.payload.get());
c1->received = false;
queue->deliver(msg3);
BOOST_CHECK(queue->dispatch(c1));
- BOOST_CHECK_EQUAL(msg3.get(), c1->last.get());
+ BOOST_CHECK_EQUAL(msg3.get(), c1->last.payload.get());
//Test cancellation:
queue->cancel(c1);
@@ -214,7 +214,7 @@ QPID_AUTO_TEST_CASE(testDequeue){
if (!consumer->received)
sleep(2);
- BOOST_CHECK_EQUAL(msg3.get(), consumer->last.get());
+ BOOST_CHECK_EQUAL(msg3.get(), consumer->last.payload.get());
BOOST_CHECK_EQUAL(uint32_t(0), queue->getMessageCount());
received = queue->get().payload;
@@ -298,14 +298,14 @@ QPID_AUTO_TEST_CASE(testSeek){
queue->deliver(msg2);
queue->deliver(msg3);
- TestConsumer::shared_ptr consumer(new TestConsumer(false));
+ TestConsumer::shared_ptr consumer(new TestConsumer("test", false));
SequenceNumber seq(2);
consumer->position = seq;
QueuedMessage qm;
queue->dispatch(consumer);
- BOOST_CHECK_EQUAL(msg3.get(), consumer->last.get());
+ BOOST_CHECK_EQUAL(msg3.get(), consumer->last.payload.get());
queue->dispatch(consumer);
queue->dispatch(consumer); // make sure over-run is safe
@@ -325,14 +325,18 @@ QPID_AUTO_TEST_CASE(testSearch){
queue->deliver(msg3);
SequenceNumber seq(2);
- QueuedMessage qm = queue->find(seq);
+ QueuedMessage qm;
+ TestConsumer::shared_ptr c1(new TestConsumer());
+
+ BOOST_CHECK(queue->find(seq, qm));
BOOST_CHECK_EQUAL(seq.getValue(), qm.position.getValue());
- queue->acquire(qm);
+ queue->acquire(qm, c1->getName());
BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u);
SequenceNumber seq1(3);
- QueuedMessage qm1 = queue->find(seq1);
+ QueuedMessage qm1;
+ BOOST_CHECK(queue->find(seq1, qm1));
BOOST_CHECK_EQUAL(seq1.getValue(), qm1.position.getValue());
}
@@ -552,12 +556,13 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){
QueuedMessage qmsg2(queue.get(), msg2, ++sequence);
framing::SequenceNumber sequence1(10);
QueuedMessage qmsg3(queue.get(), 0, sequence1);
+ TestConsumer::shared_ptr dummy(new TestConsumer());
- BOOST_CHECK(!queue->acquire(qmsg));
- BOOST_CHECK(queue->acquire(qmsg2));
+ BOOST_CHECK(!queue->acquire(qmsg, dummy->getName()));
+ BOOST_CHECK(queue->acquire(qmsg2, dummy->getName()));
// Acquire the massage again to test failure case.
- BOOST_CHECK(!queue->acquire(qmsg2));
- BOOST_CHECK(!queue->acquire(qmsg3));
+ BOOST_CHECK(!queue->acquire(qmsg2, dummy->getName()));
+ BOOST_CHECK(!queue->acquire(qmsg3, dummy->getName()));
BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u);
@@ -567,7 +572,7 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){
// set mode to no browse and check
args.setOrdering(client::LVQ_NO_BROWSE);
queue->configure(args);
- TestConsumer::shared_ptr c1(new TestConsumer(false));
+ TestConsumer::shared_ptr c1(new TestConsumer("test", false));
queue->dispatch(c1);
queue->dispatch(c1);
@@ -696,6 +701,280 @@ QPID_AUTO_TEST_CASE(testQueueCleaner) {
BOOST_CHECK_EQUAL(queue->getMessageCount(), 0u);
}
+
+namespace {
+ // helper for group tests
+ void verifyAcquire( Queue::shared_ptr queue,
+ TestConsumer::shared_ptr c,
+ std::deque<QueuedMessage>& results,
+ const std::string& expectedGroup,
+ const int expectedId )
+ {
+ queue->dispatch(c);
+ results.push_back(c->last);
+ std::string group = c->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsString("GROUP-ID");
+ int id = c->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID");
+ BOOST_CHECK_EQUAL( group, expectedGroup );
+ BOOST_CHECK_EQUAL( id, expectedId );
+ }
+}
+
+QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) {
+ //
+ // Verify that consumers of grouped messages own the groups once a message is acquired,
+ // and release the groups once all acquired messages have been dequeued or requeued
+ //
+ FieldTable args;
+ Queue::shared_ptr queue(new Queue("my_queue", true));
+ args.setString("qpid.group_header_key", "GROUP-ID");
+ args.setInt("qpid.shared_msg_group", 1);
+ queue->configure(args);
+
+ std::string groups[] = { std::string("a"), std::string("a"), std::string("a"),
+ std::string("b"), std::string("b"), std::string("b"),
+ std::string("c"), std::string("c"), std::string("c") };
+ for (int i = 0; i < 9; ++i) {
+ intrusive_ptr<Message> msg = create_message("e", "A");
+ msg->insertCustomProperty("GROUP-ID", groups[i]);
+ msg->insertCustomProperty("MY-ID", i);
+ queue->deliver(msg);
+ }
+
+ // Queue = a-0, a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8...
+ // Owners= ---, ---, ---, ---, ---, ---, ---, ---, ---,
+
+ BOOST_CHECK_EQUAL(uint32_t(9), queue->getMessageCount());
+
+ TestConsumer::shared_ptr c1(new TestConsumer("C1"));
+ TestConsumer::shared_ptr c2(new TestConsumer("C2"));
+
+ queue->consume(c1);
+ queue->consume(c2);
+
+ std::deque<QueuedMessage> dequeMeC1;
+ std::deque<QueuedMessage> dequeMeC2;
+
+
+ verifyAcquire(queue, c1, dequeMeC1, "a", 0 ); // c1 now owns group "a" (acquire a-0)
+ verifyAcquire(queue, c2, dequeMeC2, "b", 3 ); // c2 should now own group "b" (acquire b-3)
+
+ // now let c1 complete the 'a-0' message - this should free the 'a' group
+ queue->dequeue( 0, dequeMeC1.front() );
+ dequeMeC1.pop_front();
+
+ // Queue = a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8...
+ // Owners= ---, ---, ^C2, ^C2, ^C2, ---, ---, ---
+
+ // now c2 should pick up the next 'a-1', since it is oldest free
+ verifyAcquire(queue, c2, dequeMeC2, "a", 1 ); // c2 should now own groups "a" and "b"
+
+ // Queue = a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8...
+ // Owners= ^C2, ^C2, ^C2, ^C2, ^C2, ---, ---, ---
+
+ // c1 should only be able to snarf up the first "c" message now...
+ verifyAcquire(queue, c1, dequeMeC1, "c", 6 ); // should skip to the first "c"
+
+ // Queue = a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8...
+ // Owners= ^C2, ^C2, ^C2, ^C2, ^C2, ^C1, ^C1, ^C1
+
+ // hmmm... what if c2 now dequeues "b-3"? (now only has a-1 acquired)
+ queue->dequeue( 0, dequeMeC2.front() );
+ dequeMeC2.pop_front();
+
+ // Queue = a-1, a-2, b-4, b-5, c-6, c-7, c-8...
+ // Owners= ^C2, ^C2, ---, ---, ^C1, ^C1, ^C1
+
+ // b group is free, c is owned by c1 - c1's next get should grab 'b-4'
+ verifyAcquire(queue, c1, dequeMeC1, "b", 4 );
+
+ // Queue = a-1, a-2, b-4, b-5, c-6, c-7, c-8...
+ // Owners= ^C2, ^C2, ^C1, ^C1, ^C1, ^C1, ^C1
+
+ // c2 can now only grab a-2, and that's all
+ verifyAcquire(queue, c2, dequeMeC2, "a", 2 );
+
+ // now C2 can't get any more, since C1 owns "b" and "c" group...
+ bool gotOne = queue->dispatch(c2);
+ BOOST_CHECK( !gotOne );
+
+ // hmmm... what if c1 now dequeues "c-6"? (now only own's b-4)
+ queue->dequeue( 0, dequeMeC1.front() );
+ dequeMeC1.pop_front();
+
+ // Queue = a-1, a-2, b-4, b-5, c-7, c-8...
+ // Owners= ^C2, ^C2, ^C1, ^C1, ---, ---
+
+ // c2 can now grab c-7
+ verifyAcquire(queue, c2, dequeMeC2, "c", 7 );
+
+ // Queue = a-1, a-2, b-4, b-5, c-7, c-8...
+ // Owners= ^C2, ^C2, ^C1, ^C1, ^C2, ^C2
+
+ // what happens if C-2 "requeues" a-1 and a-2?
+ queue->requeue( dequeMeC2.front() );
+ dequeMeC2.pop_front();
+ queue->requeue( dequeMeC2.front() );
+ dequeMeC2.pop_front(); // now just has c-7 acquired
+
+ // Queue = a-1, a-2, b-4, b-5, c-7, c-8...
+ // Owners= ---, ---, ^C1, ^C1, ^C2, ^C2
+
+ // now c1 will grab a-1 and a-2...
+ verifyAcquire(queue, c1, dequeMeC1, "a", 1 );
+ verifyAcquire(queue, c1, dequeMeC1, "a", 2 );
+
+ // Queue = a-1, a-2, b-4, b-5, c-7, c-8...
+ // Owners= ^C1, ^C1, ^C1, ^C1, ^C2, ^C2
+
+ // c2 can now acquire c-8 only
+ verifyAcquire(queue, c2, dequeMeC2, "c", 8 );
+
+ // and c1 can get b-5
+ verifyAcquire(queue, c1, dequeMeC1, "b", 5 );
+
+ // should be no more acquire-able for anyone now:
+ gotOne = queue->dispatch(c1);
+ BOOST_CHECK( !gotOne );
+ gotOne = queue->dispatch(c2);
+ BOOST_CHECK( !gotOne );
+
+ // requeue all of C1's acquired messages, then cancel C1
+ while (!dequeMeC1.empty()) {
+ queue->requeue(dequeMeC1.front());
+ dequeMeC1.pop_front();
+ }
+ queue->cancel(c1);
+
+ // Queue = a-1, a-2, b-4, b-5, c-7, c-8...
+ // Owners= ---, ---, ---, ---, ^C2, ^C2
+
+ // b-4, a-1, a-2, b-5 all should be available, right?
+ verifyAcquire(queue, c2, dequeMeC2, "a", 1 );
+
+ while (!dequeMeC2.empty()) {
+ queue->dequeue(0, dequeMeC2.front());
+ dequeMeC2.pop_front();
+ }
+
+ // Queue = a-2, b-4, b-5
+ // Owners= ---, ---, ---
+
+ TestConsumer::shared_ptr c3(new TestConsumer("C3"));
+ std::deque<QueuedMessage> dequeMeC3;
+
+ verifyAcquire(queue, c3, dequeMeC3, "a", 2 );
+ verifyAcquire(queue, c2, dequeMeC2, "b", 4 );
+
+ // Queue = a-2, b-4, b-5
+ // Owners= ^C3, ^C2, ^C2
+
+ gotOne = queue->dispatch(c3);
+ BOOST_CHECK( !gotOne );
+
+ verifyAcquire(queue, c2, dequeMeC2, "b", 5 );
+
+ while (!dequeMeC2.empty()) {
+ queue->dequeue(0, dequeMeC2.front());
+ dequeMeC2.pop_front();
+ }
+
+ // Queue = a-2,
+ // Owners= ^C3,
+
+ intrusive_ptr<Message> msg = create_message("e", "A");
+ msg->insertCustomProperty("GROUP-ID", "a");
+ msg->insertCustomProperty("MY-ID", 9);
+ queue->deliver(msg);
+
+ // Queue = a-2, a-9
+ // Owners= ^C3, ^C3
+
+ gotOne = queue->dispatch(c2);
+ BOOST_CHECK( !gotOne );
+
+ msg = create_message("e", "A");
+ msg->insertCustomProperty("GROUP-ID", "b");
+ msg->insertCustomProperty("MY-ID", 10);
+ queue->deliver(msg);
+
+ // Queue = a-2, a-9, b-10
+ // Owners= ^C3, ^C3, ----
+
+ verifyAcquire(queue, c2, dequeMeC2, "b", 10 );
+ verifyAcquire(queue, c3, dequeMeC3, "a", 9 );
+
+ gotOne = queue->dispatch(c3);
+ BOOST_CHECK( !gotOne );
+
+ queue->cancel(c2);
+ queue->cancel(c3);
+}
+
+
+QPID_AUTO_TEST_CASE(testGroupsMultiConsumerDefaults) {
+ //
+ // Verify that the same default group name is automatically applied to messages that
+ // do not specify a group name.
+ //
+ FieldTable args;
+ Queue::shared_ptr queue(new Queue("my_queue", true));
+ args.setString("qpid.group_header_key", "GROUP-ID");
+ args.setInt("qpid.shared_msg_group", 1);
+ queue->configure(args);
+
+ for (int i = 0; i < 3; ++i) {
+ intrusive_ptr<Message> msg = create_message("e", "A");
+ // no "GROUP-ID" header
+ msg->insertCustomProperty("MY-ID", i);
+ queue->deliver(msg);
+ }
+
+ // Queue = 0, 1, 2
+
+ BOOST_CHECK_EQUAL(uint32_t(3), queue->getMessageCount());
+
+ TestConsumer::shared_ptr c1(new TestConsumer("C1"));
+ TestConsumer::shared_ptr c2(new TestConsumer("C2"));
+
+ queue->consume(c1);
+ queue->consume(c2);
+
+ std::deque<QueuedMessage> dequeMeC1;
+ std::deque<QueuedMessage> dequeMeC2;
+
+ queue->dispatch(c1); // c1 now owns default group (acquired 0)
+ dequeMeC1.push_back(c1->last);
+ int id = c1->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID");
+ BOOST_CHECK_EQUAL( id, 0 );
+
+ bool gotOne = queue->dispatch(c2); // c2 should get nothing
+ BOOST_CHECK( !gotOne );
+
+ queue->dispatch(c1); // c1 now acquires 1
+ dequeMeC1.push_back(c1->last);
+ id = c1->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID");
+ BOOST_CHECK_EQUAL( id, 1 );
+
+ gotOne = queue->dispatch(c2); // c2 should still get nothing
+ BOOST_CHECK( !gotOne );
+
+ while (!dequeMeC1.empty()) {
+ queue->dequeue(0, dequeMeC1.front());
+ dequeMeC1.pop_front();
+ }
+
+ // now default group should be available...
+ queue->dispatch(c2); // c2 now owns default group (acquired 2)
+ id = c2->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID");
+ BOOST_CHECK_EQUAL( id, 2 );
+
+ gotOne = queue->dispatch(c1); // c1 should get nothing
+ BOOST_CHECK( !gotOne );
+
+ queue->cancel(c1);
+ queue->cancel(c2);
+}
+
QPID_AUTO_TEST_CASE(testMultiQueueLastNode){
TestMessageStoreOC testStore;
diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py
index 4e3339650b..d217f9fbde 100755
--- a/cpp/src/tests/cluster_tests.py
+++ b/cpp/src/tests/cluster_tests.py
@@ -1418,6 +1418,76 @@ class LongTests(BrokerTest):
if receiver: receiver.connection.detach()
logger.setLevel(log_level)
+ def test_msg_group_failover(self):
+ """Test fail-over during continuous send-receive of grouped messages.
+ """
+
+ class GroupedTrafficGenerator(Thread):
+ def __init__(self, url, queue, group_key):
+ Thread.__init__(self)
+ self.url = url
+ self.queue = queue
+ self.group_key = group_key
+ self.status = -1
+
+ def run(self):
+ # generate traffic for approx 10 seconds (2011msgs / 200 per-sec)
+ cmd = ["msg_group_test",
+ "--broker=%s" % self.url,
+ "--address=%s" % self.queue,
+ "--connection-options={%s}" % (Cluster.CONNECTION_OPTIONS),
+ "--group-key=%s" % self.group_key,
+ "--receivers=2",
+ "--senders=3",
+ "--messages=2011",
+ "--send-rate=200",
+ "--capacity=11",
+ "--ack-frequency=23",
+ "--allow-duplicates",
+ "--group-size=37",
+ "--randomize-group-size",
+ "--interleave=13"]
+ # "--trace"]
+ self.generator = Popen( cmd );
+ self.status = self.generator.wait()
+ return self.status
+
+ def results(self):
+ self.join(timeout=30) # 3x assumed duration
+ if self.isAlive(): return -1
+ return self.status
+
+ # Original cluster will all be killed so expect exit with failure
+ cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL, args=["-t"])
+ for b in cluster: b.ready() # Wait for brokers to be ready
+
+ # create a queue with rather draconian flow control settings
+ ssn0 = cluster[0].connect().session()
+ q_args = "{'qpid.group_header_key':'group-id', 'qpid.shared_msg_group':1}"
+ s0 = ssn0.sender("test-group-q; {create:always, node:{type:queue, x-declare:{arguments:%s}}}" % q_args)
+
+ # Kill original brokers, start new ones for the duration.
+ endtime = time.time() + self.duration();
+ i = 0
+ while time.time() < endtime:
+ traffic = GroupedTrafficGenerator( cluster[i].host_port(),
+ "test-group-q", "group-id" )
+ traffic.start()
+ time.sleep(1)
+
+ for x in range(2):
+ for b in cluster[i:]: b.ready() # Check if any broker crashed.
+ cluster[i].kill()
+ i += 1
+ b = cluster.start(expect=EXPECT_EXIT_FAIL)
+ time.sleep(1)
+
+ # wait for traffic to finish, verify success
+ self.assertEqual(0, traffic.results())
+
+ for i in range(i, len(cluster)): cluster[i].kill()
+
+
class StoreTests(BrokerTest):
"""
Cluster tests that can only be run if there is a store available.
diff --git a/cpp/src/tests/msg_group_test.cpp b/cpp/src/tests/msg_group_test.cpp
new file mode 100644
index 0000000000..6b9d09b89a
--- /dev/null
+++ b/cpp/src/tests/msg_group_test.cpp
@@ -0,0 +1,618 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/messaging/Address.h>
+#include <qpid/messaging/Connection.h>
+#include <qpid/messaging/Receiver.h>
+#include <qpid/messaging/Sender.h>
+#include <qpid/messaging/Session.h>
+#include <qpid/messaging/Message.h>
+#include <qpid/messaging/FailoverUpdates.h>
+#include <qpid/Options.h>
+#include <qpid/log/Logger.h>
+#include <qpid/log/Options.h>
+#include "qpid/log/Statement.h"
+#include "qpid/sys/Time.h"
+#include "qpid/sys/Runnable.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/sys/SystemInfo.h"
+
+#include <iostream>
+#include <memory>
+#include <stdlib.h>
+
+using namespace qpid::messaging;
+using namespace qpid::types;
+using namespace std;
+
+namespace qpid {
+namespace tests {
+
+struct Options : public qpid::Options
+{
+ bool help;
+ std::string url;
+ std::string address;
+ std::string connectionOptions;
+ uint messages;
+ uint capacity;
+ uint ackFrequency;
+ bool failoverUpdates;
+ qpid::log::Options log;
+ uint senders;
+ uint receivers;
+ uint groupSize;
+ bool printReport;
+ std::string groupKey;
+ bool durable;
+ bool allowDuplicates;
+ bool randomizeSize;
+ bool stickyConsumer;
+ uint timeout;
+ uint interleave;
+ std::string prefix;
+ uint sendRate;
+
+ Options(const std::string& argv0=std::string())
+ : qpid::Options("Options"),
+ help(false),
+ url("amqp:tcp:127.0.0.1"),
+ messages(10000),
+ capacity(1000),
+ ackFrequency(100),
+ failoverUpdates(false),
+ log(argv0),
+ senders(2),
+ receivers(2),
+ groupSize(10),
+ printReport(false),
+ groupKey("qpid.no_group"),
+ durable(false),
+ allowDuplicates(false),
+ randomizeSize(false),
+ stickyConsumer(false),
+ timeout(10),
+ interleave(1),
+ sendRate(0)
+ {
+ addOptions()
+ ("ack-frequency", qpid::optValue(ackFrequency, "N"), "Ack frequency (0 implies none of the messages will get accepted)")
+ ("address,a", qpid::optValue(address, "ADDRESS"), "address to send and receive from")
+ ("allow-duplicates", qpid::optValue(allowDuplicates), "Ignore the delivery of duplicated messages")
+ ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to")
+ ("capacity", qpid::optValue(capacity, "N"), "Pre-fetch window (0 implies no pre-fetch)")
+ ("connection-options", qpid::optValue(connectionOptions, "OPTIONS"), "options for the connection")
+ ("durable", qpid::optValue(durable, "yes|no"), "Mark messages as durable.")
+ ("failover-updates", qpid::optValue(failoverUpdates), "Listen for membership updates distributed via amq.failover")
+ ("group-key", qpid::optValue(groupKey, "KEY"), "Key of the message header containing the group identifier.")
+ ("group-prefix", qpid::optValue(prefix, "STRING"), "Add 'prefix' to the start of all generated group identifiers.")
+ ("group-size", qpid::optValue(groupSize, "N"), "Number of messages per a group.")
+ ("interleave", qpid::optValue(interleave, "N"), "Simultaineously interleave messages from N different groups.")
+ ("messages,m", qpid::optValue(messages, "N"), "Number of messages to send per each sender.")
+ ("receivers,r", qpid::optValue(receivers, "N"), "Number of message consumers.")
+ ("randomize-group-size", qpid::optValue(randomizeSize), "Randomize the number of messages per group to [1...group-size].")
+ ("send-rate", qpid::optValue(sendRate,"N"), "Send at rate of N messages/second. 0 means send as fast as possible.")
+ ("senders,s", qpid::optValue(senders, "N"), "Number of message producers.")
+ ("sticky-consumers", qpid::optValue(stickyConsumer), "If set, verify that all messages in a group are consumed by the same client [TBD].")
+ ("timeout", qpid::optValue(timeout, "N"), "Fail with a stall error should all consumers remain idle for timeout seconds.")
+ ("print-report", qpid::optValue(printReport), "Dump message group statistics to stdout.")
+ ("help", qpid::optValue(help), "print this usage statement");
+ add(log);
+ //("check-redelivered", qpid::optValue(checkRedelivered), "Fails with exception if a duplicate is not marked as redelivered (only relevant when ignore-duplicates is selected)")
+ //("tx", qpid::optValue(tx, "N"), "batch size for transactions (0 implies transaction are not used)")
+ //("rollback-frequency", qpid::optValue(rollbackFrequency, "N"), "rollback frequency (0 implies no transaction will be rolledback)")
+ }
+
+ bool parse(int argc, char** argv)
+ {
+ try {
+ qpid::Options::parse(argc, argv);
+ if (address.empty()) throw qpid::Exception("Address must be specified!");
+ qpid::log::Logger::instance().configure(log);
+ if (help) {
+ std::ostringstream msg;
+ std::cout << msg << *this << std::endl << std::endl
+ << "Verifies the behavior of grouped messages." << std::endl;
+ return false;
+ } else {
+ return true;
+ }
+ } catch (const std::exception& e) {
+ std::cerr << *this << std::endl << std::endl << e.what() << std::endl;
+ return false;
+ }
+ }
+};
+
+const string EOS("eos");
+const string SN("sn");
+
+
+// class that monitors group state across all publishers and consumers. tracks the next
+// expected sequence for each group, and total messages consumed.
+class GroupChecker
+{
+ qpid::sys::Mutex lock;
+
+ const uint totalMsgs;
+ uint totalMsgsConsumed;
+ uint totalMsgsPublished;
+ bool allowDuplicates;
+ uint duplicateMsgs;
+
+ typedef std::map<std::string, uint> SequenceMap;
+ SequenceMap sequenceMap;
+
+ // Statistics - for each group, store the names of all clients that consumed messages
+ // from that group, and the number of messages consumed per client.
+ typedef std::map<std::string, uint> ClientCounter;
+ typedef std::map<std::string, ClientCounter> GroupStatistics;
+ GroupStatistics statistics;
+
+public:
+
+ GroupChecker( uint t, bool d ) :
+ totalMsgs(t), totalMsgsConsumed(0), totalMsgsPublished(0), allowDuplicates(d),
+ duplicateMsgs(0) {}
+
+ bool checkSequence( const std::string& groupId,
+ uint sequence, const std::string& client )
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+
+ QPID_LOG(debug, "Client " << client << " has received " << groupId << ":" << sequence);
+
+ GroupStatistics::iterator gs = statistics.find(groupId);
+ if (gs == statistics.end()) {
+ statistics[groupId][client] = 1;
+ } else {
+ gs->second[client]++;
+ }
+ // now verify
+ SequenceMap::iterator s = sequenceMap.find(groupId);
+ if (s == sequenceMap.end()) {
+ QPID_LOG(debug, "Client " << client << " thinks this is the first message from group " << groupId << ":" << sequence);
+ // if duplication allowed, it is possible that the last msg(s) of an old sequence are redelivered on reconnect.
+ // in this case, set the sequence from the first msg.
+ sequenceMap[groupId] = (allowDuplicates) ? sequence : 0;
+ s = sequenceMap.find(groupId);
+ } else if (sequence < s->second) {
+ duplicateMsgs++;
+ QPID_LOG(debug, "Client " << client << " thinks this message is a duplicate! " << groupId << ":" << sequence);
+ return allowDuplicates;
+ }
+ totalMsgsConsumed++;
+ return sequence == s->second++;
+ }
+
+ void sendingSequence( const std::string& groupId,
+ uint sequence, bool eos,
+ const std::string& client )
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ ++totalMsgsPublished;
+
+ QPID_LOG(debug, "Client " << client << " sending " << groupId << ":" << sequence <<
+ ((eos) ? " (last)" : ""));
+ }
+
+ bool eraseGroup( const std::string& groupId, const std::string& name )
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ QPID_LOG(debug, "Deleting group " << groupId << " (by client " << name << ")");
+ return sequenceMap.erase( groupId ) == 1;
+ }
+
+ uint getNextExpectedSequence( const std::string& groupId )
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ return sequenceMap[groupId];
+ }
+
+ bool allMsgsConsumed() // true when done processing msgs
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ return (totalMsgsPublished >= totalMsgs) &&
+ (totalMsgsConsumed >= totalMsgsPublished) &&
+ sequenceMap.size() == 0;
+ }
+
+ uint getConsumedTotal()
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ return totalMsgsConsumed;
+ }
+
+ uint getPublishedTotal()
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ return totalMsgsPublished;
+ }
+
+ ostream& print(ostream& out)
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ out << "Total Published: " << totalMsgsPublished << ", Total Consumed: " << totalMsgsConsumed <<
+ ", Duplicates detected: " << duplicateMsgs << std::endl;
+ out << "Total Groups: " << statistics.size() << std::endl;
+ unsigned long consumers = 0;
+ for (GroupStatistics::iterator gs = statistics.begin(); gs != statistics.end(); ++gs) {
+ out << " GroupId: " << gs->first;
+ consumers += gs->second.size(); // # of consumers that processed this group
+ if (gs->second.size() == 1)
+ out << " completely consumed by a single client." << std::endl;
+ else
+ out << " consumed by " << gs->second.size() << " different clients." << std::endl;
+
+ for (ClientCounter::iterator cc = gs->second.begin(); cc != gs->second.end(); ++cc) {
+ out << " Client: " << cc->first << " consumed " << cc->second << " messages from the group." << std::endl;
+ }
+ }
+ out << "Average # of consumers per group: " << ((statistics.size() != 0) ? (double(consumers)/statistics.size()) : 0) << std::endl;
+ return out;
+ }
+};
+
+
+namespace {
+ // rand() is not thread safe. Create a singleton obj to hold a lock while calling
+ // rand() so it can be called safely by multiple concurrent clients.
+ class Randomizer {
+ qpid::sys::Mutex lock;
+ public:
+ uint operator()(uint max) {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ return (rand() % max) + 1;
+ }
+ };
+
+ static Randomizer randomizer;
+}
+
+
+// tag each generated message with a group identifer
+//
+class GroupGenerator {
+
+ const std::string groupPrefix;
+ const uint groupSize;
+ const bool randomizeSize;
+ const uint interleave;
+
+ uint groupSuffix;
+ uint total;
+
+ struct GroupState {
+ std::string id;
+ const uint size;
+ uint count;
+ GroupState( const std::string& i, const uint s )
+ : id(i), size(s), count(0) {}
+ };
+ typedef std::list<GroupState> GroupList;
+ GroupList groups;
+ GroupList::iterator current;
+
+ // add a new group identifier to the list
+ void newGroup() {
+ std::ostringstream groupId(groupPrefix, ios_base::out|ios_base::ate);
+ groupId << std::string(":") << groupSuffix++;
+ uint size = (randomizeSize) ? randomizer(groupSize) : groupSize;
+ QPID_LOG(trace, "New group: GROUPID=[" << groupId.str() << "] size=" << size << " this=" << this);
+ GroupState group( groupId.str(), size );
+ groups.push_back( group );
+ }
+
+public:
+ GroupGenerator( const std::string& prefix,
+ const uint t,
+ const uint size,
+ const bool randomize,
+ const uint i)
+ : groupPrefix(prefix), groupSize(size),
+ randomizeSize(randomize), interleave(i), groupSuffix(0), total(t)
+ {
+ QPID_LOG(trace, "New group generator: PREFIX=[" << prefix << "] total=" << total << " size=" << size << " rand=" << randomize << " interleave=" << interleave << " this=" << this);
+ for (uint i = 0; i < 1 || i < interleave; ++i) {
+ newGroup();
+ }
+ current = groups.begin();
+ }
+
+ bool genGroup(std::string& groupId, uint& seq, bool& eos)
+ {
+ if (!total) return false;
+ --total;
+ if (current == groups.end())
+ current = groups.begin();
+ groupId = current->id;
+ seq = current->count++;
+ if (current->count == current->size) {
+ QPID_LOG(trace, "Last msg for " << current->id << ", " << current->count << " this=" << this);
+ eos = true;
+ if (total >= interleave) { // need a new group to replace this one
+ newGroup();
+ groups.erase(current++);
+ } else ++current;
+ } else {
+ ++current;
+ eos = total < interleave; // mark eos on the last message of each group
+ }
+ QPID_LOG(trace, "SENDING GROUPID=[" << groupId << "] seq=" << seq << " eos=" << eos << " this=" << this);
+ return true;
+ }
+};
+
+
+
+class Client : public qpid::sys::Runnable
+{
+public:
+ typedef boost::shared_ptr<Client> shared_ptr;
+ enum State {ACTIVE, DONE, FAILURE};
+ Client( const std::string& n, const Options& o ) : name(n), opts(o), state(ACTIVE), stopped(false) {}
+ virtual ~Client() {}
+ State getState() { return state; }
+ void testFailed( const std::string& reason ) { state = FAILURE; error << "Client '" << name << "' failed: " << reason; }
+ void clientDone() { if (state == ACTIVE) state = DONE; }
+ qpid::sys::Thread& getThread() { return thread; }
+ const std::string getErrorMsg() { return error.str(); }
+ void stop() {stopped = true;}
+ const std::string& getName() { return name; }
+
+protected:
+ const std::string name;
+ const Options& opts;
+ qpid::sys::Thread thread;
+ ostringstream error;
+ State state;
+ bool stopped;
+};
+
+
+class Consumer : public Client
+{
+ GroupChecker& checker;
+
+public:
+ Consumer(const std::string& n, const Options& o, GroupChecker& c ) : Client(n, o), checker(c) {};
+ virtual ~Consumer() {};
+
+ void run()
+ {
+ Connection connection;
+ try {
+ connection = Connection(opts.url, opts.connectionOptions);
+ connection.open();
+ std::auto_ptr<FailoverUpdates> updates(opts.failoverUpdates ? new FailoverUpdates(connection) : 0);
+ Session session = connection.createSession();
+ Receiver receiver = session.createReceiver(opts.address);
+ receiver.setCapacity(opts.capacity);
+ Message msg;
+ uint count = 0;
+
+ while (!stopped) {
+ if (receiver.fetch(msg, Duration::SECOND)) { // msg retrieved
+ qpid::types::Variant::Map& properties = msg.getProperties();
+ std::string groupId = properties[opts.groupKey];
+ uint groupSeq = properties[SN];
+ bool eof = properties[EOS];
+
+ QPID_LOG(trace, "RECVING GROUPID=[" << groupId << "] seq=" << groupSeq << " eos=" << eof << " name=" << name);
+
+ qpid::sys::usleep(10);
+
+ if (!checker.checkSequence( groupId, groupSeq, name )) {
+ ostringstream msg;
+ msg << "Check sequence failed. Group=" << groupId << " rcvd seq=" << groupSeq << " expected=" << checker.getNextExpectedSequence( groupId );
+ testFailed( msg.str() );
+ break;
+ } else if (eof) {
+ if (!checker.eraseGroup( groupId, name )) {
+ ostringstream msg;
+ msg << "Erase group failed. Group=" << groupId << " rcvd seq=" << groupSeq;
+ testFailed( msg.str() );
+ break;
+ }
+ }
+
+ ++count;
+ if (opts.ackFrequency && (count % opts.ackFrequency == 0)) {
+ session.acknowledge();
+ }
+ // Clear out message properties & content for next iteration.
+ msg = Message(); // TODO aconway 2010-12-01: should be done by fetch
+ } else if (checker.allMsgsConsumed()) // timed out, nothing else to do?
+ break;
+ }
+ session.acknowledge();
+ session.close();
+ connection.close();
+ } catch(const std::exception& error) {
+ ostringstream msg;
+ msg << "consumer error: " << error.what();
+ testFailed( msg.str() );
+ connection.close();
+ }
+ clientDone();
+ QPID_LOG(trace, "Consuming client " << name << " completed.");
+ }
+};
+
+
+
+class Producer : public Client
+{
+ GroupChecker& checker;
+ GroupGenerator generator;
+
+public:
+ Producer(const std::string& n, const Options& o, GroupChecker& c)
+ : Client(n, o), checker(c),
+ generator( n, o.messages, o.groupSize, o.randomizeSize, o.interleave )
+ {};
+ virtual ~Producer() {};
+
+ void run()
+ {
+ Connection connection;
+ try {
+ connection = Connection(opts.url, opts.connectionOptions);
+ connection.open();
+ std::auto_ptr<FailoverUpdates> updates(opts.failoverUpdates ? new FailoverUpdates(connection) : 0);
+ Session session = connection.createSession();
+ Sender sender = session.createSender(opts.address);
+ if (opts.capacity) sender.setCapacity(opts.capacity);
+ Message msg;
+ msg.setDurable(opts.durable);
+ std::string groupId;
+ uint seq;
+ bool eos;
+ uint sent = 0;
+
+ qpid::sys::AbsTime start = qpid::sys::now();
+ int64_t interval = 0;
+ if (opts.sendRate) interval = qpid::sys::TIME_SEC/opts.sendRate;
+
+ while (!stopped && generator.genGroup(groupId, seq, eos)) {
+ msg.getProperties()[opts.groupKey] = groupId;
+ msg.getProperties()[SN] = seq;
+ msg.getProperties()[EOS] = eos;
+ checker.sendingSequence( groupId, seq, eos, name );
+
+ sender.send(msg);
+ ++sent;
+
+ if (opts.sendRate) {
+ qpid::sys::AbsTime waitTill(start, sent*interval);
+ int64_t delay = qpid::sys::Duration(qpid::sys::now(), waitTill);
+ if (delay > 0) qpid::sys::usleep(delay/qpid::sys::TIME_USEC);
+ }
+ }
+ session.sync();
+ session.close();
+ connection.close();
+ } catch(const std::exception& error) {
+ ostringstream msg;
+ msg << "producer '" << name << "' error: " << error.what();
+ testFailed(msg.str());
+ connection.close();
+ }
+ clientDone();
+ QPID_LOG(trace, "Producing client " << name << " completed.");
+ }
+};
+
+
+}} // namespace qpid::tests
+
+using namespace qpid::tests;
+
+int main(int argc, char ** argv)
+{
+ int status = 0;
+ try {
+ Options opts;
+ if (opts.parse(argc, argv)) {
+
+ GroupChecker state( opts.senders * opts.messages,
+ opts.allowDuplicates);
+ std::vector<Client::shared_ptr> clients;
+
+ if (opts.randomizeSize) srand((unsigned int)qpid::sys::SystemInfo::getProcessId());
+
+ // fire off the producers && consumers
+ for (size_t j = 0; j < opts.senders; ++j) {
+ ostringstream name;
+ name << opts.prefix << "P_" << j;
+ clients.push_back(Client::shared_ptr(new Producer( name.str(), opts, state )));
+ clients.back()->getThread() = qpid::sys::Thread(*clients.back());
+ }
+ for (size_t j = 0; j < opts.receivers; ++j) {
+ ostringstream name;
+ name << opts.prefix << "C_" << j;
+ clients.push_back(Client::shared_ptr(new Consumer( name.str(), opts, state )));
+ clients.back()->getThread() = qpid::sys::Thread(*clients.back());
+ }
+
+ // wait for all pubs/subs to finish.... or for consumers to fail or stall.
+ uint stalledTime = 0;
+ bool done;
+ bool clientFailed = false;
+ do {
+ uint lastCount = state.getConsumedTotal();
+ qpid::sys::usleep( 1000000 );
+
+ // check each client for status
+ done = true;
+ for (std::vector<Client::shared_ptr>::iterator i = clients.begin();
+ i != clients.end(); ++i) {
+ QPID_LOG(debug, "Client " << (*i)->getName() << " state=" << (*i)->getState());
+ if ((*i)->getState() == Client::FAILURE) {
+ QPID_LOG(error, argv[0] << ": test failed with client error: " << (*i)->getErrorMsg());
+ clientFailed = true;
+ done = true;
+ break; // exit test.
+ } else if ((*i)->getState() != Client::DONE) {
+ done = false;
+ }
+ }
+
+ if (!done) {
+ // check that consumers are still receiving messages
+ if (lastCount == state.getConsumedTotal())
+ stalledTime++;
+ else {
+ lastCount = state.getConsumedTotal();
+ stalledTime = 0;
+ }
+ }
+
+ QPID_LOG(debug, "Consumed to date = " << state.getConsumedTotal() <<
+ " Published to date = " << state.getPublishedTotal() <<
+ " total=" << opts.senders * opts.messages );
+
+ } while (!done && stalledTime < opts.timeout);
+
+ if (clientFailed) {
+ status = 1;
+ } else if (stalledTime >= opts.timeout) {
+ QPID_LOG(error, argv[0] << ": test failed due to stalled consumer." );
+ status = 2;
+ }
+
+ // Wait for started threads.
+ for (std::vector<Client::shared_ptr>::iterator i = clients.begin();
+ i != clients.end(); ++i) {
+ (*i)->stop();
+ (*i)->getThread().join();
+ }
+
+ if (opts.printReport && !status) state.print(std::cout);
+ } else status = 4;
+ } catch(const std::exception& error) {
+ QPID_LOG(error, argv[0] << ": " << error.what());
+ status = 3;
+ }
+ QPID_LOG(trace, "TEST DONE [" << status << "]");
+
+ return status;
+}
diff --git a/cpp/src/tests/qpid-send.cpp b/cpp/src/tests/qpid-send.cpp
index ef5e98e2a0..b1213a484f 100644
--- a/cpp/src/tests/qpid-send.cpp
+++ b/cpp/src/tests/qpid-send.cpp
@@ -28,6 +28,7 @@
#include <qpid/messaging/FailoverUpdates.h>
#include <qpid/sys/Time.h>
#include <qpid/sys/Monitor.h>
+#include <qpid/sys/SystemInfo.h>
#include "TestOptions.h"
#include "Statistics.h"
@@ -76,6 +77,11 @@ struct Options : public qpid::Options
uint flowControl;
bool sequence;
bool timestamp;
+ std::string groupKey;
+ std::string groupPrefix;
+ uint groupSize;
+ bool groupRandSize;
+ uint groupInterleave;
Options(const std::string& argv0=std::string())
: qpid::Options("Options"),
@@ -100,7 +106,11 @@ struct Options : public qpid::Options
sendRate(0),
flowControl(0),
sequence(true),
- timestamp(true)
+ timestamp(true),
+ groupPrefix("GROUP-"),
+ groupSize(10),
+ groupRandSize(false),
+ groupInterleave(1)
{
addOptions()
("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to")
@@ -111,8 +121,8 @@ struct Options : public qpid::Options
("reply-to", qpid::optValue(replyto, "REPLY-TO"), "specify reply-to address")
("send-eos", qpid::optValue(sendEos, "N"), "Send N EOS messages to mark end of input")
("durable", qpid::optValue(durable, "yes|no"), "Mark messages as durable.")
- ("ttl", qpid::optValue(ttl, "msecs"), "Time-to-live for messages, in milliseconds")
- ("priority", qpid::optValue(priority, "PRIORITY"), "Priority for messages (higher value implies higher priority)")
+ ("ttl", qpid::optValue(ttl, "msecs"), "Time-to-live for messages, in milliseconds")
+ ("priority", qpid::optValue(priority, "PRIORITY"), "Priority for messages (higher value implies higher priority)")
("property,P", qpid::optValue(properties, "NAME=VALUE"), "specify message property")
("correlation-id", qpid::optValue(correlationid, "ID"), "correlation-id for message")
("user-id", qpid::optValue(userid, "USERID"), "userid for message")
@@ -131,6 +141,11 @@ struct Options : public qpid::Options
("flow-control", qpid::optValue(flowControl,"N"), "Do end to end flow control to limit queue depth to 2*N. 0 means no flow control.")
("sequence", qpid::optValue(sequence, "yes|no"), "Add a sequence number messages property (required for duplicate/lost message detection)")
("timestamp", qpid::optValue(timestamp, "yes|no"), "Add a time stamp messages property (required for latency measurement)")
+ ("group-key", qpid::optValue(groupKey, "KEY"), "Generate groups of messages using message header 'KEY' to hold the group identifier")
+ ("group-prefix", qpid::optValue(groupPrefix, "STRING"), "Generate group identifers with 'STRING' prefix (if group-key specified)")
+ ("group-size", qpid::optValue(groupSize, "N"), "Number of messages per a group (if group-key specified)")
+ ("group-randomize-size", qpid::optValue(groupRandSize), "Randomize the number of messages per group to [1...group-size] (if group-key specified)")
+ ("group-interleave", qpid::optValue(groupInterleave, "N"), "Simultaineously interleave messages from N different groups (if group-key specified)")
("help", qpid::optValue(help), "print this usage statement");
add(log);
}
@@ -252,6 +267,68 @@ class MapContentGenerator : public ContentGenerator {
const Options& opts;
};
+// tag each generated message with a group identifer
+//
+class GroupGenerator {
+public:
+ GroupGenerator(const std::string& key,
+ const std::string& prefix,
+ const uint size,
+ const bool randomize,
+ const uint interleave)
+ : groupKey(key), groupPrefix(prefix), groupSize(size),
+ randomizeSize(randomize), groupSuffix(0)
+ {
+ if (randomize) srand((unsigned int)qpid::sys::SystemInfo::getProcessId());
+
+ for (uint i = 0; i < 1 || i < interleave; ++i) {
+ newGroup();
+ }
+ current = groups.begin();
+ }
+
+ void setGroupInfo(Message &msg)
+ {
+ if (current == groups.end())
+ current = groups.begin();
+ msg.getProperties()[groupKey] = current->id;
+ // std::cout << "SENDING GROUPID=[" << current->id << "]" << std::endl;
+ if (++(current->count) == current->size) {
+ newGroup();
+ groups.erase(current++);
+ } else
+ ++current;
+ }
+
+ private:
+ const std::string& groupKey;
+ const std::string& groupPrefix;
+ const uint groupSize;
+ const bool randomizeSize;
+
+ uint groupSuffix;
+
+ struct GroupState {
+ std::string id;
+ const uint size;
+ uint count;
+ GroupState( const std::string& i, const uint s )
+ : id(i), size(s), count(0) {}
+ };
+ typedef std::list<GroupState> GroupList;
+ GroupList groups;
+ GroupList::iterator current;
+
+ void newGroup() {
+ std::ostringstream groupId(groupPrefix, ios_base::out|ios_base::ate);
+ groupId << groupSuffix++;
+ uint size = (randomizeSize) ? (rand() % groupSize) + 1 : groupSize;
+ // std::cout << "New group: GROUPID=[" << groupId.str() << "] size=" << size << std::endl;
+ GroupState group( groupId.str(), size );
+ groups.push_back( group );
+ }
+};
+
int main(int argc, char ** argv)
{
Connection connection;
@@ -296,6 +373,14 @@ int main(int argc, char ** argv)
else
contentGen.reset(new FixedContentGenerator(opts.contentString));
+ std::auto_ptr<GroupGenerator> groupGen;
+ if (!opts.groupKey.empty())
+ groupGen.reset(new GroupGenerator(opts.groupKey,
+ opts.groupPrefix,
+ opts.groupSize,
+ opts.groupRandSize,
+ opts.groupInterleave));
+
qpid::sys::AbsTime start = qpid::sys::now();
int64_t interval = 0;
if (opts.sendRate) interval = qpid::sys::TIME_SEC/opts.sendRate;
@@ -312,9 +397,6 @@ int main(int argc, char ** argv)
++sent;
if (opts.sequence)
msg.getProperties()[SN] = sent;
- if (opts.timestamp)
- msg.getProperties()[TS] = int64_t(
- qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()));
if (opts.flowControl) {
if ((sent % opts.flowControl) == 0) {
msg.setReplyTo(flowControlAddress);
@@ -323,6 +405,12 @@ int main(int argc, char ** argv)
else
msg.setReplyTo(Address()); // Clear the reply address.
}
+ if (groupGen.get())
+ groupGen->setGroupInfo(msg);
+
+ if (opts.timestamp)
+ msg.getProperties()[TS] = int64_t(
+ qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()));
sender.send(msg);
reporter.message(msg);
diff --git a/cpp/src/tests/run_msg_group_tests b/cpp/src/tests/run_msg_group_tests
new file mode 100755
index 0000000000..8423022521
--- /dev/null
+++ b/cpp/src/tests/run_msg_group_tests
@@ -0,0 +1,66 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#script to run a sequence of message group queue tests via make
+
+#setup path to find qpid-config and msg_group_test progs
+source ./test_env.sh
+
+export PATH=$PWD:$srcdir:$PYTHON_COMMANDS:$PATH
+
+#set port to connect to via env var
+test -s qpidd.port && QPID_PORT=`cat qpidd.port`
+
+#trap cleanup INT TERM QUIT
+
+QUEUE_NAME="group-queue"
+GROUP_KEY="My-Group-Id"
+
+BROKER_URL="${QPID_BROKER:-localhost}:${QPID_PORT:-5672}"
+
+run_test() {
+ $@
+}
+
+##set -x
+
+declare -i i=0
+declare -a tests
+tests=("qpid-config -a $BROKER_URL add queue $QUEUE_NAME --group-header=${GROUP_KEY} --shared-groups"
+ "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 103 --group-size 13 --receivers 2 --senders 3 --capacity 3 --ack-frequency 7 --randomize-group-size --interleave 3"
+ "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 103 --group-size 13 --receivers 2 --senders 3 --capacity 7 --ack-frequency 7 --randomize-group-size"
+ "qpid-config -a $BROKER_URL add queue ${QUEUE_NAME}-two --group-header=${GROUP_KEY} --shared-groups"
+ "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 103 --group-size 13 --receivers 2 --senders 3 --capacity 7 --ack-frequency 3 --randomize-group-size"
+ "msg_group_test -b $BROKER_URL -a ${QUEUE_NAME}-two --group-key $GROUP_KEY --messages 103 --group-size 13 --receivers 2 --senders 3 --capacity 3 --ack-frequency 7 --randomize-group-size --interleave 5"
+ "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 59 --group-size 5 --receivers 2 --senders 3 --capacity 1 --ack-frequency 3 --randomize-group-size"
+ "qpid-config -a $BROKER_URL del queue ${QUEUE_NAME}-two --force"
+ "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 59 --group-size 3 --receivers 2 --senders 3 --capacity 1 --ack-frequency 1 --randomize-group-size"
+ "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 211 --group-size 13 --receivers 2 --senders 3 --capacity 47 --ack-frequency 79 --interleave 53"
+ "qpid-config -a $BROKER_URL del queue $QUEUE_NAME --force")
+
+while [ -n "${tests[i]}" ]; do
+ run_test ${tests[i]}
+ RETCODE=$?
+ if test x$RETCODE != x0; then
+ echo "FAILED message group test. Failed command: \"${tests[i]}\"";
+ exit 1;
+ fi
+ i+=1
+done
diff --git a/cpp/src/tests/run_msg_group_tests_soak b/cpp/src/tests/run_msg_group_tests_soak
new file mode 100755
index 0000000000..5231f74755
--- /dev/null
+++ b/cpp/src/tests/run_msg_group_tests_soak
@@ -0,0 +1,60 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#script to run a sequence of long-running message group tests via make
+
+#setup path to find qpid-config and msg_group_test test progs
+source ./test_env.sh
+
+export PATH=$PWD:$srcdir:$PYTHON_COMMANDS:$PATH
+
+#set port to connect to via env var
+test -s qpidd.port && QPID_PORT=`cat qpidd.port`
+
+#trap cleanup INT TERM QUIT
+
+QUEUE_NAME="group-queue"
+GROUP_KEY="My-Group-Id"
+
+BROKER_URL="${QPID_BROKER:-localhost}:${QPID_PORT:-5672}"
+
+run_test() {
+ $@
+}
+
+##set -x
+
+declare -i i=0
+declare -a tests
+tests=("qpid-config -a $BROKER_URL add queue $QUEUE_NAME --group-header=${GROUP_KEY} --shared-groups"
+ "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 10007 --receivers 3 --senders 5 --group-size 211 --randomize-group-size --capacity 47 --ack-frequency 97"
+ "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 10007 --receivers 3 --senders 5 --group-size 211 --randomize-group-size --capacity 79 --ack-frequency 79"
+ "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 10007 --receivers 3 --senders 5 --group-size 211 --randomize-group-size --capacity 97 --ack-frequency 47"
+ "qpid-config -a $BROKER_URL del queue $QUEUE_NAME --force")
+
+while [ -n "${tests[i]}" ]; do
+ run_test ${tests[i]}
+ RETCODE=$?
+ if test x$RETCODE != x0; then
+ echo "FAILED message group test. Failed command: \"${tests[i]}\"";
+ exit 1;
+ fi
+ i+=1
+done