summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/SemanticState.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/SemanticState.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp34
1 files changed, 28 insertions, 6 deletions
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index 609c4ecb87..7d10322163 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -300,8 +300,9 @@ class ReplicatingSubscription : public SemanticState::ConsumerImpl, public Queue
~DelegatingConsumer();
bool deliver(QueuedMessage& msg);
void notify();
- bool filter(boost::intrusive_ptr<Message>);
- bool accept(boost::intrusive_ptr<Message>);
+ //bool filter(boost::intrusive_ptr<Message>);
+ //bool accept(boost::intrusive_ptr<Message>);
+ Consumer::Action accept(const QueuedMessage&);
OwnershipToken* getSession();
private:
ReplicatingSubscription& delegate;
@@ -498,7 +499,7 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
) :
- Consumer(_name, _acquire),
+ Consumer(_name, _acquire, !_acquire), /** @todo KAG - allow configuration of 'browse acquired' */
parent(_parent),
queue(_queue),
ackExpected(ack),
@@ -572,6 +573,7 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
return true;
}
+#if 0
bool SemanticState::ConsumerImpl::filter(intrusive_ptr<Message>)
{
return true;
@@ -588,6 +590,25 @@ bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg)
blocked = !(filter(msg) && checkCredit(msg));
return !blocked;
}
+#else
+Consumer::Action SemanticState::ConsumerImpl::accept(const QueuedMessage& msg)
+{
+ /** @todo KAG if present, run selector against msg.payload */
+ bool selected = true;
+
+ /** @todo KAG - traditional consumers/browsers (non-selectors) will never skip, always ACCEPT (or RETRY if !credit) */
+ if (selected) {
+ if (!checkCredit(msg.payload))
+ return Consumer::RETRY;
+ return Consumer::ACCEPT;
+ } else {
+ return Consumer::SKIP;
+ }
+}
+#endif
+
+
+
namespace {
struct ConsumerName {
@@ -618,7 +639,7 @@ void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg)
}
-bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg)
+bool SemanticState::ConsumerImpl::checkCredit(const intrusive_ptr<Message>& msg)
{
bool enoughCredit = msgCredit > 0 &&
(byteCredit == 0xFFFFFFFF || byteCredit >= msg->getRequiredCredit());
@@ -1085,8 +1106,9 @@ bool ReplicatingSubscription::DelegatingConsumer::deliver(QueuedMessage& m)
return delegate.deliver(m);
}
void ReplicatingSubscription::DelegatingConsumer::notify() { delegate.notify(); }
-bool ReplicatingSubscription::DelegatingConsumer::filter(boost::intrusive_ptr<Message> msg) { return delegate.filter(msg); }
-bool ReplicatingSubscription::DelegatingConsumer::accept(boost::intrusive_ptr<Message> msg) { return delegate.accept(msg); }
+//bool ReplicatingSubscription::DelegatingConsumer::filter(boost::intrusive_ptr<Message> msg) { return delegate.filter(msg); }
+//bool ReplicatingSubscription::DelegatingConsumer::accept(boost::intrusive_ptr<Message> msg) { return delegate.accept(msg); }
+Consumer::Action ReplicatingSubscription::DelegatingConsumer::accept(const QueuedMessage& msg) { return delegate.accept(msg); }
OwnershipToken* ReplicatingSubscription::DelegatingConsumer::getSession() { return delegate.getSession(); }
ReplicationStateInitialiser::ReplicationStateInitialiser(qpid::framing::SequenceSet& r,