diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Queue.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 215 |
1 files changed, 120 insertions, 95 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 551584f9d9..853bf09a9c 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -115,12 +115,6 @@ class MessageAllocator /** hook to add any interesting management state to the status map (lock held) */ virtual void query(qpid::types::Variant::Map&, const Mutex::ScopedLock&) const {}; - - /** for move, purge, reroute - check if message matches against a filter, - * return true if message matches. - */ - virtual bool match(const qpid::types::Variant::Map* filter, - const QueuedMessage& message) const; }; @@ -147,7 +141,6 @@ class MessageGroupManager : public QueueObserver, public MessageAllocator static const std::string qpidMessageGroupKey; static const std::string qpidMessageGroupTimestamp; static const std::string qpidMessageGroupDefault; - static const std::string qpidMessageGroupFilter; // key for move/purge filter map const std::string getGroupId( const QueuedMessage& qm ) const; @@ -174,9 +167,11 @@ class MessageGroupManager : public QueueObserver, public MessageAllocator const std::string MessageGroupManager::qpidMessageGroupKey("qpid.group_header_key"); const std::string MessageGroupManager::qpidMessageGroupTimestamp("qpid.group_timestamp"); const std::string MessageGroupManager::qpidMessageGroupDefault("qpid.no_group"); /** @todo KAG: make configurable in Broker options */ -const std::string MessageGroupManager::qpidMessageGroupFilter("qpid.group_id"); }} +// KAG TBD: END find me a home.... + + Queue::Queue(const string& _name, bool _autodelete, MessageStore* const _store, @@ -203,8 +198,7 @@ Queue::Queue(const string& _name, bool _autodelete, deleted(false), barrier(*this), autoDeleteTimeout(0), - allocator(new MessageAllocator( this )), - type(FIFO) + allocator(new MessageAllocator( this )) { if (parent != 0 && broker != 0) { ManagementAgent* agent = broker->getManagementAgent(); @@ -602,24 +596,101 @@ void Queue::purgeExpired(qpid::sys::Duration lapse) namespace { // for use with purge/move below - collect messages that match a given filter + // + class MessageFilter + { + public: + static const std::string typeKey; + static const std::string paramsKey; + static MessageFilter *create( const ::qpid::types::Variant::Map *filter ); + virtual bool match( const QueuedMessage& ) const { return true; } + virtual ~MessageFilter() {} + protected: + MessageFilter() {}; + }; + const std::string MessageFilter::typeKey("filter_type"); + const std::string MessageFilter::paramsKey("filter_params"); + + // filter by message header string value exact match + class HeaderMatchFilter : public MessageFilter + { + public: + /* Config: + { 'filter_type' : 'header_match_str', + 'filter_params' : { 'header_key' : "<header name>", + 'header_value' : "<value to match>" + } + } + */ + static const std::string typeKey; + static const std::string headerKey; + static const std::string valueKey; + HeaderMatchFilter( const std::string& _header, const std::string& _value ) + : MessageFilter (), header(_header), value(_value) {} + bool match( const QueuedMessage& msg ) const + { + const qpid::framing::FieldTable* headers = msg.payload->getApplicationHeaders(); + if (!headers) return false; + FieldTable::ValuePtr h = headers->get(header); + if (!h || !h->convertsTo<std::string>()) return false; + return h->get<std::string>() == value; + } + private: + const std::string header; + const std::string value; + }; + const std::string HeaderMatchFilter::typeKey("header_match_str"); + const std::string HeaderMatchFilter::headerKey("header_key"); + const std::string HeaderMatchFilter::valueKey("header_value"); + + // factory to create correct filter based on map + MessageFilter* MessageFilter::create( const ::qpid::types::Variant::Map *filter ) + { + using namespace qpid::types; + if (filter) { + Variant::Map::const_iterator i = filter->find(MessageFilter::typeKey); + if (i != filter->end()) { + + if (i->second.asString() == HeaderMatchFilter::typeKey) { + Variant::Map::const_iterator p = filter->find(MessageFilter::paramsKey); + if (p != filter->end() && p->second.getType() == VAR_MAP) { + Variant::Map::const_iterator k = p->second.asMap().find(HeaderMatchFilter::headerKey); + Variant::Map::const_iterator v = p->second.asMap().find(HeaderMatchFilter::valueKey); + if (k != p->second.asMap().end() && v != p->second.asMap().end()) { + std::string headerKey(k->second.asString()); + std::string value(v->second.asString()); + QPID_LOG(debug, "Message filtering by header value configured. key: " << headerKey << " value: " << value ); + return new HeaderMatchFilter( headerKey, value ); + } + } + } + } + QPID_LOG(error, "Ignoring unrecognized message filter: '" << *filter << "'"); + } + return new MessageFilter(); + } + + // used by removeIf() to collect all messages matching a filter, maximum match count is + // optional. struct Collector { const uint32_t maxMatches; - const qpid::types::Variant::Map *filter; + MessageFilter& filter; std::deque<QueuedMessage> matches; - boost::shared_ptr<MessageAllocator> allocator; - Collector(boost::shared_ptr<MessageAllocator> a, uint32_t m, - const qpid::types::Variant::Map *f) - : maxMatches(m), filter(f), allocator(a) {} - void operator() (QueuedMessage& qm) + Collector(MessageFilter& filter, uint32_t max) + : maxMatches(max), filter(filter) {} + bool operator() (QueuedMessage& qm) { if (maxMatches == 0 || matches.size() < maxMatches) { - if (allocator->match( filter, qm )) { + if (filter.match( qm )) { matches.push_back(qm); + return true; } } + return false; } }; -} + +} // end namespace /** @@ -640,56 +711,45 @@ namespace { uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> dest, const qpid::types::Variant::Map *filter) { - Collector c(allocator, purge_request, filter); + std::auto_ptr<MessageFilter> mf(MessageFilter::create(filter)); + Collector c(*mf.get(), purge_request); Mutex::ScopedLock locker(messageLock); - messages->foreach( boost::bind<void>(boost::ref(c), _1) ); - - uint32_t count = c.matches.size(); - - // first remove all matches + messages->removeIf( boost::bind<bool>(boost::ref(c), _1) ); for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin(); - qmsg != c.matches.end(); qmsg++) { - /** @todo KAG: need a direct remove method here */ - bool ok = acquire(qmsg->position, *qmsg); - (void) ok; assert(ok); + qmsg != c.matches.end(); ++qmsg) { + // Update observers and message state: + acquired(*qmsg); dequeue(0, *qmsg); - } - - // now reroute if necessary - if (dest.get()) { - while (!c.matches.empty()) { - QueuedMessage msg = c.matches.front(); - c.matches.pop_front(); - assert(msg.payload); - DeliverableMessage dmsg(msg.payload); + // now reroute if necessary + if (dest.get()) { + assert(qmsg->payload); + DeliverableMessage dmsg(qmsg->payload); dest->routeWithAlternate(dmsg); } } - return count; + return c.matches.size(); } uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty, const qpid::types::Variant::Map *filter) { - Collector c(allocator, qty, filter); + std::auto_ptr<MessageFilter> mf(MessageFilter::create(filter)); + Collector c(*mf.get(), qty); Mutex::ScopedLock locker(messageLock); - messages->foreach( boost::bind<void>(boost::ref(c), _1) ); + messages->removeIf( boost::bind<bool>(boost::ref(c), _1) ); - uint32_t count = c.matches.size(); - - while (!c.matches.empty()) { - QueuedMessage qmsg = c.matches.front(); - c.matches.pop_front(); - /** @todo KAG: need a direct remove method here */ - bool ok = acquire(qmsg.position, qmsg); - (void) ok; assert(ok); - dequeue(0, qmsg); - assert(qmsg.payload); - destq->deliver(qmsg.payload); + for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin(); + qmsg != c.matches.end(); ++qmsg) { + // Update observers and message state: + acquired(*qmsg); + dequeue(0, *qmsg); + // and move to destination Queue. + assert(qmsg->payload); + destq->deliver(qmsg->payload); } - return count; + return c.matches.size(); } /** Acquire the front (oldest) message from the in-memory queue. @@ -939,7 +999,6 @@ void Queue::acquired(const QueuedMessage& msg) } } - void Queue::create(const FieldTable& _settings) { settings = _settings; @@ -949,7 +1008,6 @@ void Queue::create(const FieldTable& _settings) configureImpl(_settings); } - int getIntegerSetting(const qpid::framing::FieldTable& settings, const std::string& key) { qpid::framing::FieldTable::ValuePtr v = settings.get(key); @@ -1009,29 +1067,23 @@ void Queue::configureImpl(const FieldTable& _settings) if (lvqKey.size()) { QPID_LOG(debug, "Configured queue " << getName() << " as Last Value Queue with key " << lvqKey); messages = std::auto_ptr<Messages>(new MessageMap(lvqKey)); - type = LVQ; } else if (_settings.get(qpidLastValueQueueNoBrowse)) { QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue with 'no-browse' on"); messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, true, broker); - type = LVQ; } else if (_settings.get(qpidLastValueQueue)) { QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue"); messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, false, broker); - type = LVQ; } else { std::auto_ptr<Messages> m = Fairshare::create(_settings); if (m.get()) { messages = m; QPID_LOG(debug, "Configured queue " << getName() << " as priority queue."); - type = PRIORITY; - } - } - - { // override default message allocator if message groups configured. - boost::shared_ptr<MessageAllocator> ma = boost::static_pointer_cast<MessageAllocator>(MessageGroupManager::create( this, _settings )); - if (ma) { - allocator = ma; - type = GROUP; + } else { // default (FIFO) queue type + // override default message allocator if message groups configured. + boost::shared_ptr<MessageAllocator> ma = boost::static_pointer_cast<MessageAllocator>(MessageGroupManager::create( this, _settings )); + if (ma) { + allocator = ma; + } } } @@ -1593,11 +1645,9 @@ bool MessageGroupManager::nextMessage( Consumer::shared_ptr c, QueuedMessage& ne if (c->preAcquires()) { // not browsing next = messages.front(); - QueuedMessage current; do { - current = next; /** @todo KAG: horrifingly suboptimal - optimize */ - std::string group( getGroupId( current ) ); + std::string group( getGroupId( next ) ); GroupMap::iterator gs = messageGroups.find( group ); /** @todo need to cache this somehow */ assert( gs != messageGroups.end() ); GroupState& state( gs->second ); @@ -1610,7 +1660,7 @@ bool MessageGroupManager::nextMessage( Consumer::shared_ptr c, QueuedMessage& ne if (state.owner == c->getName()) { return true; } - } while (messages.next( current, next )); /** @todo: .next() is a linear search from front - optimize */ + } while (messages.next( next.position, next )); /** @todo: .next() is a linear search from front - optimize */ return false; } else if (messages.next(c->position, next)) return true; @@ -1681,15 +1731,6 @@ void MessageGroupManager::query(qpid::types::Variant::Map& status, status[GroupQueryKey] = state; } -bool MessageGroupManager::match(const qpid::types::Variant::Map* filter, - const QueuedMessage& message) const -{ - if (!filter) return true; - qpid::types::Variant::Map::const_iterator i = filter->find( qpidMessageGroupFilter ); - if (i == filter->end()) return true; - if (i->second.asString() == getGroupId(message)) return true; - return false; -} boost::shared_ptr<MessageGroupManager> MessageGroupManager::create( Queue *q, const qpid::framing::FieldTable& settings ) @@ -1698,16 +1739,6 @@ boost::shared_ptr<MessageGroupManager> MessageGroupManager::create( Queue *q, if (settings.isSet(qpidMessageGroupKey)) { - Queue::Disposition qt = q->getDisposition(); - - if (qt == Queue::LVQ) { - QPID_LOG( error, "Message Groups cannot be enabled on LVQ Queues, queue=" << q->getName()); - return empty; - } - if (qt == Queue::PRIORITY) { - QPID_LOG( error, "Message Groups cannot be enabled for Priority Queues, queue=" << q->getName()); - return empty; - } std::string headerKey = settings.getAsString(qpidMessageGroupKey); if (headerKey.empty()) { QPID_LOG( error, "A Message Group header key must be configured, queue=" << q->getName()); @@ -1756,12 +1787,6 @@ bool MessageAllocator::canAcquire(const std::string&, const QueuedMessage&, } -// default match - ignore filter and always match. -bool MessageAllocator::match(const qpid::types::Variant::Map*, - const QueuedMessage&) const -{ - return true; -} |