summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-08-05 20:47:10 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-08-05 20:47:10 +0000
commit45012dc9764465561ee1edbc4d3de4fac03c5b54 (patch)
tree0efa83203b11cff3dd8145919a7b5c2efa2cdf79
parent3489d568087c433e75e7b6fd7c0f5eae96983d40 (diff)
downloadqpid-python-45012dc9764465561ee1edbc4d3de4fac03c5b54.tar.gz
QPID-3346: refactor queue interface to support consumer-based message selection.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3346@1154376 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Consumer.h14
-rw-r--r--qpid/cpp/src/qpid/broker/DeliveryRecord.cpp12
-rw-r--r--qpid/cpp/src/qpid/broker/DeliveryRecord.h11
-rw-r--r--qpid/cpp/src/qpid/broker/LegacyLVQ.cpp4
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp441
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h35
-rw-r--r--qpid/cpp/src/qpid/broker/QueueEvents.cpp4
-rw-r--r--qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp7
-rw-r--r--qpid/cpp/src/qpid/broker/QueueFlowLimit.h3
-rw-r--r--qpid/cpp/src/qpid/broker/QueueObserver.h38
-rw-r--r--qpid/cpp/src/qpid/broker/QueuePolicy.cpp3
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp55
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.h6
-rw-r--r--qpid/cpp/src/qpid/broker/ThresholdAlerts.h3
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.cpp14
-rw-r--r--qpid/cpp/src/tests/QueueTest.cpp21
16 files changed, 521 insertions, 150 deletions
diff --git a/qpid/cpp/src/qpid/broker/Consumer.h b/qpid/cpp/src/qpid/broker/Consumer.h
index 317338a8ad..75deec5b27 100644
--- a/qpid/cpp/src/qpid/broker/Consumer.h
+++ b/qpid/cpp/src/qpid/broker/Consumer.h
@@ -36,13 +36,17 @@ class Consumer {
// inListeners allows QueueListeners to efficiently track if this instance is registered
// for notifications without having to search its containers
bool inListeners;
- public:
- typedef boost::shared_ptr<Consumer> shared_ptr;
-
+ const std::string name;
+ public:
+ typedef boost::shared_ptr<Consumer> shared_ptr;
+
framing::SequenceNumber position;
-
- Consumer(bool preAcquires = true) : acquires(preAcquires), inListeners(false) {}
+
+ Consumer(const std::string& _name, bool preAcquires = true)
+ : acquires(preAcquires), inListeners(false), name(_name), position(0) {}
bool preAcquires() const { return acquires; }
+ const std::string& getName() const { return name; }
+
virtual bool deliver(QueuedMessage& msg) = 0;
virtual void notify() = 0;
virtual bool filter(boost::intrusive_ptr<Message>) { return true; }
diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
index 58dcc6d7c7..1b42c67edd 100644
--- a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
+++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
@@ -142,7 +142,7 @@ void DeliveryRecord::reject()
//just drop it
QPID_LOG(info, "Dropping rejected message from " << queue->getName());
}
- dequeue();
+ queue->dequeue(0, msg);
setEnded();
}
}
@@ -152,8 +152,14 @@ uint32_t DeliveryRecord::getCredit() const
return credit;
}
-void DeliveryRecord::acquire(DeliveryIds& results) {
- if (queue->acquire(msg)) {
+void DeliveryRecord::acquire(SemanticState* const session, DeliveryIds& results) {
+ SemanticState::ConsumerImpl::shared_ptr consumer;
+
+ if (!session->find( tag, consumer )) {
+ QPID_LOG(error, "Can't acquire message " << id.getValue() << ": original subscription no longer exists.");
+ }
+
+ if (queue->acquire(msg, consumer)) {
acquired = true;
results.push_back(id);
if (!acceptExpected) {
diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.h b/qpid/cpp/src/qpid/broker/DeliveryRecord.h
index d388ba94be..ba3e1d5cfb 100644
--- a/qpid/cpp/src/qpid/broker/DeliveryRecord.h
+++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.h
@@ -46,7 +46,7 @@ class DeliveryRecord
{
QueuedMessage msg;
mutable boost::shared_ptr<Queue> queue;
- std::string tag;
+ std::string tag; // name of consumer
DeliveryId id;
bool acquired : 1;
bool acceptExpected : 1;
@@ -82,7 +82,7 @@ class DeliveryRecord
void reject();
void cancel(const std::string& tag);
void redeliver(SemanticState* const);
- void acquire(DeliveryIds& results);
+ void acquire(SemanticState* const, DeliveryIds& results);
void complete();
bool accept(TransactionContext* ctxt); // Returns isRedundant()
bool setEnded(); // Returns isRedundant()
@@ -90,7 +90,7 @@ class DeliveryRecord
bool isAcquired() const { return acquired; }
bool isComplete() const { return completed; }
- bool isRedundant() const { return ended && (!windowing || completed); }
+ bool isRedundant() const { return ended && (!windowing || completed); } // msg no longer needed - can discard
bool isCancelled() const { return cancelled; }
bool isAccepted() const { return !acceptExpected; }
bool isEnded() const { return ended; }
@@ -117,13 +117,14 @@ inline bool operator<(const DeliveryRecord& a, const framing::SequenceNumber& b)
struct AcquireFunctor
{
+ SemanticState* session;
DeliveryIds& results;
- AcquireFunctor(DeliveryIds& _results) : results(_results) {}
+ AcquireFunctor(SemanticState* _session, DeliveryIds& _results) : session(_session), results(_results) {}
void operator()(DeliveryRecord& record)
{
- record.acquire(results);
+ record.acquire(session, results);
}
};
diff --git a/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp b/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp
index a811a86492..7d9cb4c1a0 100644
--- a/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp
+++ b/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp
@@ -35,7 +35,9 @@ void LegacyLVQ::setNoBrowse(bool b)
bool LegacyLVQ::remove(const framing::SequenceNumber& position, QueuedMessage& message)
{
Ordering::iterator i = messages.find(position);
- if (i != messages.end() && i->second.payload == message.payload) {
+ if (i != messages.end() &&
+ // @todo KAG: gsim? is a bug? message is a *return* value - we really shouldn't check ".payload" below:
+ i->second.payload == message.payload) {
message = i->second;
erase(i);
return true;
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 42923567a2..c9cea9212a 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -87,6 +87,28 @@ const int ENQUEUE_ONLY=1;
const int ENQUEUE_AND_DEQUEUE=2;
}
+
+// KAG TBD: find me a home....
+namespace qpid {
+namespace broker {
+
+class MessageSelector
+{
+ protected:
+ Queue *queue;
+ public:
+ MessageSelector( Queue *q ) : queue(q) {}
+ virtual ~MessageSelector() {};
+
+ // assumes caller holds messageLock
+ virtual bool nextMessage( Consumer::shared_ptr c, QueuedMessage& next,
+ const Mutex::ScopedLock&);
+ virtual bool canAcquire(Consumer::shared_ptr consumer, const QueuedMessage& qm,
+ const Mutex::ScopedLock&);
+};
+
+}}
+
Queue::Queue(const string& _name, bool _autodelete,
MessageStore* const _store,
const OwnershipToken* const _owner,
@@ -111,7 +133,8 @@ Queue::Queue(const string& _name, bool _autodelete,
broker(b),
deleted(false),
barrier(*this),
- autoDeleteTimeout(0)
+ autoDeleteTimeout(0),
+ selector(new MessageSelector( this )) // KAG TODO: FIX!!
{
if (parent != 0 && broker != 0) {
ManagementAgent* agent = broker->getManagementAgent();
@@ -220,6 +243,14 @@ void Queue::requeue(const QueuedMessage& msg){
enqueue(0, payload);
}
}
+
+ for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
+ try{
+ (*i)->requeued(msg);
+ } catch (const std::exception& e) {
+ QPID_LOG(warning, "Exception on notification of message requeue for queue " << getName() << ": " << e.what());
+ }
+ }
}
copy.notify();
}
@@ -229,7 +260,7 @@ bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& mess
Mutex::ScopedLock locker(messageLock);
assertClusterSafe();
QPID_LOG(debug, "Attempting to acquire message at " << position);
- if (messages->remove(position, message)) {
+ if (acquire(position, message )) {
QPID_LOG(debug, "Acquired message at " << position << " from " << name);
return true;
} else {
@@ -238,9 +269,24 @@ bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& mess
}
}
-bool Queue::acquire(const QueuedMessage& msg) {
- QueuedMessage copy = msg;
- return acquireMessageAt(msg.position, copy);
+bool Queue::acquire(const QueuedMessage& msg, Consumer::shared_ptr c)
+{
+ Mutex::ScopedLock locker(messageLock);
+ assertClusterSafe();
+ QPID_LOG(debug, c->getName() << " attempting to acquire message at " << msg.position);
+
+ if (!selector->canAcquire( c, msg, locker )) {
+ QPID_LOG(debug, "Not permitted to acquire msg at " << msg.position << " from '" << name);
+ return false;
+ }
+
+ QueuedMessage copy(msg);
+ if (acquire( msg.position, copy )) {
+ QPID_LOG(debug, "Acquired message at " << msg.position << " from " << name);
+ return true;
+ }
+ QPID_LOG(debug, "Could not acquire message at " << msg.position << " from " << name << "; no message at that position");
+ return false;
}
void Queue::notifyListener()
@@ -276,44 +322,60 @@ bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
{
+
while (true) {
Mutex::ScopedLock locker(messageLock);
- if (messages->empty()) {
- QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
+ QueuedMessage msg;
+
+ if (!selector->nextMessage(c, msg, locker)) { // no next available
+ QPID_LOG(debug, "No messages available to dispatch to consumer " <<
+ c->getName() << " on queue '" << name << "'");
listeners.addListener(c);
return NO_MESSAGES;
- } else {
- QueuedMessage msg = messages->front();
- if (msg.payload->hasExpired()) {
- QPID_LOG(debug, "Message expired from queue '" << name << "'");
- popAndDequeue();
- continue;
- }
+ }
- if (c->filter(msg.payload)) {
- if (c->accept(msg.payload)) {
- m = msg;
- pop();
- return CONSUMED;
- } else {
- //message(s) are available but consumer hasn't got enough credit
- QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'");
- return CANT_CONSUME;
- }
+ if (msg.payload->hasExpired()) {
+ QPID_LOG(debug, "Message expired from queue '" << name << "'");
+ c->position = msg.position;
+ acquire( msg.position, msg );
+ dequeue( 0, msg );
+ continue;
+ }
+
+ // a message is available for this consumer - can the consumer use it?
+
+ if (c->filter(msg.payload)) {
+ if (c->accept(msg.payload)) {
+ acquire( msg.position, m );
+ c->position = msg.position;
+ return CONSUMED;
} else {
- //consumer will never want this message
- QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
+ //message(s) are available but consumer hasn't got enough credit
+ QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'");
return CANT_CONSUME;
}
+ } else {
+ //consumer will never want this message
+ QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
+ c->position = msg.position;
+ return CANT_CONSUME;
}
}
}
-
bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
{
- QueuedMessage msg(this);
- while (seek(msg, c)) {
+ while (true) {
+ Mutex::ScopedLock locker(messageLock);
+ QueuedMessage msg;
+
+ if (!selector->nextMessage(c, msg, locker)) { // no next available
+ QPID_LOG(debug, "No browsable messages available for consumer " <<
+ c->getName() << " on queue '" << name << "'");
+ listeners.addListener(c);
+ return false;
+ }
+
if (c->filter(msg.payload) && !msg.payload->hasExpired()) {
if (c->accept(msg.payload)) {
//consumer wants the message
@@ -327,8 +389,8 @@ bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
}
} else {
//consumer will never want this message, continue seeking
- c->position = msg.position;
QPID_LOG(debug, "Browser skipping message from '" << name << "'");
+ c->position = msg.position;
}
}
return false;
@@ -358,61 +420,71 @@ bool Queue::dispatch(Consumer::shared_ptr c)
}
}
-// Find the next message
-bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) {
- Mutex::ScopedLock locker(messageLock);
- if (messages->next(c->position, msg)) {
- return true;
- } else {
- listeners.addListener(c);
- return false;
- }
-}
-
-QueuedMessage Queue::find(SequenceNumber pos) const {
+bool Queue::find(SequenceNumber pos, QueuedMessage& msg) const {
Mutex::ScopedLock locker(messageLock);
- QueuedMessage msg;
- messages->find(pos, msg);
- return msg;
+ if (messages->find(pos, msg))
+ return true;
+ return false;
}
void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){
assertClusterSafe();
- Mutex::ScopedLock locker(consumerLock);
- if(exclusive) {
- throw ResourceLockedException(
- QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed."));
- } else if(requestExclusive) {
- if(consumerCount) {
+ {
+ Mutex::ScopedLock locker(consumerLock);
+ if(exclusive) {
throw ResourceLockedException(
- QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied."));
- } else {
- exclusive = c->getSession();
+ QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed."));
+ } else if(requestExclusive) {
+ if(consumerCount) {
+ throw ResourceLockedException(
+ QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied."));
+ } else {
+ exclusive = c->getSession();
+ }
+ }
+ consumerCount++;
+ if (mgmtObject != 0)
+ mgmtObject->inc_consumerCount ();
+ //reset auto deletion timer if necessary
+ if (autoDeleteTimeout && autoDeleteTask) {
+ autoDeleteTask->cancel();
}
}
- consumerCount++;
- if (mgmtObject != 0)
- mgmtObject->inc_consumerCount ();
- //reset auto deletion timer if necessary
- if (autoDeleteTimeout && autoDeleteTask) {
- autoDeleteTask->cancel();
+ for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
+ try{
+ Mutex::ScopedLock locker(messageLock);
+ (*i)->consumerAdded(*c);
+ } catch (const std::exception& e) {
+ QPID_LOG(warning, "Exception on notification of new consumer for queue " << getName() << ": " << e.what());
+ }
}
}
void Queue::cancel(Consumer::shared_ptr c){
removeListener(c);
- Mutex::ScopedLock locker(consumerLock);
- consumerCount--;
- if(exclusive) exclusive = 0;
- if (mgmtObject != 0)
- mgmtObject->dec_consumerCount ();
+ {
+ Mutex::ScopedLock locker(consumerLock);
+ consumerCount--;
+ if(exclusive) exclusive = 0;
+ if (mgmtObject != 0)
+ mgmtObject->dec_consumerCount ();
+ }
+ for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
+ try{
+ Mutex::ScopedLock locker(messageLock);
+ (*i)->consumerRemoved(*c);
+ } catch (const std::exception& e) {
+ QPID_LOG(warning, "Exception on notification of removed consumer for queue " << getName() << ": " << e.what());
+ }
+ }
}
QueuedMessage Queue::get(){
Mutex::ScopedLock locker(messageLock);
QueuedMessage msg(this);
- messages->pop(msg);
+ if (messages->pop(msg))
+ consumed( msg );
return msg;
}
@@ -443,8 +515,15 @@ void Queue::purgeExpired(qpid::sys::Duration lapse)
Mutex::ScopedLock locker(messageLock);
messages->removeIf(boost::bind(&collect_if_expired, boost::ref(expired), _1));
}
- for_each(expired.begin(), expired.end(),
- boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
+
+ for (std::deque<QueuedMessage>::const_iterator i = expired.begin();
+ i != expired.end(); ++i) {
+ {
+ Mutex::ScopedLock locker(messageLock);
+ consumed( *i ); // expects messageLock held
+ }
+ dequeue( 0, *i );
+ }
}
}
@@ -503,18 +582,33 @@ uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty) {
QueuedMessage qmsg = messages->front();
boost::intrusive_ptr<Message> msg = qmsg.payload;
destq->deliver(msg); // deliver message to the destination queue
- pop();
- dequeue(0, qmsg);
+ popAndDequeue();
count++;
}
return count;
}
+/** Acquire the front (oldest) message from the in-memory queue.
+ * assumes messageLock held by caller
+ */
void Queue::pop()
{
assertClusterSafe();
- messages->pop();
- ++dequeueSincePurge;
+ QueuedMessage msg;
+ if (messages->pop(msg)) {
+ consumed( msg ); // mark it removed
+ ++dequeueSincePurge;
+ }
+}
+
+/** Acquire the message at the given position, return true and msg if acquire succeeds */
+bool Queue::acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg )
+{
+ if (messages->remove(position, msg)) {
+ consumed( msg );
+ return true;
+ }
+ return false;
}
void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
@@ -533,6 +627,7 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
}
copy.notify();
if (dequeueRequired) {
+ consumed( removed ); // tell observers
if (isRecovery) {
//can't issue new requests for the store until
//recovery is complete
@@ -696,14 +791,16 @@ void Queue::dequeueCommitted(const QueuedMessage& msg)
}
/**
- * Removes a message from the in-memory delivery queue as well
- * dequeing it from the logical (and persistent if applicable) queue
+ * Removes the first (oldest) message from the in-memory delivery queue as well dequeing
+ * it from the logical (and persistent if applicable) queue
*/
void Queue::popAndDequeue()
{
- QueuedMessage msg = messages->front();
- pop();
- dequeue(0, msg);
+ if (!messages->empty()) {
+ QueuedMessage msg = messages->front();
+ pop();
+ dequeue(0, msg);
+ }
}
/**
@@ -723,6 +820,20 @@ void Queue::dequeued(const QueuedMessage& msg)
}
}
+/** updates queue observers when a message has become unavailable for transfer,
+ * expects messageLock to be held
+ */
+void Queue::consumed(const QueuedMessage& msg)
+{
+ for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
+ try{
+ (*i)->consumed(msg);
+ } catch (const std::exception& e) {
+ QPID_LOG(warning, "Exception on notification of message removal for queue " << getName() << ": " << e.what());
+ }
+ }
+}
+
void Queue::create(const FieldTable& _settings)
{
@@ -1233,3 +1344,179 @@ void Queue::UsageBarrier::destroy()
parent.deleted = true;
while (count) parent.messageLock.wait();
}
+
+
+// KAG TBD: flesh out...
+
+
+class MessageGroupManager : public QueueObserver, public MessageSelector
+{
+ const std::string groupIdHeader; // msg header holding group identifier
+ struct GroupState {
+ const std::string group; // group identifier
+ //Consumer::shared_ptr owner; // consumer with outstanding acquired messages
+ std::string owner; // consumer with outstanding acquired messages
+ uint32_t acquired; // count of outstanding acquired messages
+ uint32_t total; // count of enqueued messages in this group
+ GroupState() : acquired(0), total(0) {}
+ };
+ std::map<std::string, struct GroupState> messageGroups;
+ std::set<std::string> consumers;
+
+ public:
+
+ MessageGroupManager(const std::string& header, Queue *q )
+ : QueueObserver(), MessageSelector(q), groupIdHeader( header ) {}
+ void enqueued( const QueuedMessage& qm );
+ void removed( const QueuedMessage& qm );
+ void requeued( const QueuedMessage& qm );
+ void dequeued( const QueuedMessage& qm );
+ void consumerAdded( const Consumer& );
+ void consumerRemoved( const Consumer& );
+ bool nextMessage( Consumer::shared_ptr c, QueuedMessage& next,
+ const Mutex::ScopedLock&);
+ bool canAcquire(Consumer::shared_ptr consumer, const QueuedMessage& msg,
+ const Mutex::ScopedLock&);
+};
+
+
+namespace {
+ const std::string NO_GROUP("");
+ const std::string getGroupId( const QueuedMessage& qm, const std::string& key )
+ {
+ const qpid::framing::FieldTable* headers = qm.payload->getApplicationHeaders();
+ if (!headers) return NO_GROUP;
+ return headers->getAsString(key);
+ }
+}
+
+
+void MessageGroupManager::enqueued( const QueuedMessage& qm )
+{
+ std::string group( getGroupId(qm, groupIdHeader) );
+ messageGroups[group].total++;
+}
+
+
+void MessageGroupManager::removed( const QueuedMessage& qm )
+{
+ std::string group( getGroupId(qm, groupIdHeader) );
+ std::map<std::string, struct GroupState>::iterator gs = messageGroups.find( group );
+ assert( gs != messageGroups.end() );
+ gs->second.acquired += 1;
+}
+
+
+void MessageGroupManager::requeued( const QueuedMessage& qm )
+{
+ std::string group( getGroupId(qm, groupIdHeader) );
+ std::map<std::string, struct GroupState>::iterator gs = messageGroups.find( group );
+ assert( gs != messageGroups.end() );
+ GroupState& state( gs->second );
+ assert( state.acquired != 0 );
+ state.acquired -= 1;
+ if (state.acquired == 0 && !state.owner.empty()) {
+ state.owner.clear(); // KAG TODO: need to invalidate consumer's positions?
+ }
+}
+
+
+void MessageGroupManager::dequeued( const QueuedMessage& qm )
+{
+ std::string group( getGroupId(qm, groupIdHeader) );
+ std::map<std::string, struct GroupState>::iterator gs = messageGroups.find( group );
+ assert( gs != messageGroups.end() );
+ GroupState& state( gs->second );
+ assert( state.total != 0 );
+ state.total -= 1;
+ assert( state.acquired != 0 );
+ state.acquired -= 1;
+ if (state.total == 0) messageGroups.erase( gs );
+ else if (state.acquired == 0 && !state.owner.empty()) {
+ state.owner.clear(); // KAG TODO: need to invalidate consumer's positions?
+ }
+}
+
+void MessageGroupManager::consumerAdded( const Consumer& c )
+{
+ bool unique = consumers.insert( c.getName() ).second;
+ (void) unique; assert( unique );
+}
+
+void MessageGroupManager::consumerRemoved( const Consumer& c )
+{
+ size_t count = consumers.erase( c.getName() );
+ (void) count; assert( count == 1 );
+
+ bool needReset = false;
+ for (std::map<std::string, struct GroupState>::iterator gs = messageGroups.begin();
+ gs != messageGroups.end(); ++gs) {
+
+ GroupState& state( gs->second );
+ if (state.owner == c.getName()) {
+ state.owner.clear();
+ needReset = true;
+ }
+ }
+
+ if (needReset) {
+ // KAG TODO: How do I invalidate all consumers that need invalidating????
+ }
+}
+
+
+bool MessageGroupManager::nextMessage( Consumer::shared_ptr c, QueuedMessage& next,
+ const Mutex::ScopedLock& l)
+{
+ // KAG TODO: FIX!!!
+ return MessageSelector::nextMessage( c, next, l );
+}
+
+
+bool MessageGroupManager::canAcquire(Consumer::shared_ptr consumer, const QueuedMessage& qm,
+ const Mutex::ScopedLock&)
+{
+ std::string group( getGroupId(qm, groupIdHeader) );
+ std::map<std::string, struct GroupState>::iterator gs = messageGroups.find( group );
+ assert( gs != messageGroups.end() );
+ GroupState& state( gs->second );
+
+ if (state.owner.empty()) {
+ state.owner = consumer->getName();
+ return true;
+ }
+ return state.owner == consumer->getName();
+}
+
+
+
+
+
+// default selector - requires messageLock to be held by caller!
+bool MessageSelector::nextMessage( Consumer::shared_ptr c, QueuedMessage& next,
+ const Mutex::ScopedLock& /*just to enforce locking*/)
+{
+ Messages& messages(queue->getMessages());
+
+ if (messages.empty())
+ return false;
+
+ if (c->preAcquires()) { // not browsing
+ next = messages.front();
+ return true;
+ } else if (messages.next(c->position, next))
+ return true;
+ return false;
+}
+
+
+// default selector - requires messageLock to be held by caller!
+bool MessageSelector::canAcquire(Consumer::shared_ptr, const QueuedMessage&,
+ const Mutex::ScopedLock& /*just to enforce locking*/)
+{
+ return true; // always give permission to acquire
+}
+
+
+
+
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index 8435e75cab..a6bb0d6915 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -59,8 +59,8 @@ class MessageStore;
class QueueEvents;
class QueueRegistry;
class TransactionContext;
-class Exchange;
-
+class MessageSelector;
+
/**
* The brokers representation of an amqp queue. Messages are
* delivered to a queue from where they can be dispatched to
@@ -129,10 +129,10 @@ class Queue : public boost::enable_shared_from_this<Queue>,
UsageBarrier barrier;
int autoDeleteTimeout;
boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask;
+ std::auto_ptr<MessageSelector> selector;
void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false);
void setPolicy(std::auto_ptr<QueuePolicy> policy);
- bool seek(QueuedMessage& msg, Consumer::shared_ptr position);
bool getNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
ConsumeCode consumeNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
bool browseNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
@@ -142,10 +142,16 @@ class Queue : public boost::enable_shared_from_this<Queue>,
bool isExcluded(boost::intrusive_ptr<Message>& msg);
+ /** update queue observers with new message state */
void enqueued(const QueuedMessage& msg);
+ void consumed(const QueuedMessage& msg);
void dequeued(const QueuedMessage& msg);
- void pop();
- void popAndDequeue();
+
+ /** modify the Queue's message container - assumes messageLock held */
+ void pop(); // acquire front msg
+ void popAndDequeue(); // acquire and dequeue front msg
+ // acquire message @ position, return true and set msg if acquire succeeds
+ bool acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg );
void forcePersistent(QueuedMessage& msg);
int getEventMode();
@@ -191,8 +197,15 @@ class Queue : public boost::enable_shared_from_this<Queue>,
Broker* broker = 0);
QPID_BROKER_EXTERN ~Queue();
+ /** allow the Consumer to consume or browse the next available message */
QPID_BROKER_EXTERN bool dispatch(Consumer::shared_ptr);
+ /** allow the Consumer to acquire a message that it has browsed.
+ * @param msg - message to be acquired.
+ * @return false if message is no longer available for acquire.
+ */
+ QPID_BROKER_EXTERN bool acquire(const QueuedMessage& msg, const Consumer::shared_ptr c);
+
/**
* Used to configure a new queue and create a persistent record
* for it in store if required.
@@ -216,7 +229,11 @@ class Queue : public boost::enable_shared_from_this<Queue>,
bool bind(boost::shared_ptr<Exchange> exchange, const std::string& key,
const qpid::framing::FieldTable& arguments=qpid::framing::FieldTable());
- QPID_BROKER_EXTERN bool acquire(const QueuedMessage& msg);
+ /** Acquire the message at the given position if it is available for acquire. Not to
+ * be used by clients, but used by the broker for queue management.
+ * @param message - set to the acquired message if true returned.
+ * @return true if the message has been acquired.
+ */
QPID_BROKER_EXTERN bool acquireMessageAt(const qpid::framing::SequenceNumber& position, QueuedMessage& message);
/**
@@ -302,12 +319,12 @@ class Queue : public boost::enable_shared_from_this<Queue>,
bool isEnqueued(const QueuedMessage& msg);
/**
- * Gets the next available message
+ * Acquires the next available (oldest) message
*/
QPID_BROKER_EXTERN QueuedMessage get();
- /** Get the message at position pos */
- QPID_BROKER_EXTERN QueuedMessage find(framing::SequenceNumber pos) const;
+ /** Get the message at position pos, returns true if found and sets msg */
+ QPID_BROKER_EXTERN bool find(framing::SequenceNumber pos, QueuedMessage& msg ) const;
const QueuePolicy* getPolicy();
diff --git a/qpid/cpp/src/qpid/broker/QueueEvents.cpp b/qpid/cpp/src/qpid/broker/QueueEvents.cpp
index 2c540ff1ad..764faf5fd7 100644
--- a/qpid/cpp/src/qpid/broker/QueueEvents.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueEvents.cpp
@@ -129,6 +129,10 @@ class EventGenerator : public QueueObserver
{
if (!enqueueOnly) manager.dequeued(m);
}
+
+ void consumed(const QueuedMessage&) {};
+ void requeued(const QueuedMessage&) {};
+
private:
QueueEvents& manager;
const bool enqueueOnly;
diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
index fcf8d089f9..db18325c78 100644
--- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
@@ -377,11 +377,12 @@ void QueueFlowLimit::setState(const qpid::framing::FieldTable& state)
++i;
fcmsg.add(first, last);
for (SequenceNumber seq = first; seq <= last; ++seq) {
- QueuedMessage msg(queue->find(seq)); // fyi: msg.payload may be null if msg is delivered & unacked
+ QueuedMessage msg;
+ bool found = queue->find(seq, msg); // fyi: msg.payload may be null if msg is delivered & unacked
+ (void) found; assert(found); // avoid unused variable warning when NDEBUG set
bool unique;
unique = index.insert(std::pair<framing::SequenceNumber, boost::intrusive_ptr<Message> >(seq, msg.payload)).second;
- // Like this to avoid tripping up unused variable warning when NDEBUG set
- if (!unique) assert(unique);
+ (void) unique; assert(unique); // ditto NDEBUG warning
}
}
}
diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.h b/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
index c02e479976..3d4b31bb69 100644
--- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
+++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
@@ -84,6 +84,9 @@ class Broker;
QPID_BROKER_EXTERN void enqueued(const QueuedMessage&);
/** the queue has removed QueuedMessage. Returns true if flow state changes */
QPID_BROKER_EXTERN void dequeued(const QueuedMessage&);
+ /** ignored */
+ QPID_BROKER_EXTERN void consumed(const QueuedMessage&) {};
+ QPID_BROKER_EXTERN void requeued(const QueuedMessage&) {};
/** for clustering: */
QPID_BROKER_EXTERN void getState(qpid::framing::FieldTable&) const;
diff --git a/qpid/cpp/src/qpid/broker/QueueObserver.h b/qpid/cpp/src/qpid/broker/QueueObserver.h
index 3ca01c051e..9c3c186f23 100644
--- a/qpid/cpp/src/qpid/broker/QueueObserver.h
+++ b/qpid/cpp/src/qpid/broker/QueueObserver.h
@@ -25,17 +25,49 @@ namespace qpid {
namespace broker {
struct QueuedMessage;
+class Consumer;
+
/**
- * Interface for notifying classes who want to act as 'observers' of a
- * queue of particular events.
+ * Interface for notifying classes who want to act as 'observers' of a queue of particular
+ * events.
+ *
+ * The events that are monitored reflect the relationship between a particular message and
+ * the queue it has been delivered to. A message can be considered in one of three states
+ * with respect to the queue:
+ *
+ * 1) "Available" - available for transfer to consumers,
+ * 2) "Locked" - to a particular consumer, no longer available for transfer, but not
+ * considered fully dequeued.
+ * 3) "Dequeued" - removed from the queue and no longer available to any consumer.
+ *
+ * The queue events that are observable are:
+ *
+ * "Enqueued" - the message is "Available" - on the queue for transfer to any consumer
+ * (e.g. browse or acquire)
+ *
+ * "Consumed" - the message is "Locked" - a consumer has claimed exclusive access to it.
+ * It is no longer available for other consumers to browse or acquire, but it is not yet
+ * considered dequeued as it may be requeued by the consumer.
+ *
+ * "Requeued" - a previously-consumed message is 'unlocked': it is put back on the queue
+ * at its original position and returns to the "Available" state.
+ *
+ * "Dequeued" - a Locked message is no longer queued. At this point, the queue no longer
+ * tracks the message, and the broker considers the consumer's transaction complete.
*/
class QueueObserver
{
public:
virtual ~QueueObserver() {}
+
+ // note: the Queue will hold the messageLock while calling these methods!
virtual void enqueued(const QueuedMessage&) = 0;
+ virtual void consumed(const QueuedMessage&) = 0;
+ virtual void requeued(const QueuedMessage&) = 0;
virtual void dequeued(const QueuedMessage&) = 0;
- private:
+ virtual void consumerAdded( const Consumer& ) {};
+ virtual void consumerRemoved( const Consumer& ) {};
+ private:
};
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
index 6ae0d53b1a..0c245700af 100644
--- a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
+++ b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
@@ -269,8 +269,7 @@ bool RingQueuePolicy::checkLimit(boost::intrusive_ptr<Message> m)
do {
QueuedMessage oldest = queue.front();
-
- if (oldest.queue->acquire(oldest) || !strict) {
+ if (oldest.queue->acquireMessageAt(oldest.position, oldest) || !strict) {
queue.pop_front();
pendingDequeues.push_back(oldest);
QPID_LOG(debug, "Ring policy triggered in " << name
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index 73a0a5cf7b..c8f77ba64e 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -269,9 +269,8 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
) :
- Consumer(_acquire),
+ Consumer(_name, _acquire),
parent(_parent),
- name(_name),
queue(_queue),
ackExpected(ack),
acquire(_acquire),
@@ -295,7 +294,7 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
if (agent != 0)
{
- mgmtObject = new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId() ,name,
+ mgmtObject = new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId(), getName(),
!acquire, ackExpected, exclusive, ManagementAgent::toMap(arguments));
agent->addObject (mgmtObject);
mgmtObject->set_creditMode("WINDOW");
@@ -327,16 +326,15 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
{
assertClusterSafe();
allocateCredit(msg.payload);
- DeliveryRecord record(msg, queue, name, acquire, !ackExpected, windowing);
+ DeliveryRecord record(msg, queue, getName(), acquire, !ackExpected, windowing);
bool sync = syncFrequency && ++deliveryCount >= syncFrequency;
if (sync) deliveryCount = 0;//reset
parent->deliver(record, sync);
- if (!ackExpected && acquire) record.setEnded();//allows message to be released now its been delivered
if (windowing || ackExpected || !acquire) {
parent->record(record);
}
- if (acquire && !ackExpected) {
- queue->dequeue(0, msg);
+ if (acquire && !ackExpected) { // auto acquire && auto accept
+ record.accept( 0 /*no ctxt*/ );
}
if (mgmtObject) { mgmtObject->inc_delivered(); }
return true;
@@ -556,50 +554,61 @@ void SemanticState::deliver(DeliveryRecord& msg, bool sync)
return deliveryAdapter.deliver(msg, sync);
}
-SemanticState::ConsumerImpl& SemanticState::find(const std::string& destination)
+const SemanticState::ConsumerImpl::shared_ptr SemanticState::find(const std::string& destination) const
{
- ConsumerImplMap::iterator i = consumers.find(destination);
- if (i == consumers.end()) {
- throw NotFoundException(QPID_MSG("Unknown destination " << destination));
+ ConsumerImpl::shared_ptr consumer;
+ if (!find(destination, consumer)) {
+ throw NotFoundException(QPID_MSG("Unknown destination " << destination << " session=" << session.getSessionId()));
} else {
- return *(i->second);
+ return consumer;
+ }
+}
+
+bool SemanticState::find(const std::string& destination, ConsumerImpl::shared_ptr& consumer) const
+{
+ // @todo KAG gsim: shouldn't the consumers map be locked????
+ ConsumerImplMap::const_iterator i = consumers.find(destination);
+ if (i == consumers.end()) {
+ return false;
}
+ consumer = i->second;
+ return true;
}
void SemanticState::setWindowMode(const std::string& destination)
{
- find(destination).setWindowMode();
+ find(destination)->setWindowMode();
}
void SemanticState::setCreditMode(const std::string& destination)
{
- find(destination).setCreditMode();
+ find(destination)->setCreditMode();
}
void SemanticState::addByteCredit(const std::string& destination, uint32_t value)
{
- ConsumerImpl& c = find(destination);
- c.addByteCredit(value);
- c.requestDispatch();
+ ConsumerImpl::shared_ptr c = find(destination);
+ c->addByteCredit(value);
+ c->requestDispatch();
}
void SemanticState::addMessageCredit(const std::string& destination, uint32_t value)
{
- ConsumerImpl& c = find(destination);
- c.addMessageCredit(value);
- c.requestDispatch();
+ ConsumerImpl::shared_ptr c = find(destination);
+ c->addMessageCredit(value);
+ c->requestDispatch();
}
void SemanticState::flush(const std::string& destination)
{
- find(destination).flush();
+ find(destination)->flush();
}
void SemanticState::stop(const std::string& destination)
{
- find(destination).stop();
+ find(destination)->stop();
}
void SemanticState::ConsumerImpl::setWindowMode()
@@ -682,7 +691,7 @@ AckRange SemanticState::findRange(DeliveryId first, DeliveryId last)
void SemanticState::acquire(DeliveryId first, DeliveryId last, DeliveryIds& acquired)
{
AckRange range = findRange(first, last);
- for_each(range.start, range.end, AcquireFunctor(acquired));
+ for_each(range.start, range.end, AcquireFunctor(this, acquired));
}
void SemanticState::release(DeliveryId first, DeliveryId last, bool setRedelivered)
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h
index 8c69d6b89b..8947e1e35f 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.h
+++ b/qpid/cpp/src/qpid/broker/SemanticState.h
@@ -75,7 +75,6 @@ class SemanticState : private boost::noncopyable {
{
mutable qpid::sys::Mutex lock;
SemanticState* const parent;
- const std::string name;
const boost::shared_ptr<Queue> queue;
const bool ackExpected;
const bool acquire;
@@ -129,8 +128,6 @@ class SemanticState : private boost::noncopyable {
bool doOutput();
- std::string getName() const { return name; }
-
bool isAckExpected() const { return ackExpected; }
bool isAcquire() const { return acquire; }
bool isWindowing() const { return windowing; }
@@ -187,7 +184,8 @@ class SemanticState : private boost::noncopyable {
SessionContext& getSession() { return session; }
const SessionContext& getSession() const { return session; }
- ConsumerImpl& find(const std::string& destination);
+ const ConsumerImpl::shared_ptr find(const std::string& destination) const;
+ bool find(const std::string& destination, ConsumerImpl::shared_ptr&) const;
/**
* Get named queue, never returns 0.
diff --git a/qpid/cpp/src/qpid/broker/ThresholdAlerts.h b/qpid/cpp/src/qpid/broker/ThresholdAlerts.h
index c77722e700..c27c97d6f5 100644
--- a/qpid/cpp/src/qpid/broker/ThresholdAlerts.h
+++ b/qpid/cpp/src/qpid/broker/ThresholdAlerts.h
@@ -50,6 +50,9 @@ class ThresholdAlerts : public QueueObserver
const long repeatInterval);
void enqueued(const QueuedMessage&);
void dequeued(const QueuedMessage&);
+ void consumed(const QueuedMessage&) {};
+ void requeued(const QueuedMessage&) {};
+
static void observe(Queue& queue, qpid::management::ManagementAgent& agent,
const uint64_t countThreshold,
const uint64_t sizeThreshold,
diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp
index 030d6e34c1..792f9f65f4 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.cpp
+++ b/qpid/cpp/src/qpid/cluster/Connection.cpp
@@ -409,11 +409,11 @@ void Connection::shadowSetUser(const std::string& userId) {
void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled, const SequenceNumber& position)
{
- broker::SemanticState::ConsumerImpl& c = semanticState().find(name);
- c.position = position;
- c.setBlocked(blocked);
- if (notifyEnabled) c.enableNotify(); else c.disableNotify();
- updateIn.consumerNumbering.add(c.shared_from_this());
+ broker::SemanticState::ConsumerImpl::shared_ptr c = semanticState().find(name);
+ c->position = position;
+ c->setBlocked(blocked);
+ if (notifyEnabled) c->enableNotify(); else c->disableNotify();
+ updateIn.consumerNumbering.add(c);
}
@@ -444,7 +444,7 @@ void Connection::outputTask(uint16_t channel, const std::string& name) {
if (!session)
throw Exception(QPID_MSG(cluster << " channel not attached " << *this
<< "[" << channel << "] "));
- OutputTask* task = &session->getSemanticState().find(name);
+ OutputTask* task = session->getSemanticState().find(name).get();
connection->getOutputTasks().addOutputTask(task);
}
@@ -534,7 +534,7 @@ void Connection::deliveryRecord(const string& qname,
m.position = position;
if (enqueued) queue->updateEnqueued(m); //inform queue of the message
} else { // Message at original position in original queue
- m = queue->find(position);
+ queue->find(position, m);
}
if (!m.payload)
throw Exception(QPID_MSG("deliveryRecord no update message"));
diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp
index 6fdc4c69ad..1f1eb16af5 100644
--- a/qpid/cpp/src/tests/QueueTest.cpp
+++ b/qpid/cpp/src/tests/QueueTest.cpp
@@ -58,7 +58,7 @@ public:
intrusive_ptr<Message> last;
bool received;
- TestConsumer(bool acquire = true):Consumer(acquire), received(false) {};
+ TestConsumer(bool acquire = true):Consumer("test", acquire), received(false) {};
virtual bool deliver(QueuedMessage& msg){
last = msg.payload;
@@ -324,14 +324,18 @@ QPID_AUTO_TEST_CASE(testSearch){
queue->deliver(msg3);
SequenceNumber seq(2);
- QueuedMessage qm = queue->find(seq);
+ QueuedMessage qm;
+ TestConsumer::shared_ptr c1(new TestConsumer());
+
+ BOOST_CHECK(queue->find(seq, qm));
BOOST_CHECK_EQUAL(seq.getValue(), qm.position.getValue());
- queue->acquire(qm);
+ queue->acquire(qm, c1);
BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u);
SequenceNumber seq1(3);
- QueuedMessage qm1 = queue->find(seq1);
+ QueuedMessage qm1;
+ BOOST_CHECK(queue->find(seq1, qm1));
BOOST_CHECK_EQUAL(seq1.getValue(), qm1.position.getValue());
}
@@ -551,12 +555,13 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){
QueuedMessage qmsg2(queue.get(), msg2, ++sequence);
framing::SequenceNumber sequence1(10);
QueuedMessage qmsg3(queue.get(), 0, sequence1);
+ TestConsumer::shared_ptr dummy(new TestConsumer());
- BOOST_CHECK(!queue->acquire(qmsg));
- BOOST_CHECK(queue->acquire(qmsg2));
+ BOOST_CHECK(!queue->acquire(qmsg, dummy));
+ BOOST_CHECK(queue->acquire(qmsg2, dummy));
// Acquire the massage again to test failure case.
- BOOST_CHECK(!queue->acquire(qmsg2));
- BOOST_CHECK(!queue->acquire(qmsg3));
+ BOOST_CHECK(!queue->acquire(qmsg2, dummy));
+ BOOST_CHECK(!queue->acquire(qmsg3, dummy));
BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u);