diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-08-17 13:07:26 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-08-17 13:07:26 +0000 |
commit | 6833c389115a220fa0a05453bd19d54e5bd8e891 (patch) | |
tree | e2ead73531a141fea5d9c8d2f2b39797fcdfadef | |
parent | 7762a152b3dabd859089abb388268101b2c72ea0 (diff) | |
download | qpid-python-6833c389115a220fa0a05453bd19d54e5bd8e891.tar.gz |
QPID-3346: checkpoint - incorporate changes based on review feedback
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3346@1158686 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/DeliveryRecord.cpp | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/LegacyLVQ.cpp | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/LegacyLVQ.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/MessageDeque.cpp | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/MessageDeque.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/MessageMap.cpp | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/MessageMap.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Messages.h | 9 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/PriorityQueue.cpp | 19 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/PriorityQueue.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 215 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.h | 8 |
12 files changed, 122 insertions, 153 deletions
diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp index a82a54f077..17ecef60d9 100644 --- a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -142,7 +142,7 @@ void DeliveryRecord::reject() //just drop it QPID_LOG(info, "Dropping rejected message from " << queue->getName()); } - queue->dequeue(0, msg); + dequeue(); setEnded(); } } @@ -152,8 +152,7 @@ uint32_t DeliveryRecord::getCredit() const return credit; } -void DeliveryRecord::acquire(DeliveryIds& results) -{ +void DeliveryRecord::acquire(DeliveryIds& results) { if (queue->acquire(msg, tag)) { acquired = true; results.push_back(id); diff --git a/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp b/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp index bc009f14d1..a811a86492 100644 --- a/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp +++ b/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp @@ -44,11 +44,6 @@ bool LegacyLVQ::remove(const framing::SequenceNumber& position, QueuedMessage& m } } -bool LegacyLVQ::next(const QueuedMessage& message, QueuedMessage& next) -{ - return this->next(message.position, next); -} - bool LegacyLVQ::next(const framing::SequenceNumber& position, QueuedMessage& message) { if (MessageMap::next(position, message)) { diff --git a/qpid/cpp/src/qpid/broker/LegacyLVQ.h b/qpid/cpp/src/qpid/broker/LegacyLVQ.h index 33d4117845..dd0fd7aaec 100644 --- a/qpid/cpp/src/qpid/broker/LegacyLVQ.h +++ b/qpid/cpp/src/qpid/broker/LegacyLVQ.h @@ -42,7 +42,6 @@ class LegacyLVQ : public MessageMap LegacyLVQ(const std::string& key, bool noBrowse = false, Broker* broker = 0); bool remove(const framing::SequenceNumber&, QueuedMessage&); bool next(const framing::SequenceNumber&, QueuedMessage&); - bool next(const QueuedMessage&, QueuedMessage&); bool push(const QueuedMessage& added, QueuedMessage& removed); void removeIf(Predicate); void setNoBrowse(bool); diff --git a/qpid/cpp/src/qpid/broker/MessageDeque.cpp b/qpid/cpp/src/qpid/broker/MessageDeque.cpp index b4fdfb3d6a..24b8f6f895 100644 --- a/qpid/cpp/src/qpid/broker/MessageDeque.cpp +++ b/qpid/cpp/src/qpid/broker/MessageDeque.cpp @@ -74,11 +74,6 @@ bool MessageDeque::find(const framing::SequenceNumber& position, QueuedMessage& return find(position, message, false); } -bool MessageDeque::next(const QueuedMessage& message, QueuedMessage& next) -{ - return this->next(message.position, next); -} - bool MessageDeque::next(const framing::SequenceNumber& position, QueuedMessage& message) { if (messages.empty()) { diff --git a/qpid/cpp/src/qpid/broker/MessageDeque.h b/qpid/cpp/src/qpid/broker/MessageDeque.h index 0726e1dee6..0e1aef2986 100644 --- a/qpid/cpp/src/qpid/broker/MessageDeque.h +++ b/qpid/cpp/src/qpid/broker/MessageDeque.h @@ -41,7 +41,6 @@ class MessageDeque : public Messages bool remove(const framing::SequenceNumber&, QueuedMessage&); bool find(const framing::SequenceNumber&, QueuedMessage&); bool next(const framing::SequenceNumber&, QueuedMessage&); - bool next(const QueuedMessage&, QueuedMessage&); QueuedMessage& front(); void pop(); diff --git a/qpid/cpp/src/qpid/broker/MessageMap.cpp b/qpid/cpp/src/qpid/broker/MessageMap.cpp index 504b2e033c..39e23df533 100644 --- a/qpid/cpp/src/qpid/broker/MessageMap.cpp +++ b/qpid/cpp/src/qpid/broker/MessageMap.cpp @@ -77,11 +77,6 @@ bool MessageMap::find(const framing::SequenceNumber& position, QueuedMessage& me } } -bool MessageMap::next(const QueuedMessage& message, QueuedMessage& next) -{ - return this->next(message.position, next); -} - bool MessageMap::next(const framing::SequenceNumber& position, QueuedMessage& message) { if (!messages.empty() && position < front().position) { diff --git a/qpid/cpp/src/qpid/broker/MessageMap.h b/qpid/cpp/src/qpid/broker/MessageMap.h index 9cfd9ea9af..1128a1d54a 100644 --- a/qpid/cpp/src/qpid/broker/MessageMap.h +++ b/qpid/cpp/src/qpid/broker/MessageMap.h @@ -47,7 +47,6 @@ class MessageMap : public Messages virtual bool remove(const framing::SequenceNumber&, QueuedMessage&); bool find(const framing::SequenceNumber&, QueuedMessage&); virtual bool next(const framing::SequenceNumber&, QueuedMessage&); - virtual bool next(const QueuedMessage&, QueuedMessage&); QueuedMessage& front(); void pop(); diff --git a/qpid/cpp/src/qpid/broker/Messages.h b/qpid/cpp/src/qpid/broker/Messages.h index 4b3a6d764f..448f17432a 100644 --- a/qpid/cpp/src/qpid/broker/Messages.h +++ b/qpid/cpp/src/qpid/broker/Messages.h @@ -76,15 +76,6 @@ class Messages * @return true if there is another message, false otherwise. */ virtual bool next(const framing::SequenceNumber&, QueuedMessage&) = 0; - - /** - * Return the next message based on the supplied queued message. - * The next messages is passed back via the second parameter. - * @todo replace with queue iterator - * @return true if there is another message, false otherwise. - */ - virtual bool next(const QueuedMessage&, QueuedMessage&) = 0; - /** * Note: Caller is responsible for ensuring that there is a front * (e.g. empty() returns false) diff --git a/qpid/cpp/src/qpid/broker/PriorityQueue.cpp b/qpid/cpp/src/qpid/broker/PriorityQueue.cpp index 3305bb4f79..e07e73d323 100644 --- a/qpid/cpp/src/qpid/broker/PriorityQueue.cpp +++ b/qpid/cpp/src/qpid/broker/PriorityQueue.cpp @@ -88,25 +88,6 @@ bool PriorityQueue::find(const framing::SequenceNumber& position, QueuedMessage& return find(position, message, false); } -bool PriorityQueue::next(const QueuedMessage& message, QueuedMessage& next) -{ - uint p = getPriorityLevel(message); - QueuedMessage match; - match.position = message.position+1; - Deque::iterator m = lower_bound(messages[p].begin(), messages[p].end(), match); - if (m != messages[p].end()) { - next = *m; - return true; - } - while (p-- > 0) { - if (!messages[p].empty()) { - next = messages[p].front(); - return true; - } - } - return false; -} - bool PriorityQueue::next(const framing::SequenceNumber& position, QueuedMessage& message) { QueuedMessage match; diff --git a/qpid/cpp/src/qpid/broker/PriorityQueue.h b/qpid/cpp/src/qpid/broker/PriorityQueue.h index 7464040e17..4bf9d26a9d 100644 --- a/qpid/cpp/src/qpid/broker/PriorityQueue.h +++ b/qpid/cpp/src/qpid/broker/PriorityQueue.h @@ -46,7 +46,6 @@ class PriorityQueue : public Messages bool remove(const framing::SequenceNumber&, QueuedMessage&); bool find(const framing::SequenceNumber&, QueuedMessage&); bool next(const framing::SequenceNumber&, QueuedMessage&); - bool next(const QueuedMessage&, QueuedMessage&); QueuedMessage& front(); void pop(); diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 551584f9d9..853bf09a9c 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -115,12 +115,6 @@ class MessageAllocator /** hook to add any interesting management state to the status map (lock held) */ virtual void query(qpid::types::Variant::Map&, const Mutex::ScopedLock&) const {}; - - /** for move, purge, reroute - check if message matches against a filter, - * return true if message matches. - */ - virtual bool match(const qpid::types::Variant::Map* filter, - const QueuedMessage& message) const; }; @@ -147,7 +141,6 @@ class MessageGroupManager : public QueueObserver, public MessageAllocator static const std::string qpidMessageGroupKey; static const std::string qpidMessageGroupTimestamp; static const std::string qpidMessageGroupDefault; - static const std::string qpidMessageGroupFilter; // key for move/purge filter map const std::string getGroupId( const QueuedMessage& qm ) const; @@ -174,9 +167,11 @@ class MessageGroupManager : public QueueObserver, public MessageAllocator const std::string MessageGroupManager::qpidMessageGroupKey("qpid.group_header_key"); const std::string MessageGroupManager::qpidMessageGroupTimestamp("qpid.group_timestamp"); const std::string MessageGroupManager::qpidMessageGroupDefault("qpid.no_group"); /** @todo KAG: make configurable in Broker options */ -const std::string MessageGroupManager::qpidMessageGroupFilter("qpid.group_id"); }} +// KAG TBD: END find me a home.... + + Queue::Queue(const string& _name, bool _autodelete, MessageStore* const _store, @@ -203,8 +198,7 @@ Queue::Queue(const string& _name, bool _autodelete, deleted(false), barrier(*this), autoDeleteTimeout(0), - allocator(new MessageAllocator( this )), - type(FIFO) + allocator(new MessageAllocator( this )) { if (parent != 0 && broker != 0) { ManagementAgent* agent = broker->getManagementAgent(); @@ -602,24 +596,101 @@ void Queue::purgeExpired(qpid::sys::Duration lapse) namespace { // for use with purge/move below - collect messages that match a given filter + // + class MessageFilter + { + public: + static const std::string typeKey; + static const std::string paramsKey; + static MessageFilter *create( const ::qpid::types::Variant::Map *filter ); + virtual bool match( const QueuedMessage& ) const { return true; } + virtual ~MessageFilter() {} + protected: + MessageFilter() {}; + }; + const std::string MessageFilter::typeKey("filter_type"); + const std::string MessageFilter::paramsKey("filter_params"); + + // filter by message header string value exact match + class HeaderMatchFilter : public MessageFilter + { + public: + /* Config: + { 'filter_type' : 'header_match_str', + 'filter_params' : { 'header_key' : "<header name>", + 'header_value' : "<value to match>" + } + } + */ + static const std::string typeKey; + static const std::string headerKey; + static const std::string valueKey; + HeaderMatchFilter( const std::string& _header, const std::string& _value ) + : MessageFilter (), header(_header), value(_value) {} + bool match( const QueuedMessage& msg ) const + { + const qpid::framing::FieldTable* headers = msg.payload->getApplicationHeaders(); + if (!headers) return false; + FieldTable::ValuePtr h = headers->get(header); + if (!h || !h->convertsTo<std::string>()) return false; + return h->get<std::string>() == value; + } + private: + const std::string header; + const std::string value; + }; + const std::string HeaderMatchFilter::typeKey("header_match_str"); + const std::string HeaderMatchFilter::headerKey("header_key"); + const std::string HeaderMatchFilter::valueKey("header_value"); + + // factory to create correct filter based on map + MessageFilter* MessageFilter::create( const ::qpid::types::Variant::Map *filter ) + { + using namespace qpid::types; + if (filter) { + Variant::Map::const_iterator i = filter->find(MessageFilter::typeKey); + if (i != filter->end()) { + + if (i->second.asString() == HeaderMatchFilter::typeKey) { + Variant::Map::const_iterator p = filter->find(MessageFilter::paramsKey); + if (p != filter->end() && p->second.getType() == VAR_MAP) { + Variant::Map::const_iterator k = p->second.asMap().find(HeaderMatchFilter::headerKey); + Variant::Map::const_iterator v = p->second.asMap().find(HeaderMatchFilter::valueKey); + if (k != p->second.asMap().end() && v != p->second.asMap().end()) { + std::string headerKey(k->second.asString()); + std::string value(v->second.asString()); + QPID_LOG(debug, "Message filtering by header value configured. key: " << headerKey << " value: " << value ); + return new HeaderMatchFilter( headerKey, value ); + } + } + } + } + QPID_LOG(error, "Ignoring unrecognized message filter: '" << *filter << "'"); + } + return new MessageFilter(); + } + + // used by removeIf() to collect all messages matching a filter, maximum match count is + // optional. struct Collector { const uint32_t maxMatches; - const qpid::types::Variant::Map *filter; + MessageFilter& filter; std::deque<QueuedMessage> matches; - boost::shared_ptr<MessageAllocator> allocator; - Collector(boost::shared_ptr<MessageAllocator> a, uint32_t m, - const qpid::types::Variant::Map *f) - : maxMatches(m), filter(f), allocator(a) {} - void operator() (QueuedMessage& qm) + Collector(MessageFilter& filter, uint32_t max) + : maxMatches(max), filter(filter) {} + bool operator() (QueuedMessage& qm) { if (maxMatches == 0 || matches.size() < maxMatches) { - if (allocator->match( filter, qm )) { + if (filter.match( qm )) { matches.push_back(qm); + return true; } } + return false; } }; -} + +} // end namespace /** @@ -640,56 +711,45 @@ namespace { uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> dest, const qpid::types::Variant::Map *filter) { - Collector c(allocator, purge_request, filter); + std::auto_ptr<MessageFilter> mf(MessageFilter::create(filter)); + Collector c(*mf.get(), purge_request); Mutex::ScopedLock locker(messageLock); - messages->foreach( boost::bind<void>(boost::ref(c), _1) ); - - uint32_t count = c.matches.size(); - - // first remove all matches + messages->removeIf( boost::bind<bool>(boost::ref(c), _1) ); for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin(); - qmsg != c.matches.end(); qmsg++) { - /** @todo KAG: need a direct remove method here */ - bool ok = acquire(qmsg->position, *qmsg); - (void) ok; assert(ok); + qmsg != c.matches.end(); ++qmsg) { + // Update observers and message state: + acquired(*qmsg); dequeue(0, *qmsg); - } - - // now reroute if necessary - if (dest.get()) { - while (!c.matches.empty()) { - QueuedMessage msg = c.matches.front(); - c.matches.pop_front(); - assert(msg.payload); - DeliverableMessage dmsg(msg.payload); + // now reroute if necessary + if (dest.get()) { + assert(qmsg->payload); + DeliverableMessage dmsg(qmsg->payload); dest->routeWithAlternate(dmsg); } } - return count; + return c.matches.size(); } uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty, const qpid::types::Variant::Map *filter) { - Collector c(allocator, qty, filter); + std::auto_ptr<MessageFilter> mf(MessageFilter::create(filter)); + Collector c(*mf.get(), qty); Mutex::ScopedLock locker(messageLock); - messages->foreach( boost::bind<void>(boost::ref(c), _1) ); + messages->removeIf( boost::bind<bool>(boost::ref(c), _1) ); - uint32_t count = c.matches.size(); - - while (!c.matches.empty()) { - QueuedMessage qmsg = c.matches.front(); - c.matches.pop_front(); - /** @todo KAG: need a direct remove method here */ - bool ok = acquire(qmsg.position, qmsg); - (void) ok; assert(ok); - dequeue(0, qmsg); - assert(qmsg.payload); - destq->deliver(qmsg.payload); + for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin(); + qmsg != c.matches.end(); ++qmsg) { + // Update observers and message state: + acquired(*qmsg); + dequeue(0, *qmsg); + // and move to destination Queue. + assert(qmsg->payload); + destq->deliver(qmsg->payload); } - return count; + return c.matches.size(); } /** Acquire the front (oldest) message from the in-memory queue. @@ -939,7 +999,6 @@ void Queue::acquired(const QueuedMessage& msg) } } - void Queue::create(const FieldTable& _settings) { settings = _settings; @@ -949,7 +1008,6 @@ void Queue::create(const FieldTable& _settings) configureImpl(_settings); } - int getIntegerSetting(const qpid::framing::FieldTable& settings, const std::string& key) { qpid::framing::FieldTable::ValuePtr v = settings.get(key); @@ -1009,29 +1067,23 @@ void Queue::configureImpl(const FieldTable& _settings) if (lvqKey.size()) { QPID_LOG(debug, "Configured queue " << getName() << " as Last Value Queue with key " << lvqKey); messages = std::auto_ptr<Messages>(new MessageMap(lvqKey)); - type = LVQ; } else if (_settings.get(qpidLastValueQueueNoBrowse)) { QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue with 'no-browse' on"); messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, true, broker); - type = LVQ; } else if (_settings.get(qpidLastValueQueue)) { QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue"); messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, false, broker); - type = LVQ; } else { std::auto_ptr<Messages> m = Fairshare::create(_settings); if (m.get()) { messages = m; QPID_LOG(debug, "Configured queue " << getName() << " as priority queue."); - type = PRIORITY; - } - } - - { // override default message allocator if message groups configured. - boost::shared_ptr<MessageAllocator> ma = boost::static_pointer_cast<MessageAllocator>(MessageGroupManager::create( this, _settings )); - if (ma) { - allocator = ma; - type = GROUP; + } else { // default (FIFO) queue type + // override default message allocator if message groups configured. + boost::shared_ptr<MessageAllocator> ma = boost::static_pointer_cast<MessageAllocator>(MessageGroupManager::create( this, _settings )); + if (ma) { + allocator = ma; + } } } @@ -1593,11 +1645,9 @@ bool MessageGroupManager::nextMessage( Consumer::shared_ptr c, QueuedMessage& ne if (c->preAcquires()) { // not browsing next = messages.front(); - QueuedMessage current; do { - current = next; /** @todo KAG: horrifingly suboptimal - optimize */ - std::string group( getGroupId( current ) ); + std::string group( getGroupId( next ) ); GroupMap::iterator gs = messageGroups.find( group ); /** @todo need to cache this somehow */ assert( gs != messageGroups.end() ); GroupState& state( gs->second ); @@ -1610,7 +1660,7 @@ bool MessageGroupManager::nextMessage( Consumer::shared_ptr c, QueuedMessage& ne if (state.owner == c->getName()) { return true; } - } while (messages.next( current, next )); /** @todo: .next() is a linear search from front - optimize */ + } while (messages.next( next.position, next )); /** @todo: .next() is a linear search from front - optimize */ return false; } else if (messages.next(c->position, next)) return true; @@ -1681,15 +1731,6 @@ void MessageGroupManager::query(qpid::types::Variant::Map& status, status[GroupQueryKey] = state; } -bool MessageGroupManager::match(const qpid::types::Variant::Map* filter, - const QueuedMessage& message) const -{ - if (!filter) return true; - qpid::types::Variant::Map::const_iterator i = filter->find( qpidMessageGroupFilter ); - if (i == filter->end()) return true; - if (i->second.asString() == getGroupId(message)) return true; - return false; -} boost::shared_ptr<MessageGroupManager> MessageGroupManager::create( Queue *q, const qpid::framing::FieldTable& settings ) @@ -1698,16 +1739,6 @@ boost::shared_ptr<MessageGroupManager> MessageGroupManager::create( Queue *q, if (settings.isSet(qpidMessageGroupKey)) { - Queue::Disposition qt = q->getDisposition(); - - if (qt == Queue::LVQ) { - QPID_LOG( error, "Message Groups cannot be enabled on LVQ Queues, queue=" << q->getName()); - return empty; - } - if (qt == Queue::PRIORITY) { - QPID_LOG( error, "Message Groups cannot be enabled for Priority Queues, queue=" << q->getName()); - return empty; - } std::string headerKey = settings.getAsString(qpidMessageGroupKey); if (headerKey.empty()) { QPID_LOG( error, "A Message Group header key must be configured, queue=" << q->getName()); @@ -1756,12 +1787,6 @@ bool MessageAllocator::canAcquire(const std::string&, const QueuedMessage&, } -// default match - ignore filter and always match. -bool MessageAllocator::match(const qpid::types::Variant::Map*, - const QueuedMessage&) const -{ - return true; -} diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 67da0a1403..475da15577 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -70,10 +70,6 @@ class MessageAllocator; class Queue : public boost::enable_shared_from_this<Queue>, public PersistableQueue, public management::Manageable { - public: - enum Disposition {FIFO, LVQ, PRIORITY, GROUP}; - - private: struct UsageBarrier { Queue& parent; @@ -134,8 +130,6 @@ class Queue : public boost::enable_shared_from_this<Queue>, int autoDeleteTimeout; boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask; boost::shared_ptr<MessageAllocator> allocator; - Disposition type; - void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false); void setPolicy(std::auto_ptr<QueuePolicy> policy); @@ -412,8 +406,6 @@ class Queue : public boost::enable_shared_from_this<Queue>, uint32_t getDequeueSincePurge() { return dequeueSincePurge.get(); } void setDequeueSincePurge(uint32_t value); - - Disposition getDisposition() const { return type; } }; } } |