From 9a5a8e164ef91abdcf5be637035ddb3f25444b71 Mon Sep 17 00:00:00 2001 From: Kenneth Anthony Giusti Date: Wed, 7 Sep 2011 18:23:01 +0000 Subject: QPID-3346: move message group code into its own files. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3346@1166299 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/Makefile.am | 4 + qpid/cpp/src/qpid/broker/MessageAllocator.cpp | 60 ++++ qpid/cpp/src/qpid/broker/MessageAllocator.h | 79 ++++ qpid/cpp/src/qpid/broker/MessageGroupManager.cpp | 307 ++++++++++++++++ qpid/cpp/src/qpid/broker/MessageGroupManager.h | 116 ++++++ qpid/cpp/src/qpid/broker/Queue.cpp | 438 +---------------------- 6 files changed, 571 insertions(+), 433 deletions(-) create mode 100644 qpid/cpp/src/qpid/broker/MessageAllocator.cpp create mode 100644 qpid/cpp/src/qpid/broker/MessageAllocator.h create mode 100644 qpid/cpp/src/qpid/broker/MessageGroupManager.cpp create mode 100644 qpid/cpp/src/qpid/broker/MessageGroupManager.h diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 4e13e9ad9c..9533053473 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -671,6 +671,10 @@ libqpidbroker_la_SOURCES = \ qpid/broker/TxPublish.h \ qpid/broker/Vhost.cpp \ qpid/broker/Vhost.h \ + qpid/broker/MessageAllocator.h \ + qpid/broker/MessageAllocator.cpp \ + qpid/broker/MessageGroupManager.cpp \ + qpid/broker/MessageGroupManager.h \ qpid/management/ManagementAgent.cpp \ qpid/management/ManagementAgent.h \ qpid/management/ManagementDirectExchange.cpp \ diff --git a/qpid/cpp/src/qpid/broker/MessageAllocator.cpp b/qpid/cpp/src/qpid/broker/MessageAllocator.cpp new file mode 100644 index 0000000000..62d5402b0a --- /dev/null +++ b/qpid/cpp/src/qpid/broker/MessageAllocator.cpp @@ -0,0 +1,60 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/* Used by queues to allocate the next "most desirable" message to a consuming client */ + +#include "qpid/broker/Queue.h" +#include "qpid/broker/MessageAllocator.h" + +using namespace qpid::broker; + +bool MessageAllocator::nextConsumableMessage( Consumer::shared_ptr&, QueuedMessage& next, + const sys::Mutex::ScopedLock&) +{ + Messages& messages(queue->getMessages()); + if (!messages.empty()) { + next = messages.front(); // by default, consume oldest msg + return true; + } + return false; +} + +bool MessageAllocator::nextBrowsableMessage( Consumer::shared_ptr& c, QueuedMessage& next, + const sys::Mutex::ScopedLock&) +{ + Messages& messages(queue->getMessages()); + if (!messages.empty() && messages.next(c->position, next)) + return true; + return false; +} + + +bool MessageAllocator::acquirable( const std::string&, + const QueuedMessage&, + const sys::Mutex::ScopedLock&) +{ + return true; +} + +void MessageAllocator::query(qpid::types::Variant::Map&, const sys::Mutex::ScopedLock&) const +{ +} + diff --git a/qpid/cpp/src/qpid/broker/MessageAllocator.h b/qpid/cpp/src/qpid/broker/MessageAllocator.h new file mode 100644 index 0000000000..4daf7f2174 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/MessageAllocator.h @@ -0,0 +1,79 @@ +#ifndef _broker_MessageAllocator_h +#define _broker_MessageAllocator_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/* Used by queues to allocate the next "most desirable" message to a consuming client */ + + +#include "qpid/broker/Consumer.h" + +namespace qpid { +namespace broker { + +class Queue; +class QueuedMessage; + +class MessageAllocator +{ + protected: + Queue *queue; + public: + MessageAllocator( Queue *q ) : queue(q) {} + virtual ~MessageAllocator() {}; + + // Note: all methods taking a mutex assume the caller is holding the + // Queue::messageLock during the method call. + + /** Determine the next message available for consumption by the consumer + * @param next set to the next message that the consumer may acquire. + * @return true if message is available + */ + virtual bool nextConsumableMessage( Consumer::shared_ptr& consumer, + QueuedMessage& next, + const sys::Mutex::ScopedLock& lock); + + /** Determine the next message available for browsing by the consumer + * @param next set to the next message that the consumer may browse. + * @return true if a message is available + */ + virtual bool nextBrowsableMessage( Consumer::shared_ptr& consumer, + QueuedMessage& next, + const sys::Mutex::ScopedLock& lock); + + /** check if a message previously returned via next*Message() may be acquired. + * @param consumer name of consumer that is attempting to acquire the message + * @param qm the message to be acquired + * @param messageLock - ensures caller is holding it! + * @return true if acquire is permitted, false if acquire is no longer permitted. + */ + virtual bool acquirable( const std::string&, + const QueuedMessage&, + const sys::Mutex::ScopedLock&); + + /** hook to add any interesting management state to the status map */ + virtual void query(qpid::types::Variant::Map&, const sys::Mutex::ScopedLock&) const; +}; + +}} + +#endif diff --git a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp new file mode 100644 index 0000000000..4fc142f553 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp @@ -0,0 +1,307 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/FieldTable.h" +#include "qpid/types/Variant.h" +#include "qpid/log/Statement.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/MessageGroupManager.h" + +using namespace qpid::broker; + +namespace { + const std::string GroupQueryKey("qpid.message_group_queue"); + const std::string GroupHeaderKey("group_header_key"); + const std::string GroupStateKey("group_state"); + const std::string GroupIdKey("group_id"); + const std::string GroupMsgCount("msg_count"); + const std::string GroupTimestamp("timestamp"); + const std::string GroupConsumer("consumer"); +} + + +const std::string MessageGroupManager::qpidMessageGroupKey("qpid.group_header_key"); +const std::string MessageGroupManager::qpidMessageGroupTimestamp("qpid.group_timestamp"); +const std::string MessageGroupManager::qpidMessageGroupDefault("qpid.no_group"); /** @todo KAG: make configurable in Broker options */ + + +const std::string MessageGroupManager::getGroupId( const QueuedMessage& qm ) const +{ + const qpid::framing::FieldTable* headers = qm.payload->getApplicationHeaders(); + if (!headers) return qpidMessageGroupDefault; + qpid::framing::FieldTable::ValuePtr id = headers->get( groupIdHeader ); + if (!id || !id->convertsTo()) return qpidMessageGroupDefault; + return id->get(); +} + + +void MessageGroupManager::enqueued( const QueuedMessage& qm ) +{ + // @todo KAG optimization - store reference to group state in QueuedMessage + // issue: const-ness?? + std::string group( getGroupId(qm) ); + GroupState &state(messageGroups[group]); + state.members.push_back(qm.position); + uint32_t total = state.members.size(); + QPID_LOG( trace, "group queue " << queue->getName() << + ": added message to group id=" << group << " total=" << total ); + if (total == 1) { + // newly created group, no owner + state.group = group; +#ifdef NDEBUG + freeGroups[qm.position] = &state; +#else + bool unique = freeGroups.insert(GroupFifo::value_type(qm.position, &state)).second; + (void) unique; assert(unique); +#endif + } +} + + +void MessageGroupManager::acquired( const QueuedMessage& qm ) +{ + // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage + // issue: const-ness?? + std::string group( getGroupId(qm) ); + GroupMap::iterator gs = messageGroups.find( group ); + assert( gs != messageGroups.end() ); + GroupState& state( gs->second ); + state.acquired += 1; + QPID_LOG( trace, "group queue " << queue->getName() << + ": acquired message in group id=" << group << " acquired=" << state.acquired ); +} + + +void MessageGroupManager::requeued( const QueuedMessage& qm ) +{ + // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage + // issue: const-ness?? + // @todo KAG BUG - how to ensure requeue happens in the correct order? + // @todo KAG BUG - if requeue is not in correct order - what do we do? throw? + std::string group( getGroupId(qm) ); + GroupMap::iterator gs = messageGroups.find( group ); + assert( gs != messageGroups.end() ); + GroupState& state( gs->second ); + assert( state.acquired != 0 ); + state.acquired -= 1; + if (state.acquired == 0 && state.owned()) { + QPID_LOG( trace, "group queue " << queue->getName() << + ": consumer name=" << state.owner << " released group id=" << gs->first); + disown(state); + } + QPID_LOG( trace, "group queue " << queue->getName() << + ": requeued message to group id=" << group << " acquired=" << state.acquired ); +} + + +void MessageGroupManager::dequeued( const QueuedMessage& qm ) +{ + // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage + // issue: const-ness?? + std::string group( getGroupId(qm) ); + GroupMap::iterator gs = messageGroups.find( group ); + assert( gs != messageGroups.end() ); + GroupState& state( gs->second ); + assert( state.members.size() != 0 ); + + // likely to be at or near begin() if dequeued in order + { + GroupState::PositionFifo::iterator pos = state.members.begin(); + GroupState::PositionFifo::iterator end = state.members.end(); + while (pos != end) { + if (*pos == qm.position) { + state.members.erase(pos); + break; + } + ++pos; + } + } + + assert( state.acquired != 0 ); + state.acquired -= 1; + uint32_t total = state.members.size(); + if (total == 0) { + if (!state.owned()) { // unlikely, but need to remove from the free list before erase + unFree( state ); + } + QPID_LOG( trace, "group queue " << queue->getName() << ": deleting group id=" << gs->first); + messageGroups.erase( gs ); + } else { + if (state.acquired == 0 && state.owned()) { + QPID_LOG( trace, "group queue " << queue->getName() << + ": consumer name=" << state.owner << " released group id=" << gs->first); + disown(state); + } + } + QPID_LOG( trace, "group queue " << queue->getName() << + ": dequeued message from group id=" << group << " total=" << total ); +} + +void MessageGroupManager::consumerAdded( const Consumer& c ) +{ + assert(consumers.find(c.getName()) == consumers.end()); + consumers[c.getName()] = 0; // no groups owned yet + QPID_LOG( trace, "group queue " << queue->getName() << ": added consumer, name=" << c.getName() ); +} + +void MessageGroupManager::consumerRemoved( const Consumer& c ) +{ + const std::string& name(c.getName()); + Consumers::iterator consumer = consumers.find(name); + assert(consumer != consumers.end()); + size_t count = consumer->second; + + for (GroupMap::iterator gs = messageGroups.begin(); + count && gs != messageGroups.end(); ++gs) { + + GroupState& state( gs->second ); + if (state.owner == name) { + --count; + disown(state); + QPID_LOG( trace, "group queue " << queue->getName() << + ": consumer name=" << name << " released group id=" << gs->first); + } + } + consumers.erase( consumer ); + QPID_LOG( trace, "group queue " << queue->getName() << ": removed consumer name=" << name ); +} + + +bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, QueuedMessage& next, + const qpid::sys::Mutex::ScopedLock& ) +{ + Messages& messages(queue->getMessages()); + + if (messages.empty()) + return false; + + if (!freeGroups.empty()) { + framing::SequenceNumber nextFree = freeGroups.begin()->first; + if (nextFree < c->position) { // next free group's msg is older than current position + bool ok = messages.find(nextFree, next); + (void) ok; assert( ok ); + } else { + if (!messages.next( c->position, next )) + return false; // shouldn't happen - should find nextFree + } + } else { // no free groups available + if (consumers[c->getName()] == 0) { // and none currently owned + return false; // so nothing available to consume + } + if (!messages.next( c->position, next )) + return false; + } + + do { + // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage + std::string group( getGroupId( next ) ); + GroupMap::iterator gs = messageGroups.find( group ); + assert( gs != messageGroups.end() ); + GroupState& state( gs->second ); + if (!state.owned() || state.owner == c->getName()) { + return true; + } + } while (messages.next( next.position, next )); + return false; +} + + +bool MessageGroupManager::acquirable(const std::string& consumer, const QueuedMessage& qm, + const qpid::sys::Mutex::ScopedLock&) +{ + // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage + std::string group( getGroupId(qm) ); + GroupMap::iterator gs = messageGroups.find( group ); + assert( gs != messageGroups.end() ); + GroupState& state( gs->second ); + + if (!state.owned()) { + own( state, consumer ); + QPID_LOG( trace, "group queue " << queue->getName() << + ": consumer name=" << consumer << " has acquired group id=" << gs->first); + return true; + } + return state.owner == consumer; +} + + +void MessageGroupManager::query(qpid::types::Variant::Map& status, + const qpid::sys::Mutex::ScopedLock&) const +{ + /** Add a description of the current state of the message groups for this queue. + FORMAT: + { "qpid.message_group_queue": + { "group_header_key" : "", + "group_state" : + [ { "group_id" : "", + "msg_count" : , + "timestamp" : , + "consumer" : }, + {...} // one for each known group + ] + } + } + **/ + + assert(status.find(GroupQueryKey) == status.end()); + qpid::types::Variant::Map state; + qpid::types::Variant::List groups; + + state[GroupHeaderKey] = groupIdHeader; + for (GroupMap::const_iterator g = messageGroups.begin(); + g != messageGroups.end(); ++g) { + qpid::types::Variant::Map info; + info[GroupIdKey] = g->first; + info[GroupMsgCount] = g->second.members.size(); + info[GroupTimestamp] = 0; /** @todo KAG - NEED HEAD MSG TIMESTAMP */ + info[GroupConsumer] = g->second.owner; + groups.push_back(info); + } + state[GroupStateKey] = groups; + status[GroupQueryKey] = state; +} + + +boost::shared_ptr MessageGroupManager::create( Queue *q, + const qpid::framing::FieldTable& settings ) +{ + boost::shared_ptr empty; + + if (settings.isSet(qpidMessageGroupKey)) { + + std::string headerKey = settings.getAsString(qpidMessageGroupKey); + if (headerKey.empty()) { + QPID_LOG( error, "A Message Group header key must be configured, queue=" << q->getName()); + return empty; + } + unsigned int timestamp = settings.getAsInt(qpidMessageGroupTimestamp); + + boost::shared_ptr manager( new MessageGroupManager( headerKey, q, timestamp ) ); + + q->addObserver( boost::static_pointer_cast(manager) ); + + QPID_LOG( debug, "Configured Queue '" << q->getName() << + "' for message grouping using header key '" << headerKey << "'" << + " (timestamp=" << timestamp << ")"); + return manager; + } + return empty; +} diff --git a/qpid/cpp/src/qpid/broker/MessageGroupManager.h b/qpid/cpp/src/qpid/broker/MessageGroupManager.h new file mode 100644 index 0000000000..0a1551f3ba --- /dev/null +++ b/qpid/cpp/src/qpid/broker/MessageGroupManager.h @@ -0,0 +1,116 @@ +#ifndef _broker_MessageGroupManager_h +#define _broker_MessageGroupManager_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/* for managing message grouping on Queues */ + +#include "qpid/broker/QueueObserver.h" +#include "qpid/broker/MessageAllocator.h" + + +namespace qpid { +namespace broker { + +class QueueObserver; +class MessageAllocator; + +class MessageGroupManager : public QueueObserver, public MessageAllocator +{ + const std::string groupIdHeader; // msg header holding group identifier + const unsigned int timestamp; // mark messages with timestamp if set + + struct GroupState { + typedef std::list PositionFifo; + + std::string group; // group identifier + std::string owner; // consumer with outstanding acquired messages + uint32_t acquired; // count of outstanding acquired messages + //uint32_t total; // count of enqueued messages in this group + PositionFifo members; // msgs belonging to this group + + GroupState() : acquired(0) {} + bool owned() const {return !owner.empty();} + }; + typedef std::map GroupMap; + typedef std::map Consumers; // count of owned groups + typedef std::map GroupFifo; + + GroupMap messageGroups; // index: group name + GroupFifo freeGroups; // ordered by oldest free msg + Consumers consumers; // index: consumer name + + static const std::string qpidMessageGroupKey; + static const std::string qpidMessageGroupTimestamp; + static const std::string qpidMessageGroupDefault; + + const std::string getGroupId( const QueuedMessage& qm ) const; + void unFree( const GroupState& state ) + { + GroupFifo::iterator pos = freeGroups.find( state.members.front() ); + assert( pos != freeGroups.end() && pos->second == &state ); + freeGroups.erase( pos ); + } + void own( GroupState& state, const std::string& owner ) + { + state.owner = owner; + consumers[state.owner]++; + unFree( state ); + } + void disown( GroupState& state ) + { + assert(consumers[state.owner]); + consumers[state.owner]--; + state.owner.clear(); + assert(state.members.size()); +#ifdef NDEBUG + freeGroups[state.members.front()] = &state; +#else + bool unique = freeGroups.insert(GroupFifo::value_type(state.members.front(), &state)).second; + (void) unique; assert(unique); +#endif + } + + public: + + static boost::shared_ptr create( Queue *q, const qpid::framing::FieldTable& settings ); + + MessageGroupManager(const std::string& header, Queue *q, unsigned int _timestamp=0 ) + : QueueObserver(), MessageAllocator(q), groupIdHeader( header ), timestamp(_timestamp) {} + void enqueued( const QueuedMessage& qm ); + void acquired( const QueuedMessage& qm ); + void requeued( const QueuedMessage& qm ); + void dequeued( const QueuedMessage& qm ); + void consumerAdded( const Consumer& ); + void consumerRemoved( const Consumer& ); + bool nextConsumableMessage( Consumer::shared_ptr& c, QueuedMessage& next, + const sys::Mutex::ScopedLock&); + // uses default nextBrowsableMessage() + bool acquirable(const std::string& consumer, const QueuedMessage& msg, + const sys::Mutex::ScopedLock&); + void query(qpid::types::Variant::Map&, const sys::Mutex::ScopedLock&) const; + bool match(const qpid::types::Variant::Map*, const QueuedMessage&) const; +}; + +}} + +#endif diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 12ecfa882e..224446adb2 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -33,6 +33,8 @@ #include "qpid/broker/QueueRegistry.h" #include "qpid/broker/QueueFlowLimit.h" #include "qpid/broker/ThresholdAlerts.h" +#include "qpid/broker/MessageAllocator.h" +#include "qpid/broker/MessageGroupManager.h" #include "qpid/StringUtils.h" #include "qpid/log/Statement.h" @@ -88,152 +90,6 @@ const int ENQUEUE_ONLY=1; const int ENQUEUE_AND_DEQUEUE=2; } - -// KAG TBD: find me a home.... -namespace qpid { -namespace broker { - -class MessageAllocator -{ - protected: - Queue *queue; - public: - MessageAllocator( Queue *q ) : queue(q) {} - virtual ~MessageAllocator() {}; - - // Note: all methods taking a mutex assume the caller is holding the - // Queue::messageLock during the method call. - - /** Determine the next message available for consumption by the consumer - * @param next set to the next message that the consumer may acquire. - * @return true if message is available - */ - virtual bool nextConsumableMessage( Consumer::shared_ptr&, QueuedMessage& next, - const Mutex::ScopedLock&) - { - Messages& messages(queue->getMessages()); - if (!messages.empty()) { - next = messages.front(); // by default, consume oldest msg - return true; - } - return false; - } - /** Determine the next message available for browsing by the consumer - * @param next set to the next message that the consumer may browse. - * @return true if a message is available - */ - virtual bool nextBrowsableMessage( Consumer::shared_ptr& c, QueuedMessage& next, - const Mutex::ScopedLock&) - { - Messages& messages(queue->getMessages()); - if (!messages.empty() && messages.next(c->position, next)) - return true; - return false; - } - /** acquire a message previously returned via next*Message(). - * @param consumer name of consumer that is attempting to acquire the message - * @param qm the message to be acquired - * @param messageLock - ensures caller is holding it! - * @returns true if acquire is successful, false if acquire failed. - */ - virtual bool acquireMessage( const std::string&, const QueuedMessage&, - const Mutex::ScopedLock&) - { - return true; - } - - /** hook to add any interesting management state to the status map */ - virtual void query(qpid::types::Variant::Map&, const Mutex::ScopedLock&) const {}; -}; - - - -class MessageGroupManager : public QueueObserver, public MessageAllocator -{ - const std::string groupIdHeader; // msg header holding group identifier - const unsigned int timestamp; // mark messages with timestamp if set - - struct GroupState { - typedef std::list PositionFifo; - - std::string group; // group identifier - std::string owner; // consumer with outstanding acquired messages - uint32_t acquired; // count of outstanding acquired messages - //uint32_t total; // count of enqueued messages in this group - PositionFifo members; // msgs belonging to this group - - GroupState() : acquired(0) {} - bool owned() const {return !owner.empty();} - }; - typedef std::map GroupMap; - typedef std::map Consumers; // count of owned groups - typedef std::map GroupFifo; - - GroupMap messageGroups; // index: group name - GroupFifo freeGroups; // ordered by oldest free msg - Consumers consumers; // index: consumer name - - static const std::string qpidMessageGroupKey; - static const std::string qpidMessageGroupTimestamp; - static const std::string qpidMessageGroupDefault; - - const std::string getGroupId( const QueuedMessage& qm ) const; - void unFree( const GroupState& state ) - { - GroupFifo::iterator pos = freeGroups.find( state.members.front() ); - assert( pos != freeGroups.end() && pos->second == &state ); - freeGroups.erase( pos ); - } - void own( GroupState& state, const std::string& owner ) - { - state.owner = owner; - consumers[state.owner]++; - unFree( state ); - } - void disown( GroupState& state ) - { - assert(consumers[state.owner]); - consumers[state.owner]--; - state.owner.clear(); - assert(state.members.size()); -#ifdef NDEBUG - freeGroups[state.members.front()] = &state; -#else - bool unique = freeGroups.insert(GroupFifo::value_type(state.members.front(), &state)).second; - (void) unique; assert(unique); -#endif - } - - public: - - static boost::shared_ptr create( Queue *q, const qpid::framing::FieldTable& settings ); - - MessageGroupManager(const std::string& header, Queue *q, unsigned int _timestamp=0 ) - : QueueObserver(), MessageAllocator(q), groupIdHeader( header ), timestamp(_timestamp) {} - void enqueued( const QueuedMessage& qm ); - void acquired( const QueuedMessage& qm ); - void requeued( const QueuedMessage& qm ); - void dequeued( const QueuedMessage& qm ); - void consumerAdded( const Consumer& ); - void consumerRemoved( const Consumer& ); - bool nextConsumableMessage( Consumer::shared_ptr& c, QueuedMessage& next, - const Mutex::ScopedLock&); - // uses default nextBrowsableMessage() - bool acquireMessage(const std::string& consumer, const QueuedMessage& msg, - const Mutex::ScopedLock&); - void query(qpid::types::Variant::Map&, const Mutex::ScopedLock&) const; - bool match(const qpid::types::Variant::Map*, const QueuedMessage&) const; -}; - -const std::string MessageGroupManager::qpidMessageGroupKey("qpid.group_header_key"); -const std::string MessageGroupManager::qpidMessageGroupTimestamp("qpid.group_timestamp"); -const std::string MessageGroupManager::qpidMessageGroupDefault("qpid.no_group"); /** @todo KAG: make configurable in Broker options */ - -}} -// KAG TBD: END find me a home.... - - - Queue::Queue(const string& _name, bool _autodelete, MessageStore* const _store, const OwnershipToken* const _owner, @@ -400,7 +256,7 @@ bool Queue::acquire(const QueuedMessage& msg, const std::string& consumer) assertClusterSafe(); QPID_LOG(debug, consumer << " attempting to acquire message at " << msg.position); - if (!allocator->acquireMessage( consumer, msg, locker )) { + if (!allocator->acquirable( consumer, msg, locker )) { QPID_LOG(debug, "Not permitted to acquire msg at " << msg.position << " from '" << name); return false; } @@ -447,7 +303,6 @@ bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr& c) Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr& c) { - while (true) { Mutex::ScopedLock locker(messageLock); QueuedMessage msg; @@ -471,7 +326,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ if (c->filter(msg.payload)) { if (c->accept(msg.payload)) { - bool ok = allocator->acquireMessage( c->getName(), msg, locker ); // inform allocator + bool ok = allocator->acquirable( c->getName(), msg, locker ); // inform allocator (void) ok; assert(ok); ok = acquire( msg.position, msg ); (void) ok; assert(ok); @@ -1071,6 +926,7 @@ void Queue::create(const FieldTable& _settings) configureImpl(_settings); } + int getIntegerSetting(const qpid::framing::FieldTable& settings, const std::string& key) { qpid::framing::FieldTable::ValuePtr v = settings.get(key); @@ -1584,287 +1440,3 @@ void Queue::UsageBarrier::destroy() parent.deleted = true; while (count) parent.messageLock.wait(); } - - -// KAG TBD: flesh out... - - - - -const std::string MessageGroupManager::getGroupId( const QueuedMessage& qm ) const -{ - const qpid::framing::FieldTable* headers = qm.payload->getApplicationHeaders(); - if (!headers) return qpidMessageGroupDefault; - FieldTable::ValuePtr id = headers->get( groupIdHeader ); - if (!id || !id->convertsTo()) return qpidMessageGroupDefault; - return id->get(); -} - - -void MessageGroupManager::enqueued( const QueuedMessage& qm ) -{ - // @todo KAG optimization - store reference to group state in QueuedMessage - // issue: const-ness?? - std::string group( getGroupId(qm) ); - GroupState &state(messageGroups[group]); - state.members.push_back(qm.position); - uint32_t total = state.members.size(); - QPID_LOG( trace, "group queue " << queue->getName() << - ": added message to group id=" << group << " total=" << total ); - if (total == 1) { - // newly created group, no owner - state.group = group; -#ifdef NDEBUG - freeGroups[qm.position] = &state; -#else - bool unique = freeGroups.insert(GroupFifo::value_type(qm.position, &state)).second; - (void) unique; assert(unique); -#endif - } -} - - -void MessageGroupManager::acquired( const QueuedMessage& qm ) -{ - // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage - // issue: const-ness?? - std::string group( getGroupId(qm) ); - GroupMap::iterator gs = messageGroups.find( group ); - assert( gs != messageGroups.end() ); - GroupState& state( gs->second ); - state.acquired += 1; - QPID_LOG( trace, "group queue " << queue->getName() << - ": acquired message in group id=" << group << " acquired=" << state.acquired ); -} - - -void MessageGroupManager::requeued( const QueuedMessage& qm ) -{ - // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage - // issue: const-ness?? - // @todo KAG BUG - how to ensure requeue happens in the correct order? - // @todo KAG BUG - if requeue is not in correct order - what do we do? throw? - std::string group( getGroupId(qm) ); - GroupMap::iterator gs = messageGroups.find( group ); - assert( gs != messageGroups.end() ); - GroupState& state( gs->second ); - assert( state.acquired != 0 ); - state.acquired -= 1; - if (state.acquired == 0 && state.owned()) { - QPID_LOG( trace, "group queue " << queue->getName() << - ": consumer name=" << state.owner << " released group id=" << gs->first); - disown(state); - } - QPID_LOG( trace, "group queue " << queue->getName() << - ": requeued message to group id=" << group << " acquired=" << state.acquired ); -} - - -void MessageGroupManager::dequeued( const QueuedMessage& qm ) -{ - // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage - // issue: const-ness?? - std::string group( getGroupId(qm) ); - GroupMap::iterator gs = messageGroups.find( group ); - assert( gs != messageGroups.end() ); - GroupState& state( gs->second ); - assert( state.members.size() != 0 ); - - // likely to be at or near begin() if dequeued in order - { - GroupState::PositionFifo::iterator pos = state.members.begin(); - GroupState::PositionFifo::iterator end = state.members.end(); - while (pos != end) { - if (*pos == qm.position) { - state.members.erase(pos); - break; - } - ++pos; - } - } - - assert( state.acquired != 0 ); - state.acquired -= 1; - uint32_t total = state.members.size(); - if (total == 0) { - if (!state.owned()) { // unlikely, but need to remove from the free list before erase - unFree( state ); - } - QPID_LOG( trace, "group queue " << queue->getName() << ": deleting group id=" << gs->first); - messageGroups.erase( gs ); - } else { - if (state.acquired == 0 && state.owned()) { - QPID_LOG( trace, "group queue " << queue->getName() << - ": consumer name=" << state.owner << " released group id=" << gs->first); - disown(state); - } - } - QPID_LOG( trace, "group queue " << queue->getName() << - ": dequeued message from group id=" << group << " total=" << total ); -} - -void MessageGroupManager::consumerAdded( const Consumer& c ) -{ - assert(consumers.find(c.getName()) == consumers.end()); - consumers[c.getName()] = 0; // no groups owned yet - QPID_LOG( trace, "group queue " << queue->getName() << ": added consumer, name=" << c.getName() ); -} - -void MessageGroupManager::consumerRemoved( const Consumer& c ) -{ - const std::string& name(c.getName()); - Consumers::iterator consumer = consumers.find(name); - assert(consumer != consumers.end()); - size_t count = consumer->second; - - for (GroupMap::iterator gs = messageGroups.begin(); - count && gs != messageGroups.end(); ++gs) { - - GroupState& state( gs->second ); - if (state.owner == name) { - --count; - disown(state); - QPID_LOG( trace, "group queue " << queue->getName() << - ": consumer name=" << name << " released group id=" << gs->first); - } - } - consumers.erase( consumer ); - QPID_LOG( trace, "group queue " << queue->getName() << ": removed consumer name=" << name ); -} - - -bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, QueuedMessage& next, - const Mutex::ScopedLock& ) -{ - Messages& messages(queue->getMessages()); - - if (messages.empty()) - return false; - - if (!freeGroups.empty()) { - framing::SequenceNumber nextFree = freeGroups.begin()->first; - if (nextFree < c->position) { // next free group's msg is older than current position - bool ok = messages.find(nextFree, next); - (void) ok; assert( ok ); - } else { - if (!messages.next( c->position, next )) - return false; // shouldn't happen - should find nextFree - } - } else { // no free groups available - if (consumers[c->getName()] == 0) { // and none currently owned - return false; // so nothing available to consume - } - if (!messages.next( c->position, next )) - return false; - } - - do { - // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage - std::string group( getGroupId( next ) ); - GroupMap::iterator gs = messageGroups.find( group ); - assert( gs != messageGroups.end() ); - GroupState& state( gs->second ); - if (!state.owned() || state.owner == c->getName()) { - return true; - } - } while (messages.next( next.position, next )); - return false; -} - - -bool MessageGroupManager::acquireMessage(const std::string& consumer, const QueuedMessage& qm, - const Mutex::ScopedLock&) -{ - // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage - std::string group( getGroupId(qm) ); - GroupMap::iterator gs = messageGroups.find( group ); - assert( gs != messageGroups.end() ); - GroupState& state( gs->second ); - - if (!state.owned()) { - own( state, consumer ); - QPID_LOG( trace, "group queue " << queue->getName() << - ": consumer name=" << consumer << " has acquired group id=" << gs->first); - return true; - } - return state.owner == consumer; -} - -namespace { - const std::string GroupQueryKey("qpid.message_group_queue"); - const std::string GroupHeaderKey("group_header_key"); - const std::string GroupStateKey("group_state"); - const std::string GroupIdKey("group_id"); - const std::string GroupMsgCount("msg_count"); - const std::string GroupTimestamp("timestamp"); - const std::string GroupConsumer("consumer"); -} - -void MessageGroupManager::query(qpid::types::Variant::Map& status, - const Mutex::ScopedLock&) const -{ - /** Add a description of the current state of the message groups for this queue. - FORMAT: - { "qpid.message_group_queue": - { "group_header_key" : "", - "group_state" : - [ { "group_id" : "", - "msg_count" : , - "timestamp" : , - "consumer" : }, - {...} // one for each known group - ] - } - } - **/ - - assert(status.find(GroupQueryKey) == status.end()); - qpid::types::Variant::Map state; - qpid::types::Variant::List groups; - - state[GroupHeaderKey] = groupIdHeader; - for (GroupMap::const_iterator g = messageGroups.begin(); - g != messageGroups.end(); ++g) { - qpid::types::Variant::Map info; - info[GroupIdKey] = g->first; - info[GroupMsgCount] = g->second.members.size(); - info[GroupTimestamp] = 0; /** @todo KAG - NEED HEAD MSG TIMESTAMP */ - info[GroupConsumer] = g->second.owner; - groups.push_back(info); - } - state[GroupStateKey] = groups; - status[GroupQueryKey] = state; -} - - -boost::shared_ptr MessageGroupManager::create( Queue *q, - const qpid::framing::FieldTable& settings ) -{ - boost::shared_ptr empty; - - if (settings.isSet(qpidMessageGroupKey)) { - - std::string headerKey = settings.getAsString(qpidMessageGroupKey); - if (headerKey.empty()) { - QPID_LOG( error, "A Message Group header key must be configured, queue=" << q->getName()); - return empty; - } - unsigned int timestamp = settings.getAsInt(qpidMessageGroupTimestamp); - - boost::shared_ptr manager( new MessageGroupManager( headerKey, q, timestamp ) ); - - q->addObserver( boost::static_pointer_cast(manager) ); - - QPID_LOG( debug, "Configured Queue '" << q->getName() << - "' for message grouping using header key '" << headerKey << "'" << - " (timestamp=" << timestamp << ")"); - return manager; - } - return empty; -} - - - - - - -- cgit v1.2.1