diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-08-05 20:47:10 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-08-05 20:47:10 +0000 |
commit | 45012dc9764465561ee1edbc4d3de4fac03c5b54 (patch) | |
tree | 0efa83203b11cff3dd8145919a7b5c2efa2cdf79 | |
parent | 3489d568087c433e75e7b6fd7c0f5eae96983d40 (diff) | |
download | qpid-python-45012dc9764465561ee1edbc4d3de4fac03c5b54.tar.gz |
QPID-3346: refactor queue interface to support consumer-based message selection.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3346@1154376 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/Consumer.h | 14 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/DeliveryRecord.cpp | 12 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/DeliveryRecord.h | 11 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/LegacyLVQ.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 441 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.h | 35 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueueEvents.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp | 7 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueueFlowLimit.h | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueueObserver.h | 38 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueuePolicy.cpp | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.cpp | 55 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.h | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/ThresholdAlerts.h | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Connection.cpp | 14 | ||||
-rw-r--r-- | qpid/cpp/src/tests/QueueTest.cpp | 21 |
16 files changed, 521 insertions, 150 deletions
diff --git a/qpid/cpp/src/qpid/broker/Consumer.h b/qpid/cpp/src/qpid/broker/Consumer.h index 317338a8ad..75deec5b27 100644 --- a/qpid/cpp/src/qpid/broker/Consumer.h +++ b/qpid/cpp/src/qpid/broker/Consumer.h @@ -36,13 +36,17 @@ class Consumer { // inListeners allows QueueListeners to efficiently track if this instance is registered // for notifications without having to search its containers bool inListeners; - public: - typedef boost::shared_ptr<Consumer> shared_ptr; - + const std::string name; + public: + typedef boost::shared_ptr<Consumer> shared_ptr; + framing::SequenceNumber position; - - Consumer(bool preAcquires = true) : acquires(preAcquires), inListeners(false) {} + + Consumer(const std::string& _name, bool preAcquires = true) + : acquires(preAcquires), inListeners(false), name(_name), position(0) {} bool preAcquires() const { return acquires; } + const std::string& getName() const { return name; } + virtual bool deliver(QueuedMessage& msg) = 0; virtual void notify() = 0; virtual bool filter(boost::intrusive_ptr<Message>) { return true; } diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp index 58dcc6d7c7..1b42c67edd 100644 --- a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -142,7 +142,7 @@ void DeliveryRecord::reject() //just drop it QPID_LOG(info, "Dropping rejected message from " << queue->getName()); } - dequeue(); + queue->dequeue(0, msg); setEnded(); } } @@ -152,8 +152,14 @@ uint32_t DeliveryRecord::getCredit() const return credit; } -void DeliveryRecord::acquire(DeliveryIds& results) { - if (queue->acquire(msg)) { +void DeliveryRecord::acquire(SemanticState* const session, DeliveryIds& results) { + SemanticState::ConsumerImpl::shared_ptr consumer; + + if (!session->find( tag, consumer )) { + QPID_LOG(error, "Can't acquire message " << id.getValue() << ": original subscription no longer exists."); + } + + if (queue->acquire(msg, consumer)) { acquired = true; results.push_back(id); if (!acceptExpected) { diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.h b/qpid/cpp/src/qpid/broker/DeliveryRecord.h index d388ba94be..ba3e1d5cfb 100644 --- a/qpid/cpp/src/qpid/broker/DeliveryRecord.h +++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.h @@ -46,7 +46,7 @@ class DeliveryRecord { QueuedMessage msg; mutable boost::shared_ptr<Queue> queue; - std::string tag; + std::string tag; // name of consumer DeliveryId id; bool acquired : 1; bool acceptExpected : 1; @@ -82,7 +82,7 @@ class DeliveryRecord void reject(); void cancel(const std::string& tag); void redeliver(SemanticState* const); - void acquire(DeliveryIds& results); + void acquire(SemanticState* const, DeliveryIds& results); void complete(); bool accept(TransactionContext* ctxt); // Returns isRedundant() bool setEnded(); // Returns isRedundant() @@ -90,7 +90,7 @@ class DeliveryRecord bool isAcquired() const { return acquired; } bool isComplete() const { return completed; } - bool isRedundant() const { return ended && (!windowing || completed); } + bool isRedundant() const { return ended && (!windowing || completed); } // msg no longer needed - can discard bool isCancelled() const { return cancelled; } bool isAccepted() const { return !acceptExpected; } bool isEnded() const { return ended; } @@ -117,13 +117,14 @@ inline bool operator<(const DeliveryRecord& a, const framing::SequenceNumber& b) struct AcquireFunctor { + SemanticState* session; DeliveryIds& results; - AcquireFunctor(DeliveryIds& _results) : results(_results) {} + AcquireFunctor(SemanticState* _session, DeliveryIds& _results) : session(_session), results(_results) {} void operator()(DeliveryRecord& record) { - record.acquire(results); + record.acquire(session, results); } }; diff --git a/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp b/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp index a811a86492..7d9cb4c1a0 100644 --- a/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp +++ b/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp @@ -35,7 +35,9 @@ void LegacyLVQ::setNoBrowse(bool b) bool LegacyLVQ::remove(const framing::SequenceNumber& position, QueuedMessage& message) { Ordering::iterator i = messages.find(position); - if (i != messages.end() && i->second.payload == message.payload) { + if (i != messages.end() && + // @todo KAG: gsim? is a bug? message is a *return* value - we really shouldn't check ".payload" below: + i->second.payload == message.payload) { message = i->second; erase(i); return true; diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 42923567a2..c9cea9212a 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -87,6 +87,28 @@ const int ENQUEUE_ONLY=1; const int ENQUEUE_AND_DEQUEUE=2; } + +// KAG TBD: find me a home.... +namespace qpid { +namespace broker { + +class MessageSelector +{ + protected: + Queue *queue; + public: + MessageSelector( Queue *q ) : queue(q) {} + virtual ~MessageSelector() {}; + + // assumes caller holds messageLock + virtual bool nextMessage( Consumer::shared_ptr c, QueuedMessage& next, + const Mutex::ScopedLock&); + virtual bool canAcquire(Consumer::shared_ptr consumer, const QueuedMessage& qm, + const Mutex::ScopedLock&); +}; + +}} + Queue::Queue(const string& _name, bool _autodelete, MessageStore* const _store, const OwnershipToken* const _owner, @@ -111,7 +133,8 @@ Queue::Queue(const string& _name, bool _autodelete, broker(b), deleted(false), barrier(*this), - autoDeleteTimeout(0) + autoDeleteTimeout(0), + selector(new MessageSelector( this )) // KAG TODO: FIX!! { if (parent != 0 && broker != 0) { ManagementAgent* agent = broker->getManagementAgent(); @@ -220,6 +243,14 @@ void Queue::requeue(const QueuedMessage& msg){ enqueue(0, payload); } } + + for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { + try{ + (*i)->requeued(msg); + } catch (const std::exception& e) { + QPID_LOG(warning, "Exception on notification of message requeue for queue " << getName() << ": " << e.what()); + } + } } copy.notify(); } @@ -229,7 +260,7 @@ bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& mess Mutex::ScopedLock locker(messageLock); assertClusterSafe(); QPID_LOG(debug, "Attempting to acquire message at " << position); - if (messages->remove(position, message)) { + if (acquire(position, message )) { QPID_LOG(debug, "Acquired message at " << position << " from " << name); return true; } else { @@ -238,9 +269,24 @@ bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& mess } } -bool Queue::acquire(const QueuedMessage& msg) { - QueuedMessage copy = msg; - return acquireMessageAt(msg.position, copy); +bool Queue::acquire(const QueuedMessage& msg, Consumer::shared_ptr c) +{ + Mutex::ScopedLock locker(messageLock); + assertClusterSafe(); + QPID_LOG(debug, c->getName() << " attempting to acquire message at " << msg.position); + + if (!selector->canAcquire( c, msg, locker )) { + QPID_LOG(debug, "Not permitted to acquire msg at " << msg.position << " from '" << name); + return false; + } + + QueuedMessage copy(msg); + if (acquire( msg.position, copy )) { + QPID_LOG(debug, "Acquired message at " << msg.position << " from " << name); + return true; + } + QPID_LOG(debug, "Could not acquire message at " << msg.position << " from " << name << "; no message at that position"); + return false; } void Queue::notifyListener() @@ -276,44 +322,60 @@ bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c) Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c) { + while (true) { Mutex::ScopedLock locker(messageLock); - if (messages->empty()) { - QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'"); + QueuedMessage msg; + + if (!selector->nextMessage(c, msg, locker)) { // no next available + QPID_LOG(debug, "No messages available to dispatch to consumer " << + c->getName() << " on queue '" << name << "'"); listeners.addListener(c); return NO_MESSAGES; - } else { - QueuedMessage msg = messages->front(); - if (msg.payload->hasExpired()) { - QPID_LOG(debug, "Message expired from queue '" << name << "'"); - popAndDequeue(); - continue; - } + } - if (c->filter(msg.payload)) { - if (c->accept(msg.payload)) { - m = msg; - pop(); - return CONSUMED; - } else { - //message(s) are available but consumer hasn't got enough credit - QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'"); - return CANT_CONSUME; - } + if (msg.payload->hasExpired()) { + QPID_LOG(debug, "Message expired from queue '" << name << "'"); + c->position = msg.position; + acquire( msg.position, msg ); + dequeue( 0, msg ); + continue; + } + + // a message is available for this consumer - can the consumer use it? + + if (c->filter(msg.payload)) { + if (c->accept(msg.payload)) { + acquire( msg.position, m ); + c->position = msg.position; + return CONSUMED; } else { - //consumer will never want this message - QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'"); + //message(s) are available but consumer hasn't got enough credit + QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'"); return CANT_CONSUME; } + } else { + //consumer will never want this message + QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'"); + c->position = msg.position; + return CANT_CONSUME; } } } - bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr c) { - QueuedMessage msg(this); - while (seek(msg, c)) { + while (true) { + Mutex::ScopedLock locker(messageLock); + QueuedMessage msg; + + if (!selector->nextMessage(c, msg, locker)) { // no next available + QPID_LOG(debug, "No browsable messages available for consumer " << + c->getName() << " on queue '" << name << "'"); + listeners.addListener(c); + return false; + } + if (c->filter(msg.payload) && !msg.payload->hasExpired()) { if (c->accept(msg.payload)) { //consumer wants the message @@ -327,8 +389,8 @@ bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr c) } } else { //consumer will never want this message, continue seeking - c->position = msg.position; QPID_LOG(debug, "Browser skipping message from '" << name << "'"); + c->position = msg.position; } } return false; @@ -358,61 +420,71 @@ bool Queue::dispatch(Consumer::shared_ptr c) } } -// Find the next message -bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) { - Mutex::ScopedLock locker(messageLock); - if (messages->next(c->position, msg)) { - return true; - } else { - listeners.addListener(c); - return false; - } -} - -QueuedMessage Queue::find(SequenceNumber pos) const { +bool Queue::find(SequenceNumber pos, QueuedMessage& msg) const { Mutex::ScopedLock locker(messageLock); - QueuedMessage msg; - messages->find(pos, msg); - return msg; + if (messages->find(pos, msg)) + return true; + return false; } void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){ assertClusterSafe(); - Mutex::ScopedLock locker(consumerLock); - if(exclusive) { - throw ResourceLockedException( - QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed.")); - } else if(requestExclusive) { - if(consumerCount) { + { + Mutex::ScopedLock locker(consumerLock); + if(exclusive) { throw ResourceLockedException( - QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied.")); - } else { - exclusive = c->getSession(); + QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed.")); + } else if(requestExclusive) { + if(consumerCount) { + throw ResourceLockedException( + QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied.")); + } else { + exclusive = c->getSession(); + } + } + consumerCount++; + if (mgmtObject != 0) + mgmtObject->inc_consumerCount (); + //reset auto deletion timer if necessary + if (autoDeleteTimeout && autoDeleteTask) { + autoDeleteTask->cancel(); } } - consumerCount++; - if (mgmtObject != 0) - mgmtObject->inc_consumerCount (); - //reset auto deletion timer if necessary - if (autoDeleteTimeout && autoDeleteTask) { - autoDeleteTask->cancel(); + for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { + try{ + Mutex::ScopedLock locker(messageLock); + (*i)->consumerAdded(*c); + } catch (const std::exception& e) { + QPID_LOG(warning, "Exception on notification of new consumer for queue " << getName() << ": " << e.what()); + } } } void Queue::cancel(Consumer::shared_ptr c){ removeListener(c); - Mutex::ScopedLock locker(consumerLock); - consumerCount--; - if(exclusive) exclusive = 0; - if (mgmtObject != 0) - mgmtObject->dec_consumerCount (); + { + Mutex::ScopedLock locker(consumerLock); + consumerCount--; + if(exclusive) exclusive = 0; + if (mgmtObject != 0) + mgmtObject->dec_consumerCount (); + } + for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { + try{ + Mutex::ScopedLock locker(messageLock); + (*i)->consumerRemoved(*c); + } catch (const std::exception& e) { + QPID_LOG(warning, "Exception on notification of removed consumer for queue " << getName() << ": " << e.what()); + } + } } QueuedMessage Queue::get(){ Mutex::ScopedLock locker(messageLock); QueuedMessage msg(this); - messages->pop(msg); + if (messages->pop(msg)) + consumed( msg ); return msg; } @@ -443,8 +515,15 @@ void Queue::purgeExpired(qpid::sys::Duration lapse) Mutex::ScopedLock locker(messageLock); messages->removeIf(boost::bind(&collect_if_expired, boost::ref(expired), _1)); } - for_each(expired.begin(), expired.end(), - boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); + + for (std::deque<QueuedMessage>::const_iterator i = expired.begin(); + i != expired.end(); ++i) { + { + Mutex::ScopedLock locker(messageLock); + consumed( *i ); // expects messageLock held + } + dequeue( 0, *i ); + } } } @@ -503,18 +582,33 @@ uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty) { QueuedMessage qmsg = messages->front(); boost::intrusive_ptr<Message> msg = qmsg.payload; destq->deliver(msg); // deliver message to the destination queue - pop(); - dequeue(0, qmsg); + popAndDequeue(); count++; } return count; } +/** Acquire the front (oldest) message from the in-memory queue. + * assumes messageLock held by caller + */ void Queue::pop() { assertClusterSafe(); - messages->pop(); - ++dequeueSincePurge; + QueuedMessage msg; + if (messages->pop(msg)) { + consumed( msg ); // mark it removed + ++dequeueSincePurge; + } +} + +/** Acquire the message at the given position, return true and msg if acquire succeeds */ +bool Queue::acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg ) +{ + if (messages->remove(position, msg)) { + consumed( msg ); + return true; + } + return false; } void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ @@ -533,6 +627,7 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ } copy.notify(); if (dequeueRequired) { + consumed( removed ); // tell observers if (isRecovery) { //can't issue new requests for the store until //recovery is complete @@ -696,14 +791,16 @@ void Queue::dequeueCommitted(const QueuedMessage& msg) } /** - * Removes a message from the in-memory delivery queue as well - * dequeing it from the logical (and persistent if applicable) queue + * Removes the first (oldest) message from the in-memory delivery queue as well dequeing + * it from the logical (and persistent if applicable) queue */ void Queue::popAndDequeue() { - QueuedMessage msg = messages->front(); - pop(); - dequeue(0, msg); + if (!messages->empty()) { + QueuedMessage msg = messages->front(); + pop(); + dequeue(0, msg); + } } /** @@ -723,6 +820,20 @@ void Queue::dequeued(const QueuedMessage& msg) } } +/** updates queue observers when a message has become unavailable for transfer, + * expects messageLock to be held + */ +void Queue::consumed(const QueuedMessage& msg) +{ + for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { + try{ + (*i)->consumed(msg); + } catch (const std::exception& e) { + QPID_LOG(warning, "Exception on notification of message removal for queue " << getName() << ": " << e.what()); + } + } +} + void Queue::create(const FieldTable& _settings) { @@ -1233,3 +1344,179 @@ void Queue::UsageBarrier::destroy() parent.deleted = true; while (count) parent.messageLock.wait(); } + + +// KAG TBD: flesh out... + + +class MessageGroupManager : public QueueObserver, public MessageSelector +{ + const std::string groupIdHeader; // msg header holding group identifier + struct GroupState { + const std::string group; // group identifier + //Consumer::shared_ptr owner; // consumer with outstanding acquired messages + std::string owner; // consumer with outstanding acquired messages + uint32_t acquired; // count of outstanding acquired messages + uint32_t total; // count of enqueued messages in this group + GroupState() : acquired(0), total(0) {} + }; + std::map<std::string, struct GroupState> messageGroups; + std::set<std::string> consumers; + + public: + + MessageGroupManager(const std::string& header, Queue *q ) + : QueueObserver(), MessageSelector(q), groupIdHeader( header ) {} + void enqueued( const QueuedMessage& qm ); + void removed( const QueuedMessage& qm ); + void requeued( const QueuedMessage& qm ); + void dequeued( const QueuedMessage& qm ); + void consumerAdded( const Consumer& ); + void consumerRemoved( const Consumer& ); + bool nextMessage( Consumer::shared_ptr c, QueuedMessage& next, + const Mutex::ScopedLock&); + bool canAcquire(Consumer::shared_ptr consumer, const QueuedMessage& msg, + const Mutex::ScopedLock&); +}; + + +namespace { + const std::string NO_GROUP(""); + const std::string getGroupId( const QueuedMessage& qm, const std::string& key ) + { + const qpid::framing::FieldTable* headers = qm.payload->getApplicationHeaders(); + if (!headers) return NO_GROUP; + return headers->getAsString(key); + } +} + + +void MessageGroupManager::enqueued( const QueuedMessage& qm ) +{ + std::string group( getGroupId(qm, groupIdHeader) ); + messageGroups[group].total++; +} + + +void MessageGroupManager::removed( const QueuedMessage& qm ) +{ + std::string group( getGroupId(qm, groupIdHeader) ); + std::map<std::string, struct GroupState>::iterator gs = messageGroups.find( group ); + assert( gs != messageGroups.end() ); + gs->second.acquired += 1; +} + + +void MessageGroupManager::requeued( const QueuedMessage& qm ) +{ + std::string group( getGroupId(qm, groupIdHeader) ); + std::map<std::string, struct GroupState>::iterator gs = messageGroups.find( group ); + assert( gs != messageGroups.end() ); + GroupState& state( gs->second ); + assert( state.acquired != 0 ); + state.acquired -= 1; + if (state.acquired == 0 && !state.owner.empty()) { + state.owner.clear(); // KAG TODO: need to invalidate consumer's positions? + } +} + + +void MessageGroupManager::dequeued( const QueuedMessage& qm ) +{ + std::string group( getGroupId(qm, groupIdHeader) ); + std::map<std::string, struct GroupState>::iterator gs = messageGroups.find( group ); + assert( gs != messageGroups.end() ); + GroupState& state( gs->second ); + assert( state.total != 0 ); + state.total -= 1; + assert( state.acquired != 0 ); + state.acquired -= 1; + if (state.total == 0) messageGroups.erase( gs ); + else if (state.acquired == 0 && !state.owner.empty()) { + state.owner.clear(); // KAG TODO: need to invalidate consumer's positions? + } +} + +void MessageGroupManager::consumerAdded( const Consumer& c ) +{ + bool unique = consumers.insert( c.getName() ).second; + (void) unique; assert( unique ); +} + +void MessageGroupManager::consumerRemoved( const Consumer& c ) +{ + size_t count = consumers.erase( c.getName() ); + (void) count; assert( count == 1 ); + + bool needReset = false; + for (std::map<std::string, struct GroupState>::iterator gs = messageGroups.begin(); + gs != messageGroups.end(); ++gs) { + + GroupState& state( gs->second ); + if (state.owner == c.getName()) { + state.owner.clear(); + needReset = true; + } + } + + if (needReset) { + // KAG TODO: How do I invalidate all consumers that need invalidating???? + } +} + + +bool MessageGroupManager::nextMessage( Consumer::shared_ptr c, QueuedMessage& next, + const Mutex::ScopedLock& l) +{ + // KAG TODO: FIX!!! + return MessageSelector::nextMessage( c, next, l ); +} + + +bool MessageGroupManager::canAcquire(Consumer::shared_ptr consumer, const QueuedMessage& qm, + const Mutex::ScopedLock&) +{ + std::string group( getGroupId(qm, groupIdHeader) ); + std::map<std::string, struct GroupState>::iterator gs = messageGroups.find( group ); + assert( gs != messageGroups.end() ); + GroupState& state( gs->second ); + + if (state.owner.empty()) { + state.owner = consumer->getName(); + return true; + } + return state.owner == consumer->getName(); +} + + + + + +// default selector - requires messageLock to be held by caller! +bool MessageSelector::nextMessage( Consumer::shared_ptr c, QueuedMessage& next, + const Mutex::ScopedLock& /*just to enforce locking*/) +{ + Messages& messages(queue->getMessages()); + + if (messages.empty()) + return false; + + if (c->preAcquires()) { // not browsing + next = messages.front(); + return true; + } else if (messages.next(c->position, next)) + return true; + return false; +} + + +// default selector - requires messageLock to be held by caller! +bool MessageSelector::canAcquire(Consumer::shared_ptr, const QueuedMessage&, + const Mutex::ScopedLock& /*just to enforce locking*/) +{ + return true; // always give permission to acquire +} + + + + diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 8435e75cab..a6bb0d6915 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -59,8 +59,8 @@ class MessageStore; class QueueEvents; class QueueRegistry; class TransactionContext; -class Exchange; - +class MessageSelector; + /** * The brokers representation of an amqp queue. Messages are * delivered to a queue from where they can be dispatched to @@ -129,10 +129,10 @@ class Queue : public boost::enable_shared_from_this<Queue>, UsageBarrier barrier; int autoDeleteTimeout; boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask; + std::auto_ptr<MessageSelector> selector; void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false); void setPolicy(std::auto_ptr<QueuePolicy> policy); - bool seek(QueuedMessage& msg, Consumer::shared_ptr position); bool getNextMessage(QueuedMessage& msg, Consumer::shared_ptr c); ConsumeCode consumeNextMessage(QueuedMessage& msg, Consumer::shared_ptr c); bool browseNextMessage(QueuedMessage& msg, Consumer::shared_ptr c); @@ -142,10 +142,16 @@ class Queue : public boost::enable_shared_from_this<Queue>, bool isExcluded(boost::intrusive_ptr<Message>& msg); + /** update queue observers with new message state */ void enqueued(const QueuedMessage& msg); + void consumed(const QueuedMessage& msg); void dequeued(const QueuedMessage& msg); - void pop(); - void popAndDequeue(); + + /** modify the Queue's message container - assumes messageLock held */ + void pop(); // acquire front msg + void popAndDequeue(); // acquire and dequeue front msg + // acquire message @ position, return true and set msg if acquire succeeds + bool acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg ); void forcePersistent(QueuedMessage& msg); int getEventMode(); @@ -191,8 +197,15 @@ class Queue : public boost::enable_shared_from_this<Queue>, Broker* broker = 0); QPID_BROKER_EXTERN ~Queue(); + /** allow the Consumer to consume or browse the next available message */ QPID_BROKER_EXTERN bool dispatch(Consumer::shared_ptr); + /** allow the Consumer to acquire a message that it has browsed. + * @param msg - message to be acquired. + * @return false if message is no longer available for acquire. + */ + QPID_BROKER_EXTERN bool acquire(const QueuedMessage& msg, const Consumer::shared_ptr c); + /** * Used to configure a new queue and create a persistent record * for it in store if required. @@ -216,7 +229,11 @@ class Queue : public boost::enable_shared_from_this<Queue>, bool bind(boost::shared_ptr<Exchange> exchange, const std::string& key, const qpid::framing::FieldTable& arguments=qpid::framing::FieldTable()); - QPID_BROKER_EXTERN bool acquire(const QueuedMessage& msg); + /** Acquire the message at the given position if it is available for acquire. Not to + * be used by clients, but used by the broker for queue management. + * @param message - set to the acquired message if true returned. + * @return true if the message has been acquired. + */ QPID_BROKER_EXTERN bool acquireMessageAt(const qpid::framing::SequenceNumber& position, QueuedMessage& message); /** @@ -302,12 +319,12 @@ class Queue : public boost::enable_shared_from_this<Queue>, bool isEnqueued(const QueuedMessage& msg); /** - * Gets the next available message + * Acquires the next available (oldest) message */ QPID_BROKER_EXTERN QueuedMessage get(); - /** Get the message at position pos */ - QPID_BROKER_EXTERN QueuedMessage find(framing::SequenceNumber pos) const; + /** Get the message at position pos, returns true if found and sets msg */ + QPID_BROKER_EXTERN bool find(framing::SequenceNumber pos, QueuedMessage& msg ) const; const QueuePolicy* getPolicy(); diff --git a/qpid/cpp/src/qpid/broker/QueueEvents.cpp b/qpid/cpp/src/qpid/broker/QueueEvents.cpp index 2c540ff1ad..764faf5fd7 100644 --- a/qpid/cpp/src/qpid/broker/QueueEvents.cpp +++ b/qpid/cpp/src/qpid/broker/QueueEvents.cpp @@ -129,6 +129,10 @@ class EventGenerator : public QueueObserver { if (!enqueueOnly) manager.dequeued(m); } + + void consumed(const QueuedMessage&) {}; + void requeued(const QueuedMessage&) {}; + private: QueueEvents& manager; const bool enqueueOnly; diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp index fcf8d089f9..db18325c78 100644 --- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp +++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp @@ -377,11 +377,12 @@ void QueueFlowLimit::setState(const qpid::framing::FieldTable& state) ++i; fcmsg.add(first, last); for (SequenceNumber seq = first; seq <= last; ++seq) { - QueuedMessage msg(queue->find(seq)); // fyi: msg.payload may be null if msg is delivered & unacked + QueuedMessage msg; + bool found = queue->find(seq, msg); // fyi: msg.payload may be null if msg is delivered & unacked + (void) found; assert(found); // avoid unused variable warning when NDEBUG set bool unique; unique = index.insert(std::pair<framing::SequenceNumber, boost::intrusive_ptr<Message> >(seq, msg.payload)).second; - // Like this to avoid tripping up unused variable warning when NDEBUG set - if (!unique) assert(unique); + (void) unique; assert(unique); // ditto NDEBUG warning } } } diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.h b/qpid/cpp/src/qpid/broker/QueueFlowLimit.h index c02e479976..3d4b31bb69 100644 --- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.h +++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.h @@ -84,6 +84,9 @@ class Broker; QPID_BROKER_EXTERN void enqueued(const QueuedMessage&); /** the queue has removed QueuedMessage. Returns true if flow state changes */ QPID_BROKER_EXTERN void dequeued(const QueuedMessage&); + /** ignored */ + QPID_BROKER_EXTERN void consumed(const QueuedMessage&) {}; + QPID_BROKER_EXTERN void requeued(const QueuedMessage&) {}; /** for clustering: */ QPID_BROKER_EXTERN void getState(qpid::framing::FieldTable&) const; diff --git a/qpid/cpp/src/qpid/broker/QueueObserver.h b/qpid/cpp/src/qpid/broker/QueueObserver.h index 3ca01c051e..9c3c186f23 100644 --- a/qpid/cpp/src/qpid/broker/QueueObserver.h +++ b/qpid/cpp/src/qpid/broker/QueueObserver.h @@ -25,17 +25,49 @@ namespace qpid { namespace broker { struct QueuedMessage; +class Consumer; + /** - * Interface for notifying classes who want to act as 'observers' of a - * queue of particular events. + * Interface for notifying classes who want to act as 'observers' of a queue of particular + * events. + * + * The events that are monitored reflect the relationship between a particular message and + * the queue it has been delivered to. A message can be considered in one of three states + * with respect to the queue: + * + * 1) "Available" - available for transfer to consumers, + * 2) "Locked" - to a particular consumer, no longer available for transfer, but not + * considered fully dequeued. + * 3) "Dequeued" - removed from the queue and no longer available to any consumer. + * + * The queue events that are observable are: + * + * "Enqueued" - the message is "Available" - on the queue for transfer to any consumer + * (e.g. browse or acquire) + * + * "Consumed" - the message is "Locked" - a consumer has claimed exclusive access to it. + * It is no longer available for other consumers to browse or acquire, but it is not yet + * considered dequeued as it may be requeued by the consumer. + * + * "Requeued" - a previously-consumed message is 'unlocked': it is put back on the queue + * at its original position and returns to the "Available" state. + * + * "Dequeued" - a Locked message is no longer queued. At this point, the queue no longer + * tracks the message, and the broker considers the consumer's transaction complete. */ class QueueObserver { public: virtual ~QueueObserver() {} + + // note: the Queue will hold the messageLock while calling these methods! virtual void enqueued(const QueuedMessage&) = 0; + virtual void consumed(const QueuedMessage&) = 0; + virtual void requeued(const QueuedMessage&) = 0; virtual void dequeued(const QueuedMessage&) = 0; - private: + virtual void consumerAdded( const Consumer& ) {}; + virtual void consumerRemoved( const Consumer& ) {}; + private: }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp index 6ae0d53b1a..0c245700af 100644 --- a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp +++ b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp @@ -269,8 +269,7 @@ bool RingQueuePolicy::checkLimit(boost::intrusive_ptr<Message> m) do { QueuedMessage oldest = queue.front(); - - if (oldest.queue->acquire(oldest) || !strict) { + if (oldest.queue->acquireMessageAt(oldest.position, oldest) || !strict) { queue.pop_front(); pendingDequeues.push_back(oldest); QPID_LOG(debug, "Ring policy triggered in " << name diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index 73a0a5cf7b..c8f77ba64e 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -269,9 +269,8 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, ) : - Consumer(_acquire), + Consumer(_name, _acquire), parent(_parent), - name(_name), queue(_queue), ackExpected(ack), acquire(_acquire), @@ -295,7 +294,7 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, if (agent != 0) { - mgmtObject = new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId() ,name, + mgmtObject = new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId(), getName(), !acquire, ackExpected, exclusive, ManagementAgent::toMap(arguments)); agent->addObject (mgmtObject); mgmtObject->set_creditMode("WINDOW"); @@ -327,16 +326,15 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg) { assertClusterSafe(); allocateCredit(msg.payload); - DeliveryRecord record(msg, queue, name, acquire, !ackExpected, windowing); + DeliveryRecord record(msg, queue, getName(), acquire, !ackExpected, windowing); bool sync = syncFrequency && ++deliveryCount >= syncFrequency; if (sync) deliveryCount = 0;//reset parent->deliver(record, sync); - if (!ackExpected && acquire) record.setEnded();//allows message to be released now its been delivered if (windowing || ackExpected || !acquire) { parent->record(record); } - if (acquire && !ackExpected) { - queue->dequeue(0, msg); + if (acquire && !ackExpected) { // auto acquire && auto accept + record.accept( 0 /*no ctxt*/ ); } if (mgmtObject) { mgmtObject->inc_delivered(); } return true; @@ -556,50 +554,61 @@ void SemanticState::deliver(DeliveryRecord& msg, bool sync) return deliveryAdapter.deliver(msg, sync); } -SemanticState::ConsumerImpl& SemanticState::find(const std::string& destination) +const SemanticState::ConsumerImpl::shared_ptr SemanticState::find(const std::string& destination) const { - ConsumerImplMap::iterator i = consumers.find(destination); - if (i == consumers.end()) { - throw NotFoundException(QPID_MSG("Unknown destination " << destination)); + ConsumerImpl::shared_ptr consumer; + if (!find(destination, consumer)) { + throw NotFoundException(QPID_MSG("Unknown destination " << destination << " session=" << session.getSessionId())); } else { - return *(i->second); + return consumer; + } +} + +bool SemanticState::find(const std::string& destination, ConsumerImpl::shared_ptr& consumer) const +{ + // @todo KAG gsim: shouldn't the consumers map be locked???? + ConsumerImplMap::const_iterator i = consumers.find(destination); + if (i == consumers.end()) { + return false; } + consumer = i->second; + return true; } void SemanticState::setWindowMode(const std::string& destination) { - find(destination).setWindowMode(); + find(destination)->setWindowMode(); } void SemanticState::setCreditMode(const std::string& destination) { - find(destination).setCreditMode(); + find(destination)->setCreditMode(); } void SemanticState::addByteCredit(const std::string& destination, uint32_t value) { - ConsumerImpl& c = find(destination); - c.addByteCredit(value); - c.requestDispatch(); + ConsumerImpl::shared_ptr c = find(destination); + c->addByteCredit(value); + c->requestDispatch(); } void SemanticState::addMessageCredit(const std::string& destination, uint32_t value) { - ConsumerImpl& c = find(destination); - c.addMessageCredit(value); - c.requestDispatch(); + ConsumerImpl::shared_ptr c = find(destination); + c->addMessageCredit(value); + c->requestDispatch(); } void SemanticState::flush(const std::string& destination) { - find(destination).flush(); + find(destination)->flush(); } void SemanticState::stop(const std::string& destination) { - find(destination).stop(); + find(destination)->stop(); } void SemanticState::ConsumerImpl::setWindowMode() @@ -682,7 +691,7 @@ AckRange SemanticState::findRange(DeliveryId first, DeliveryId last) void SemanticState::acquire(DeliveryId first, DeliveryId last, DeliveryIds& acquired) { AckRange range = findRange(first, last); - for_each(range.start, range.end, AcquireFunctor(acquired)); + for_each(range.start, range.end, AcquireFunctor(this, acquired)); } void SemanticState::release(DeliveryId first, DeliveryId last, bool setRedelivered) diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h index 8c69d6b89b..8947e1e35f 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.h +++ b/qpid/cpp/src/qpid/broker/SemanticState.h @@ -75,7 +75,6 @@ class SemanticState : private boost::noncopyable { { mutable qpid::sys::Mutex lock; SemanticState* const parent; - const std::string name; const boost::shared_ptr<Queue> queue; const bool ackExpected; const bool acquire; @@ -129,8 +128,6 @@ class SemanticState : private boost::noncopyable { bool doOutput(); - std::string getName() const { return name; } - bool isAckExpected() const { return ackExpected; } bool isAcquire() const { return acquire; } bool isWindowing() const { return windowing; } @@ -187,7 +184,8 @@ class SemanticState : private boost::noncopyable { SessionContext& getSession() { return session; } const SessionContext& getSession() const { return session; } - ConsumerImpl& find(const std::string& destination); + const ConsumerImpl::shared_ptr find(const std::string& destination) const; + bool find(const std::string& destination, ConsumerImpl::shared_ptr&) const; /** * Get named queue, never returns 0. diff --git a/qpid/cpp/src/qpid/broker/ThresholdAlerts.h b/qpid/cpp/src/qpid/broker/ThresholdAlerts.h index c77722e700..c27c97d6f5 100644 --- a/qpid/cpp/src/qpid/broker/ThresholdAlerts.h +++ b/qpid/cpp/src/qpid/broker/ThresholdAlerts.h @@ -50,6 +50,9 @@ class ThresholdAlerts : public QueueObserver const long repeatInterval); void enqueued(const QueuedMessage&); void dequeued(const QueuedMessage&); + void consumed(const QueuedMessage&) {}; + void requeued(const QueuedMessage&) {}; + static void observe(Queue& queue, qpid::management::ManagementAgent& agent, const uint64_t countThreshold, const uint64_t sizeThreshold, diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp index 030d6e34c1..792f9f65f4 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.cpp +++ b/qpid/cpp/src/qpid/cluster/Connection.cpp @@ -409,11 +409,11 @@ void Connection::shadowSetUser(const std::string& userId) { void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled, const SequenceNumber& position) { - broker::SemanticState::ConsumerImpl& c = semanticState().find(name); - c.position = position; - c.setBlocked(blocked); - if (notifyEnabled) c.enableNotify(); else c.disableNotify(); - updateIn.consumerNumbering.add(c.shared_from_this()); + broker::SemanticState::ConsumerImpl::shared_ptr c = semanticState().find(name); + c->position = position; + c->setBlocked(blocked); + if (notifyEnabled) c->enableNotify(); else c->disableNotify(); + updateIn.consumerNumbering.add(c); } @@ -444,7 +444,7 @@ void Connection::outputTask(uint16_t channel, const std::string& name) { if (!session) throw Exception(QPID_MSG(cluster << " channel not attached " << *this << "[" << channel << "] ")); - OutputTask* task = &session->getSemanticState().find(name); + OutputTask* task = session->getSemanticState().find(name).get(); connection->getOutputTasks().addOutputTask(task); } @@ -534,7 +534,7 @@ void Connection::deliveryRecord(const string& qname, m.position = position; if (enqueued) queue->updateEnqueued(m); //inform queue of the message } else { // Message at original position in original queue - m = queue->find(position); + queue->find(position, m); } if (!m.payload) throw Exception(QPID_MSG("deliveryRecord no update message")); diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp index 6fdc4c69ad..1f1eb16af5 100644 --- a/qpid/cpp/src/tests/QueueTest.cpp +++ b/qpid/cpp/src/tests/QueueTest.cpp @@ -58,7 +58,7 @@ public: intrusive_ptr<Message> last; bool received; - TestConsumer(bool acquire = true):Consumer(acquire), received(false) {}; + TestConsumer(bool acquire = true):Consumer("test", acquire), received(false) {}; virtual bool deliver(QueuedMessage& msg){ last = msg.payload; @@ -324,14 +324,18 @@ QPID_AUTO_TEST_CASE(testSearch){ queue->deliver(msg3); SequenceNumber seq(2); - QueuedMessage qm = queue->find(seq); + QueuedMessage qm; + TestConsumer::shared_ptr c1(new TestConsumer()); + + BOOST_CHECK(queue->find(seq, qm)); BOOST_CHECK_EQUAL(seq.getValue(), qm.position.getValue()); - queue->acquire(qm); + queue->acquire(qm, c1); BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u); SequenceNumber seq1(3); - QueuedMessage qm1 = queue->find(seq1); + QueuedMessage qm1; + BOOST_CHECK(queue->find(seq1, qm1)); BOOST_CHECK_EQUAL(seq1.getValue(), qm1.position.getValue()); } @@ -551,12 +555,13 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){ QueuedMessage qmsg2(queue.get(), msg2, ++sequence); framing::SequenceNumber sequence1(10); QueuedMessage qmsg3(queue.get(), 0, sequence1); + TestConsumer::shared_ptr dummy(new TestConsumer()); - BOOST_CHECK(!queue->acquire(qmsg)); - BOOST_CHECK(queue->acquire(qmsg2)); + BOOST_CHECK(!queue->acquire(qmsg, dummy)); + BOOST_CHECK(queue->acquire(qmsg2, dummy)); // Acquire the massage again to test failure case. - BOOST_CHECK(!queue->acquire(qmsg2)); - BOOST_CHECK(!queue->acquire(qmsg3)); + BOOST_CHECK(!queue->acquire(qmsg2, dummy)); + BOOST_CHECK(!queue->acquire(qmsg3, dummy)); BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u); |