diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Queue.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 438 |
1 files changed, 338 insertions, 100 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 68dd2ae125..551584f9d9 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -42,6 +42,7 @@ #include "qpid/sys/ClusterSafe.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Time.h" +#include "qpid/types/Variant.h" #include "qmf/org/apache/qpid/broker/ArgsQueuePurge.h" #include "qmf/org/apache/qpid/broker/ArgsQueueReroute.h" @@ -111,8 +112,70 @@ class MessageAllocator */ virtual bool canAcquire( const std::string& consumer, const QueuedMessage& qm, const Mutex::ScopedLock&); + + /** hook to add any interesting management state to the status map (lock held) */ + virtual void query(qpid::types::Variant::Map&, const Mutex::ScopedLock&) const {}; + + /** for move, purge, reroute - check if message matches against a filter, + * return true if message matches. + */ + virtual bool match(const qpid::types::Variant::Map* filter, + const QueuedMessage& message) const; }; + + +class MessageGroupManager : public QueueObserver, public MessageAllocator +{ + const std::string groupIdHeader; // msg header holding group identifier + const unsigned int timestamp; // mark messages with timestamp if set + + struct GroupState { + //const std::string group; // group identifier + //Consumer::shared_ptr owner; // consumer with outstanding acquired messages + std::string owner; // consumer with outstanding acquired messages + uint32_t acquired; // count of outstanding acquired messages + uint32_t total; // count of enqueued messages in this group + GroupState() : acquired(0), total(0) {} + }; + typedef std::map<std::string, struct GroupState> GroupMap; + typedef std::set<std::string> Consumers; + + GroupMap messageGroups; + Consumers consumers; + + static const std::string qpidMessageGroupKey; + static const std::string qpidMessageGroupTimestamp; + static const std::string qpidMessageGroupDefault; + static const std::string qpidMessageGroupFilter; // key for move/purge filter map + + const std::string getGroupId( const QueuedMessage& qm ) const; + + public: + + static boost::shared_ptr<MessageGroupManager> create( Queue *q, const qpid::framing::FieldTable& settings ); + + MessageGroupManager(const std::string& header, Queue *q, unsigned int _timestamp=0 ) + : QueueObserver(), MessageAllocator(q), groupIdHeader( header ), timestamp(_timestamp) {} + void enqueued( const QueuedMessage& qm ); + void acquired( const QueuedMessage& qm ); + void requeued( const QueuedMessage& qm ); + void dequeued( const QueuedMessage& qm ); + void consumerAdded( const Consumer& ); + void consumerRemoved( const Consumer& ); + bool nextMessage( Consumer::shared_ptr c, QueuedMessage& next, + const Mutex::ScopedLock&); + bool canAcquire(const std::string& consumer, const QueuedMessage& msg, + const Mutex::ScopedLock&); + void query(qpid::types::Variant::Map&, const Mutex::ScopedLock&) const; + bool match(const qpid::types::Variant::Map*, const QueuedMessage&) const; +}; + +const std::string MessageGroupManager::qpidMessageGroupKey("qpid.group_header_key"); +const std::string MessageGroupManager::qpidMessageGroupTimestamp("qpid.group_timestamp"); +const std::string MessageGroupManager::qpidMessageGroupDefault("qpid.no_group"); /** @todo KAG: make configurable in Broker options */ +const std::string MessageGroupManager::qpidMessageGroupFilter("qpid.group_id"); + }} Queue::Queue(const string& _name, bool _autodelete, @@ -140,7 +203,8 @@ Queue::Queue(const string& _name, bool _autodelete, deleted(false), barrier(*this), autoDeleteTimeout(0), - allocator(new MessageAllocator( this )) // KAG TODO: FIX!! + allocator(new MessageAllocator( this )), + type(FIFO) { if (parent != 0 && broker != 0) { ManagementAgent* agent = broker->getManagementAgent(); @@ -275,7 +339,7 @@ bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& mess } } -bool Queue::acquire(const QueuedMessage& msg, const std::string& consumer) +bool Queue::acquire(const QueuedMessage& msg, const std::string& consumer) { Mutex::ScopedLock locker(messageLock); assertClusterSafe(); @@ -535,6 +599,29 @@ void Queue::purgeExpired(qpid::sys::Duration lapse) } } + +namespace { + // for use with purge/move below - collect messages that match a given filter + struct Collector { + const uint32_t maxMatches; + const qpid::types::Variant::Map *filter; + std::deque<QueuedMessage> matches; + boost::shared_ptr<MessageAllocator> allocator; + Collector(boost::shared_ptr<MessageAllocator> a, uint32_t m, + const qpid::types::Variant::Map *f) + : maxMatches(m), filter(f), allocator(a) {} + void operator() (QueuedMessage& qm) + { + if (maxMatches == 0 || matches.size() < maxMatches) { + if (allocator->match( filter, qm )) { + matches.push_back(qm); + } + } + } + }; +} + + /** * purge - for purging all or some messages on a queue * depending on the purge_request @@ -546,52 +633,61 @@ void Queue::purgeExpired(qpid::sys::Duration lapse) * The dest exchange may be supplied to re-route messages through the exchange. * It is safe to re-route messages such that they arrive back on the same queue, * even if the queue is ordered by priority. + * + * An optional filter can be supplied that will be applied against each message. The + * message is purged only if the filter matches. See MessageAllocator for more detail. */ -uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> dest) +uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> dest, + const qpid::types::Variant::Map *filter) { - Mutex::ScopedLock locker(messageLock); - uint32_t purge_count = purge_request; // only comes into play if >0 - std::deque<DeliverableMessage> rerouteQueue; + Collector c(allocator, purge_request, filter); - uint32_t count = 0; - // Either purge them all or just the some (purge_count) while the queue isn't empty. - while((!purge_request || purge_count--) && !messages->empty()) { - if (dest.get()) { - // - // If there is a destination exchange, stage the messages onto a reroute queue - // so they don't wind up getting purged more than once. - // - DeliverableMessage msg(messages->front().payload); - rerouteQueue.push_back(msg); + Mutex::ScopedLock locker(messageLock); + messages->foreach( boost::bind<void>(boost::ref(c), _1) ); + + uint32_t count = c.matches.size(); + + // first remove all matches + for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin(); + qmsg != c.matches.end(); qmsg++) { + /** @todo KAG: need a direct remove method here */ + bool ok = acquire(qmsg->position, *qmsg); + (void) ok; assert(ok); + dequeue(0, *qmsg); + } + + // now reroute if necessary + if (dest.get()) { + while (!c.matches.empty()) { + QueuedMessage msg = c.matches.front(); + c.matches.pop_front(); + assert(msg.payload); + DeliverableMessage dmsg(msg.payload); + dest->routeWithAlternate(dmsg); } - popAndDequeue(); - count++; - } - - // - // Re-route purged messages into the destination exchange. Note that there's no need - // to test dest.get() here because if it is NULL, the rerouteQueue will be empty. - // - while (!rerouteQueue.empty()) { - DeliverableMessage msg(rerouteQueue.front()); - rerouteQueue.pop_front(); - dest->routeWithAlternate(msg); } - return count; } -uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty) { +uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty, + const qpid::types::Variant::Map *filter) +{ + Collector c(allocator, qty, filter); + Mutex::ScopedLock locker(messageLock); - uint32_t move_count = qty; // only comes into play if qty >0 - uint32_t count = 0; // count how many were moved for returning + messages->foreach( boost::bind<void>(boost::ref(c), _1) ); + + uint32_t count = c.matches.size(); - while((!qty || move_count--) && !messages->empty()) { - QueuedMessage qmsg = messages->front(); - boost::intrusive_ptr<Message> msg = qmsg.payload; - destq->deliver(msg); // deliver message to the destination queue - popAndDequeue(); - count++; + while (!c.matches.empty()) { + QueuedMessage qmsg = c.matches.front(); + c.matches.pop_front(); + /** @todo KAG: need a direct remove method here */ + bool ok = acquire(qmsg.position, qmsg); + (void) ok; assert(ok); + dequeue(0, qmsg); + assert(qmsg.payload); + destq->deliver(qmsg.payload); } return count; } @@ -614,6 +710,7 @@ bool Queue::acquire(const qpid::framing::SequenceNumber& position, QueuedMessage { if (messages->remove(position, msg)) { acquired( msg ); + ++dequeueSincePurge; return true; } return false; @@ -912,17 +1009,29 @@ void Queue::configureImpl(const FieldTable& _settings) if (lvqKey.size()) { QPID_LOG(debug, "Configured queue " << getName() << " as Last Value Queue with key " << lvqKey); messages = std::auto_ptr<Messages>(new MessageMap(lvqKey)); + type = LVQ; } else if (_settings.get(qpidLastValueQueueNoBrowse)) { QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue with 'no-browse' on"); messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, true, broker); + type = LVQ; } else if (_settings.get(qpidLastValueQueue)) { QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue"); messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, false, broker); + type = LVQ; } else { std::auto_ptr<Messages> m = Fairshare::create(_settings); if (m.get()) { messages = m; QPID_LOG(debug, "Configured queue " << getName() << " as priority queue."); + type = PRIORITY; + } + } + + { // override default message allocator if message groups configured. + boost::shared_ptr<MessageAllocator> ma = boost::static_pointer_cast<MessageAllocator>(MessageGroupManager::create( this, _settings )); + if (ma) { + allocator = ma; + type = GROUP; } } @@ -1181,7 +1290,7 @@ Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, str case _qmf::Queue::METHOD_PURGE : { _qmf::ArgsQueuePurge& purgeArgs = (_qmf::ArgsQueuePurge&) args; - purge(purgeArgs.i_request); + purge(purgeArgs.i_request, boost::shared_ptr<Exchange>(), &purgeArgs.i_filter); status = Manageable::STATUS_OK; } break; @@ -1202,7 +1311,7 @@ Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, str } } - purge(rerouteArgs.i_request, dest); + purge(rerouteArgs.i_request, dest, &rerouteArgs.i_filter); status = Manageable::STATUS_OK; } break; @@ -1211,6 +1320,14 @@ Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, str return status; } + +void Queue::query(qpid::types::Variant::Map& results) const +{ + Mutex::ScopedLock locker(messageLock); + /** @todo add any interesting queue state into results */ + if (allocator) allocator->query( results, messageLock ); +} + void Queue::setPosition(SequenceNumber n) { Mutex::ScopedLock locker(messageLock); sequence = n; @@ -1357,145 +1474,258 @@ void Queue::UsageBarrier::destroy() // KAG TBD: flesh out... -class MessageGroupManager : public QueueObserver, public MessageAllocator -{ - const std::string groupIdHeader; // msg header holding group identifier - struct GroupState { - const std::string group; // group identifier - //Consumer::shared_ptr owner; // consumer with outstanding acquired messages - std::string owner; // consumer with outstanding acquired messages - uint32_t acquired; // count of outstanding acquired messages - uint32_t total; // count of enqueued messages in this group - GroupState() : acquired(0), total(0) {} - }; - std::map<std::string, struct GroupState> messageGroups; - std::set<std::string> consumers; - - public: - - MessageGroupManager(const std::string& header, Queue *q ) - : QueueObserver(), MessageAllocator(q), groupIdHeader( header ) {} - void enqueued( const QueuedMessage& qm ); - void removed( const QueuedMessage& qm ); - void requeued( const QueuedMessage& qm ); - void dequeued( const QueuedMessage& qm ); - void consumerAdded( const Consumer& ); - void consumerRemoved( const Consumer& ); - bool nextMessage( Consumer::shared_ptr c, QueuedMessage& next, - const Mutex::ScopedLock&); - bool canAcquire(const std::string& consumer, const QueuedMessage& msg, - const Mutex::ScopedLock&); -}; -namespace { - const std::string NO_GROUP(""); - const std::string getGroupId( const QueuedMessage& qm, const std::string& key ) - { - const qpid::framing::FieldTable* headers = qm.payload->getApplicationHeaders(); - if (!headers) return NO_GROUP; - return headers->getAsString(key); - } +const std::string MessageGroupManager::getGroupId( const QueuedMessage& qm ) const +{ + const qpid::framing::FieldTable* headers = qm.payload->getApplicationHeaders(); + if (!headers) return qpidMessageGroupDefault; + FieldTable::ValuePtr id = headers->get( groupIdHeader ); + if (!id || !id->convertsTo<std::string>()) return qpidMessageGroupDefault; + return id->get<std::string>(); } void MessageGroupManager::enqueued( const QueuedMessage& qm ) { - std::string group( getGroupId(qm, groupIdHeader) ); - messageGroups[group].total++; + std::string group( getGroupId(qm) ); + uint32_t total = ++messageGroups[group].total; + QPID_LOG( trace, "group queue " << queue->getName() << + ": added message to group id=" << group << " total=" << total ); } -void MessageGroupManager::removed( const QueuedMessage& qm ) +void MessageGroupManager::acquired( const QueuedMessage& qm ) { - std::string group( getGroupId(qm, groupIdHeader) ); - std::map<std::string, struct GroupState>::iterator gs = messageGroups.find( group ); + std::string group( getGroupId(qm) ); + GroupMap::iterator gs = messageGroups.find( group ); assert( gs != messageGroups.end() ); - gs->second.acquired += 1; + GroupState& state( gs->second ); + state.acquired += 1; + QPID_LOG( trace, "group queue " << queue->getName() << + ": acquired message in group id=" << group << " acquired=" << state.acquired ); } void MessageGroupManager::requeued( const QueuedMessage& qm ) { - std::string group( getGroupId(qm, groupIdHeader) ); - std::map<std::string, struct GroupState>::iterator gs = messageGroups.find( group ); + std::string group( getGroupId(qm) ); + GroupMap::iterator gs = messageGroups.find( group ); assert( gs != messageGroups.end() ); GroupState& state( gs->second ); assert( state.acquired != 0 ); state.acquired -= 1; if (state.acquired == 0 && !state.owner.empty()) { + QPID_LOG( trace, "group queue " << queue->getName() << + ": consumer name=" << state.owner << " released group id=" << gs->first); state.owner.clear(); // KAG TODO: need to invalidate consumer's positions? } + QPID_LOG( trace, "group queue " << queue->getName() << + ": requeued message to group id=" << group << " acquired=" << state.acquired ); } void MessageGroupManager::dequeued( const QueuedMessage& qm ) { - std::string group( getGroupId(qm, groupIdHeader) ); - std::map<std::string, struct GroupState>::iterator gs = messageGroups.find( group ); + std::string group( getGroupId(qm) ); + GroupMap::iterator gs = messageGroups.find( group ); assert( gs != messageGroups.end() ); GroupState& state( gs->second ); assert( state.total != 0 ); - state.total -= 1; + uint32_t total = state.total -= 1; assert( state.acquired != 0 ); state.acquired -= 1; - if (state.total == 0) messageGroups.erase( gs ); - else if (state.acquired == 0 && !state.owner.empty()) { - state.owner.clear(); // KAG TODO: need to invalidate consumer's positions? + if (state.total == 0) { + QPID_LOG( trace, "group queue " << queue->getName() << ": deleting group id=" << gs->first); + messageGroups.erase( gs ); + } else { + if (state.acquired == 0 && !state.owner.empty()) { + QPID_LOG( trace, "group queue " << queue->getName() << + ": consumer name=" << state.owner << " released group id=" << gs->first); + state.owner.clear(); // KAG TODO: need to invalidate consumer's positions? + } } + QPID_LOG( trace, "group queue " << queue->getName() << + ": dequeued message from group id=" << group << " total=" << total ); } void MessageGroupManager::consumerAdded( const Consumer& c ) { - bool unique = consumers.insert( c.getName() ).second; + const std::string& name(c.getName()); + bool unique = consumers.insert( name ).second; (void) unique; assert( unique ); + QPID_LOG( trace, "group queue " << queue->getName() << ": added consumer name=" << name ); } void MessageGroupManager::consumerRemoved( const Consumer& c ) { - size_t count = consumers.erase( c.getName() ); + const std::string& name(c.getName()); + size_t count = consumers.erase( name ); (void) count; assert( count == 1 ); bool needReset = false; - for (std::map<std::string, struct GroupState>::iterator gs = messageGroups.begin(); + for (GroupMap::iterator gs = messageGroups.begin(); gs != messageGroups.end(); ++gs) { GroupState& state( gs->second ); - if (state.owner == c.getName()) { + if (state.owner == name) { state.owner.clear(); needReset = true; + QPID_LOG( trace, "group queue " << queue->getName() << + ": consumer name=" << name << " released group id=" << gs->first); } } if (needReset) { // KAG TODO: How do I invalidate all consumers that need invalidating???? } + QPID_LOG( trace, "group queue " << queue->getName() << ": removed consumer name=" << name ); } bool MessageGroupManager::nextMessage( Consumer::shared_ptr c, QueuedMessage& next, - const Mutex::ScopedLock& l) + const Mutex::ScopedLock& ) { - // KAG TODO: FIX!!! - return MessageAllocator::nextMessage( c, next, l ); + Messages& messages(queue->getMessages()); + + if (messages.empty()) + return false; + + if (c->preAcquires()) { // not browsing + next = messages.front(); + QueuedMessage current; + do { + current = next; + /** @todo KAG: horrifingly suboptimal - optimize */ + std::string group( getGroupId( current ) ); + GroupMap::iterator gs = messageGroups.find( group ); /** @todo need to cache this somehow */ + assert( gs != messageGroups.end() ); + GroupState& state( gs->second ); + if (state.owner.empty()) { + state.owner = c->getName(); + QPID_LOG( trace, "group queue " << queue->getName() << + ": consumer name=" << c->getName() << " has acquired group id=" << group); + return true; + } + if (state.owner == c->getName()) { + return true; + } + } while (messages.next( current, next )); /** @todo: .next() is a linear search from front - optimize */ + return false; + } else if (messages.next(c->position, next)) + return true; + return false; } bool MessageGroupManager::canAcquire(const std::string& consumer, const QueuedMessage& qm, const Mutex::ScopedLock&) { - std::string group( getGroupId(qm, groupIdHeader) ); - std::map<std::string, struct GroupState>::iterator gs = messageGroups.find( group ); + std::string group( getGroupId(qm) ); + GroupMap::iterator gs = messageGroups.find( group ); assert( gs != messageGroups.end() ); GroupState& state( gs->second ); if (state.owner.empty()) { state.owner = consumer; + QPID_LOG( trace, "group queue " << queue->getName() << + ": consumer name=" << consumer << " has acquired group id=" << gs->first); return true; } return state.owner == consumer; } +namespace { + const std::string GroupQueryKey("qpid.message_group_queue"); + const std::string GroupHeaderKey("group_header_key"); + const std::string GroupStateKey("group_state"); + const std::string GroupIdKey("group_id"); + const std::string GroupMsgCount("msg_count"); + const std::string GroupTimestamp("timestamp"); + const std::string GroupConsumer("consumer"); +} + +void MessageGroupManager::query(qpid::types::Variant::Map& status, + const Mutex::ScopedLock&) const +{ + /** Add a description of the current state of the message groups for this queue. + FORMAT: + { "qpid.message_group_queue": + { "group_header_key" : "<KEY>", + "group_state" : + [ { "group_id" : "<name>", + "msg_count" : <int>, + "timestamp" : <absTime>, + "consumer" : <consumer name> }, + {...} // one for each known group + ] + } + } + **/ + + assert(status.find(GroupQueryKey) == status.end()); + qpid::types::Variant::Map state; + qpid::types::Variant::List groups; + + state[GroupHeaderKey] = groupIdHeader; + for (GroupMap::const_iterator g = messageGroups.begin(); + g != messageGroups.end(); ++g) { + qpid::types::Variant::Map info; + info[GroupIdKey] = g->first; + info[GroupMsgCount] = g->second.total; + info[GroupTimestamp] = 0; /** @todo KAG - NEED HEAD MSG TIMESTAMP */ + info[GroupConsumer] = g->second.owner; + groups.push_back(info); + } + state[GroupStateKey] = groups; + status[GroupQueryKey] = state; +} + +bool MessageGroupManager::match(const qpid::types::Variant::Map* filter, + const QueuedMessage& message) const +{ + if (!filter) return true; + qpid::types::Variant::Map::const_iterator i = filter->find( qpidMessageGroupFilter ); + if (i == filter->end()) return true; + if (i->second.asString() == getGroupId(message)) return true; + return false; +} + +boost::shared_ptr<MessageGroupManager> MessageGroupManager::create( Queue *q, + const qpid::framing::FieldTable& settings ) +{ + boost::shared_ptr<MessageGroupManager> empty; + + if (settings.isSet(qpidMessageGroupKey)) { + + Queue::Disposition qt = q->getDisposition(); + + if (qt == Queue::LVQ) { + QPID_LOG( error, "Message Groups cannot be enabled on LVQ Queues, queue=" << q->getName()); + return empty; + } + if (qt == Queue::PRIORITY) { + QPID_LOG( error, "Message Groups cannot be enabled for Priority Queues, queue=" << q->getName()); + return empty; + } + std::string headerKey = settings.getAsString(qpidMessageGroupKey); + if (headerKey.empty()) { + QPID_LOG( error, "A Message Group header key must be configured, queue=" << q->getName()); + return empty; + } + unsigned int timestamp = settings.getAsInt(qpidMessageGroupTimestamp); + + boost::shared_ptr<MessageGroupManager> manager( new MessageGroupManager( headerKey, q, timestamp ) ); + + q->addObserver( boost::static_pointer_cast<QueueObserver>(manager) ); + + QPID_LOG( debug, "Configured Queue '" << q->getName() << + "' for message grouping using header key '" << headerKey << "'" << + " (timestamp=" << timestamp << ")"); + return manager; + } + return empty; +} @@ -1526,5 +1756,13 @@ bool MessageAllocator::canAcquire(const std::string&, const QueuedMessage&, } +// default match - ignore filter and always match. +bool MessageAllocator::match(const qpid::types::Variant::Map*, + const QueuedMessage&) const +{ + return true; +} + + |