diff options
author | Ken Giusti <kgiusti@apache.org> | 2011-10-07 14:21:48 +0000 |
---|---|---|
committer | Ken Giusti <kgiusti@apache.org> | 2011-10-07 14:21:48 +0000 |
commit | ae4e2dd017d1863b7b0fd9e760f56696a49b414c (patch) | |
tree | 4a54f245efa1c2df1601d648c1fdd41fba08b802 /cpp/src | |
parent | eb9363d2d8eb1b6041c70c3db64297dfaeaae3d4 (diff) | |
download | qpid-python-ae4e2dd017d1863b7b0fd9e760f56696a49b414c.tar.gz |
QPID-3346: move message group feature into trunk.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1180050 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
33 files changed, 2581 insertions, 235 deletions
diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index c8aedd4c60..da1f539108 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -976,6 +976,8 @@ set (qpidbroker_SOURCES qpid/broker/Queue.cpp qpid/broker/QueueCleaner.cpp qpid/broker/QueueListeners.cpp + qpid/broker/FifoDistributor.cpp + qpid/broker/MessageGroupManager.cpp qpid/broker/PersistableMessage.cpp qpid/broker/Bridge.cpp qpid/broker/Connection.cpp diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 2663987f75..6230a8f6f6 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -671,6 +671,11 @@ libqpidbroker_la_SOURCES = \ qpid/broker/TxPublish.h \ qpid/broker/Vhost.cpp \ qpid/broker/Vhost.h \ + qpid/broker/MessageDistributor.h \ + qpid/broker/FifoDistributor.h \ + qpid/broker/FifoDistributor.cpp \ + qpid/broker/MessageGroupManager.cpp \ + qpid/broker/MessageGroupManager.h \ qpid/management/ManagementAgent.cpp \ qpid/management/ManagementAgent.h \ qpid/management/ManagementDirectExchange.cpp \ diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 598c43b1d8..bd94582d10 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -33,10 +33,12 @@ #include "qpid/broker/Link.h" #include "qpid/broker/ExpiryPolicy.h" #include "qpid/broker/QueueFlowLimit.h" +#include "qpid/broker/MessageGroupManager.h" #include "qmf/org/apache/qpid/broker/Package.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerCreate.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerDelete.h" +#include "qmf/org/apache/qpid/broker/ArgsBrokerQuery.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerEcho.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerGetLogLevel.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerQueueMoveMessages.h" @@ -122,7 +124,8 @@ Broker::Options::Options(const std::string& name) : qmf1Support(true), queueFlowStopRatio(80), queueFlowResumeRatio(70), - queueThresholdEventRatio(80) + queueThresholdEventRatio(80), + defaultMsgGroup("qpid.no-group") { int c = sys::SystemInfo::concurrency(); workerThreads=c+1; @@ -158,7 +161,8 @@ Broker::Options::Options(const std::string& name) : ("async-queue-events", optValue(asyncQueueEvents, "yes|no"), "Set Queue Events async, used for services like replication") ("default-flow-stop-threshold", optValue(queueFlowStopRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is activated.") ("default-flow-resume-threshold", optValue(queueFlowResumeRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is de-activated.") - ("default-event-threshold-ratio", optValue(queueThresholdEventRatio, "%age of limit"), "The ratio of any specified queue limit at which an event will be raised"); + ("default-event-threshold-ratio", optValue(queueThresholdEventRatio, "%age of limit"), "The ratio of any specified queue limit at which an event will be raised") + ("default-message-group", optValue(defaultMsgGroup, "GROUP-IDENTIFER"), "Group identifier to assign to messages delivered to a message group queue that do not contain an identifier."); } const std::string empty; @@ -249,6 +253,7 @@ Broker::Broker(const Broker::Options& conf) : Plugin::earlyInitAll(*this); QueueFlowLimit::setDefaults(conf.queueLimit, conf.queueFlowStopRatio, conf.queueFlowResumeRatio); + MessageGroupManager::setDefaults(conf.defaultMsgGroup); // If no plugin store module registered itself, set up the null store. if (NullMessageStore::isNullStore(store.get())) @@ -453,7 +458,7 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, _qmf::ArgsBrokerQueueMoveMessages& moveArgs= dynamic_cast<_qmf::ArgsBrokerQueueMoveMessages&>(args); QPID_LOG (debug, "Broker::queueMoveMessages()"); - if (queueMoveMessages(moveArgs.i_srcQueue, moveArgs.i_destQueue, moveArgs.i_qty)) + if (queueMoveMessages(moveArgs.i_srcQueue, moveArgs.i_destQueue, moveArgs.i_qty, moveArgs.i_filter)) status = Manageable::STATUS_OK; else return Manageable::STATUS_PARAMETER_INVALID; @@ -483,6 +488,13 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, status = Manageable::STATUS_OK; break; } + case _qmf::Broker::METHOD_QUERY : + { + _qmf::ArgsBrokerQuery& a = dynamic_cast<_qmf::ArgsBrokerQuery&>(args); + status = queryObject(a.i_type, a.i_name, a.o_results, getManagementExecutionContext()); + status = Manageable::STATUS_OK; + break; + } default: QPID_LOG (debug, "Broker ManagementMethod not implemented: id=" << methodId << "]"); status = Manageable::STATUS_NOT_IMPLEMENTED; @@ -655,6 +667,50 @@ void Broker::deleteObject(const std::string& type, const std::string& name, } +Manageable::status_t Broker::queryObject(const std::string& type, + const std::string& name, + Variant::Map& results, + const ConnectionState* context) +{ + std::string userId; + std::string connectionId; + if (context) { + userId = context->getUserId(); + connectionId = context->getUrl(); + } + QPID_LOG (debug, "Broker::query(" << type << ", " << name << ")"); + + if (type == TYPE_QUEUE) + return queryQueue( name, userId, connectionId, results ); + + if (type == TYPE_EXCHANGE || + type == TYPE_TOPIC || + type == TYPE_BINDING) + return Manageable::STATUS_NOT_IMPLEMENTED; + + throw UnknownObjectType(type); +} + +Manageable::status_t Broker::queryQueue( const std::string& name, + const std::string& userId, + const std::string& /*connectionId*/, + Variant::Map& results ) +{ + (void) results; + if (acl) { + if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_QUEUE, name, NULL) ) + throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue query request from " << userId)); + } + + boost::shared_ptr<Queue> q(queues.find(name)); + if (!q) { + QPID_LOG(error, "Query failed: queue not found, name=" << name); + return Manageable::STATUS_UNKNOWN_OBJECT; + } + q->query( results ); + return Manageable::STATUS_OK;; +} + void Broker::setLogLevel(const std::string& level) { QPID_LOG(notice, "Changing log level to " << level); @@ -724,7 +780,8 @@ void Broker::connect( uint32_t Broker::queueMoveMessages( const std::string& srcQueue, const std::string& destQueue, - uint32_t qty) + uint32_t qty, + const Variant::Map& filter) { Queue::shared_ptr src_queue = queues.find(srcQueue); if (!src_queue) @@ -733,7 +790,7 @@ uint32_t Broker::queueMoveMessages( if (!dest_queue) return 0; - return src_queue->move(dest_queue, qty); + return src_queue->move(dest_queue, qty, &filter); } diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 40f7b6273f..8b347db3c0 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -121,6 +121,7 @@ public: uint queueFlowStopRatio; // producer flow control: on uint queueFlowResumeRatio; // producer flow control: off uint16_t queueThresholdEventRatio; + std::string defaultMsgGroup; private: std::string getHome(); @@ -157,7 +158,12 @@ public: const qpid::types::Variant::Map& properties, bool strict, const ConnectionState* context); void deleteObject(const std::string& type, const std::string& name, const qpid::types::Variant::Map& options, const ConnectionState* context); - + Manageable::status_t queryObject(const std::string& type, const std::string& name, + qpid::types::Variant::Map& results, const ConnectionState* context); + Manageable::status_t queryQueue( const std::string& name, + const std::string& userId, + const std::string& connectionId, + qpid::types::Variant::Map& results); boost::shared_ptr<sys::Poller> poller; sys::Timer timer; std::auto_ptr<sys::Timer> clusterTimer; @@ -258,7 +264,8 @@ public: */ uint32_t queueMoveMessages( const std::string& srcQueue, const std::string& destQueue, - uint32_t qty); + uint32_t qty, + const qpid::types::Variant::Map& filter); boost::shared_ptr<sys::ProtocolFactory> getProtocolFactory(const std::string& name = TCP_TRANSPORT) const; diff --git a/cpp/src/qpid/broker/Consumer.h b/cpp/src/qpid/broker/Consumer.h index 317338a8ad..2af9b0c121 100644 --- a/cpp/src/qpid/broker/Consumer.h +++ b/cpp/src/qpid/broker/Consumer.h @@ -36,13 +36,19 @@ class Consumer { // inListeners allows QueueListeners to efficiently track if this instance is registered // for notifications without having to search its containers bool inListeners; - public: - typedef boost::shared_ptr<Consumer> shared_ptr; - + // the name is generated by broker and is unique within broker scope. It is not + // provided or known by the remote Consumer. + const std::string name; + public: + typedef boost::shared_ptr<Consumer> shared_ptr; + framing::SequenceNumber position; - - Consumer(bool preAcquires = true) : acquires(preAcquires), inListeners(false) {} + + Consumer(const std::string& _name, bool preAcquires = true) + : acquires(preAcquires), inListeners(false), name(_name), position(0) {} bool preAcquires() const { return acquires; } + const std::string& getName() const { return name; } + virtual bool deliver(QueuedMessage& msg) = 0; virtual void notify() = 0; virtual bool filter(boost::intrusive_ptr<Message>) { return true; } diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp index 11970db394..0b8fe95d5e 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -153,7 +153,7 @@ uint32_t DeliveryRecord::getCredit() const } void DeliveryRecord::acquire(DeliveryIds& results) { - if (queue->acquire(msg)) { + if (queue->acquire(msg, tag)) { acquired = true; results.push_back(id); if (!acceptExpected) { diff --git a/cpp/src/qpid/broker/DeliveryRecord.h b/cpp/src/qpid/broker/DeliveryRecord.h index 19ab37ac17..5a331357be 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.h +++ b/cpp/src/qpid/broker/DeliveryRecord.h @@ -46,7 +46,7 @@ class DeliveryRecord { QueuedMessage msg; mutable boost::shared_ptr<Queue> queue; - std::string tag; + std::string tag; // name of consumer DeliveryId id; bool acquired : 1; bool acceptExpected : 1; diff --git a/cpp/src/qpid/broker/FifoDistributor.cpp b/cpp/src/qpid/broker/FifoDistributor.cpp new file mode 100644 index 0000000000..cdb32d8c8c --- /dev/null +++ b/cpp/src/qpid/broker/FifoDistributor.cpp @@ -0,0 +1,58 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +#include "qpid/broker/Queue.h" +#include "qpid/broker/FifoDistributor.h" + +using namespace qpid::broker; + +FifoDistributor::FifoDistributor(Messages& container) + : messages(container) {} + +bool FifoDistributor::nextConsumableMessage( Consumer::shared_ptr&, QueuedMessage& next ) +{ + if (!messages.empty()) { + next = messages.front(); // by default, consume oldest msg + return true; + } + return false; +} + +bool FifoDistributor::allocate(const std::string&, const QueuedMessage& ) +{ + // by default, all messages present on the queue may be allocated as they have yet to + // be acquired. + return true; +} + +bool FifoDistributor::nextBrowsableMessage( Consumer::shared_ptr& c, QueuedMessage& next ) +{ + if (!messages.empty() && messages.next(c->position, next)) + return true; + return false; +} + +void FifoDistributor::query(qpid::types::Variant::Map&) const +{ + // nothing to see here.... +} + diff --git a/cpp/src/qpid/broker/FifoDistributor.h b/cpp/src/qpid/broker/FifoDistributor.h new file mode 100644 index 0000000000..245537ed12 --- /dev/null +++ b/cpp/src/qpid/broker/FifoDistributor.h @@ -0,0 +1,58 @@ +#ifndef _broker_FifoDistributor_h +#define _broker_FifoDistributor_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/** Simple MessageDistributor for FIFO Queues - the HEAD message is always the next + * available message for consumption. + */ + +#include "qpid/broker/MessageDistributor.h" + +namespace qpid { +namespace broker { + +class Messages; + +class FifoDistributor : public MessageDistributor +{ + public: + FifoDistributor(Messages& container); + + /** Locking Note: all methods assume the caller is holding the Queue::messageLock + * during the method call. + */ + + /** MessageDistributor interface */ + + bool nextConsumableMessage( Consumer::shared_ptr& consumer, QueuedMessage& next ); + bool allocate(const std::string& consumer, const QueuedMessage& target); + bool nextBrowsableMessage( Consumer::shared_ptr& consumer, QueuedMessage& next ); + void query(qpid::types::Variant::Map&) const; + + private: + Messages& messages; +}; + +}} + +#endif diff --git a/cpp/src/qpid/broker/MessageDistributor.h b/cpp/src/qpid/broker/MessageDistributor.h new file mode 100644 index 0000000000..090393c160 --- /dev/null +++ b/cpp/src/qpid/broker/MessageDistributor.h @@ -0,0 +1,76 @@ +#ifndef _broker_MessageDistributor_h +#define _broker_MessageDistributor_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/** Abstraction used by Queue to determine the next "most desirable" message to provide to + * a particular consuming client + */ + + +#include "qpid/broker/Consumer.h" + +namespace qpid { +namespace broker { + +struct QueuedMessage; + +class MessageDistributor +{ + public: + virtual ~MessageDistributor() {}; + + /** Locking Note: all methods assume the caller is holding the Queue::messageLock + * during the method call. + */ + + /** Determine the next message available for consumption by the consumer + * @param consumer the consumer that needs a message to consume + * @param next set to the next message that the consumer may consume. + * @return true if message is available and next is set + */ + virtual bool nextConsumableMessage( Consumer::shared_ptr& consumer, + QueuedMessage& next ) = 0; + + /** Allow the comsumer to take ownership of the given message. + * @param consumer the name of the consumer that is attempting to acquire the message + * @param qm the message to be acquired, previously returned from nextConsumableMessage() + * @return true if ownership is permitted, false if ownership cannot be assigned. + */ + virtual bool allocate( const std::string& consumer, + const QueuedMessage& target) = 0; + + /** Determine the next message available for browsing by the consumer + * @param consumer the consumer that is browsing the queue + * @param next set to the next message that the consumer may browse. + * @return true if a message is available and next is returned + */ + virtual bool nextBrowsableMessage( Consumer::shared_ptr& consumer, + QueuedMessage& next ) = 0; + + /** hook to add any interesting management state to the status map */ + virtual void query(qpid::types::Variant::Map&) const = 0; +}; + +}} + +#endif diff --git a/cpp/src/qpid/broker/MessageGroupManager.cpp b/cpp/src/qpid/broker/MessageGroupManager.cpp new file mode 100644 index 0000000000..d4ca6af1d5 --- /dev/null +++ b/cpp/src/qpid/broker/MessageGroupManager.cpp @@ -0,0 +1,443 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/FieldTable.h" +#include "qpid/types/Variant.h" +#include "qpid/log/Statement.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/MessageGroupManager.h" + +using namespace qpid::broker; + +namespace { + const std::string GROUP_QUERY_KEY("qpid.message_group_queue"); + const std::string GROUP_HEADER_KEY("group_header_key"); + const std::string GROUP_STATE_KEY("group_state"); + const std::string GROUP_ID_KEY("group_id"); + const std::string GROUP_MSG_COUNT("msg_count"); + const std::string GROUP_TIMESTAMP("timestamp"); + const std::string GROUP_CONSUMER("consumer"); +} + + +const std::string MessageGroupManager::qpidMessageGroupKey("qpid.group_header_key"); +const std::string MessageGroupManager::qpidSharedGroup("qpid.shared_msg_group"); +const std::string MessageGroupManager::qpidMessageGroupTimestamp("qpid.group_timestamp"); + + +const std::string MessageGroupManager::getGroupId( const QueuedMessage& qm ) const +{ + const qpid::framing::FieldTable* headers = qm.payload->getApplicationHeaders(); + if (!headers) return defaultGroupId; + qpid::framing::FieldTable::ValuePtr id = headers->get( groupIdHeader ); + if (!id || !id->convertsTo<std::string>()) return defaultGroupId; + return id->get<std::string>(); +} + + +void MessageGroupManager::enqueued( const QueuedMessage& qm ) +{ + // @todo KAG optimization - store reference to group state in QueuedMessage + // issue: const-ness?? + std::string group( getGroupId(qm) ); + GroupState &state(messageGroups[group]); + state.members.push_back(qm.position); + uint32_t total = state.members.size(); + QPID_LOG( trace, "group queue " << qName << + ": added message to group id=" << group << " total=" << total ); + if (total == 1) { + // newly created group, no owner + state.group = group; + assert(freeGroups.find(qm.position) == freeGroups.end()); + freeGroups[qm.position] = &state; + } +} + + +void MessageGroupManager::acquired( const QueuedMessage& qm ) +{ + // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage + // issue: const-ness?? + std::string group( getGroupId(qm) ); + GroupMap::iterator gs = messageGroups.find( group ); + assert( gs != messageGroups.end() ); + GroupState& state( gs->second ); + state.acquired += 1; + QPID_LOG( trace, "group queue " << qName << + ": acquired message in group id=" << group << " acquired=" << state.acquired ); +} + + +void MessageGroupManager::requeued( const QueuedMessage& qm ) +{ + // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage + // issue: const-ness?? + std::string group( getGroupId(qm) ); + GroupMap::iterator gs = messageGroups.find( group ); + assert( gs != messageGroups.end() ); + GroupState& state( gs->second ); + assert( state.acquired != 0 ); + state.acquired -= 1; + if (state.acquired == 0 && state.owned()) { + QPID_LOG( trace, "group queue " << qName << + ": consumer name=" << state.owner << " released group id=" << gs->first); + disown(state); + } + QPID_LOG( trace, "group queue " << qName << + ": requeued message to group id=" << group << " acquired=" << state.acquired ); +} + + +void MessageGroupManager::dequeued( const QueuedMessage& qm ) +{ + // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage + // issue: const-ness?? + std::string group( getGroupId(qm) ); + GroupMap::iterator gs = messageGroups.find( group ); + assert( gs != messageGroups.end() ); + GroupState& state( gs->second ); + assert( state.members.size() != 0 ); + assert( state.acquired != 0 ); + state.acquired -= 1; + + // likely to be at or near begin() if dequeued in order + bool reFreeNeeded = false; + if (state.members.front() == qm.position) { + if (!state.owned()) { + // will be on the freeGroups list if mgmt is dequeueing rather than a consumer! + // if on freelist, it is indexed by first member, which is about to be removed! + unFree(state); + reFreeNeeded = true; + } + state.members.pop_front(); + } else { + GroupState::PositionFifo::iterator pos = state.members.begin() + 1; + GroupState::PositionFifo::iterator end = state.members.end(); + while (pos != end) { + if (*pos == qm.position) { + state.members.erase(pos); + break; + } + ++pos; + } + } + + uint32_t total = state.members.size(); + if (total == 0) { + QPID_LOG( trace, "group queue " << qName << ": deleting group id=" << gs->first); + messageGroups.erase( gs ); + } else if (state.acquired == 0 && state.owned()) { + QPID_LOG( trace, "group queue " << qName << + ": consumer name=" << state.owner << " released group id=" << gs->first); + disown(state); + } else if (reFreeNeeded) { + disown(state); + } + QPID_LOG( trace, "group queue " << qName << + ": dequeued message from group id=" << group << " total=" << total ); +} + +void MessageGroupManager::consumerAdded( const Consumer& /*c*/ ) +{ +#if 0 + // allow a re-subscribing consumer + if (consumers.find(c.getName()) == consumers.end()) { + consumers[c.getName()] = 0; // no groups owned yet + QPID_LOG( trace, "group queue " << qName << ": added consumer, name=" << c.getName() ); + } else { + QPID_LOG( trace, "group queue " << qName << ": consumer re-subscribed, name=" << c.getName() ); + } +#endif +} + +void MessageGroupManager::consumerRemoved( const Consumer& /*c*/ ) +{ +#if 0 + const std::string& name(c.getName()); + Consumers::iterator consumer = consumers.find(name); + assert(consumer != consumers.end()); + size_t count = consumer->second; + + for (GroupMap::iterator gs = messageGroups.begin(); + count && gs != messageGroups.end(); ++gs) { + + GroupState& state( gs->second ); + if (state.owner == name) { + if (state.acquired == 0) { + --count; + disown(state); + QPID_LOG( trace, "group queue " << qName << + ": consumer name=" << name << " released group id=" << gs->first); + } + } + } + if (count == 0) { + consumers.erase( consumer ); + QPID_LOG( trace, "group queue " << qName << ": removed consumer name=" << name ); + } else { + // don't release groups with outstanding acquired msgs - consumer may re-subscribe! + QPID_LOG( trace, "group queue " << qName << ": consumer name=" << name << " unsubscribed with outstanding messages."); + } +#endif +} + + +bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, QueuedMessage& next ) +{ + if (messages.empty()) + return false; + + if (!freeGroups.empty()) { + framing::SequenceNumber nextFree = freeGroups.begin()->first; + if (nextFree < c->position) { // next free group's msg is older than current position + bool ok = messages.find(nextFree, next); + (void) ok; assert( ok ); + } else { + if (!messages.next( c->position, next )) + return false; // shouldn't happen - should find nextFree + } + } else { // no free groups available +#if 0 + if (consumers[c->getName()] == 0) { // and none currently owned + return false; // so nothing available to consume + } +#endif + if (!messages.next( c->position, next )) + return false; + } + + do { + // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage + std::string group( getGroupId( next ) ); + GroupMap::iterator gs = messageGroups.find( group ); + assert( gs != messageGroups.end() ); + GroupState& state( gs->second ); + if (!state.owned() || state.owner == c->getName()) { + return true; + } + } while (messages.next( next.position, next )); + return false; +} + + +bool MessageGroupManager::allocate(const std::string& consumer, const QueuedMessage& qm) +{ + // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage + std::string group( getGroupId(qm) ); + GroupMap::iterator gs = messageGroups.find( group ); + assert( gs != messageGroups.end() ); + GroupState& state( gs->second ); + + if (!state.owned()) { + own( state, consumer ); + QPID_LOG( trace, "group queue " << qName << + ": consumer name=" << consumer << " has acquired group id=" << gs->first); + return true; + } + return state.owner == consumer; +} + +bool MessageGroupManager::nextBrowsableMessage( Consumer::shared_ptr& c, QueuedMessage& next ) +{ + // browse: allow access to any available msg, regardless of group ownership (?ok?) + if (!messages.empty() && messages.next(c->position, next)) + return true; + return false; +} + +void MessageGroupManager::query(qpid::types::Variant::Map& status) const +{ + /** Add a description of the current state of the message groups for this queue. + FORMAT: + { "qpid.message_group_queue": + { "group_header_key" : "<KEY>", + "group_state" : + [ { "group_id" : "<name>", + "msg_count" : <int>, + "timestamp" : <absTime>, + "consumer" : <consumer name> }, + {...} // one for each known group + ] + } + } + **/ + + assert(status.find(GROUP_QUERY_KEY) == status.end()); + qpid::types::Variant::Map state; + qpid::types::Variant::List groups; + + state[GROUP_HEADER_KEY] = groupIdHeader; + for (GroupMap::const_iterator g = messageGroups.begin(); + g != messageGroups.end(); ++g) { + qpid::types::Variant::Map info; + info[GROUP_ID_KEY] = g->first; + info[GROUP_MSG_COUNT] = g->second.members.size(); + info[GROUP_TIMESTAMP] = 0; /** @todo KAG - NEED HEAD MSG TIMESTAMP */ + info[GROUP_CONSUMER] = g->second.owner; + groups.push_back(info); + } + state[GROUP_STATE_KEY] = groups; + status[GROUP_QUERY_KEY] = state; +} + + +boost::shared_ptr<MessageGroupManager> MessageGroupManager::create( const std::string& qName, + Messages& messages, + const qpid::framing::FieldTable& settings ) +{ + boost::shared_ptr<MessageGroupManager> empty; + + if (settings.isSet(qpidMessageGroupKey)) { + + // @todo: remove once "sticky" consumers are supported - see QPID-3347 + if (!settings.isSet(qpidSharedGroup)) { + QPID_LOG( error, "Only shared groups are supported in this version of the broker. Use '--shared-groups' in qpid-config." ); + return empty; + } + + std::string headerKey = settings.getAsString(qpidMessageGroupKey); + if (headerKey.empty()) { + QPID_LOG( error, "A Message Group header key must be configured, queue=" << qName); + return empty; + } + unsigned int timestamp = settings.getAsInt(qpidMessageGroupTimestamp); + + boost::shared_ptr<MessageGroupManager> manager( new MessageGroupManager( headerKey, qName, messages, timestamp ) ); + + QPID_LOG( debug, "Configured Queue '" << qName << + "' for message grouping using header key '" << headerKey << "'" << + " (timestamp=" << timestamp << ")"); + return manager; + } + return empty; +} + +std::string MessageGroupManager::defaultGroupId; +void MessageGroupManager::setDefaults(const std::string& groupId) // static +{ + defaultGroupId = groupId; +} + +/** Cluster replication: + + state map format: + + { "group-state": [ {"name": <group-name>, + "owner": <consumer-name>-or-empty, + "acquired-ct": <acquired count>, + "positions": [Seqnumbers, ... ]}, + {...} + ] + } +*/ + +namespace { + const std::string GROUP_NAME("name"); + const std::string GROUP_OWNER("owner"); + const std::string GROUP_ACQUIRED_CT("acquired-ct"); + const std::string GROUP_POSITIONS("positions"); + const std::string GROUP_STATE("group-state"); +} + + +/** Runs on UPDATER to snapshot current state */ +void MessageGroupManager::getState(qpid::framing::FieldTable& state ) const +{ + using namespace qpid::framing; + state.clear(); + framing::Array groupState(TYPE_CODE_MAP); + for (GroupMap::const_iterator g = messageGroups.begin(); + g != messageGroups.end(); ++g) { + + framing::FieldTable group; + group.setString(GROUP_NAME, g->first); + group.setString(GROUP_OWNER, g->second.owner); + group.setInt(GROUP_ACQUIRED_CT, g->second.acquired); + framing::Array positions(TYPE_CODE_UINT32); + for (GroupState::PositionFifo::const_iterator p = g->second.members.begin(); + p != g->second.members.end(); ++p) + positions.push_back(framing::Array::ValuePtr(new IntegerValue( *p ))); + group.setArray(GROUP_POSITIONS, positions); + groupState.push_back(framing::Array::ValuePtr(new FieldTableValue(group))); + } + state.setArray(GROUP_STATE, groupState); + + QPID_LOG(debug, "Queue \"" << qName << "\": replicating message group state, key=" << groupIdHeader); +} + + +/** called on UPDATEE to set state from snapshot */ +void MessageGroupManager::setState(const qpid::framing::FieldTable& state) +{ + using namespace qpid::framing; + messageGroups.clear(); + //consumers.clear(); + freeGroups.clear(); + + framing::Array groupState(TYPE_CODE_MAP); + + bool ok = state.getArray(GROUP_STATE, groupState); + if (!ok) { + QPID_LOG(error, "Unable to find message group state information for queue \"" << + qName << "\": cluster inconsistency error!"); + return; + } + + for (framing::Array::const_iterator g = groupState.begin(); + g != groupState.end(); ++g) { + framing::FieldTable group; + ok = framing::getEncodedValue<FieldTable>(*g, group); + if (!ok) { + QPID_LOG(error, "Invalid message group state information for queue \"" << + qName << "\": table encoding error!"); + return; + } + MessageGroupManager::GroupState state; + if (!group.isSet(GROUP_NAME) || !group.isSet(GROUP_OWNER) || !group.isSet(GROUP_ACQUIRED_CT)) { + QPID_LOG(error, "Invalid message group state information for queue \"" << + qName << "\": fields missing error!"); + return; + } + state.group = group.getAsString(GROUP_NAME); + state.owner = group.getAsString(GROUP_OWNER); + state.acquired = group.getAsInt(GROUP_ACQUIRED_CT); + framing::Array positions(TYPE_CODE_UINT32); + ok = group.getArray(GROUP_POSITIONS, positions); + if (!ok) { + QPID_LOG(error, "Invalid message group state information for queue \"" << + qName << "\": position encoding error!"); + return; + } + + for (Array::const_iterator p = positions.begin(); p != positions.end(); ++p) + state.members.push_back((*p)->getIntegerValue<uint32_t, 4>()); + messageGroups[state.group] = state; + if (state.owned()) + //consumers[state.owner]++; + ; + else { + assert(state.members.size()); + freeGroups[state.members.front()] = &messageGroups[state.group]; + } + } + + QPID_LOG(debug, "Queue \"" << qName << "\": message group state replicated, key =" << groupIdHeader) +} diff --git a/cpp/src/qpid/broker/MessageGroupManager.h b/cpp/src/qpid/broker/MessageGroupManager.h new file mode 100644 index 0000000000..35bdda94d5 --- /dev/null +++ b/cpp/src/qpid/broker/MessageGroupManager.h @@ -0,0 +1,125 @@ +#ifndef _broker_MessageGroupManager_h +#define _broker_MessageGroupManager_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/* for managing message grouping on Queues */ + +#include "qpid/broker/StatefulQueueObserver.h" +#include "qpid/broker/MessageDistributor.h" + + +namespace qpid { +namespace broker { + +class QueueObserver; +class MessageDistributor; + +class MessageGroupManager : public StatefulQueueObserver, public MessageDistributor +{ + static std::string defaultGroupId; // assigned if no group id header present + + const std::string groupIdHeader; // msg header holding group identifier + const unsigned int timestamp; // mark messages with timestamp if set + Messages& messages; // parent Queue's in memory message container + const std::string qName; // name of parent queue (for logs) + + struct GroupState { + typedef std::deque<framing::SequenceNumber> PositionFifo; + + std::string group; // group identifier + std::string owner; // consumer with outstanding acquired messages + uint32_t acquired; // count of outstanding acquired messages + //uint32_t total; // count of enqueued messages in this group + PositionFifo members; // msgs belonging to this group + + GroupState() : acquired(0) {} + bool owned() const {return !owner.empty();} + }; + typedef std::map<std::string, struct GroupState> GroupMap; + //typedef std::map<std::string, uint32_t> Consumers; // count of owned groups + typedef std::map<framing::SequenceNumber, struct GroupState *> GroupFifo; + + // note: update getState()/setState() when changing this object's state implementation + GroupMap messageGroups; // index: group name + GroupFifo freeGroups; // ordered by oldest free msg + //Consumers consumers; // index: consumer name + + static const std::string qpidMessageGroupKey; + static const std::string qpidSharedGroup; // if specified, one group can be consumed by multiple receivers + static const std::string qpidMessageGroupTimestamp; + + const std::string getGroupId( const QueuedMessage& qm ) const; + void unFree( const GroupState& state ) + { + GroupFifo::iterator pos = freeGroups.find( state.members.front() ); + assert( pos != freeGroups.end() && pos->second == &state ); + freeGroups.erase( pos ); + } + void own( GroupState& state, const std::string& owner ) + { + state.owner = owner; + //consumers[state.owner]++; + unFree( state ); + } + void disown( GroupState& state ) + { + //assert(consumers[state.owner]); + //consumers[state.owner]--; + state.owner.clear(); + assert(state.members.size()); + assert(freeGroups.find(state.members.front()) == freeGroups.end()); + freeGroups[state.members.front()] = &state; + } + + public: + + static QPID_BROKER_EXTERN void setDefaults(const std::string& groupId); + static boost::shared_ptr<MessageGroupManager> create( const std::string& qName, + Messages& messages, + const qpid::framing::FieldTable& settings ); + + MessageGroupManager(const std::string& header, const std::string& _qName, + Messages& container, unsigned int _timestamp=0 ) + : StatefulQueueObserver(std::string("MessageGroupManager:") + header), + groupIdHeader( header ), timestamp(_timestamp), messages(container), qName(_qName) {} + void enqueued( const QueuedMessage& qm ); + void acquired( const QueuedMessage& qm ); + void requeued( const QueuedMessage& qm ); + void dequeued( const QueuedMessage& qm ); + void consumerAdded( const Consumer& ); + void consumerRemoved( const Consumer& ); + void getState(qpid::framing::FieldTable& state ) const; + void setState(const qpid::framing::FieldTable&); + + // MessageDistributor iface + bool nextConsumableMessage(Consumer::shared_ptr& c, QueuedMessage& next); + bool allocate(const std::string& c, const QueuedMessage& qm); + bool nextBrowsableMessage(Consumer::shared_ptr& c, QueuedMessage& next); + void query(qpid::types::Variant::Map&) const; + + bool match(const qpid::types::Variant::Map*, const QueuedMessage&) const; +}; + +}} + +#endif diff --git a/cpp/src/qpid/broker/Messages.h b/cpp/src/qpid/broker/Messages.h index c535fd1936..448f17432a 100644 --- a/cpp/src/qpid/broker/Messages.h +++ b/cpp/src/qpid/broker/Messages.h @@ -76,7 +76,6 @@ class Messages * @return true if there is another message, false otherwise. */ virtual bool next(const framing::SequenceNumber&, QueuedMessage&) = 0; - /** * Note: Caller is responsible for ensuring that there is a front * (e.g. empty() returns false) diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index dd3f982699..3d878d02a8 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -33,6 +33,8 @@ #include "qpid/broker/QueueRegistry.h" #include "qpid/broker/QueueFlowLimit.h" #include "qpid/broker/ThresholdAlerts.h" +#include "qpid/broker/FifoDistributor.h" +#include "qpid/broker/MessageGroupManager.h" #include "qpid/StringUtils.h" #include "qpid/log/Statement.h" @@ -42,6 +44,7 @@ #include "qpid/sys/ClusterSafe.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Time.h" +#include "qpid/types/Variant.h" #include "qmf/org/apache/qpid/broker/ArgsQueuePurge.h" #include "qmf/org/apache/qpid/broker/ArgsQueueReroute.h" @@ -111,7 +114,8 @@ Queue::Queue(const string& _name, bool _autodelete, broker(b), deleted(false), barrier(*this), - autoDeleteTimeout(0) + autoDeleteTimeout(0), + allocator(new FifoDistributor( *messages )) { if (parent != 0 && broker != 0) { ManagementAgent* agent = broker->getManagementAgent(); @@ -220,6 +224,7 @@ void Queue::requeue(const QueuedMessage& msg){ enqueue(0, payload); } } + observeRequeue(msg, locker); } copy.notify(); } @@ -229,7 +234,7 @@ bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& mess Mutex::ScopedLock locker(messageLock); assertClusterSafe(); QPID_LOG(debug, "Attempting to acquire message at " << position); - if (messages->remove(position, message)) { + if (acquire(position, message, locker)) { QPID_LOG(debug, "Acquired message at " << position << " from " << name); return true; } else { @@ -238,9 +243,24 @@ bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& mess } } -bool Queue::acquire(const QueuedMessage& msg) { - QueuedMessage copy = msg; - return acquireMessageAt(msg.position, copy); +bool Queue::acquire(const QueuedMessage& msg, const std::string& consumer) +{ + Mutex::ScopedLock locker(messageLock); + assertClusterSafe(); + QPID_LOG(debug, consumer << " attempting to acquire message at " << msg.position); + + if (!allocator->allocate( consumer, msg )) { + QPID_LOG(debug, "Not permitted to acquire msg at " << msg.position << " from '" << name); + return false; + } + + QueuedMessage copy(msg); + if (acquire( msg.position, copy, locker)) { + QPID_LOG(debug, "Acquired message at " << msg.position << " from " << name); + return true; + } + QPID_LOG(debug, "Could not acquire message at " << msg.position << " from " << name << "; no message at that position"); + return false; } void Queue::notifyListener() @@ -256,7 +276,7 @@ void Queue::notifyListener() set.notify(); } -bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c) +bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr& c) { checkNotDeleted(); if (c->preAcquires()) { @@ -274,46 +294,65 @@ bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c) } } -Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c) +Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr& c) { while (true) { Mutex::ScopedLock locker(messageLock); - if (messages->empty()) { - QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'"); + QueuedMessage msg; + + if (!allocator->nextConsumableMessage(c, msg)) { // no next available + QPID_LOG(debug, "No messages available to dispatch to consumer " << + c->getName() << " on queue '" << name << "'"); listeners.addListener(c); return NO_MESSAGES; - } else { - QueuedMessage msg = messages->front(); - if (msg.payload->hasExpired()) { - QPID_LOG(debug, "Message expired from queue '" << name << "'"); - popAndDequeue(); - continue; - } + } - if (c->filter(msg.payload)) { - if (c->accept(msg.payload)) { - m = msg; - pop(); - return CONSUMED; - } else { - //message(s) are available but consumer hasn't got enough credit - QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'"); - return CANT_CONSUME; - } + if (msg.payload->hasExpired()) { + QPID_LOG(debug, "Message expired from queue '" << name << "'"); + c->position = msg.position; + acquire( msg.position, msg, locker); + dequeue( 0, msg ); + continue; + } + + // a message is available for this consumer - can the consumer use it? + + if (c->filter(msg.payload)) { + if (c->accept(msg.payload)) { + bool ok = allocator->allocate( c->getName(), msg ); // inform allocator + (void) ok; assert(ok); + ok = acquire( msg.position, msg, locker); + (void) ok; assert(ok); + m = msg; + c->position = m.position; + return CONSUMED; } else { - //consumer will never want this message - QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'"); + //message(s) are available but consumer hasn't got enough credit + QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'"); return CANT_CONSUME; } + } else { + //consumer will never want this message + QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'"); + c->position = msg.position; + return CANT_CONSUME; } } } - -bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr c) +bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr& c) { - QueuedMessage msg(this); - while (seek(msg, c)) { + while (true) { + Mutex::ScopedLock locker(messageLock); + QueuedMessage msg; + + if (!allocator->nextBrowsableMessage(c, msg)) { // no next available + QPID_LOG(debug, "No browsable messages available for consumer " << + c->getName() << " on queue '" << name << "'"); + listeners.addListener(c); + return false; + } + if (c->filter(msg.payload) && !msg.payload->hasExpired()) { if (c->accept(msg.payload)) { //consumer wants the message @@ -327,8 +366,8 @@ bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr c) } } else { //consumer will never want this message, continue seeking - c->position = msg.position; QPID_LOG(debug, "Browser skipping message from '" << name << "'"); + c->position = msg.position; } } return false; @@ -358,61 +397,71 @@ bool Queue::dispatch(Consumer::shared_ptr c) } } -// Find the next message -bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) { - Mutex::ScopedLock locker(messageLock); - if (messages->next(c->position, msg)) { - return true; - } else { - listeners.addListener(c); - return false; - } -} - -QueuedMessage Queue::find(SequenceNumber pos) const { +bool Queue::find(SequenceNumber pos, QueuedMessage& msg) const { Mutex::ScopedLock locker(messageLock); - QueuedMessage msg; - messages->find(pos, msg); - return msg; + if (messages->find(pos, msg)) + return true; + return false; } void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){ assertClusterSafe(); - Mutex::ScopedLock locker(consumerLock); - if(exclusive) { - throw ResourceLockedException( - QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed.")); - } else if(requestExclusive) { - if(consumerCount) { + { + Mutex::ScopedLock locker(consumerLock); + if(exclusive) { throw ResourceLockedException( - QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied.")); - } else { - exclusive = c->getSession(); + QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed.")); + } else if(requestExclusive) { + if(consumerCount) { + throw ResourceLockedException( + QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied.")); + } else { + exclusive = c->getSession(); + } + } + consumerCount++; + if (mgmtObject != 0) + mgmtObject->inc_consumerCount (); + //reset auto deletion timer if necessary + if (autoDeleteTimeout && autoDeleteTask) { + autoDeleteTask->cancel(); } } - consumerCount++; - if (mgmtObject != 0) - mgmtObject->inc_consumerCount (); - //reset auto deletion timer if necessary - if (autoDeleteTimeout && autoDeleteTask) { - autoDeleteTask->cancel(); + Mutex::ScopedLock locker(messageLock); + for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { + try{ + (*i)->consumerAdded(*c); + } catch (const std::exception& e) { + QPID_LOG(warning, "Exception on notification of new consumer for queue " << getName() << ": " << e.what()); + } } } void Queue::cancel(Consumer::shared_ptr c){ removeListener(c); - Mutex::ScopedLock locker(consumerLock); - consumerCount--; - if(exclusive) exclusive = 0; - if (mgmtObject != 0) - mgmtObject->dec_consumerCount (); + { + Mutex::ScopedLock locker(consumerLock); + consumerCount--; + if(exclusive) exclusive = 0; + if (mgmtObject != 0) + mgmtObject->dec_consumerCount (); + } + Mutex::ScopedLock locker(messageLock); + for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { + try{ + (*i)->consumerRemoved(*c); + } catch (const std::exception& e) { + QPID_LOG(warning, "Exception on notification of removed consumer for queue " << getName() << ": " << e.what()); + } + } } QueuedMessage Queue::get(){ Mutex::ScopedLock locker(messageLock); QueuedMessage msg(this); - messages->pop(msg); + if (messages->pop(msg)) + observeAcquire(msg, locker); return msg; } @@ -443,11 +492,118 @@ void Queue::purgeExpired(qpid::sys::Duration lapse) Mutex::ScopedLock locker(messageLock); messages->removeIf(boost::bind(&collect_if_expired, boost::ref(expired), _1)); } - for_each(expired.begin(), expired.end(), - boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); + + for (std::deque<QueuedMessage>::const_iterator i = expired.begin(); + i != expired.end(); ++i) { + { + Mutex::ScopedLock locker(messageLock); + observeAcquire(*i, locker); + } + dequeue( 0, *i ); + } } } + +namespace { + // for use with purge/move below - collect messages that match a given filter + // + class MessageFilter + { + public: + static const std::string typeKey; + static const std::string paramsKey; + static MessageFilter *create( const ::qpid::types::Variant::Map *filter ); + virtual bool match( const QueuedMessage& ) const { return true; } + virtual ~MessageFilter() {} + protected: + MessageFilter() {}; + }; + const std::string MessageFilter::typeKey("filter_type"); + const std::string MessageFilter::paramsKey("filter_params"); + + // filter by message header string value exact match + class HeaderMatchFilter : public MessageFilter + { + public: + /* Config: + { 'filter_type' : 'header_match_str', + 'filter_params' : { 'header_key' : "<header name>", + 'header_value' : "<value to match>" + } + } + */ + static const std::string typeKey; + static const std::string headerKey; + static const std::string valueKey; + HeaderMatchFilter( const std::string& _header, const std::string& _value ) + : MessageFilter (), header(_header), value(_value) {} + bool match( const QueuedMessage& msg ) const + { + const qpid::framing::FieldTable* headers = msg.payload->getApplicationHeaders(); + if (!headers) return false; + FieldTable::ValuePtr h = headers->get(header); + if (!h || !h->convertsTo<std::string>()) return false; + return h->get<std::string>() == value; + } + private: + const std::string header; + const std::string value; + }; + const std::string HeaderMatchFilter::typeKey("header_match_str"); + const std::string HeaderMatchFilter::headerKey("header_key"); + const std::string HeaderMatchFilter::valueKey("header_value"); + + // factory to create correct filter based on map + MessageFilter* MessageFilter::create( const ::qpid::types::Variant::Map *filter ) + { + using namespace qpid::types; + if (filter && !filter->empty()) { + Variant::Map::const_iterator i = filter->find(MessageFilter::typeKey); + if (i != filter->end()) { + + if (i->second.asString() == HeaderMatchFilter::typeKey) { + Variant::Map::const_iterator p = filter->find(MessageFilter::paramsKey); + if (p != filter->end() && p->second.getType() == VAR_MAP) { + Variant::Map::const_iterator k = p->second.asMap().find(HeaderMatchFilter::headerKey); + Variant::Map::const_iterator v = p->second.asMap().find(HeaderMatchFilter::valueKey); + if (k != p->second.asMap().end() && v != p->second.asMap().end()) { + std::string headerKey(k->second.asString()); + std::string value(v->second.asString()); + QPID_LOG(debug, "Message filtering by header value configured. key: " << headerKey << " value: " << value ); + return new HeaderMatchFilter( headerKey, value ); + } + } + } + } + QPID_LOG(error, "Ignoring unrecognized message filter: '" << *filter << "'"); + } + return new MessageFilter(); + } + + // used by removeIf() to collect all messages matching a filter, maximum match count is + // optional. + struct Collector { + const uint32_t maxMatches; + MessageFilter& filter; + std::deque<QueuedMessage> matches; + Collector(MessageFilter& filter, uint32_t max) + : maxMatches(max), filter(filter) {} + bool operator() (QueuedMessage& qm) + { + if (maxMatches == 0 || matches.size() < maxMatches) { + if (filter.match( qm )) { + matches.push_back(qm); + return true; + } + } + return false; + } + }; + +} // end namespace + + /** * purge - for purging all or some messages on a queue * depending on the purge_request @@ -459,62 +615,77 @@ void Queue::purgeExpired(qpid::sys::Duration lapse) * The dest exchange may be supplied to re-route messages through the exchange. * It is safe to re-route messages such that they arrive back on the same queue, * even if the queue is ordered by priority. + * + * An optional filter can be supplied that will be applied against each message. The + * message is purged only if the filter matches. See MessageDistributor for more detail. */ -uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> dest) +uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> dest, + const qpid::types::Variant::Map *filter) { - Mutex::ScopedLock locker(messageLock); - uint32_t purge_count = purge_request; // only comes into play if >0 - std::deque<DeliverableMessage> rerouteQueue; + std::auto_ptr<MessageFilter> mf(MessageFilter::create(filter)); + Collector c(*mf.get(), purge_request); - uint32_t count = 0; - // Either purge them all or just the some (purge_count) while the queue isn't empty. - while((!purge_request || purge_count--) && !messages->empty()) { + Mutex::ScopedLock locker(messageLock); + messages->removeIf( boost::bind<bool>(boost::ref(c), _1) ); + for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin(); + qmsg != c.matches.end(); ++qmsg) { + // Update observers and message state: + observeAcquire(*qmsg, locker); + dequeue(0, *qmsg); + // now reroute if necessary if (dest.get()) { - // - // If there is a destination exchange, stage the messages onto a reroute queue - // so they don't wind up getting purged more than once. - // - DeliverableMessage msg(messages->front().payload); - rerouteQueue.push_back(msg); + assert(qmsg->payload); + DeliverableMessage dmsg(qmsg->payload); + dest->routeWithAlternate(dmsg); } - popAndDequeue(); - count++; - } - - // - // Re-route purged messages into the destination exchange. Note that there's no need - // to test dest.get() here because if it is NULL, the rerouteQueue will be empty. - // - while (!rerouteQueue.empty()) { - DeliverableMessage msg(rerouteQueue.front()); - rerouteQueue.pop_front(); - dest->routeWithAlternate(msg); } - - return count; + return c.matches.size(); } -uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty) { +uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty, + const qpid::types::Variant::Map *filter) +{ + std::auto_ptr<MessageFilter> mf(MessageFilter::create(filter)); + Collector c(*mf.get(), qty); + Mutex::ScopedLock locker(messageLock); - uint32_t move_count = qty; // only comes into play if qty >0 - uint32_t count = 0; // count how many were moved for returning + messages->removeIf( boost::bind<bool>(boost::ref(c), _1) ); - while((!qty || move_count--) && !messages->empty()) { - QueuedMessage qmsg = messages->front(); - boost::intrusive_ptr<Message> msg = qmsg.payload; - destq->deliver(msg); // deliver message to the destination queue - pop(); - dequeue(0, qmsg); - count++; + for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin(); + qmsg != c.matches.end(); ++qmsg) { + // Update observers and message state: + observeAcquire(*qmsg, locker); + dequeue(0, *qmsg); + // and move to destination Queue. + assert(qmsg->payload); + destq->deliver(qmsg->payload); } - return count; + return c.matches.size(); } -void Queue::pop() +/** Acquire the front (oldest) message from the in-memory queue. + * assumes messageLock held by caller + */ +void Queue::pop(const Mutex::ScopedLock& locker) { assertClusterSafe(); - messages->pop(); - ++dequeueSincePurge; + QueuedMessage msg; + if (messages->pop(msg)) { + observeAcquire(msg, locker); + ++dequeueSincePurge; + } +} + +/** Acquire the message at the given position, return true and msg if acquire succeeds */ +bool Queue::acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg, + const Mutex::ScopedLock& locker) +{ + if (messages->remove(position, msg)) { + observeAcquire(msg, locker); + ++dequeueSincePurge; + return true; + } + return false; } void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ @@ -528,8 +699,10 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ if (insertSeqNo) msg->insertCustomProperty(seqNoKey, sequence); dequeueRequired = messages->push(qm, removed); + if (dequeueRequired) + observeAcquire(removed, locker); listeners.populate(copy); - enqueued(qm); + observeEnqueue(qm, locker); } copy.notify(); if (dequeueRequired) { @@ -663,7 +836,7 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) Mutex::ScopedLock locker(messageLock); if (!isEnqueued(msg)) return false; if (!ctxt) { - dequeued(msg); + observeDequeue(msg, locker); } } // This check prevents messages which have been forced persistent on one queue from dequeuing @@ -683,7 +856,7 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) void Queue::dequeueCommitted(const QueuedMessage& msg) { Mutex::ScopedLock locker(messageLock); - dequeued(msg); + observeDequeue(msg, locker); if (mgmtObject != 0) { mgmtObject->inc_msgTxnDequeues(); mgmtObject->inc_byteTxnDequeues(msg.payload->contentSize()); @@ -691,21 +864,23 @@ void Queue::dequeueCommitted(const QueuedMessage& msg) } /** - * Removes a message from the in-memory delivery queue as well - * dequeing it from the logical (and persistent if applicable) queue + * Removes the first (oldest) message from the in-memory delivery queue as well dequeing + * it from the logical (and persistent if applicable) queue */ -void Queue::popAndDequeue() +void Queue::popAndDequeue(const Mutex::ScopedLock& held) { - QueuedMessage msg = messages->front(); - pop(); - dequeue(0, msg); + if (!messages->empty()) { + QueuedMessage msg = messages->front(); + pop(held); + dequeue(0, msg); + } } /** * Updates policy and management when a message has been dequeued, * expects messageLock to be held */ -void Queue::dequeued(const QueuedMessage& msg) +void Queue::observeDequeue(const QueuedMessage& msg, const Mutex::ScopedLock&) { if (policy.get()) policy->dequeued(msg); mgntDeqStats(msg.payload); @@ -718,6 +893,33 @@ void Queue::dequeued(const QueuedMessage& msg) } } +/** updates queue observers when a message has become unavailable for transfer, + * expects messageLock to be held + */ +void Queue::observeAcquire(const QueuedMessage& msg, const Mutex::ScopedLock&) +{ + for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { + try{ + (*i)->acquired(msg); + } catch (const std::exception& e) { + QPID_LOG(warning, "Exception on notification of message removal for queue " << getName() << ": " << e.what()); + } + } +} + +/** updates queue observers when a message has become re-available for transfer, + * expects messageLock to be held + */ +void Queue::observeRequeue(const QueuedMessage& msg, const Mutex::ScopedLock&) +{ + for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { + try{ + (*i)->requeued(msg); + } catch (const std::exception& e) { + QPID_LOG(warning, "Exception on notification of message requeue for queue " << getName() << ": " << e.what()); + } + } +} void Queue::create(const FieldTable& _settings) { @@ -788,17 +990,28 @@ void Queue::configureImpl(const FieldTable& _settings) if (lvqKey.size()) { QPID_LOG(debug, "Configured queue " << getName() << " as Last Value Queue with key " << lvqKey); messages = std::auto_ptr<Messages>(new MessageMap(lvqKey)); + allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *messages )); } else if (_settings.get(qpidLastValueQueueNoBrowse)) { QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue with 'no-browse' on"); messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, true, broker); + allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *messages )); } else if (_settings.get(qpidLastValueQueue)) { QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue"); messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, false, broker); + allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *messages )); } else { std::auto_ptr<Messages> m = Fairshare::create(_settings); if (m.get()) { messages = m; + allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *messages )); QPID_LOG(debug, "Configured queue " << getName() << " as priority queue."); + } else { // default (FIFO) queue type + // override default message allocator if message groups configured. + boost::shared_ptr<MessageGroupManager> mgm(MessageGroupManager::create( getName(), *messages, _settings)); + if (mgm) { + allocator = mgm; + addObserver(mgm); + } } } @@ -835,7 +1048,7 @@ void Queue::destroyed() while(!messages->empty()){ DeliverableMessage msg(messages->front().payload); alternateExchange->routeWithAlternate(msg); - popAndDequeue(); + popAndDequeue(locker); } alternateExchange->decAlternateUsers(); } @@ -1057,7 +1270,7 @@ Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, str case _qmf::Queue::METHOD_PURGE : { _qmf::ArgsQueuePurge& purgeArgs = (_qmf::ArgsQueuePurge&) args; - purge(purgeArgs.i_request); + purge(purgeArgs.i_request, boost::shared_ptr<Exchange>(), &purgeArgs.i_filter); status = Manageable::STATUS_OK; } break; @@ -1078,7 +1291,7 @@ Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, str } } - purge(rerouteArgs.i_request, dest); + purge(rerouteArgs.i_request, dest, &rerouteArgs.i_filter); status = Manageable::STATUS_OK; } break; @@ -1087,6 +1300,14 @@ Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, str return status; } + +void Queue::query(qpid::types::Variant::Map& results) const +{ + Mutex::ScopedLock locker(messageLock); + /** @todo add any interesting queue state into results */ + if (allocator) allocator->query(results); +} + void Queue::setPosition(SequenceNumber n) { Mutex::ScopedLock locker(messageLock); sequence = n; @@ -1121,7 +1342,10 @@ void Queue::insertSequenceNumbers(const std::string& key) QPID_LOG(debug, "Inserting sequence numbers as " << key); } -void Queue::enqueued(const QueuedMessage& m) +/** updates queue observers and state when a message has become available for transfer, + * expects messageLock to be held + */ +void Queue::observeEnqueue(const QueuedMessage& m, const Mutex::ScopedLock&) { for (Observers::iterator i = observers.begin(); i != observers.end(); ++i) { try { @@ -1144,7 +1368,8 @@ void Queue::updateEnqueued(const QueuedMessage& m) if (policy.get()) { policy->recoverEnqueued(payload); } - enqueued(m); + Mutex::ScopedLock locker(messageLock); + observeEnqueue(m, locker); } else { QPID_LOG(warning, "Queue informed of enqueued message that has no payload"); } @@ -1168,6 +1393,7 @@ void Queue::checkNotDeleted() void Queue::addObserver(boost::shared_ptr<QueueObserver> observer) { + Mutex::ScopedLock locker(messageLock); observers.insert(observer); } diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 8435e75cab..59ae41e768 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -59,7 +59,7 @@ class MessageStore; class QueueEvents; class QueueRegistry; class TransactionContext; -class Exchange; +class MessageDistributor; /** * The brokers representation of an amqp queue. Messages are @@ -129,23 +129,32 @@ class Queue : public boost::enable_shared_from_this<Queue>, UsageBarrier barrier; int autoDeleteTimeout; boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask; + boost::shared_ptr<MessageDistributor> allocator; void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false); void setPolicy(std::auto_ptr<QueuePolicy> policy); - bool seek(QueuedMessage& msg, Consumer::shared_ptr position); - bool getNextMessage(QueuedMessage& msg, Consumer::shared_ptr c); - ConsumeCode consumeNextMessage(QueuedMessage& msg, Consumer::shared_ptr c); - bool browseNextMessage(QueuedMessage& msg, Consumer::shared_ptr c); + bool getNextMessage(QueuedMessage& msg, Consumer::shared_ptr& c); + ConsumeCode consumeNextMessage(QueuedMessage& msg, Consumer::shared_ptr& c); + bool browseNextMessage(QueuedMessage& msg, Consumer::shared_ptr& c); void notifyListener(); void removeListener(Consumer::shared_ptr); bool isExcluded(boost::intrusive_ptr<Message>& msg); - void enqueued(const QueuedMessage& msg); - void dequeued(const QueuedMessage& msg); - void pop(); - void popAndDequeue(); + /** update queue observers, stats, policy, etc when the messages' state changes. Lock + * must be held by caller */ + void observeEnqueue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock); + void observeAcquire(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock); + void observeRequeue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock); + void observeDequeue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock); + + /** modify the Queue's message container - assumes messageLock held */ + void pop(const sys::Mutex::ScopedLock& held); // acquire front msg + void popAndDequeue(const sys::Mutex::ScopedLock& held); // acquire and dequeue front msg + // acquire message @ position, return true and set msg if acquire succeeds + bool acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg, + const sys::Mutex::ScopedLock& held); void forcePersistent(QueuedMessage& msg); int getEventMode(); @@ -191,8 +200,15 @@ class Queue : public boost::enable_shared_from_this<Queue>, Broker* broker = 0); QPID_BROKER_EXTERN ~Queue(); + /** allow the Consumer to consume or browse the next available message */ QPID_BROKER_EXTERN bool dispatch(Consumer::shared_ptr); + /** allow the Consumer to acquire a message that it has browsed. + * @param msg - message to be acquired. + * @return false if message is no longer available for acquire. + */ + QPID_BROKER_EXTERN bool acquire(const QueuedMessage& msg, const std::string& consumer); + /** * Used to configure a new queue and create a persistent record * for it in store if required. @@ -216,7 +232,11 @@ class Queue : public boost::enable_shared_from_this<Queue>, bool bind(boost::shared_ptr<Exchange> exchange, const std::string& key, const qpid::framing::FieldTable& arguments=qpid::framing::FieldTable()); - QPID_BROKER_EXTERN bool acquire(const QueuedMessage& msg); + /** Acquire the message at the given position if it is available for acquire. Not to + * be used by clients, but used by the broker for queue management. + * @param message - set to the acquired message if true returned. + * @return true if the message has been acquired. + */ QPID_BROKER_EXTERN bool acquireMessageAt(const qpid::framing::SequenceNumber& position, QueuedMessage& message); /** @@ -245,11 +265,14 @@ class Queue : public boost::enable_shared_from_this<Queue>, bool exclusive = false); QPID_BROKER_EXTERN void cancel(Consumer::shared_ptr c); - uint32_t purge(const uint32_t purge_request=0, boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>()); //defaults to all messages + uint32_t purge(const uint32_t purge_request=0, //defaults to all messages + boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>(), + const ::qpid::types::Variant::Map *filter=0); QPID_BROKER_EXTERN void purgeExpired(sys::Duration); //move qty # of messages to destination Queue destq - uint32_t move(const Queue::shared_ptr destq, uint32_t qty); + uint32_t move(const Queue::shared_ptr destq, uint32_t qty, + const qpid::types::Variant::Map *filter=0); QPID_BROKER_EXTERN uint32_t getMessageCount() const; QPID_BROKER_EXTERN uint32_t getEnqueueCompleteMessageCount() const; @@ -302,12 +325,12 @@ class Queue : public boost::enable_shared_from_this<Queue>, bool isEnqueued(const QueuedMessage& msg); /** - * Gets the next available message + * Acquires the next available (oldest) message */ QPID_BROKER_EXTERN QueuedMessage get(); - /** Get the message at position pos */ - QPID_BROKER_EXTERN QueuedMessage find(framing::SequenceNumber pos) const; + /** Get the message at position pos, returns true if found and sets msg */ + QPID_BROKER_EXTERN bool find(framing::SequenceNumber pos, QueuedMessage& msg ) const; const QueuePolicy* getPolicy(); @@ -336,6 +359,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, management::ManagementObject* GetManagementObject (void) const; management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text); + void query(::qpid::types::Variant::Map&) const; /** Apply f to each Message on the queue. */ template <class F> void eachMessage(F f) { diff --git a/cpp/src/qpid/broker/QueueEvents.cpp b/cpp/src/qpid/broker/QueueEvents.cpp index 2c540ff1ad..c66bdabf0f 100644 --- a/cpp/src/qpid/broker/QueueEvents.cpp +++ b/cpp/src/qpid/broker/QueueEvents.cpp @@ -129,6 +129,10 @@ class EventGenerator : public QueueObserver { if (!enqueueOnly) manager.dequeued(m); } + + void acquired(const QueuedMessage&) {}; + void requeued(const QueuedMessage&) {}; + private: QueueEvents& manager; const bool enqueueOnly; diff --git a/cpp/src/qpid/broker/QueueFlowLimit.cpp b/cpp/src/qpid/broker/QueueFlowLimit.cpp index fcf8d089f9..f15bb45c01 100644 --- a/cpp/src/qpid/broker/QueueFlowLimit.cpp +++ b/cpp/src/qpid/broker/QueueFlowLimit.cpp @@ -377,7 +377,8 @@ void QueueFlowLimit::setState(const qpid::framing::FieldTable& state) ++i; fcmsg.add(first, last); for (SequenceNumber seq = first; seq <= last; ++seq) { - QueuedMessage msg(queue->find(seq)); // fyi: msg.payload may be null if msg is delivered & unacked + QueuedMessage msg; + queue->find(seq, msg); // fyi: may not be found if msg is acquired & unacked bool unique; unique = index.insert(std::pair<framing::SequenceNumber, boost::intrusive_ptr<Message> >(seq, msg.payload)).second; // Like this to avoid tripping up unused variable warning when NDEBUG set diff --git a/cpp/src/qpid/broker/QueueFlowLimit.h b/cpp/src/qpid/broker/QueueFlowLimit.h index c02e479976..ad8a2720ef 100644 --- a/cpp/src/qpid/broker/QueueFlowLimit.h +++ b/cpp/src/qpid/broker/QueueFlowLimit.h @@ -84,6 +84,9 @@ class Broker; QPID_BROKER_EXTERN void enqueued(const QueuedMessage&); /** the queue has removed QueuedMessage. Returns true if flow state changes */ QPID_BROKER_EXTERN void dequeued(const QueuedMessage&); + /** ignored */ + QPID_BROKER_EXTERN void acquired(const QueuedMessage&) {}; + QPID_BROKER_EXTERN void requeued(const QueuedMessage&) {}; /** for clustering: */ QPID_BROKER_EXTERN void getState(qpid::framing::FieldTable&) const; diff --git a/cpp/src/qpid/broker/QueueObserver.h b/cpp/src/qpid/broker/QueueObserver.h index 3ca01c051e..b58becd2ae 100644 --- a/cpp/src/qpid/broker/QueueObserver.h +++ b/cpp/src/qpid/broker/QueueObserver.h @@ -25,17 +25,51 @@ namespace qpid { namespace broker { struct QueuedMessage; +class Consumer; + /** - * Interface for notifying classes who want to act as 'observers' of a - * queue of particular events. + * Interface for notifying classes who want to act as 'observers' of a queue of particular + * events. + * + * The events that are monitored reflect the relationship between a particular message and + * the queue it has been delivered to. A message can be considered in one of three states + * with respect to the queue: + * + * 1) "Available" - available for transfer to consumers (i.e. for browse or acquire), + * + * 2) "Acquired" - owned by a particular consumer, no longer available to other consumers + * (by either browse or acquire), but still considered on the queue. + * + * 3) "Dequeued" - removed from the queue and no longer available to any consumer. + * + * The queue events that are observable are: + * + * "Enqueued" - the message is "Available" - on the queue for transfer to any consumer + * (e.g. browse or acquire) + * + * "Acquired" - - a consumer has claimed exclusive access to it. It is no longer available + * for other consumers to browse or acquire, but it is not yet considered dequeued as it + * may be requeued by the consumer. + * + * "Requeued" - a previously-acquired message is released by its owner: it is put back on + * the queue at its original position and returns to the "Available" state. + * + * "Dequeued" - a message is no longer queued. At this point, the queue no longer tracks + * the message, and the broker considers the consumer's transaction complete. */ class QueueObserver { public: virtual ~QueueObserver() {} + + // note: the Queue will hold the messageLock while calling these methods! virtual void enqueued(const QueuedMessage&) = 0; virtual void dequeued(const QueuedMessage&) = 0; - private: + virtual void acquired(const QueuedMessage&) = 0; + virtual void requeued(const QueuedMessage&) = 0; + virtual void consumerAdded( const Consumer& ) {}; + virtual void consumerRemoved( const Consumer& ) {}; + private: }; }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/QueuePolicy.cpp b/cpp/src/qpid/broker/QueuePolicy.cpp index 6ae0d53b1a..0c245700af 100644 --- a/cpp/src/qpid/broker/QueuePolicy.cpp +++ b/cpp/src/qpid/broker/QueuePolicy.cpp @@ -269,8 +269,7 @@ bool RingQueuePolicy::checkLimit(boost::intrusive_ptr<Message> m) do { QueuedMessage oldest = queue.front(); - - if (oldest.queue->acquire(oldest) || !strict) { + if (oldest.queue->acquireMessageAt(oldest.position, oldest) || !strict) { queue.pop_front(); pendingDequeues.push_back(oldest); QPID_LOG(debug, "Ring policy triggered in " << name diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index b4f146e699..94d0cc87f7 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -107,11 +107,18 @@ bool SemanticState::exists(const string& consumerTag){ return consumers.find(consumerTag) != consumers.end(); } +namespace { + const std::string SEPARATOR("::"); +} + void SemanticState::consume(const string& tag, Queue::shared_ptr queue, bool ackRequired, bool acquire, bool exclusive, const string& resumeId, uint64_t resumeTtl, const FieldTable& arguments) { - ConsumerImpl::shared_ptr c(new ConsumerImpl(this, tag, queue, ackRequired, acquire, exclusive, resumeId, resumeTtl, arguments)); + // "tag" is only guaranteed to be unique to this session (see AMQP 0-10 Message.subscribe, destination). + // Create a globally unique name so the broker can identify individual consumers + std::string name = session.getSessionId().str() + SEPARATOR + tag; + ConsumerImpl::shared_ptr c(new ConsumerImpl(this, name, queue, ackRequired, acquire, exclusive, tag, resumeId, resumeTtl, arguments)); queue->consume(c, exclusive);//may throw exception consumers[tag] = c; } @@ -267,15 +274,15 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, bool ack, bool _acquire, bool _exclusive, + const string& _tag, const string& _resumeId, uint64_t _resumeTtl, const framing::FieldTable& _arguments ) : - Consumer(_acquire), + Consumer(_name, _acquire), parent(_parent), - name(_name), queue(_queue), ackExpected(ack), acquire(_acquire), @@ -284,6 +291,7 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, windowActive(false), exclusive(_exclusive), resumeId(_resumeId), + tag(_tag), resumeTtl(_resumeTtl), arguments(_arguments), msgCredit(0), @@ -300,7 +308,7 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, if (agent != 0) { - mgmtObject = new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId() ,name, + mgmtObject = new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId(), getTag(), !acquire, ackExpected, exclusive, ManagementAgent::toMap(arguments)); agent->addObject (mgmtObject); mgmtObject->set_creditMode("WINDOW"); @@ -332,16 +340,15 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg) { assertClusterSafe(); allocateCredit(msg.payload); - DeliveryRecord record(msg, queue, name, acquire, !ackExpected, windowing); + DeliveryRecord record(msg, queue, getTag(), acquire, !ackExpected, windowing); bool sync = syncFrequency && ++deliveryCount >= syncFrequency; if (sync) deliveryCount = 0;//reset parent->deliver(record, sync); - if (!ackExpected && acquire) record.setEnded();//allows message to be released now its been delivered if (windowing || ackExpected || !acquire) { parent->record(record); } - if (acquire && !ackExpected) { - queue->dequeue(0, msg); + if (acquire && !ackExpected) { // auto acquire && auto accept + record.accept( 0 /*no ctxt*/ ); } if (mgmtObject) { mgmtObject->inc_delivered(); } return true; @@ -371,7 +378,7 @@ struct ConsumerName { }; ostream& operator<<(ostream& o, const ConsumerName& pc) { - return o << pc.consumer.getName() << " on " + return o << pc.consumer.getTag() << " on " << pc.consumer.getParent().getSession().getSessionId(); } } @@ -561,50 +568,61 @@ void SemanticState::deliver(DeliveryRecord& msg, bool sync) return deliveryAdapter.deliver(msg, sync); } -SemanticState::ConsumerImpl& SemanticState::find(const std::string& destination) +const SemanticState::ConsumerImpl::shared_ptr SemanticState::find(const std::string& destination) const { - ConsumerImplMap::iterator i = consumers.find(destination); - if (i == consumers.end()) { - throw NotFoundException(QPID_MSG("Unknown destination " << destination)); + ConsumerImpl::shared_ptr consumer; + if (!find(destination, consumer)) { + throw NotFoundException(QPID_MSG("Unknown destination " << destination << " session=" << session.getSessionId())); } else { - return *(i->second); + return consumer; + } +} + +bool SemanticState::find(const std::string& destination, ConsumerImpl::shared_ptr& consumer) const +{ + // @todo KAG gsim: shouldn't the consumers map be locked???? + ConsumerImplMap::const_iterator i = consumers.find(destination); + if (i == consumers.end()) { + return false; } + consumer = i->second; + return true; } void SemanticState::setWindowMode(const std::string& destination) { - find(destination).setWindowMode(); + find(destination)->setWindowMode(); } void SemanticState::setCreditMode(const std::string& destination) { - find(destination).setCreditMode(); + find(destination)->setCreditMode(); } void SemanticState::addByteCredit(const std::string& destination, uint32_t value) { - ConsumerImpl& c = find(destination); - c.addByteCredit(value); - c.requestDispatch(); + ConsumerImpl::shared_ptr c = find(destination); + c->addByteCredit(value); + c->requestDispatch(); } void SemanticState::addMessageCredit(const std::string& destination, uint32_t value) { - ConsumerImpl& c = find(destination); - c.addMessageCredit(value); - c.requestDispatch(); + ConsumerImpl::shared_ptr c = find(destination); + c->addMessageCredit(value); + c->requestDispatch(); } void SemanticState::flush(const std::string& destination) { - find(destination).flush(); + find(destination)->flush(); } void SemanticState::stop(const std::string& destination) { - find(destination).stop(); + find(destination)->stop(); } void SemanticState::ConsumerImpl::setWindowMode() diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h index 69d980947b..12ccc75f11 100644 --- a/cpp/src/qpid/broker/SemanticState.h +++ b/cpp/src/qpid/broker/SemanticState.h @@ -75,7 +75,6 @@ class SemanticState : private boost::noncopyable { { mutable qpid::sys::Mutex lock; SemanticState* const parent; - const std::string name; const boost::shared_ptr<Queue> queue; const bool ackExpected; const bool acquire; @@ -84,6 +83,7 @@ class SemanticState : private boost::noncopyable { bool windowActive; bool exclusive; std::string resumeId; + const std::string tag; // <destination> from AMQP 0-10 Message.subscribe command uint64_t resumeTtl; framing::FieldTable arguments; uint32_t msgCredit; @@ -103,7 +103,8 @@ class SemanticState : private boost::noncopyable { ConsumerImpl(SemanticState* parent, const std::string& name, boost::shared_ptr<Queue> queue, bool ack, bool acquire, bool exclusive, - const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments); + const std::string& tag, const std::string& resumeId, + uint64_t resumeTtl, const framing::FieldTable& arguments); ~ConsumerImpl(); OwnershipToken* getSession(); bool deliver(QueuedMessage& msg); @@ -130,8 +131,6 @@ class SemanticState : private boost::noncopyable { bool doOutput(); - std::string getName() const { return name; } - bool isAckExpected() const { return ackExpected; } bool isAcquire() const { return acquire; } bool isWindowing() const { return windowing; } @@ -139,6 +138,7 @@ class SemanticState : private boost::noncopyable { uint32_t getMsgCredit() const { return msgCredit; } uint32_t getByteCredit() const { return byteCredit; } std::string getResumeId() const { return resumeId; }; + const std::string& getTag() const { return tag; } uint64_t getResumeTtl() const { return resumeTtl; } const framing::FieldTable& getArguments() const { return arguments; } @@ -190,7 +190,8 @@ class SemanticState : private boost::noncopyable { SessionContext& getSession() { return session; } const SessionContext& getSession() const { return session; } - ConsumerImpl& find(const std::string& destination); + const ConsumerImpl::shared_ptr find(const std::string& destination) const; + bool find(const std::string& destination, ConsumerImpl::shared_ptr&) const; /** * Get named queue, never returns 0. diff --git a/cpp/src/qpid/broker/ThresholdAlerts.h b/cpp/src/qpid/broker/ThresholdAlerts.h index c77722e700..2b4a46b736 100644 --- a/cpp/src/qpid/broker/ThresholdAlerts.h +++ b/cpp/src/qpid/broker/ThresholdAlerts.h @@ -50,6 +50,9 @@ class ThresholdAlerts : public QueueObserver const long repeatInterval); void enqueued(const QueuedMessage&); void dequeued(const QueuedMessage&); + void acquired(const QueuedMessage&) {}; + void requeued(const QueuedMessage&) {}; + static void observe(Queue& queue, qpid::management::ManagementAgent& agent, const uint64_t countThreshold, const uint64_t sizeThreshold, diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 015301573e..394749aad2 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -406,11 +406,11 @@ void Connection::shadowSetUser(const std::string& userId) { void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled, const SequenceNumber& position) { - broker::SemanticState::ConsumerImpl& c = semanticState().find(name); - c.position = position; - c.setBlocked(blocked); - if (notifyEnabled) c.enableNotify(); else c.disableNotify(); - updateIn.consumerNumbering.add(c.shared_from_this()); + broker::SemanticState::ConsumerImpl::shared_ptr c = semanticState().find(name); + c->position = position; + c->setBlocked(blocked); + if (notifyEnabled) c->enableNotify(); else c->disableNotify(); + updateIn.consumerNumbering.add(c); } @@ -444,7 +444,7 @@ void Connection::outputTask(uint16_t channel, const std::string& name) { if (!session) throw Exception(QPID_MSG(cluster << " channel not attached " << *this << "[" << channel << "] ")); - OutputTask* task = &session->getSemanticState().find(name); + OutputTask* task = session->getSemanticState().find(name).get(); connection->getOutputTasks().addOutputTask(task); } @@ -547,7 +547,7 @@ void Connection::deliveryRecord(const string& qname, m.position = position; if (enqueued) queue->updateEnqueued(m); //inform queue of the message } else { // Message at original position in original queue - m = queue->find(position); + queue->find(position, m); } // FIXME aconway 2011-08-19: removed: // if (!m.payload) diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index 2be1bf1f77..2446c12f2b 100644 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -421,8 +421,8 @@ void UpdateClient::updateOutputTask(const sys::OutputTask* task) { boost::polymorphic_downcast<const SemanticState::ConsumerImpl*> (task); SemanticState::ConsumerImpl* ci = const_cast<SemanticState::ConsumerImpl*>(cci); uint16_t channel = ci->getParent().getSession().getChannel(); - ClusterConnectionProxy(shadowConnection).outputTask(channel, ci->getName()); - QPID_LOG(debug, *this << " updating output task " << ci->getName() + ClusterConnectionProxy(shadowConnection).outputTask(channel, ci->getTag()); + QPID_LOG(debug, *this << " updating output task " << ci->getTag() << " channel=" << channel); } @@ -521,13 +521,13 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) { void UpdateClient::updateConsumer( const broker::SemanticState::ConsumerImpl::shared_ptr& ci) { - QPID_LOG(debug, *this << " updating consumer " << ci->getName() << " on " + QPID_LOG(debug, *this << " updating consumer " << ci->getTag() << " on " << shadowSession.getId()); using namespace message; shadowSession.messageSubscribe( arg::queue = ci->getQueue()->getName(), - arg::destination = ci->getName(), + arg::destination = ci->getTag(), arg::acceptMode = ci->isAckExpected() ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE, arg::acquireMode = ci->isAcquire() ? ACQUIRE_MODE_PRE_ACQUIRED : ACQUIRE_MODE_NOT_ACQUIRED, arg::exclusive = ci->isExclusive(), @@ -535,18 +535,18 @@ void UpdateClient::updateConsumer( arg::resumeTtl = ci->getResumeTtl(), arg::arguments = ci->getArguments() ); - shadowSession.messageSetFlowMode(ci->getName(), ci->isWindowing() ? FLOW_MODE_WINDOW : FLOW_MODE_CREDIT); - shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_MESSAGE, ci->getMsgCredit()); - shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_BYTE, ci->getByteCredit()); + shadowSession.messageSetFlowMode(ci->getTag(), ci->isWindowing() ? FLOW_MODE_WINDOW : FLOW_MODE_CREDIT); + shadowSession.messageFlow(ci->getTag(), CREDIT_UNIT_MESSAGE, ci->getMsgCredit()); + shadowSession.messageFlow(ci->getTag(), CREDIT_UNIT_BYTE, ci->getByteCredit()); ClusterConnectionProxy(shadowSession).consumerState( - ci->getName(), + ci->getTag(), ci->isBlocked(), ci->isNotifyEnabled(), ci->position ); consumerNumbering.add(ci.get()); - QPID_LOG(debug, *this << " updated consumer " << ci->getName() + QPID_LOG(debug, *this << " updated consumer " << ci->getTag() << " on " << shadowSession.getId()); } diff --git a/cpp/src/tests/CMakeLists.txt b/cpp/src/tests/CMakeLists.txt index cc33478114..7d781e5eb3 100644 --- a/cpp/src/tests/CMakeLists.txt +++ b/cpp/src/tests/CMakeLists.txt @@ -272,6 +272,11 @@ add_executable (datagen datagen.cpp ${platform_test_additions}) target_link_libraries (datagen qpidclient) remember_location(datagen) +add_executable (msg_group_test msg_group_test.cpp ${platform_test_additions}) +target_link_libraries (msg_group_test qpidmessaging) +remember_location(msg_group_test) + + # qpid-perftest and qpid-latency-test are generally useful so install them install (TARGETS qpid-perftest qpid-latency-test RUNTIME DESTINATION ${QPID_INSTALL_BINDIR}) diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index cd569e901c..78ac6db5f1 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -291,13 +291,19 @@ qpid_stream_INCLUDES=$(PUBLIC_INCLUDES) qpid_stream_SOURCES=qpid-stream.cpp qpid_stream_LDADD=$(lib_messaging) +check_PROGRAMS+=msg_group_test +msg_group_test_INCLUDES=$(PUBLIC_INCLUDES) +msg_group_test_SOURCES=msg_group_test.cpp +msg_group_test_LDADD=$(lib_messaging) + TESTS_ENVIRONMENT = \ VALGRIND=$(VALGRIND) \ LIBTOOL="$(LIBTOOL)" \ QPID_DATA_DIR= \ $(srcdir)/run_test -system_tests = qpid-client-test quick_perftest quick_topictest run_header_test quick_txtest +system_tests = qpid-client-test quick_perftest quick_topictest run_header_test quick_txtest \ + run_msg_group_tests TESTS += start_broker $(system_tests) python_tests stop_broker run_federation_tests run_federation_sys_tests \ run_acl_tests run_cli_tests replication_test dynamic_log_level_test \ run_queue_flow_limit_tests ipv6_test @@ -342,7 +348,8 @@ EXTRA_DIST += \ start_broker.ps1 \ stop_broker.ps1 \ topictest.ps1 \ - run_queue_flow_limit_tests + run_queue_flow_limit_tests \ + run_msg_group_tests check_LTLIBRARIES += libdlclose_noop.la libdlclose_noop_la_LDFLAGS = -module -rpath $(abs_builddir) @@ -353,7 +360,10 @@ CLEANFILES+=valgrind.out *.log *.vglog* dummy_test qpidd.port $(unit_wrappers) # Longer running stability tests, not run by default check: target. # Not run under valgrind, too slow -LONG_TESTS+=start_broker fanout_perftest shared_perftest multiq_perftest topic_perftest run_ring_queue_test stop_broker \ +LONG_TESTS+=start_broker \ + fanout_perftest shared_perftest multiq_perftest topic_perftest run_ring_queue_test \ + run_msg_groups_tests_soak \ + stop_broker \ run_long_federation_sys_tests \ run_failover_soak reliable_replication_test \ federated_cluster_test_with_node_failure @@ -366,7 +376,8 @@ EXTRA_DIST+= \ run_failover_soak \ reliable_replication_test \ federated_cluster_test_with_node_failure \ - sasl_test_setup.sh + sasl_test_setup.sh \ + run_msg_groups_tests_soak check-long: $(MAKE) check TESTS="$(LONG_TESTS)" VALGRIND= diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp index d94a5cab7f..7bf061ff54 100644 --- a/cpp/src/tests/QueueTest.cpp +++ b/cpp/src/tests/QueueTest.cpp @@ -56,12 +56,12 @@ class TestConsumer : public virtual Consumer{ public: typedef boost::shared_ptr<TestConsumer> shared_ptr; - intrusive_ptr<Message> last; + QueuedMessage last; bool received; - TestConsumer(bool acquire = true):Consumer(acquire), received(false) {}; + TestConsumer(std::string name="test", bool acquire = true):Consumer(name, acquire), received(false) {}; virtual bool deliver(QueuedMessage& msg){ - last = msg.payload; + last = msg; received = true; return true; }; @@ -149,16 +149,16 @@ QPID_AUTO_TEST_CASE(testConsumers){ queue->deliver(msg1); BOOST_CHECK(queue->dispatch(c1)); - BOOST_CHECK_EQUAL(msg1.get(), c1->last.get()); + BOOST_CHECK_EQUAL(msg1.get(), c1->last.payload.get()); queue->deliver(msg2); BOOST_CHECK(queue->dispatch(c2)); - BOOST_CHECK_EQUAL(msg2.get(), c2->last.get()); + BOOST_CHECK_EQUAL(msg2.get(), c2->last.payload.get()); c1->received = false; queue->deliver(msg3); BOOST_CHECK(queue->dispatch(c1)); - BOOST_CHECK_EQUAL(msg3.get(), c1->last.get()); + BOOST_CHECK_EQUAL(msg3.get(), c1->last.payload.get()); //Test cancellation: queue->cancel(c1); @@ -214,7 +214,7 @@ QPID_AUTO_TEST_CASE(testDequeue){ if (!consumer->received) sleep(2); - BOOST_CHECK_EQUAL(msg3.get(), consumer->last.get()); + BOOST_CHECK_EQUAL(msg3.get(), consumer->last.payload.get()); BOOST_CHECK_EQUAL(uint32_t(0), queue->getMessageCount()); received = queue->get().payload; @@ -298,14 +298,14 @@ QPID_AUTO_TEST_CASE(testSeek){ queue->deliver(msg2); queue->deliver(msg3); - TestConsumer::shared_ptr consumer(new TestConsumer(false)); + TestConsumer::shared_ptr consumer(new TestConsumer("test", false)); SequenceNumber seq(2); consumer->position = seq; QueuedMessage qm; queue->dispatch(consumer); - BOOST_CHECK_EQUAL(msg3.get(), consumer->last.get()); + BOOST_CHECK_EQUAL(msg3.get(), consumer->last.payload.get()); queue->dispatch(consumer); queue->dispatch(consumer); // make sure over-run is safe @@ -325,14 +325,18 @@ QPID_AUTO_TEST_CASE(testSearch){ queue->deliver(msg3); SequenceNumber seq(2); - QueuedMessage qm = queue->find(seq); + QueuedMessage qm; + TestConsumer::shared_ptr c1(new TestConsumer()); + + BOOST_CHECK(queue->find(seq, qm)); BOOST_CHECK_EQUAL(seq.getValue(), qm.position.getValue()); - queue->acquire(qm); + queue->acquire(qm, c1->getName()); BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u); SequenceNumber seq1(3); - QueuedMessage qm1 = queue->find(seq1); + QueuedMessage qm1; + BOOST_CHECK(queue->find(seq1, qm1)); BOOST_CHECK_EQUAL(seq1.getValue(), qm1.position.getValue()); } @@ -552,12 +556,13 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){ QueuedMessage qmsg2(queue.get(), msg2, ++sequence); framing::SequenceNumber sequence1(10); QueuedMessage qmsg3(queue.get(), 0, sequence1); + TestConsumer::shared_ptr dummy(new TestConsumer()); - BOOST_CHECK(!queue->acquire(qmsg)); - BOOST_CHECK(queue->acquire(qmsg2)); + BOOST_CHECK(!queue->acquire(qmsg, dummy->getName())); + BOOST_CHECK(queue->acquire(qmsg2, dummy->getName())); // Acquire the massage again to test failure case. - BOOST_CHECK(!queue->acquire(qmsg2)); - BOOST_CHECK(!queue->acquire(qmsg3)); + BOOST_CHECK(!queue->acquire(qmsg2, dummy->getName())); + BOOST_CHECK(!queue->acquire(qmsg3, dummy->getName())); BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u); @@ -567,7 +572,7 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){ // set mode to no browse and check args.setOrdering(client::LVQ_NO_BROWSE); queue->configure(args); - TestConsumer::shared_ptr c1(new TestConsumer(false)); + TestConsumer::shared_ptr c1(new TestConsumer("test", false)); queue->dispatch(c1); queue->dispatch(c1); @@ -696,6 +701,280 @@ QPID_AUTO_TEST_CASE(testQueueCleaner) { BOOST_CHECK_EQUAL(queue->getMessageCount(), 0u); } + +namespace { + // helper for group tests + void verifyAcquire( Queue::shared_ptr queue, + TestConsumer::shared_ptr c, + std::deque<QueuedMessage>& results, + const std::string& expectedGroup, + const int expectedId ) + { + queue->dispatch(c); + results.push_back(c->last); + std::string group = c->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsString("GROUP-ID"); + int id = c->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID"); + BOOST_CHECK_EQUAL( group, expectedGroup ); + BOOST_CHECK_EQUAL( id, expectedId ); + } +} + +QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) { + // + // Verify that consumers of grouped messages own the groups once a message is acquired, + // and release the groups once all acquired messages have been dequeued or requeued + // + FieldTable args; + Queue::shared_ptr queue(new Queue("my_queue", true)); + args.setString("qpid.group_header_key", "GROUP-ID"); + args.setInt("qpid.shared_msg_group", 1); + queue->configure(args); + + std::string groups[] = { std::string("a"), std::string("a"), std::string("a"), + std::string("b"), std::string("b"), std::string("b"), + std::string("c"), std::string("c"), std::string("c") }; + for (int i = 0; i < 9; ++i) { + intrusive_ptr<Message> msg = create_message("e", "A"); + msg->insertCustomProperty("GROUP-ID", groups[i]); + msg->insertCustomProperty("MY-ID", i); + queue->deliver(msg); + } + + // Queue = a-0, a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8... + // Owners= ---, ---, ---, ---, ---, ---, ---, ---, ---, + + BOOST_CHECK_EQUAL(uint32_t(9), queue->getMessageCount()); + + TestConsumer::shared_ptr c1(new TestConsumer("C1")); + TestConsumer::shared_ptr c2(new TestConsumer("C2")); + + queue->consume(c1); + queue->consume(c2); + + std::deque<QueuedMessage> dequeMeC1; + std::deque<QueuedMessage> dequeMeC2; + + + verifyAcquire(queue, c1, dequeMeC1, "a", 0 ); // c1 now owns group "a" (acquire a-0) + verifyAcquire(queue, c2, dequeMeC2, "b", 3 ); // c2 should now own group "b" (acquire b-3) + + // now let c1 complete the 'a-0' message - this should free the 'a' group + queue->dequeue( 0, dequeMeC1.front() ); + dequeMeC1.pop_front(); + + // Queue = a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8... + // Owners= ---, ---, ^C2, ^C2, ^C2, ---, ---, --- + + // now c2 should pick up the next 'a-1', since it is oldest free + verifyAcquire(queue, c2, dequeMeC2, "a", 1 ); // c2 should now own groups "a" and "b" + + // Queue = a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8... + // Owners= ^C2, ^C2, ^C2, ^C2, ^C2, ---, ---, --- + + // c1 should only be able to snarf up the first "c" message now... + verifyAcquire(queue, c1, dequeMeC1, "c", 6 ); // should skip to the first "c" + + // Queue = a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8... + // Owners= ^C2, ^C2, ^C2, ^C2, ^C2, ^C1, ^C1, ^C1 + + // hmmm... what if c2 now dequeues "b-3"? (now only has a-1 acquired) + queue->dequeue( 0, dequeMeC2.front() ); + dequeMeC2.pop_front(); + + // Queue = a-1, a-2, b-4, b-5, c-6, c-7, c-8... + // Owners= ^C2, ^C2, ---, ---, ^C1, ^C1, ^C1 + + // b group is free, c is owned by c1 - c1's next get should grab 'b-4' + verifyAcquire(queue, c1, dequeMeC1, "b", 4 ); + + // Queue = a-1, a-2, b-4, b-5, c-6, c-7, c-8... + // Owners= ^C2, ^C2, ^C1, ^C1, ^C1, ^C1, ^C1 + + // c2 can now only grab a-2, and that's all + verifyAcquire(queue, c2, dequeMeC2, "a", 2 ); + + // now C2 can't get any more, since C1 owns "b" and "c" group... + bool gotOne = queue->dispatch(c2); + BOOST_CHECK( !gotOne ); + + // hmmm... what if c1 now dequeues "c-6"? (now only own's b-4) + queue->dequeue( 0, dequeMeC1.front() ); + dequeMeC1.pop_front(); + + // Queue = a-1, a-2, b-4, b-5, c-7, c-8... + // Owners= ^C2, ^C2, ^C1, ^C1, ---, --- + + // c2 can now grab c-7 + verifyAcquire(queue, c2, dequeMeC2, "c", 7 ); + + // Queue = a-1, a-2, b-4, b-5, c-7, c-8... + // Owners= ^C2, ^C2, ^C1, ^C1, ^C2, ^C2 + + // what happens if C-2 "requeues" a-1 and a-2? + queue->requeue( dequeMeC2.front() ); + dequeMeC2.pop_front(); + queue->requeue( dequeMeC2.front() ); + dequeMeC2.pop_front(); // now just has c-7 acquired + + // Queue = a-1, a-2, b-4, b-5, c-7, c-8... + // Owners= ---, ---, ^C1, ^C1, ^C2, ^C2 + + // now c1 will grab a-1 and a-2... + verifyAcquire(queue, c1, dequeMeC1, "a", 1 ); + verifyAcquire(queue, c1, dequeMeC1, "a", 2 ); + + // Queue = a-1, a-2, b-4, b-5, c-7, c-8... + // Owners= ^C1, ^C1, ^C1, ^C1, ^C2, ^C2 + + // c2 can now acquire c-8 only + verifyAcquire(queue, c2, dequeMeC2, "c", 8 ); + + // and c1 can get b-5 + verifyAcquire(queue, c1, dequeMeC1, "b", 5 ); + + // should be no more acquire-able for anyone now: + gotOne = queue->dispatch(c1); + BOOST_CHECK( !gotOne ); + gotOne = queue->dispatch(c2); + BOOST_CHECK( !gotOne ); + + // requeue all of C1's acquired messages, then cancel C1 + while (!dequeMeC1.empty()) { + queue->requeue(dequeMeC1.front()); + dequeMeC1.pop_front(); + } + queue->cancel(c1); + + // Queue = a-1, a-2, b-4, b-5, c-7, c-8... + // Owners= ---, ---, ---, ---, ^C2, ^C2 + + // b-4, a-1, a-2, b-5 all should be available, right? + verifyAcquire(queue, c2, dequeMeC2, "a", 1 ); + + while (!dequeMeC2.empty()) { + queue->dequeue(0, dequeMeC2.front()); + dequeMeC2.pop_front(); + } + + // Queue = a-2, b-4, b-5 + // Owners= ---, ---, --- + + TestConsumer::shared_ptr c3(new TestConsumer("C3")); + std::deque<QueuedMessage> dequeMeC3; + + verifyAcquire(queue, c3, dequeMeC3, "a", 2 ); + verifyAcquire(queue, c2, dequeMeC2, "b", 4 ); + + // Queue = a-2, b-4, b-5 + // Owners= ^C3, ^C2, ^C2 + + gotOne = queue->dispatch(c3); + BOOST_CHECK( !gotOne ); + + verifyAcquire(queue, c2, dequeMeC2, "b", 5 ); + + while (!dequeMeC2.empty()) { + queue->dequeue(0, dequeMeC2.front()); + dequeMeC2.pop_front(); + } + + // Queue = a-2, + // Owners= ^C3, + + intrusive_ptr<Message> msg = create_message("e", "A"); + msg->insertCustomProperty("GROUP-ID", "a"); + msg->insertCustomProperty("MY-ID", 9); + queue->deliver(msg); + + // Queue = a-2, a-9 + // Owners= ^C3, ^C3 + + gotOne = queue->dispatch(c2); + BOOST_CHECK( !gotOne ); + + msg = create_message("e", "A"); + msg->insertCustomProperty("GROUP-ID", "b"); + msg->insertCustomProperty("MY-ID", 10); + queue->deliver(msg); + + // Queue = a-2, a-9, b-10 + // Owners= ^C3, ^C3, ---- + + verifyAcquire(queue, c2, dequeMeC2, "b", 10 ); + verifyAcquire(queue, c3, dequeMeC3, "a", 9 ); + + gotOne = queue->dispatch(c3); + BOOST_CHECK( !gotOne ); + + queue->cancel(c2); + queue->cancel(c3); +} + + +QPID_AUTO_TEST_CASE(testGroupsMultiConsumerDefaults) { + // + // Verify that the same default group name is automatically applied to messages that + // do not specify a group name. + // + FieldTable args; + Queue::shared_ptr queue(new Queue("my_queue", true)); + args.setString("qpid.group_header_key", "GROUP-ID"); + args.setInt("qpid.shared_msg_group", 1); + queue->configure(args); + + for (int i = 0; i < 3; ++i) { + intrusive_ptr<Message> msg = create_message("e", "A"); + // no "GROUP-ID" header + msg->insertCustomProperty("MY-ID", i); + queue->deliver(msg); + } + + // Queue = 0, 1, 2 + + BOOST_CHECK_EQUAL(uint32_t(3), queue->getMessageCount()); + + TestConsumer::shared_ptr c1(new TestConsumer("C1")); + TestConsumer::shared_ptr c2(new TestConsumer("C2")); + + queue->consume(c1); + queue->consume(c2); + + std::deque<QueuedMessage> dequeMeC1; + std::deque<QueuedMessage> dequeMeC2; + + queue->dispatch(c1); // c1 now owns default group (acquired 0) + dequeMeC1.push_back(c1->last); + int id = c1->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID"); + BOOST_CHECK_EQUAL( id, 0 ); + + bool gotOne = queue->dispatch(c2); // c2 should get nothing + BOOST_CHECK( !gotOne ); + + queue->dispatch(c1); // c1 now acquires 1 + dequeMeC1.push_back(c1->last); + id = c1->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID"); + BOOST_CHECK_EQUAL( id, 1 ); + + gotOne = queue->dispatch(c2); // c2 should still get nothing + BOOST_CHECK( !gotOne ); + + while (!dequeMeC1.empty()) { + queue->dequeue(0, dequeMeC1.front()); + dequeMeC1.pop_front(); + } + + // now default group should be available... + queue->dispatch(c2); // c2 now owns default group (acquired 2) + id = c2->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID"); + BOOST_CHECK_EQUAL( id, 2 ); + + gotOne = queue->dispatch(c1); // c1 should get nothing + BOOST_CHECK( !gotOne ); + + queue->cancel(c1); + queue->cancel(c2); +} + QPID_AUTO_TEST_CASE(testMultiQueueLastNode){ TestMessageStoreOC testStore; diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py index 4e3339650b..d217f9fbde 100755 --- a/cpp/src/tests/cluster_tests.py +++ b/cpp/src/tests/cluster_tests.py @@ -1418,6 +1418,76 @@ class LongTests(BrokerTest): if receiver: receiver.connection.detach() logger.setLevel(log_level) + def test_msg_group_failover(self): + """Test fail-over during continuous send-receive of grouped messages. + """ + + class GroupedTrafficGenerator(Thread): + def __init__(self, url, queue, group_key): + Thread.__init__(self) + self.url = url + self.queue = queue + self.group_key = group_key + self.status = -1 + + def run(self): + # generate traffic for approx 10 seconds (2011msgs / 200 per-sec) + cmd = ["msg_group_test", + "--broker=%s" % self.url, + "--address=%s" % self.queue, + "--connection-options={%s}" % (Cluster.CONNECTION_OPTIONS), + "--group-key=%s" % self.group_key, + "--receivers=2", + "--senders=3", + "--messages=2011", + "--send-rate=200", + "--capacity=11", + "--ack-frequency=23", + "--allow-duplicates", + "--group-size=37", + "--randomize-group-size", + "--interleave=13"] + # "--trace"] + self.generator = Popen( cmd ); + self.status = self.generator.wait() + return self.status + + def results(self): + self.join(timeout=30) # 3x assumed duration + if self.isAlive(): return -1 + return self.status + + # Original cluster will all be killed so expect exit with failure + cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL, args=["-t"]) + for b in cluster: b.ready() # Wait for brokers to be ready + + # create a queue with rather draconian flow control settings + ssn0 = cluster[0].connect().session() + q_args = "{'qpid.group_header_key':'group-id', 'qpid.shared_msg_group':1}" + s0 = ssn0.sender("test-group-q; {create:always, node:{type:queue, x-declare:{arguments:%s}}}" % q_args) + + # Kill original brokers, start new ones for the duration. + endtime = time.time() + self.duration(); + i = 0 + while time.time() < endtime: + traffic = GroupedTrafficGenerator( cluster[i].host_port(), + "test-group-q", "group-id" ) + traffic.start() + time.sleep(1) + + for x in range(2): + for b in cluster[i:]: b.ready() # Check if any broker crashed. + cluster[i].kill() + i += 1 + b = cluster.start(expect=EXPECT_EXIT_FAIL) + time.sleep(1) + + # wait for traffic to finish, verify success + self.assertEqual(0, traffic.results()) + + for i in range(i, len(cluster)): cluster[i].kill() + + class StoreTests(BrokerTest): """ Cluster tests that can only be run if there is a store available. diff --git a/cpp/src/tests/msg_group_test.cpp b/cpp/src/tests/msg_group_test.cpp new file mode 100644 index 0000000000..6b9d09b89a --- /dev/null +++ b/cpp/src/tests/msg_group_test.cpp @@ -0,0 +1,618 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/messaging/Address.h> +#include <qpid/messaging/Connection.h> +#include <qpid/messaging/Receiver.h> +#include <qpid/messaging/Sender.h> +#include <qpid/messaging/Session.h> +#include <qpid/messaging/Message.h> +#include <qpid/messaging/FailoverUpdates.h> +#include <qpid/Options.h> +#include <qpid/log/Logger.h> +#include <qpid/log/Options.h> +#include "qpid/log/Statement.h" +#include "qpid/sys/Time.h" +#include "qpid/sys/Runnable.h" +#include "qpid/sys/Thread.h" +#include "qpid/sys/SystemInfo.h" + +#include <iostream> +#include <memory> +#include <stdlib.h> + +using namespace qpid::messaging; +using namespace qpid::types; +using namespace std; + +namespace qpid { +namespace tests { + +struct Options : public qpid::Options +{ + bool help; + std::string url; + std::string address; + std::string connectionOptions; + uint messages; + uint capacity; + uint ackFrequency; + bool failoverUpdates; + qpid::log::Options log; + uint senders; + uint receivers; + uint groupSize; + bool printReport; + std::string groupKey; + bool durable; + bool allowDuplicates; + bool randomizeSize; + bool stickyConsumer; + uint timeout; + uint interleave; + std::string prefix; + uint sendRate; + + Options(const std::string& argv0=std::string()) + : qpid::Options("Options"), + help(false), + url("amqp:tcp:127.0.0.1"), + messages(10000), + capacity(1000), + ackFrequency(100), + failoverUpdates(false), + log(argv0), + senders(2), + receivers(2), + groupSize(10), + printReport(false), + groupKey("qpid.no_group"), + durable(false), + allowDuplicates(false), + randomizeSize(false), + stickyConsumer(false), + timeout(10), + interleave(1), + sendRate(0) + { + addOptions() + ("ack-frequency", qpid::optValue(ackFrequency, "N"), "Ack frequency (0 implies none of the messages will get accepted)") + ("address,a", qpid::optValue(address, "ADDRESS"), "address to send and receive from") + ("allow-duplicates", qpid::optValue(allowDuplicates), "Ignore the delivery of duplicated messages") + ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to") + ("capacity", qpid::optValue(capacity, "N"), "Pre-fetch window (0 implies no pre-fetch)") + ("connection-options", qpid::optValue(connectionOptions, "OPTIONS"), "options for the connection") + ("durable", qpid::optValue(durable, "yes|no"), "Mark messages as durable.") + ("failover-updates", qpid::optValue(failoverUpdates), "Listen for membership updates distributed via amq.failover") + ("group-key", qpid::optValue(groupKey, "KEY"), "Key of the message header containing the group identifier.") + ("group-prefix", qpid::optValue(prefix, "STRING"), "Add 'prefix' to the start of all generated group identifiers.") + ("group-size", qpid::optValue(groupSize, "N"), "Number of messages per a group.") + ("interleave", qpid::optValue(interleave, "N"), "Simultaineously interleave messages from N different groups.") + ("messages,m", qpid::optValue(messages, "N"), "Number of messages to send per each sender.") + ("receivers,r", qpid::optValue(receivers, "N"), "Number of message consumers.") + ("randomize-group-size", qpid::optValue(randomizeSize), "Randomize the number of messages per group to [1...group-size].") + ("send-rate", qpid::optValue(sendRate,"N"), "Send at rate of N messages/second. 0 means send as fast as possible.") + ("senders,s", qpid::optValue(senders, "N"), "Number of message producers.") + ("sticky-consumers", qpid::optValue(stickyConsumer), "If set, verify that all messages in a group are consumed by the same client [TBD].") + ("timeout", qpid::optValue(timeout, "N"), "Fail with a stall error should all consumers remain idle for timeout seconds.") + ("print-report", qpid::optValue(printReport), "Dump message group statistics to stdout.") + ("help", qpid::optValue(help), "print this usage statement"); + add(log); + //("check-redelivered", qpid::optValue(checkRedelivered), "Fails with exception if a duplicate is not marked as redelivered (only relevant when ignore-duplicates is selected)") + //("tx", qpid::optValue(tx, "N"), "batch size for transactions (0 implies transaction are not used)") + //("rollback-frequency", qpid::optValue(rollbackFrequency, "N"), "rollback frequency (0 implies no transaction will be rolledback)") + } + + bool parse(int argc, char** argv) + { + try { + qpid::Options::parse(argc, argv); + if (address.empty()) throw qpid::Exception("Address must be specified!"); + qpid::log::Logger::instance().configure(log); + if (help) { + std::ostringstream msg; + std::cout << msg << *this << std::endl << std::endl + << "Verifies the behavior of grouped messages." << std::endl; + return false; + } else { + return true; + } + } catch (const std::exception& e) { + std::cerr << *this << std::endl << std::endl << e.what() << std::endl; + return false; + } + } +}; + +const string EOS("eos"); +const string SN("sn"); + + +// class that monitors group state across all publishers and consumers. tracks the next +// expected sequence for each group, and total messages consumed. +class GroupChecker +{ + qpid::sys::Mutex lock; + + const uint totalMsgs; + uint totalMsgsConsumed; + uint totalMsgsPublished; + bool allowDuplicates; + uint duplicateMsgs; + + typedef std::map<std::string, uint> SequenceMap; + SequenceMap sequenceMap; + + // Statistics - for each group, store the names of all clients that consumed messages + // from that group, and the number of messages consumed per client. + typedef std::map<std::string, uint> ClientCounter; + typedef std::map<std::string, ClientCounter> GroupStatistics; + GroupStatistics statistics; + +public: + + GroupChecker( uint t, bool d ) : + totalMsgs(t), totalMsgsConsumed(0), totalMsgsPublished(0), allowDuplicates(d), + duplicateMsgs(0) {} + + bool checkSequence( const std::string& groupId, + uint sequence, const std::string& client ) + { + qpid::sys::Mutex::ScopedLock l(lock); + + QPID_LOG(debug, "Client " << client << " has received " << groupId << ":" << sequence); + + GroupStatistics::iterator gs = statistics.find(groupId); + if (gs == statistics.end()) { + statistics[groupId][client] = 1; + } else { + gs->second[client]++; + } + // now verify + SequenceMap::iterator s = sequenceMap.find(groupId); + if (s == sequenceMap.end()) { + QPID_LOG(debug, "Client " << client << " thinks this is the first message from group " << groupId << ":" << sequence); + // if duplication allowed, it is possible that the last msg(s) of an old sequence are redelivered on reconnect. + // in this case, set the sequence from the first msg. + sequenceMap[groupId] = (allowDuplicates) ? sequence : 0; + s = sequenceMap.find(groupId); + } else if (sequence < s->second) { + duplicateMsgs++; + QPID_LOG(debug, "Client " << client << " thinks this message is a duplicate! " << groupId << ":" << sequence); + return allowDuplicates; + } + totalMsgsConsumed++; + return sequence == s->second++; + } + + void sendingSequence( const std::string& groupId, + uint sequence, bool eos, + const std::string& client ) + { + qpid::sys::Mutex::ScopedLock l(lock); + ++totalMsgsPublished; + + QPID_LOG(debug, "Client " << client << " sending " << groupId << ":" << sequence << + ((eos) ? " (last)" : "")); + } + + bool eraseGroup( const std::string& groupId, const std::string& name ) + { + qpid::sys::Mutex::ScopedLock l(lock); + QPID_LOG(debug, "Deleting group " << groupId << " (by client " << name << ")"); + return sequenceMap.erase( groupId ) == 1; + } + + uint getNextExpectedSequence( const std::string& groupId ) + { + qpid::sys::Mutex::ScopedLock l(lock); + return sequenceMap[groupId]; + } + + bool allMsgsConsumed() // true when done processing msgs + { + qpid::sys::Mutex::ScopedLock l(lock); + return (totalMsgsPublished >= totalMsgs) && + (totalMsgsConsumed >= totalMsgsPublished) && + sequenceMap.size() == 0; + } + + uint getConsumedTotal() + { + qpid::sys::Mutex::ScopedLock l(lock); + return totalMsgsConsumed; + } + + uint getPublishedTotal() + { + qpid::sys::Mutex::ScopedLock l(lock); + return totalMsgsPublished; + } + + ostream& print(ostream& out) + { + qpid::sys::Mutex::ScopedLock l(lock); + out << "Total Published: " << totalMsgsPublished << ", Total Consumed: " << totalMsgsConsumed << + ", Duplicates detected: " << duplicateMsgs << std::endl; + out << "Total Groups: " << statistics.size() << std::endl; + unsigned long consumers = 0; + for (GroupStatistics::iterator gs = statistics.begin(); gs != statistics.end(); ++gs) { + out << " GroupId: " << gs->first; + consumers += gs->second.size(); // # of consumers that processed this group + if (gs->second.size() == 1) + out << " completely consumed by a single client." << std::endl; + else + out << " consumed by " << gs->second.size() << " different clients." << std::endl; + + for (ClientCounter::iterator cc = gs->second.begin(); cc != gs->second.end(); ++cc) { + out << " Client: " << cc->first << " consumed " << cc->second << " messages from the group." << std::endl; + } + } + out << "Average # of consumers per group: " << ((statistics.size() != 0) ? (double(consumers)/statistics.size()) : 0) << std::endl; + return out; + } +}; + + +namespace { + // rand() is not thread safe. Create a singleton obj to hold a lock while calling + // rand() so it can be called safely by multiple concurrent clients. + class Randomizer { + qpid::sys::Mutex lock; + public: + uint operator()(uint max) { + qpid::sys::Mutex::ScopedLock l(lock); + return (rand() % max) + 1; + } + }; + + static Randomizer randomizer; +} + + +// tag each generated message with a group identifer +// +class GroupGenerator { + + const std::string groupPrefix; + const uint groupSize; + const bool randomizeSize; + const uint interleave; + + uint groupSuffix; + uint total; + + struct GroupState { + std::string id; + const uint size; + uint count; + GroupState( const std::string& i, const uint s ) + : id(i), size(s), count(0) {} + }; + typedef std::list<GroupState> GroupList; + GroupList groups; + GroupList::iterator current; + + // add a new group identifier to the list + void newGroup() { + std::ostringstream groupId(groupPrefix, ios_base::out|ios_base::ate); + groupId << std::string(":") << groupSuffix++; + uint size = (randomizeSize) ? randomizer(groupSize) : groupSize; + QPID_LOG(trace, "New group: GROUPID=[" << groupId.str() << "] size=" << size << " this=" << this); + GroupState group( groupId.str(), size ); + groups.push_back( group ); + } + +public: + GroupGenerator( const std::string& prefix, + const uint t, + const uint size, + const bool randomize, + const uint i) + : groupPrefix(prefix), groupSize(size), + randomizeSize(randomize), interleave(i), groupSuffix(0), total(t) + { + QPID_LOG(trace, "New group generator: PREFIX=[" << prefix << "] total=" << total << " size=" << size << " rand=" << randomize << " interleave=" << interleave << " this=" << this); + for (uint i = 0; i < 1 || i < interleave; ++i) { + newGroup(); + } + current = groups.begin(); + } + + bool genGroup(std::string& groupId, uint& seq, bool& eos) + { + if (!total) return false; + --total; + if (current == groups.end()) + current = groups.begin(); + groupId = current->id; + seq = current->count++; + if (current->count == current->size) { + QPID_LOG(trace, "Last msg for " << current->id << ", " << current->count << " this=" << this); + eos = true; + if (total >= interleave) { // need a new group to replace this one + newGroup(); + groups.erase(current++); + } else ++current; + } else { + ++current; + eos = total < interleave; // mark eos on the last message of each group + } + QPID_LOG(trace, "SENDING GROUPID=[" << groupId << "] seq=" << seq << " eos=" << eos << " this=" << this); + return true; + } +}; + + + +class Client : public qpid::sys::Runnable +{ +public: + typedef boost::shared_ptr<Client> shared_ptr; + enum State {ACTIVE, DONE, FAILURE}; + Client( const std::string& n, const Options& o ) : name(n), opts(o), state(ACTIVE), stopped(false) {} + virtual ~Client() {} + State getState() { return state; } + void testFailed( const std::string& reason ) { state = FAILURE; error << "Client '" << name << "' failed: " << reason; } + void clientDone() { if (state == ACTIVE) state = DONE; } + qpid::sys::Thread& getThread() { return thread; } + const std::string getErrorMsg() { return error.str(); } + void stop() {stopped = true;} + const std::string& getName() { return name; } + +protected: + const std::string name; + const Options& opts; + qpid::sys::Thread thread; + ostringstream error; + State state; + bool stopped; +}; + + +class Consumer : public Client +{ + GroupChecker& checker; + +public: + Consumer(const std::string& n, const Options& o, GroupChecker& c ) : Client(n, o), checker(c) {}; + virtual ~Consumer() {}; + + void run() + { + Connection connection; + try { + connection = Connection(opts.url, opts.connectionOptions); + connection.open(); + std::auto_ptr<FailoverUpdates> updates(opts.failoverUpdates ? new FailoverUpdates(connection) : 0); + Session session = connection.createSession(); + Receiver receiver = session.createReceiver(opts.address); + receiver.setCapacity(opts.capacity); + Message msg; + uint count = 0; + + while (!stopped) { + if (receiver.fetch(msg, Duration::SECOND)) { // msg retrieved + qpid::types::Variant::Map& properties = msg.getProperties(); + std::string groupId = properties[opts.groupKey]; + uint groupSeq = properties[SN]; + bool eof = properties[EOS]; + + QPID_LOG(trace, "RECVING GROUPID=[" << groupId << "] seq=" << groupSeq << " eos=" << eof << " name=" << name); + + qpid::sys::usleep(10); + + if (!checker.checkSequence( groupId, groupSeq, name )) { + ostringstream msg; + msg << "Check sequence failed. Group=" << groupId << " rcvd seq=" << groupSeq << " expected=" << checker.getNextExpectedSequence( groupId ); + testFailed( msg.str() ); + break; + } else if (eof) { + if (!checker.eraseGroup( groupId, name )) { + ostringstream msg; + msg << "Erase group failed. Group=" << groupId << " rcvd seq=" << groupSeq; + testFailed( msg.str() ); + break; + } + } + + ++count; + if (opts.ackFrequency && (count % opts.ackFrequency == 0)) { + session.acknowledge(); + } + // Clear out message properties & content for next iteration. + msg = Message(); // TODO aconway 2010-12-01: should be done by fetch + } else if (checker.allMsgsConsumed()) // timed out, nothing else to do? + break; + } + session.acknowledge(); + session.close(); + connection.close(); + } catch(const std::exception& error) { + ostringstream msg; + msg << "consumer error: " << error.what(); + testFailed( msg.str() ); + connection.close(); + } + clientDone(); + QPID_LOG(trace, "Consuming client " << name << " completed."); + } +}; + + + +class Producer : public Client +{ + GroupChecker& checker; + GroupGenerator generator; + +public: + Producer(const std::string& n, const Options& o, GroupChecker& c) + : Client(n, o), checker(c), + generator( n, o.messages, o.groupSize, o.randomizeSize, o.interleave ) + {}; + virtual ~Producer() {}; + + void run() + { + Connection connection; + try { + connection = Connection(opts.url, opts.connectionOptions); + connection.open(); + std::auto_ptr<FailoverUpdates> updates(opts.failoverUpdates ? new FailoverUpdates(connection) : 0); + Session session = connection.createSession(); + Sender sender = session.createSender(opts.address); + if (opts.capacity) sender.setCapacity(opts.capacity); + Message msg; + msg.setDurable(opts.durable); + std::string groupId; + uint seq; + bool eos; + uint sent = 0; + + qpid::sys::AbsTime start = qpid::sys::now(); + int64_t interval = 0; + if (opts.sendRate) interval = qpid::sys::TIME_SEC/opts.sendRate; + + while (!stopped && generator.genGroup(groupId, seq, eos)) { + msg.getProperties()[opts.groupKey] = groupId; + msg.getProperties()[SN] = seq; + msg.getProperties()[EOS] = eos; + checker.sendingSequence( groupId, seq, eos, name ); + + sender.send(msg); + ++sent; + + if (opts.sendRate) { + qpid::sys::AbsTime waitTill(start, sent*interval); + int64_t delay = qpid::sys::Duration(qpid::sys::now(), waitTill); + if (delay > 0) qpid::sys::usleep(delay/qpid::sys::TIME_USEC); + } + } + session.sync(); + session.close(); + connection.close(); + } catch(const std::exception& error) { + ostringstream msg; + msg << "producer '" << name << "' error: " << error.what(); + testFailed(msg.str()); + connection.close(); + } + clientDone(); + QPID_LOG(trace, "Producing client " << name << " completed."); + } +}; + + +}} // namespace qpid::tests + +using namespace qpid::tests; + +int main(int argc, char ** argv) +{ + int status = 0; + try { + Options opts; + if (opts.parse(argc, argv)) { + + GroupChecker state( opts.senders * opts.messages, + opts.allowDuplicates); + std::vector<Client::shared_ptr> clients; + + if (opts.randomizeSize) srand((unsigned int)qpid::sys::SystemInfo::getProcessId()); + + // fire off the producers && consumers + for (size_t j = 0; j < opts.senders; ++j) { + ostringstream name; + name << opts.prefix << "P_" << j; + clients.push_back(Client::shared_ptr(new Producer( name.str(), opts, state ))); + clients.back()->getThread() = qpid::sys::Thread(*clients.back()); + } + for (size_t j = 0; j < opts.receivers; ++j) { + ostringstream name; + name << opts.prefix << "C_" << j; + clients.push_back(Client::shared_ptr(new Consumer( name.str(), opts, state ))); + clients.back()->getThread() = qpid::sys::Thread(*clients.back()); + } + + // wait for all pubs/subs to finish.... or for consumers to fail or stall. + uint stalledTime = 0; + bool done; + bool clientFailed = false; + do { + uint lastCount = state.getConsumedTotal(); + qpid::sys::usleep( 1000000 ); + + // check each client for status + done = true; + for (std::vector<Client::shared_ptr>::iterator i = clients.begin(); + i != clients.end(); ++i) { + QPID_LOG(debug, "Client " << (*i)->getName() << " state=" << (*i)->getState()); + if ((*i)->getState() == Client::FAILURE) { + QPID_LOG(error, argv[0] << ": test failed with client error: " << (*i)->getErrorMsg()); + clientFailed = true; + done = true; + break; // exit test. + } else if ((*i)->getState() != Client::DONE) { + done = false; + } + } + + if (!done) { + // check that consumers are still receiving messages + if (lastCount == state.getConsumedTotal()) + stalledTime++; + else { + lastCount = state.getConsumedTotal(); + stalledTime = 0; + } + } + + QPID_LOG(debug, "Consumed to date = " << state.getConsumedTotal() << + " Published to date = " << state.getPublishedTotal() << + " total=" << opts.senders * opts.messages ); + + } while (!done && stalledTime < opts.timeout); + + if (clientFailed) { + status = 1; + } else if (stalledTime >= opts.timeout) { + QPID_LOG(error, argv[0] << ": test failed due to stalled consumer." ); + status = 2; + } + + // Wait for started threads. + for (std::vector<Client::shared_ptr>::iterator i = clients.begin(); + i != clients.end(); ++i) { + (*i)->stop(); + (*i)->getThread().join(); + } + + if (opts.printReport && !status) state.print(std::cout); + } else status = 4; + } catch(const std::exception& error) { + QPID_LOG(error, argv[0] << ": " << error.what()); + status = 3; + } + QPID_LOG(trace, "TEST DONE [" << status << "]"); + + return status; +} diff --git a/cpp/src/tests/qpid-send.cpp b/cpp/src/tests/qpid-send.cpp index ef5e98e2a0..b1213a484f 100644 --- a/cpp/src/tests/qpid-send.cpp +++ b/cpp/src/tests/qpid-send.cpp @@ -28,6 +28,7 @@ #include <qpid/messaging/FailoverUpdates.h> #include <qpid/sys/Time.h> #include <qpid/sys/Monitor.h> +#include <qpid/sys/SystemInfo.h> #include "TestOptions.h" #include "Statistics.h" @@ -76,6 +77,11 @@ struct Options : public qpid::Options uint flowControl; bool sequence; bool timestamp; + std::string groupKey; + std::string groupPrefix; + uint groupSize; + bool groupRandSize; + uint groupInterleave; Options(const std::string& argv0=std::string()) : qpid::Options("Options"), @@ -100,7 +106,11 @@ struct Options : public qpid::Options sendRate(0), flowControl(0), sequence(true), - timestamp(true) + timestamp(true), + groupPrefix("GROUP-"), + groupSize(10), + groupRandSize(false), + groupInterleave(1) { addOptions() ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to") @@ -111,8 +121,8 @@ struct Options : public qpid::Options ("reply-to", qpid::optValue(replyto, "REPLY-TO"), "specify reply-to address") ("send-eos", qpid::optValue(sendEos, "N"), "Send N EOS messages to mark end of input") ("durable", qpid::optValue(durable, "yes|no"), "Mark messages as durable.") - ("ttl", qpid::optValue(ttl, "msecs"), "Time-to-live for messages, in milliseconds") - ("priority", qpid::optValue(priority, "PRIORITY"), "Priority for messages (higher value implies higher priority)") + ("ttl", qpid::optValue(ttl, "msecs"), "Time-to-live for messages, in milliseconds") + ("priority", qpid::optValue(priority, "PRIORITY"), "Priority for messages (higher value implies higher priority)") ("property,P", qpid::optValue(properties, "NAME=VALUE"), "specify message property") ("correlation-id", qpid::optValue(correlationid, "ID"), "correlation-id for message") ("user-id", qpid::optValue(userid, "USERID"), "userid for message") @@ -131,6 +141,11 @@ struct Options : public qpid::Options ("flow-control", qpid::optValue(flowControl,"N"), "Do end to end flow control to limit queue depth to 2*N. 0 means no flow control.") ("sequence", qpid::optValue(sequence, "yes|no"), "Add a sequence number messages property (required for duplicate/lost message detection)") ("timestamp", qpid::optValue(timestamp, "yes|no"), "Add a time stamp messages property (required for latency measurement)") + ("group-key", qpid::optValue(groupKey, "KEY"), "Generate groups of messages using message header 'KEY' to hold the group identifier") + ("group-prefix", qpid::optValue(groupPrefix, "STRING"), "Generate group identifers with 'STRING' prefix (if group-key specified)") + ("group-size", qpid::optValue(groupSize, "N"), "Number of messages per a group (if group-key specified)") + ("group-randomize-size", qpid::optValue(groupRandSize), "Randomize the number of messages per group to [1...group-size] (if group-key specified)") + ("group-interleave", qpid::optValue(groupInterleave, "N"), "Simultaineously interleave messages from N different groups (if group-key specified)") ("help", qpid::optValue(help), "print this usage statement"); add(log); } @@ -252,6 +267,68 @@ class MapContentGenerator : public ContentGenerator { const Options& opts; }; +// tag each generated message with a group identifer +// +class GroupGenerator { +public: + GroupGenerator(const std::string& key, + const std::string& prefix, + const uint size, + const bool randomize, + const uint interleave) + : groupKey(key), groupPrefix(prefix), groupSize(size), + randomizeSize(randomize), groupSuffix(0) + { + if (randomize) srand((unsigned int)qpid::sys::SystemInfo::getProcessId()); + + for (uint i = 0; i < 1 || i < interleave; ++i) { + newGroup(); + } + current = groups.begin(); + } + + void setGroupInfo(Message &msg) + { + if (current == groups.end()) + current = groups.begin(); + msg.getProperties()[groupKey] = current->id; + // std::cout << "SENDING GROUPID=[" << current->id << "]" << std::endl; + if (++(current->count) == current->size) { + newGroup(); + groups.erase(current++); + } else + ++current; + } + + private: + const std::string& groupKey; + const std::string& groupPrefix; + const uint groupSize; + const bool randomizeSize; + + uint groupSuffix; + + struct GroupState { + std::string id; + const uint size; + uint count; + GroupState( const std::string& i, const uint s ) + : id(i), size(s), count(0) {} + }; + typedef std::list<GroupState> GroupList; + GroupList groups; + GroupList::iterator current; + + void newGroup() { + std::ostringstream groupId(groupPrefix, ios_base::out|ios_base::ate); + groupId << groupSuffix++; + uint size = (randomizeSize) ? (rand() % groupSize) + 1 : groupSize; + // std::cout << "New group: GROUPID=[" << groupId.str() << "] size=" << size << std::endl; + GroupState group( groupId.str(), size ); + groups.push_back( group ); + } +}; + int main(int argc, char ** argv) { Connection connection; @@ -296,6 +373,14 @@ int main(int argc, char ** argv) else contentGen.reset(new FixedContentGenerator(opts.contentString)); + std::auto_ptr<GroupGenerator> groupGen; + if (!opts.groupKey.empty()) + groupGen.reset(new GroupGenerator(opts.groupKey, + opts.groupPrefix, + opts.groupSize, + opts.groupRandSize, + opts.groupInterleave)); + qpid::sys::AbsTime start = qpid::sys::now(); int64_t interval = 0; if (opts.sendRate) interval = qpid::sys::TIME_SEC/opts.sendRate; @@ -312,9 +397,6 @@ int main(int argc, char ** argv) ++sent; if (opts.sequence) msg.getProperties()[SN] = sent; - if (opts.timestamp) - msg.getProperties()[TS] = int64_t( - qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now())); if (opts.flowControl) { if ((sent % opts.flowControl) == 0) { msg.setReplyTo(flowControlAddress); @@ -323,6 +405,12 @@ int main(int argc, char ** argv) else msg.setReplyTo(Address()); // Clear the reply address. } + if (groupGen.get()) + groupGen->setGroupInfo(msg); + + if (opts.timestamp) + msg.getProperties()[TS] = int64_t( + qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now())); sender.send(msg); reporter.message(msg); diff --git a/cpp/src/tests/run_msg_group_tests b/cpp/src/tests/run_msg_group_tests new file mode 100755 index 0000000000..8423022521 --- /dev/null +++ b/cpp/src/tests/run_msg_group_tests @@ -0,0 +1,66 @@ +#!/bin/bash + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#script to run a sequence of message group queue tests via make + +#setup path to find qpid-config and msg_group_test progs +source ./test_env.sh + +export PATH=$PWD:$srcdir:$PYTHON_COMMANDS:$PATH + +#set port to connect to via env var +test -s qpidd.port && QPID_PORT=`cat qpidd.port` + +#trap cleanup INT TERM QUIT + +QUEUE_NAME="group-queue" +GROUP_KEY="My-Group-Id" + +BROKER_URL="${QPID_BROKER:-localhost}:${QPID_PORT:-5672}" + +run_test() { + $@ +} + +##set -x + +declare -i i=0 +declare -a tests +tests=("qpid-config -a $BROKER_URL add queue $QUEUE_NAME --group-header=${GROUP_KEY} --shared-groups" + "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 103 --group-size 13 --receivers 2 --senders 3 --capacity 3 --ack-frequency 7 --randomize-group-size --interleave 3" + "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 103 --group-size 13 --receivers 2 --senders 3 --capacity 7 --ack-frequency 7 --randomize-group-size" + "qpid-config -a $BROKER_URL add queue ${QUEUE_NAME}-two --group-header=${GROUP_KEY} --shared-groups" + "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 103 --group-size 13 --receivers 2 --senders 3 --capacity 7 --ack-frequency 3 --randomize-group-size" + "msg_group_test -b $BROKER_URL -a ${QUEUE_NAME}-two --group-key $GROUP_KEY --messages 103 --group-size 13 --receivers 2 --senders 3 --capacity 3 --ack-frequency 7 --randomize-group-size --interleave 5" + "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 59 --group-size 5 --receivers 2 --senders 3 --capacity 1 --ack-frequency 3 --randomize-group-size" + "qpid-config -a $BROKER_URL del queue ${QUEUE_NAME}-two --force" + "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 59 --group-size 3 --receivers 2 --senders 3 --capacity 1 --ack-frequency 1 --randomize-group-size" + "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 211 --group-size 13 --receivers 2 --senders 3 --capacity 47 --ack-frequency 79 --interleave 53" + "qpid-config -a $BROKER_URL del queue $QUEUE_NAME --force") + +while [ -n "${tests[i]}" ]; do + run_test ${tests[i]} + RETCODE=$? + if test x$RETCODE != x0; then + echo "FAILED message group test. Failed command: \"${tests[i]}\""; + exit 1; + fi + i+=1 +done diff --git a/cpp/src/tests/run_msg_group_tests_soak b/cpp/src/tests/run_msg_group_tests_soak new file mode 100755 index 0000000000..5231f74755 --- /dev/null +++ b/cpp/src/tests/run_msg_group_tests_soak @@ -0,0 +1,60 @@ +#!/bin/bash + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#script to run a sequence of long-running message group tests via make + +#setup path to find qpid-config and msg_group_test test progs +source ./test_env.sh + +export PATH=$PWD:$srcdir:$PYTHON_COMMANDS:$PATH + +#set port to connect to via env var +test -s qpidd.port && QPID_PORT=`cat qpidd.port` + +#trap cleanup INT TERM QUIT + +QUEUE_NAME="group-queue" +GROUP_KEY="My-Group-Id" + +BROKER_URL="${QPID_BROKER:-localhost}:${QPID_PORT:-5672}" + +run_test() { + $@ +} + +##set -x + +declare -i i=0 +declare -a tests +tests=("qpid-config -a $BROKER_URL add queue $QUEUE_NAME --group-header=${GROUP_KEY} --shared-groups" + "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 10007 --receivers 3 --senders 5 --group-size 211 --randomize-group-size --capacity 47 --ack-frequency 97" + "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 10007 --receivers 3 --senders 5 --group-size 211 --randomize-group-size --capacity 79 --ack-frequency 79" + "msg_group_test -b $BROKER_URL -a $QUEUE_NAME --group-key $GROUP_KEY --messages 10007 --receivers 3 --senders 5 --group-size 211 --randomize-group-size --capacity 97 --ack-frequency 47" + "qpid-config -a $BROKER_URL del queue $QUEUE_NAME --force") + +while [ -n "${tests[i]}" ]; do + run_test ${tests[i]} + RETCODE=$? + if test x$RETCODE != x0; then + echo "FAILED message group test. Failed command: \"${tests[i]}\""; + exit 1; + fi + i+=1 +done |