summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-11-30 15:04:21 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-11-30 15:04:21 +0000
commitb41fd08fc751cbc288ed96cbdce7ba3ca074f458 (patch)
treee3c233f818e51a30aa369ca1cf278fad1087f8ff
parentda8964d6aad7d8b9637523cf49ba0e86e609a6ca (diff)
downloadqpid-python-b41fd08fc751cbc288ed96cbdce7ba3ca074f458.tar.gz
QPID-3603: checkpoint prototyped changes
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-kgiusti@1208463 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Consumer.h21
-rw-r--r--qpid/cpp/src/qpid/broker/FifoDistributor.cpp12
-rw-r--r--qpid/cpp/src/qpid/broker/FifoDistributor.h3
-rw-r--r--qpid/cpp/src/qpid/broker/MessageDistributor.h11
-rw-r--r--qpid/cpp/src/qpid/broker/MessageGroupManager.cpp18
-rw-r--r--qpid/cpp/src/qpid/broker/MessageGroupManager.h3
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp122
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp34
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.h8
9 files changed, 97 insertions, 135 deletions
diff --git a/qpid/cpp/src/qpid/broker/Consumer.h b/qpid/cpp/src/qpid/broker/Consumer.h
index 2af9b0c121..a3838fb6f0 100644
--- a/qpid/cpp/src/qpid/broker/Consumer.h
+++ b/qpid/cpp/src/qpid/broker/Consumer.h
@@ -33,6 +33,8 @@ class QueueListeners;
class Consumer {
const bool acquires;
+ const bool browseAcquired;
+ const bool rewindable;
// inListeners allows QueueListeners to efficiently track if this instance is registered
// for notifications without having to search its containers
bool inListeners;
@@ -44,18 +46,29 @@ class Consumer {
framing::SequenceNumber position;
- Consumer(const std::string& _name, bool preAcquires = true)
- : acquires(preAcquires), inListeners(false), name(_name), position(0) {}
+ Consumer(const std::string& _name, bool preAcquires = true, bool ba = false)
+ : acquires(preAcquires), browseAcquired(ba), rewindable(false), inListeners(false),
+ name(_name), position(0) {}
bool preAcquires() const { return acquires; }
const std::string& getName() const { return name; }
virtual bool deliver(QueuedMessage& msg) = 0;
virtual void notify() = 0;
- virtual bool filter(boost::intrusive_ptr<Message>) { return true; }
- virtual bool accept(boost::intrusive_ptr<Message>) { return true; }
+ // virtual bool filter(boost::intrusive_ptr<Message>) { return true; }
+ // virtual bool accept(boost::intrusive_ptr<Message>) { return true; }
virtual OwnershipToken* getSession() = 0;
virtual ~Consumer(){}
friend class QueueListeners;
+
+ /** true if Consumer is a browsing subscription */
+ bool isBrowsing() const { return !acquires; }
+ /** if true, pass acquired messages to consumer, as well as un-acquired */
+ virtual bool allowAcquired() const { return isBrowsing() && browseAcquired; }
+ /** if true, reset consumer's position to queue HEAD if messages released. */
+ virtual bool rewindOnRelease() const { return rewindable; }
+ /** called by Queue to allow consumer to filter the current message */
+ enum Action {ACCEPT, SKIP, RETRY};
+ virtual Action accept(const QueuedMessage& msg) = 0;
};
}}
diff --git a/qpid/cpp/src/qpid/broker/FifoDistributor.cpp b/qpid/cpp/src/qpid/broker/FifoDistributor.cpp
index d63feffd57..bce9625183 100644
--- a/qpid/cpp/src/qpid/broker/FifoDistributor.cpp
+++ b/qpid/cpp/src/qpid/broker/FifoDistributor.cpp
@@ -28,23 +28,17 @@ using namespace qpid::broker;
FifoDistributor::FifoDistributor(Messages& container)
: messages(container) {}
-bool FifoDistributor::nextConsumableMessage( Consumer::shared_ptr&, QueuedMessage& next )
+bool FifoDistributor::nextMessage( Consumer::shared_ptr& c, QueuedMessage& next )
{
- return messages.consume(next);
+ return messages.browse(c->position, next, !c->allowAcquired());
}
bool FifoDistributor::allocate(const std::string&, const QueuedMessage& )
{
- // by default, all messages present on the queue may be allocated as they have yet to
- // be acquired.
+ // The Fifo distributor does not enforce or record message allocation
return true;
}
-bool FifoDistributor::nextBrowsableMessage( Consumer::shared_ptr& c, QueuedMessage& next )
-{
- return messages.browse(c->position, next, false);
-}
-
void FifoDistributor::query(qpid::types::Variant::Map&) const
{
// nothing to see here....
diff --git a/qpid/cpp/src/qpid/broker/FifoDistributor.h b/qpid/cpp/src/qpid/broker/FifoDistributor.h
index 245537ed12..7fe9502de3 100644
--- a/qpid/cpp/src/qpid/broker/FifoDistributor.h
+++ b/qpid/cpp/src/qpid/broker/FifoDistributor.h
@@ -44,9 +44,8 @@ class FifoDistributor : public MessageDistributor
/** MessageDistributor interface */
- bool nextConsumableMessage( Consumer::shared_ptr& consumer, QueuedMessage& next );
+ bool nextMessage( Consumer::shared_ptr& consumer, QueuedMessage& next );
bool allocate(const std::string& consumer, const QueuedMessage& target);
- bool nextBrowsableMessage( Consumer::shared_ptr& consumer, QueuedMessage& next );
void query(qpid::types::Variant::Map&) const;
private:
diff --git a/qpid/cpp/src/qpid/broker/MessageDistributor.h b/qpid/cpp/src/qpid/broker/MessageDistributor.h
index 090393c160..33025422e3 100644
--- a/qpid/cpp/src/qpid/broker/MessageDistributor.h
+++ b/qpid/cpp/src/qpid/broker/MessageDistributor.h
@@ -48,8 +48,7 @@ class MessageDistributor
* @param next set to the next message that the consumer may consume.
* @return true if message is available and next is set
*/
- virtual bool nextConsumableMessage( Consumer::shared_ptr& consumer,
- QueuedMessage& next ) = 0;
+ virtual bool nextMessage( Consumer::shared_ptr& consumer, QueuedMessage& next ) = 0;
/** Allow the comsumer to take ownership of the given message.
* @param consumer the name of the consumer that is attempting to acquire the message
@@ -59,14 +58,6 @@ class MessageDistributor
virtual bool allocate( const std::string& consumer,
const QueuedMessage& target) = 0;
- /** Determine the next message available for browsing by the consumer
- * @param consumer the consumer that is browsing the queue
- * @param next set to the next message that the consumer may browse.
- * @return true if a message is available and next is returned
- */
- virtual bool nextBrowsableMessage( Consumer::shared_ptr& consumer,
- QueuedMessage& next ) = 0;
-
/** hook to add any interesting management state to the status map */
virtual void query(qpid::types::Variant::Map&) const = 0;
};
diff --git a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
index 24ac394e26..0caa4ba465 100644
--- a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
+++ b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
@@ -202,11 +202,16 @@ MessageGroupManager::~MessageGroupManager()
{
QPID_LOG( debug, "group queue " << qName << " cache results: hits=" << hits << " misses=" << misses );
}
-bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, QueuedMessage& next )
+bool MessageGroupManager::nextMessage( Consumer::shared_ptr& c, QueuedMessage& next )
{
if (!messages.size())
return false;
+ // Message groups are ignored for browsers
+ if (c->isBrowsing()) {
+ return messages.browse(c->position, next, !c->allowAcquired());
+ }
+
next.position = c->position;
if (!freeGroups.empty()) {
const framing::SequenceNumber& nextFree = freeGroups.begin()->first;
@@ -219,13 +224,12 @@ bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, Queued
while (messages.browse( next.position, next, true )) {
GroupState& group = findGroup(next);
if (!group.owned()) {
- //TODO: make acquire more efficient when we already have the message in question
- if (group.members.front() == next.position && messages.acquire(next.position, next)) { // only take from head!
+ if (group.members.front() == next.position) { // only take from head!
return true;
}
QPID_LOG(debug, "Skipping " << next.position << " since group " << group.group
<< "'s head message still pending. pos=" << group.members.front());
- } else if (group.owner == c->getName() && messages.acquire(next.position, next)) {
+ } else if (group.owner == c->getName()) {
return true;
}
}
@@ -247,12 +251,6 @@ bool MessageGroupManager::allocate(const std::string& consumer, const QueuedMess
return state.owner == consumer;
}
-bool MessageGroupManager::nextBrowsableMessage( Consumer::shared_ptr& c, QueuedMessage& next )
-{
- // browse: allow access to any available msg, regardless of group ownership (?ok?)
- return messages.browse(c->position, next, false);
-}
-
void MessageGroupManager::query(qpid::types::Variant::Map& status) const
{
/** Add a description of the current state of the message groups for this queue.
diff --git a/qpid/cpp/src/qpid/broker/MessageGroupManager.h b/qpid/cpp/src/qpid/broker/MessageGroupManager.h
index f4bffc4760..eb6de95d25 100644
--- a/qpid/cpp/src/qpid/broker/MessageGroupManager.h
+++ b/qpid/cpp/src/qpid/broker/MessageGroupManager.h
@@ -103,9 +103,8 @@ class MessageGroupManager : public StatefulQueueObserver, public MessageDistribu
void setState(const qpid::framing::FieldTable&);
// MessageDistributor iface
- bool nextConsumableMessage(Consumer::shared_ptr& c, QueuedMessage& next);
+ bool nextMessage(Consumer::shared_ptr& c, QueuedMessage& next);
bool allocate(const std::string& c, const QueuedMessage& qm);
- bool nextBrowsableMessage(Consumer::shared_ptr& c, QueuedMessage& next);
void query(qpid::types::Variant::Map&) const;
bool match(const qpid::types::Variant::Map*, const QueuedMessage&) const;
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 41c912d71c..c47ba554d2 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -276,118 +276,62 @@ void Queue::notifyListener()
set.notify();
}
-bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr& c)
+void Queue::removeListener(Consumer::shared_ptr c)
{
- checkNotDeleted();
- if (c->preAcquires()) {
- switch (consumeNextMessage(m, c)) {
- case CONSUMED:
- return true;
- case CANT_CONSUME:
- notifyListener();//let someone else try
- case NO_MESSAGES:
- default:
- return false;
+ QueueListeners::NotificationSet set;
+ {
+ Mutex::ScopedLock locker(messageLock);
+ listeners.removeListener(c);
+ if (messages->size()) {
+ listeners.populate(set);
}
- } else {
- return browseNextMessage(m, c);
}
+ set.notify();
}
-Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr& c)
+// give the consumer a message from the queue
+bool Queue::dispatch(Consumer::shared_ptr c)
{
- QueuedMessage msg;
+ QueuedMessage msg(this);
while (true) {
Mutex::ScopedLock locker(messageLock);
- if (allocator->nextConsumableMessage(c, msg)) {
+ if (allocator->nextMessage(c, msg)) {
+
if (msg.payload->hasExpired()) {
QPID_LOG(debug, "Message expired from queue '" << name << "'");
- c->position = msg.position;
+ acquire(msg.position, msg, locker);
dequeue(0, msg);
continue;
}
- if (c->filter(msg.payload)) {
- if (c->accept(msg.payload)) {
- bool ok = allocator->allocate( c->getName(), msg ); // inform allocator
+ switch (c->accept(msg)) {
+ case Consumer::ACCEPT:
+ if (!c->isBrowsing()) {
+ // consume this message
+ bool ok = acquire(msg.position, msg, locker);
+ (void) ok; assert(ok);
+ ok = allocator->allocate( c->getName(), msg );
(void) ok; assert(ok);
- observeAcquire(msg, locker);
- m = msg;
- return CONSUMED;
- } else {
- //message(s) are available but consumer hasn't got enough credit
- QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'");
- messages->release(msg);
- return CANT_CONSUME;
}
- } else {
- //consumer will never want this message
+ c->position = msg.position;
+ c->deliver(msg);
+ return true;
+ case Consumer::RETRY:
+ // consumer wants this message, but cannot accept it at this time
+ QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'");
+ notifyListener();//let someone else try
+ return false;
+ case Consumer::SKIP:
+ // consumer will never want this message, continue looking...
QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
- messages->release(msg);
- return CANT_CONSUME;
+ c->position = msg.position;
+ continue;
}
} else {
QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
listeners.addListener(c);
- return NO_MESSAGES;
- }
- }
-}
-
-bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr& c)
-{
- while (true) {
- Mutex::ScopedLock locker(messageLock);
- QueuedMessage msg;
-
- if (!allocator->nextBrowsableMessage(c, msg)) { // no next available
- QPID_LOG(debug, "No browsable messages available for consumer " <<
- c->getName() << " on queue '" << name << "'");
- listeners.addListener(c);
return false;
}
-
- if (c->filter(msg.payload) && !msg.payload->hasExpired()) {
- if (c->accept(msg.payload)) {
- //consumer wants the message
- c->position = msg.position;
- m = msg;
- return true;
- } else {
- //browser hasn't got enough credit for the message
- QPID_LOG(debug, "Browser can't currently accept message from '" << name << "'");
- return false;
- }
- } else {
- //consumer will never want this message, continue seeking
- QPID_LOG(debug, "Browser skipping message from '" << name << "'");
- c->position = msg.position;
- }
- }
- return false;
-}
-
-void Queue::removeListener(Consumer::shared_ptr c)
-{
- QueueListeners::NotificationSet set;
- {
- Mutex::ScopedLock locker(messageLock);
- listeners.removeListener(c);
- if (messages->size()) {
- listeners.populate(set);
- }
- }
- set.notify();
-}
-
-bool Queue::dispatch(Consumer::shared_ptr c)
-{
- QueuedMessage msg(this);
- if (getNextMessage(msg, c)) {
- c->deliver(msg);
- return true;
- } else {
- return false;
}
}
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index 609c4ecb87..7d10322163 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -300,8 +300,9 @@ class ReplicatingSubscription : public SemanticState::ConsumerImpl, public Queue
~DelegatingConsumer();
bool deliver(QueuedMessage& msg);
void notify();
- bool filter(boost::intrusive_ptr<Message>);
- bool accept(boost::intrusive_ptr<Message>);
+ //bool filter(boost::intrusive_ptr<Message>);
+ //bool accept(boost::intrusive_ptr<Message>);
+ Consumer::Action accept(const QueuedMessage&);
OwnershipToken* getSession();
private:
ReplicatingSubscription& delegate;
@@ -498,7 +499,7 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
) :
- Consumer(_name, _acquire),
+ Consumer(_name, _acquire, !_acquire), /** @todo KAG - allow configuration of 'browse acquired' */
parent(_parent),
queue(_queue),
ackExpected(ack),
@@ -572,6 +573,7 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
return true;
}
+#if 0
bool SemanticState::ConsumerImpl::filter(intrusive_ptr<Message>)
{
return true;
@@ -588,6 +590,25 @@ bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg)
blocked = !(filter(msg) && checkCredit(msg));
return !blocked;
}
+#else
+Consumer::Action SemanticState::ConsumerImpl::accept(const QueuedMessage& msg)
+{
+ /** @todo KAG if present, run selector against msg.payload */
+ bool selected = true;
+
+ /** @todo KAG - traditional consumers/browsers (non-selectors) will never skip, always ACCEPT (or RETRY if !credit) */
+ if (selected) {
+ if (!checkCredit(msg.payload))
+ return Consumer::RETRY;
+ return Consumer::ACCEPT;
+ } else {
+ return Consumer::SKIP;
+ }
+}
+#endif
+
+
+
namespace {
struct ConsumerName {
@@ -618,7 +639,7 @@ void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg)
}
-bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg)
+bool SemanticState::ConsumerImpl::checkCredit(const intrusive_ptr<Message>& msg)
{
bool enoughCredit = msgCredit > 0 &&
(byteCredit == 0xFFFFFFFF || byteCredit >= msg->getRequiredCredit());
@@ -1085,8 +1106,9 @@ bool ReplicatingSubscription::DelegatingConsumer::deliver(QueuedMessage& m)
return delegate.deliver(m);
}
void ReplicatingSubscription::DelegatingConsumer::notify() { delegate.notify(); }
-bool ReplicatingSubscription::DelegatingConsumer::filter(boost::intrusive_ptr<Message> msg) { return delegate.filter(msg); }
-bool ReplicatingSubscription::DelegatingConsumer::accept(boost::intrusive_ptr<Message> msg) { return delegate.accept(msg); }
+//bool ReplicatingSubscription::DelegatingConsumer::filter(boost::intrusive_ptr<Message> msg) { return delegate.filter(msg); }
+//bool ReplicatingSubscription::DelegatingConsumer::accept(boost::intrusive_ptr<Message> msg) { return delegate.accept(msg); }
+Consumer::Action ReplicatingSubscription::DelegatingConsumer::accept(const QueuedMessage& msg) { return delegate.accept(msg); }
OwnershipToken* ReplicatingSubscription::DelegatingConsumer::getSession() { return delegate.getSession(); }
ReplicationStateInitialiser::ReplicationStateInitialiser(qpid::framing::SequenceSet& r,
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h
index ac98bf29b2..62c829e8f8 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.h
+++ b/qpid/cpp/src/qpid/broker/SemanticState.h
@@ -96,7 +96,7 @@ class SemanticState : private boost::noncopyable {
int deliveryCount;
qmf::org::apache::qpid::broker::Subscription* mgmtObject;
- bool checkCredit(boost::intrusive_ptr<Message>& msg);
+ bool checkCredit(const boost::intrusive_ptr<Message>& msg);
void allocateCredit(boost::intrusive_ptr<Message>& msg);
bool haveCredit();
@@ -114,8 +114,10 @@ class SemanticState : private boost::noncopyable {
virtual ~ConsumerImpl();
OwnershipToken* getSession();
virtual bool deliver(QueuedMessage& msg);
- bool filter(boost::intrusive_ptr<Message> msg);
- bool accept(boost::intrusive_ptr<Message> msg);
+ //bool filter(boost::intrusive_ptr<Message> msg);
+ //bool accept(boost::intrusive_ptr<Message> msg);
+ virtual Action accept(const QueuedMessage& msg);
+
void disableNotify();
void enableNotify();