From b41fd08fc751cbc288ed96cbdce7ba3ca074f458 Mon Sep 17 00:00:00 2001 From: Kenneth Anthony Giusti Date: Wed, 30 Nov 2011 15:04:21 +0000 Subject: QPID-3603: checkpoint prototyped changes git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-kgiusti@1208463 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/broker/Consumer.h | 21 +++- qpid/cpp/src/qpid/broker/FifoDistributor.cpp | 12 +-- qpid/cpp/src/qpid/broker/FifoDistributor.h | 3 +- qpid/cpp/src/qpid/broker/MessageDistributor.h | 11 +- qpid/cpp/src/qpid/broker/MessageGroupManager.cpp | 18 ++-- qpid/cpp/src/qpid/broker/MessageGroupManager.h | 3 +- qpid/cpp/src/qpid/broker/Queue.cpp | 122 ++++++----------------- qpid/cpp/src/qpid/broker/SemanticState.cpp | 34 +++++-- qpid/cpp/src/qpid/broker/SemanticState.h | 8 +- 9 files changed, 97 insertions(+), 135 deletions(-) diff --git a/qpid/cpp/src/qpid/broker/Consumer.h b/qpid/cpp/src/qpid/broker/Consumer.h index 2af9b0c121..a3838fb6f0 100644 --- a/qpid/cpp/src/qpid/broker/Consumer.h +++ b/qpid/cpp/src/qpid/broker/Consumer.h @@ -33,6 +33,8 @@ class QueueListeners; class Consumer { const bool acquires; + const bool browseAcquired; + const bool rewindable; // inListeners allows QueueListeners to efficiently track if this instance is registered // for notifications without having to search its containers bool inListeners; @@ -44,18 +46,29 @@ class Consumer { framing::SequenceNumber position; - Consumer(const std::string& _name, bool preAcquires = true) - : acquires(preAcquires), inListeners(false), name(_name), position(0) {} + Consumer(const std::string& _name, bool preAcquires = true, bool ba = false) + : acquires(preAcquires), browseAcquired(ba), rewindable(false), inListeners(false), + name(_name), position(0) {} bool preAcquires() const { return acquires; } const std::string& getName() const { return name; } virtual bool deliver(QueuedMessage& msg) = 0; virtual void notify() = 0; - virtual bool filter(boost::intrusive_ptr) { return true; } - virtual bool accept(boost::intrusive_ptr) { return true; } + // virtual bool filter(boost::intrusive_ptr) { return true; } + // virtual bool accept(boost::intrusive_ptr) { return true; } virtual OwnershipToken* getSession() = 0; virtual ~Consumer(){} friend class QueueListeners; + + /** true if Consumer is a browsing subscription */ + bool isBrowsing() const { return !acquires; } + /** if true, pass acquired messages to consumer, as well as un-acquired */ + virtual bool allowAcquired() const { return isBrowsing() && browseAcquired; } + /** if true, reset consumer's position to queue HEAD if messages released. */ + virtual bool rewindOnRelease() const { return rewindable; } + /** called by Queue to allow consumer to filter the current message */ + enum Action {ACCEPT, SKIP, RETRY}; + virtual Action accept(const QueuedMessage& msg) = 0; }; }} diff --git a/qpid/cpp/src/qpid/broker/FifoDistributor.cpp b/qpid/cpp/src/qpid/broker/FifoDistributor.cpp index d63feffd57..bce9625183 100644 --- a/qpid/cpp/src/qpid/broker/FifoDistributor.cpp +++ b/qpid/cpp/src/qpid/broker/FifoDistributor.cpp @@ -28,23 +28,17 @@ using namespace qpid::broker; FifoDistributor::FifoDistributor(Messages& container) : messages(container) {} -bool FifoDistributor::nextConsumableMessage( Consumer::shared_ptr&, QueuedMessage& next ) +bool FifoDistributor::nextMessage( Consumer::shared_ptr& c, QueuedMessage& next ) { - return messages.consume(next); + return messages.browse(c->position, next, !c->allowAcquired()); } bool FifoDistributor::allocate(const std::string&, const QueuedMessage& ) { - // by default, all messages present on the queue may be allocated as they have yet to - // be acquired. + // The Fifo distributor does not enforce or record message allocation return true; } -bool FifoDistributor::nextBrowsableMessage( Consumer::shared_ptr& c, QueuedMessage& next ) -{ - return messages.browse(c->position, next, false); -} - void FifoDistributor::query(qpid::types::Variant::Map&) const { // nothing to see here.... diff --git a/qpid/cpp/src/qpid/broker/FifoDistributor.h b/qpid/cpp/src/qpid/broker/FifoDistributor.h index 245537ed12..7fe9502de3 100644 --- a/qpid/cpp/src/qpid/broker/FifoDistributor.h +++ b/qpid/cpp/src/qpid/broker/FifoDistributor.h @@ -44,9 +44,8 @@ class FifoDistributor : public MessageDistributor /** MessageDistributor interface */ - bool nextConsumableMessage( Consumer::shared_ptr& consumer, QueuedMessage& next ); + bool nextMessage( Consumer::shared_ptr& consumer, QueuedMessage& next ); bool allocate(const std::string& consumer, const QueuedMessage& target); - bool nextBrowsableMessage( Consumer::shared_ptr& consumer, QueuedMessage& next ); void query(qpid::types::Variant::Map&) const; private: diff --git a/qpid/cpp/src/qpid/broker/MessageDistributor.h b/qpid/cpp/src/qpid/broker/MessageDistributor.h index 090393c160..33025422e3 100644 --- a/qpid/cpp/src/qpid/broker/MessageDistributor.h +++ b/qpid/cpp/src/qpid/broker/MessageDistributor.h @@ -48,8 +48,7 @@ class MessageDistributor * @param next set to the next message that the consumer may consume. * @return true if message is available and next is set */ - virtual bool nextConsumableMessage( Consumer::shared_ptr& consumer, - QueuedMessage& next ) = 0; + virtual bool nextMessage( Consumer::shared_ptr& consumer, QueuedMessage& next ) = 0; /** Allow the comsumer to take ownership of the given message. * @param consumer the name of the consumer that is attempting to acquire the message @@ -59,14 +58,6 @@ class MessageDistributor virtual bool allocate( const std::string& consumer, const QueuedMessage& target) = 0; - /** Determine the next message available for browsing by the consumer - * @param consumer the consumer that is browsing the queue - * @param next set to the next message that the consumer may browse. - * @return true if a message is available and next is returned - */ - virtual bool nextBrowsableMessage( Consumer::shared_ptr& consumer, - QueuedMessage& next ) = 0; - /** hook to add any interesting management state to the status map */ virtual void query(qpid::types::Variant::Map&) const = 0; }; diff --git a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp index 24ac394e26..0caa4ba465 100644 --- a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp +++ b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp @@ -202,11 +202,16 @@ MessageGroupManager::~MessageGroupManager() { QPID_LOG( debug, "group queue " << qName << " cache results: hits=" << hits << " misses=" << misses ); } -bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, QueuedMessage& next ) +bool MessageGroupManager::nextMessage( Consumer::shared_ptr& c, QueuedMessage& next ) { if (!messages.size()) return false; + // Message groups are ignored for browsers + if (c->isBrowsing()) { + return messages.browse(c->position, next, !c->allowAcquired()); + } + next.position = c->position; if (!freeGroups.empty()) { const framing::SequenceNumber& nextFree = freeGroups.begin()->first; @@ -219,13 +224,12 @@ bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, Queued while (messages.browse( next.position, next, true )) { GroupState& group = findGroup(next); if (!group.owned()) { - //TODO: make acquire more efficient when we already have the message in question - if (group.members.front() == next.position && messages.acquire(next.position, next)) { // only take from head! + if (group.members.front() == next.position) { // only take from head! return true; } QPID_LOG(debug, "Skipping " << next.position << " since group " << group.group << "'s head message still pending. pos=" << group.members.front()); - } else if (group.owner == c->getName() && messages.acquire(next.position, next)) { + } else if (group.owner == c->getName()) { return true; } } @@ -247,12 +251,6 @@ bool MessageGroupManager::allocate(const std::string& consumer, const QueuedMess return state.owner == consumer; } -bool MessageGroupManager::nextBrowsableMessage( Consumer::shared_ptr& c, QueuedMessage& next ) -{ - // browse: allow access to any available msg, regardless of group ownership (?ok?) - return messages.browse(c->position, next, false); -} - void MessageGroupManager::query(qpid::types::Variant::Map& status) const { /** Add a description of the current state of the message groups for this queue. diff --git a/qpid/cpp/src/qpid/broker/MessageGroupManager.h b/qpid/cpp/src/qpid/broker/MessageGroupManager.h index f4bffc4760..eb6de95d25 100644 --- a/qpid/cpp/src/qpid/broker/MessageGroupManager.h +++ b/qpid/cpp/src/qpid/broker/MessageGroupManager.h @@ -103,9 +103,8 @@ class MessageGroupManager : public StatefulQueueObserver, public MessageDistribu void setState(const qpid::framing::FieldTable&); // MessageDistributor iface - bool nextConsumableMessage(Consumer::shared_ptr& c, QueuedMessage& next); + bool nextMessage(Consumer::shared_ptr& c, QueuedMessage& next); bool allocate(const std::string& c, const QueuedMessage& qm); - bool nextBrowsableMessage(Consumer::shared_ptr& c, QueuedMessage& next); void query(qpid::types::Variant::Map&) const; bool match(const qpid::types::Variant::Map*, const QueuedMessage&) const; diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 41c912d71c..c47ba554d2 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -276,118 +276,62 @@ void Queue::notifyListener() set.notify(); } -bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr& c) +void Queue::removeListener(Consumer::shared_ptr c) { - checkNotDeleted(); - if (c->preAcquires()) { - switch (consumeNextMessage(m, c)) { - case CONSUMED: - return true; - case CANT_CONSUME: - notifyListener();//let someone else try - case NO_MESSAGES: - default: - return false; + QueueListeners::NotificationSet set; + { + Mutex::ScopedLock locker(messageLock); + listeners.removeListener(c); + if (messages->size()) { + listeners.populate(set); } - } else { - return browseNextMessage(m, c); } + set.notify(); } -Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr& c) +// give the consumer a message from the queue +bool Queue::dispatch(Consumer::shared_ptr c) { - QueuedMessage msg; + QueuedMessage msg(this); while (true) { Mutex::ScopedLock locker(messageLock); - if (allocator->nextConsumableMessage(c, msg)) { + if (allocator->nextMessage(c, msg)) { + if (msg.payload->hasExpired()) { QPID_LOG(debug, "Message expired from queue '" << name << "'"); - c->position = msg.position; + acquire(msg.position, msg, locker); dequeue(0, msg); continue; } - if (c->filter(msg.payload)) { - if (c->accept(msg.payload)) { - bool ok = allocator->allocate( c->getName(), msg ); // inform allocator + switch (c->accept(msg)) { + case Consumer::ACCEPT: + if (!c->isBrowsing()) { + // consume this message + bool ok = acquire(msg.position, msg, locker); + (void) ok; assert(ok); + ok = allocator->allocate( c->getName(), msg ); (void) ok; assert(ok); - observeAcquire(msg, locker); - m = msg; - return CONSUMED; - } else { - //message(s) are available but consumer hasn't got enough credit - QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'"); - messages->release(msg); - return CANT_CONSUME; } - } else { - //consumer will never want this message + c->position = msg.position; + c->deliver(msg); + return true; + case Consumer::RETRY: + // consumer wants this message, but cannot accept it at this time + QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'"); + notifyListener();//let someone else try + return false; + case Consumer::SKIP: + // consumer will never want this message, continue looking... QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'"); - messages->release(msg); - return CANT_CONSUME; + c->position = msg.position; + continue; } } else { QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'"); listeners.addListener(c); - return NO_MESSAGES; - } - } -} - -bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr& c) -{ - while (true) { - Mutex::ScopedLock locker(messageLock); - QueuedMessage msg; - - if (!allocator->nextBrowsableMessage(c, msg)) { // no next available - QPID_LOG(debug, "No browsable messages available for consumer " << - c->getName() << " on queue '" << name << "'"); - listeners.addListener(c); return false; } - - if (c->filter(msg.payload) && !msg.payload->hasExpired()) { - if (c->accept(msg.payload)) { - //consumer wants the message - c->position = msg.position; - m = msg; - return true; - } else { - //browser hasn't got enough credit for the message - QPID_LOG(debug, "Browser can't currently accept message from '" << name << "'"); - return false; - } - } else { - //consumer will never want this message, continue seeking - QPID_LOG(debug, "Browser skipping message from '" << name << "'"); - c->position = msg.position; - } - } - return false; -} - -void Queue::removeListener(Consumer::shared_ptr c) -{ - QueueListeners::NotificationSet set; - { - Mutex::ScopedLock locker(messageLock); - listeners.removeListener(c); - if (messages->size()) { - listeners.populate(set); - } - } - set.notify(); -} - -bool Queue::dispatch(Consumer::shared_ptr c) -{ - QueuedMessage msg(this); - if (getNextMessage(msg, c)) { - c->deliver(msg); - return true; - } else { - return false; } } diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index 609c4ecb87..7d10322163 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -300,8 +300,9 @@ class ReplicatingSubscription : public SemanticState::ConsumerImpl, public Queue ~DelegatingConsumer(); bool deliver(QueuedMessage& msg); void notify(); - bool filter(boost::intrusive_ptr); - bool accept(boost::intrusive_ptr); + //bool filter(boost::intrusive_ptr); + //bool accept(boost::intrusive_ptr); + Consumer::Action accept(const QueuedMessage&); OwnershipToken* getSession(); private: ReplicatingSubscription& delegate; @@ -498,7 +499,7 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, ) : - Consumer(_name, _acquire), + Consumer(_name, _acquire, !_acquire), /** @todo KAG - allow configuration of 'browse acquired' */ parent(_parent), queue(_queue), ackExpected(ack), @@ -572,6 +573,7 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg) return true; } +#if 0 bool SemanticState::ConsumerImpl::filter(intrusive_ptr) { return true; @@ -588,6 +590,25 @@ bool SemanticState::ConsumerImpl::accept(intrusive_ptr msg) blocked = !(filter(msg) && checkCredit(msg)); return !blocked; } +#else +Consumer::Action SemanticState::ConsumerImpl::accept(const QueuedMessage& msg) +{ + /** @todo KAG if present, run selector against msg.payload */ + bool selected = true; + + /** @todo KAG - traditional consumers/browsers (non-selectors) will never skip, always ACCEPT (or RETRY if !credit) */ + if (selected) { + if (!checkCredit(msg.payload)) + return Consumer::RETRY; + return Consumer::ACCEPT; + } else { + return Consumer::SKIP; + } +} +#endif + + + namespace { struct ConsumerName { @@ -618,7 +639,7 @@ void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr& msg) } -bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr& msg) +bool SemanticState::ConsumerImpl::checkCredit(const intrusive_ptr& msg) { bool enoughCredit = msgCredit > 0 && (byteCredit == 0xFFFFFFFF || byteCredit >= msg->getRequiredCredit()); @@ -1085,8 +1106,9 @@ bool ReplicatingSubscription::DelegatingConsumer::deliver(QueuedMessage& m) return delegate.deliver(m); } void ReplicatingSubscription::DelegatingConsumer::notify() { delegate.notify(); } -bool ReplicatingSubscription::DelegatingConsumer::filter(boost::intrusive_ptr msg) { return delegate.filter(msg); } -bool ReplicatingSubscription::DelegatingConsumer::accept(boost::intrusive_ptr msg) { return delegate.accept(msg); } +//bool ReplicatingSubscription::DelegatingConsumer::filter(boost::intrusive_ptr msg) { return delegate.filter(msg); } +//bool ReplicatingSubscription::DelegatingConsumer::accept(boost::intrusive_ptr msg) { return delegate.accept(msg); } +Consumer::Action ReplicatingSubscription::DelegatingConsumer::accept(const QueuedMessage& msg) { return delegate.accept(msg); } OwnershipToken* ReplicatingSubscription::DelegatingConsumer::getSession() { return delegate.getSession(); } ReplicationStateInitialiser::ReplicationStateInitialiser(qpid::framing::SequenceSet& r, diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h index ac98bf29b2..62c829e8f8 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.h +++ b/qpid/cpp/src/qpid/broker/SemanticState.h @@ -96,7 +96,7 @@ class SemanticState : private boost::noncopyable { int deliveryCount; qmf::org::apache::qpid::broker::Subscription* mgmtObject; - bool checkCredit(boost::intrusive_ptr& msg); + bool checkCredit(const boost::intrusive_ptr& msg); void allocateCredit(boost::intrusive_ptr& msg); bool haveCredit(); @@ -114,8 +114,10 @@ class SemanticState : private boost::noncopyable { virtual ~ConsumerImpl(); OwnershipToken* getSession(); virtual bool deliver(QueuedMessage& msg); - bool filter(boost::intrusive_ptr msg); - bool accept(boost::intrusive_ptr msg); + //bool filter(boost::intrusive_ptr msg); + //bool accept(boost::intrusive_ptr msg); + virtual Action accept(const QueuedMessage& msg); + void disableNotify(); void enableNotify(); -- cgit v1.2.1