diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/SemanticState.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.cpp | 34 |
1 files changed, 28 insertions, 6 deletions
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index 609c4ecb87..7d10322163 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -300,8 +300,9 @@ class ReplicatingSubscription : public SemanticState::ConsumerImpl, public Queue ~DelegatingConsumer(); bool deliver(QueuedMessage& msg); void notify(); - bool filter(boost::intrusive_ptr<Message>); - bool accept(boost::intrusive_ptr<Message>); + //bool filter(boost::intrusive_ptr<Message>); + //bool accept(boost::intrusive_ptr<Message>); + Consumer::Action accept(const QueuedMessage&); OwnershipToken* getSession(); private: ReplicatingSubscription& delegate; @@ -498,7 +499,7 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, ) : - Consumer(_name, _acquire), + Consumer(_name, _acquire, !_acquire), /** @todo KAG - allow configuration of 'browse acquired' */ parent(_parent), queue(_queue), ackExpected(ack), @@ -572,6 +573,7 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg) return true; } +#if 0 bool SemanticState::ConsumerImpl::filter(intrusive_ptr<Message>) { return true; @@ -588,6 +590,25 @@ bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg) blocked = !(filter(msg) && checkCredit(msg)); return !blocked; } +#else +Consumer::Action SemanticState::ConsumerImpl::accept(const QueuedMessage& msg) +{ + /** @todo KAG if present, run selector against msg.payload */ + bool selected = true; + + /** @todo KAG - traditional consumers/browsers (non-selectors) will never skip, always ACCEPT (or RETRY if !credit) */ + if (selected) { + if (!checkCredit(msg.payload)) + return Consumer::RETRY; + return Consumer::ACCEPT; + } else { + return Consumer::SKIP; + } +} +#endif + + + namespace { struct ConsumerName { @@ -618,7 +639,7 @@ void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg) } -bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg) +bool SemanticState::ConsumerImpl::checkCredit(const intrusive_ptr<Message>& msg) { bool enoughCredit = msgCredit > 0 && (byteCredit == 0xFFFFFFFF || byteCredit >= msg->getRequiredCredit()); @@ -1085,8 +1106,9 @@ bool ReplicatingSubscription::DelegatingConsumer::deliver(QueuedMessage& m) return delegate.deliver(m); } void ReplicatingSubscription::DelegatingConsumer::notify() { delegate.notify(); } -bool ReplicatingSubscription::DelegatingConsumer::filter(boost::intrusive_ptr<Message> msg) { return delegate.filter(msg); } -bool ReplicatingSubscription::DelegatingConsumer::accept(boost::intrusive_ptr<Message> msg) { return delegate.accept(msg); } +//bool ReplicatingSubscription::DelegatingConsumer::filter(boost::intrusive_ptr<Message> msg) { return delegate.filter(msg); } +//bool ReplicatingSubscription::DelegatingConsumer::accept(boost::intrusive_ptr<Message> msg) { return delegate.accept(msg); } +Consumer::Action ReplicatingSubscription::DelegatingConsumer::accept(const QueuedMessage& msg) { return delegate.accept(msg); } OwnershipToken* ReplicatingSubscription::DelegatingConsumer::getSession() { return delegate.getSession(); } ReplicationStateInitialiser::ReplicationStateInitialiser(qpid::framing::SequenceSet& r, |