summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2013-07-10 13:47:05 +0000
committerGordon Sim <gsim@apache.org>2013-07-10 13:47:05 +0000
commit04ad679eb369aac2a49dbccfeaeaa5964e688dd8 (patch)
treed7d482d9ca24a6922698034b1b66f56b7645e4d0 /cpp/src/qpid/broker/Queue.cpp
parented030b2309b484610bc3aca3c9b978cea1f6d00b (diff)
downloadqpid-python-04ad679eb369aac2a49dbccfeaeaa5964e688dd8.tar.gz
QPID-4976: support standard lifetime policies
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1501768 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp242
1 files changed, 165 insertions, 77 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index c402e3e016..1d0a8017ef 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -175,8 +175,6 @@ Queue::Queue(const string& _name, const QueueSettings& _settings,
name(_name),
store(_store),
owner(0),
- consumerCount(0),
- browserCount(0),
exclusive(0),
messages(new MessageDeque()),
persistenceId(0),
@@ -188,8 +186,8 @@ Queue::Queue(const string& _name, const QueueSettings& _settings,
allocator(new FifoDistributor( *messages )),
redirectSource(false)
{
- if (settings.maxDepth.hasCount()) current.setCount(0);
- if (settings.maxDepth.hasSize()) current.setSize(0);
+ current.setCount(0);//always track depth in messages
+ if (settings.maxDepth.hasSize()) current.setSize(0);//track depth in bytes only if policy requires it
if (settings.traceExcludes.size()) {
split(traceExclude, settings.traceExcludes, ", ");
}
@@ -340,6 +338,7 @@ void Queue::release(const QueueCursor& position, bool markRedelivered)
bool Queue::dequeueMessageAt(const SequenceNumber& position)
{
+ ScopedAutoDelete autodelete(*this);
boost::intrusive_ptr<PersistableMessage> pmsg;
{
Mutex::ScopedLock locker(messageLock);
@@ -348,7 +347,7 @@ bool Queue::dequeueMessageAt(const SequenceNumber& position)
Message* msg = messages->find(position, &cursor);
if (msg) {
if (msg->isPersistent()) pmsg = msg->getPersistentContext();
- observeDequeue(*msg, locker);
+ observeDequeue(*msg, locker, settings.autodelete ? &autodelete : 0);
messages->deleted(cursor);
} else {
QPID_LOG(debug, "Could not dequeue message at " << position << "; no such message");
@@ -385,6 +384,7 @@ bool Queue::getNextMessage(Message& m, Consumer::shared_ptr& c)
{
if (!checkNotDeleted(c)) return false;
QueueListeners::NotificationSet set;
+ ScopedAutoDelete autodelete(*this);
while (true) {
//TODO: reduce lock scope
Mutex::ScopedLock locker(messageLock);
@@ -393,7 +393,7 @@ bool Queue::getNextMessage(Message& m, Consumer::shared_ptr& c)
if (msg) {
if (msg->hasExpired()) {
QPID_LOG(debug, "Message expired from queue '" << name << "'");
- observeDequeue(*msg, locker);
+ observeDequeue(*msg, locker, settings.autodelete ? &autodelete : 0);
//ERROR: don't hold lock across call to store!!
if (msg->isPersistent()) dequeueFromStore(msg->getPersistentContext());
if (mgmtObject) {
@@ -490,14 +490,31 @@ bool Queue::find(SequenceNumber pos, Message& msg) const
return false;
}
+void Queue::markInUse(bool controlling)
+{
+ Mutex::ScopedLock locker(messageLock);
+ if (controlling) users.addLifecycleController();
+ else users.addOther();
+}
+
+void Queue::releaseFromUse(bool controlling)
+{
+ if (controlling) {
+ {
+ Mutex::ScopedLock locker(messageLock);
+ users.removeLifecycleController();
+ }
+ scheduleAutoDelete();
+ } else {
+ Mutex::ScopedLock locker(messageLock);
+ users.removeOther();
+ }
+}
+
void Queue::consume(Consumer::shared_ptr c, bool requestExclusive)
{
{
Mutex::ScopedLock locker(messageLock);
- // NOTE: consumerCount is actually a count of all
- // subscriptions, both acquiring and non-acquiring (browsers).
- // Check for exclusivity of acquiring consumers.
- size_t acquiringConsumers = consumerCount - browserCount;
if (c->preAcquires()) {
if(settings.isBrowseOnly) {
throw NotAllowedException(
@@ -509,7 +526,7 @@ void Queue::consume(Consumer::shared_ptr c, bool requestExclusive)
QPID_MSG("Queue " << getName()
<< " has an exclusive consumer. No more consumers allowed."));
} else if(requestExclusive) {
- if(acquiringConsumers) {
+ if(users.hasConsumers()) {
throw ResourceLockedException(
QPID_MSG("Queue " << getName()
<< " already has consumers. Exclusive access denied."));
@@ -517,13 +534,11 @@ void Queue::consume(Consumer::shared_ptr c, bool requestExclusive)
exclusive = c->getSession();
}
}
- }
- else if(c->isCounted()) {
- browserCount++;
+ users.addConsumer();
+ } else if(c->isCounted()) {
+ users.addBrowser();
}
if(c->isCounted()) {
- consumerCount++;
-
//reset auto deletion timer if necessary
if (settings.autoDeleteDelay && autoDeleteTask) {
autoDeleteTask->cancel();
@@ -542,14 +557,24 @@ void Queue::cancel(Consumer::shared_ptr c)
removeListener(c);
if(c->isCounted())
{
- Mutex::ScopedLock locker(messageLock);
- consumerCount--;
- if (!c->preAcquires()) browserCount--;
- if(exclusive) exclusive = 0;
- observeConsumerRemove(*c, locker);
- }
- if (mgmtObject != 0 && c->isCounted()) {
- mgmtObject->dec_consumerCount();
+ bool unused;
+ {
+ Mutex::ScopedLock locker(messageLock);
+ if (c->preAcquires()) {
+ users.removeConsumer();
+ if (exclusive) exclusive = 0;
+ } else {
+ users.removeBrowser();
+ }
+ observeConsumerRemove(*c, locker);
+ unused = !users.isUsed();
+ }
+ if (mgmtObject != 0) {
+ mgmtObject->dec_consumerCount();
+ }
+ if (unused && settings.autodelete) {
+ scheduleAutoDelete();
+ }
}
}
@@ -564,7 +589,7 @@ void Queue::purgeExpired(sys::Duration lapse) {
dequeueSincePurge -= count;
int seconds = int64_t(lapse)/qpid::sys::TIME_SEC;
if (seconds == 0 || count / seconds < 1) {
- uint32_t count = remove(0, boost::bind(&Message::hasExpired, _1), 0, CONSUMER);
+ uint32_t count = remove(0, boost::bind(&Message::hasExpired, _1), 0, CONSUMER, settings.autodelete);
QPID_LOG(debug, "Purged " << count << " expired messages from " << getName());
//
// Report the count of discarded-by-ttl messages
@@ -671,8 +696,9 @@ namespace {
}
} // end namespace
-uint32_t Queue::remove(const uint32_t maxCount, MessagePredicate p, MessageFunctor f, SubscriptionType type)
+uint32_t Queue::remove(const uint32_t maxCount, MessagePredicate p, MessageFunctor f, SubscriptionType type, bool triggerAutoDelete)
{
+ ScopedAutoDelete autodelete(*this);
std::deque<Message> removed;
{
QueueCursor c(type);
@@ -686,7 +712,7 @@ uint32_t Queue::remove(const uint32_t maxCount, MessagePredicate p, MessageFunct
//don't actually acquire, just act as if we did
observeAcquire(*m, locker);
}
- observeDequeue(*m, locker);
+ observeDequeue(*m, locker, triggerAutoDelete ? &autodelete : 0);
removed.push_back(*m);//takes a copy of the message
if (!messages->deleted(c)) {
QPID_LOG(warning, "Failed to correctly remove message from " << name << "; state is not consistent!");
@@ -726,7 +752,7 @@ uint32_t Queue::purge(const uint32_t qty, boost::shared_ptr<Exchange> dest,
const qpid::types::Variant::Map *filter)
{
std::auto_ptr<MessageFilter> mf(MessageFilter::create(filter));
- uint32_t count = remove(qty, boost::bind(&MessageFilter::match, mf.get(), _1), boost::bind(&reroute, dest, _1), CONSUMER/*?*/);
+ uint32_t count = remove(qty, boost::bind(&MessageFilter::match, mf.get(), _1), boost::bind(&reroute, dest, _1), CONSUMER/*?*/, settings.autodelete);
if (mgmtObject && count) {
mgmtObject->inc_acquires(count);
@@ -752,7 +778,7 @@ uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty,
const qpid::types::Variant::Map *filter)
{
std::auto_ptr<MessageFilter> mf(MessageFilter::create(filter));
- return remove(qty, boost::bind(&MessageFilter::match, mf.get(), _1), boost::bind(&moveTo, destq, _1), CONSUMER/*?*/);
+ return remove(qty, boost::bind(&MessageFilter::match, mf.get(), _1), boost::bind(&moveTo, destq, _1), CONSUMER/*?*/, settings.autodelete);
}
void Queue::push(Message& message, bool /*isRecovery*/)
@@ -779,15 +805,41 @@ uint32_t Queue::getMessageCount() const
uint32_t Queue::getConsumerCount() const
{
Mutex::ScopedLock locker(messageLock);
- return consumerCount;
+ return users.getSubscriberCount();
}
bool Queue::canAutoDelete() const
{
Mutex::ScopedLock locker(messageLock);
- return settings.autodelete && !consumerCount && !owner;
+ return !deleted && checkAutoDelete(locker);
+}
+
+bool Queue::checkAutoDelete(const Mutex::ScopedLock& lock) const
+{
+ if (settings.autodelete) {
+ switch (settings.lifetime) {
+ case QueueSettings::DELETE_IF_UNUSED:
+ return isUnused(lock);
+ case QueueSettings::DELETE_IF_EMPTY:
+ return !users.isInUseByController() && isEmpty(lock);
+ case QueueSettings::DELETE_IF_UNUSED_AND_EMPTY:
+ return isUnused(lock) && isEmpty(lock);
+ case QueueSettings::DELETE_ON_CLOSE:
+ return !users.isInUseByController();
+ }
+ }
+ return false;
+}
+
+bool Queue::isUnused(const Mutex::ScopedLock&) const
+{
+ return !owner && !users.isUsed();;
}
+bool Queue::isEmpty(const Mutex::ScopedLock&) const
+{
+ return current.getCount() == 0;
+}
/*
* return true if enqueue succeeded and message should be made
* available; returning false will result in the message being dropped
@@ -853,8 +905,9 @@ void Queue::dequeueCommited(const Message& msg)
//store and will not be available for delivery. The only action
//required is to ensure the observers are notified and the
//management stats are correctly decremented
+ ScopedAutoDelete autodelete(*this);
Mutex::ScopedLock locker(messageLock);
- observeDequeue(msg, locker);
+ observeDequeue(msg, locker, settings.autodelete ? &autodelete : 0);
if (mgmtObject != 0) {
mgmtObject->inc_msgTxnDequeues();
mgmtObject->inc_byteTxnDequeues(msg.getContentSize());
@@ -874,6 +927,7 @@ void Queue::dequeue(TransactionContext* ctxt, const QueueCursor& cursor)
{
ScopedUse u(barrier);
if (!u.acquired) return;
+ ScopedAutoDelete autodelete(*this);
boost::intrusive_ptr<PersistableMessage> pmsg;
{
Mutex::ScopedLock locker(messageLock);
@@ -881,7 +935,7 @@ void Queue::dequeue(TransactionContext* ctxt, const QueueCursor& cursor)
if (msg) {
if (msg->isPersistent()) pmsg = msg->getPersistentContext();
if (!ctxt) {
- observeDequeue(*msg, locker);
+ observeDequeue(*msg, locker, settings.autodelete ? &autodelete : 0);
messages->deleted(cursor);//message pointer not valid after this
}
} else {
@@ -895,11 +949,12 @@ void Queue::dequeue(TransactionContext* ctxt, const QueueCursor& cursor)
void Queue::dequeueCommitted(const QueueCursor& cursor)
{
+ ScopedAutoDelete autodelete(*this);
Mutex::ScopedLock locker(messageLock);
Message* msg = messages->find(cursor);
if (msg) {
const uint64_t contentSize = msg->getContentSize();
- observeDequeue(*msg, locker);
+ observeDequeue(*msg, locker, settings.autodelete ? &autodelete : 0);
if (mgmtObject != 0) {
mgmtObject->inc_msgTxnDequeues();
mgmtObject->inc_byteTxnDequeues(contentSize);
@@ -920,7 +975,7 @@ void Queue::dequeueCommitted(const QueueCursor& cursor)
* Updates policy and management when a message has been dequeued,
* Requires messageLock be held by caller.
*/
-void Queue::observeDequeue(const Message& msg, const Mutex::ScopedLock&)
+void Queue::observeDequeue(const Message& msg, const Mutex::ScopedLock& lock, ScopedAutoDelete* autodelete)
{
current -= QueueDepth(1, msg.getContentSize());
mgntDeqStats(msg, mgmtObject, brokerMgmtObject);
@@ -931,6 +986,7 @@ void Queue::observeDequeue(const Message& msg, const Mutex::ScopedLock&)
QPID_LOG(warning, "Exception on notification of dequeue for queue " << getName() << ": " << e.what());
}
}
+ if (autodelete && isEmpty(lock)) autodelete->check(lock);
}
/** updates queue observers when a message has become unavailable for transfer.
@@ -1053,9 +1109,10 @@ void Queue::abandoned(const Message& message)
void Queue::destroyed()
{
unbind(broker->getExchanges());
- remove(0, 0, boost::bind(&Queue::abandoned, this, _1), REPLICATOR/*even acquired message are treated as abandoned*/);
+ remove(0, 0, boost::bind(&Queue::abandoned, this, _1), REPLICATOR/*even acquired message are treated as abandoned*/, false);
if (alternateExchange.get()) {
alternateExchange->decAlternateUsers();
+ alternateExchange.reset();
}
if (store) {
@@ -1170,33 +1227,12 @@ boost::shared_ptr<Exchange> Queue::getAlternateExchange()
return alternateExchange;
}
-void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue, const std::string& connectionId, const std::string& userId)
-{
- if (broker.getQueues().destroyIf(queue->getName(),
- boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) {
- if (broker.getAcl())
- broker.getAcl()->recordDestroyQueue(queue->getName());
-
- QPID_LOG_CAT(debug, model, "Auto-delete queue: " << queue->getName()
- << " user:" << userId
- << " rhost:" << connectionId );
- queue->destroyed();
- } else {
- QPID_LOG_CAT(debug, model, "Auto-delete queue could not be deleted: " << queue->getName()
- << " user:" << userId
- << " rhost:" << connectionId );
- }
-}
-
struct AutoDeleteTask : qpid::sys::TimerTask
{
- Broker& broker;
Queue::shared_ptr queue;
- std::string connectionId;
- std::string userId;
- AutoDeleteTask(Broker& b, Queue::shared_ptr q, const std::string& cId, const std::string& uId, AbsTime fireTime)
- : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion:"+q->getName()), broker(b), queue(q), connectionId(cId), userId(uId) {}
+ AutoDeleteTask(Queue::shared_ptr q, AbsTime fireTime)
+ : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion:"+q->getName()), queue(q) {}
void fire()
{
@@ -1204,34 +1240,56 @@ struct AutoDeleteTask : qpid::sys::TimerTask
//created, but then became unused again before the task fired;
//in this case ignore this request as there will have already
//been a later task added
- tryAutoDeleteImpl(broker, queue, connectionId, userId);
+ queue->tryAutoDelete();
}
};
-void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue, const std::string& connectionId, const std::string& userId)
+void Queue::scheduleAutoDelete()
{
- if (queue->settings.autoDeleteDelay && queue->canAutoDelete()) {
- AbsTime time(now(), Duration(queue->settings.autoDeleteDelay * TIME_SEC));
- queue->autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new AutoDeleteTask(broker, queue, connectionId, userId, time));
- broker.getTimer().add(queue->autoDeleteTask);
- QPID_LOG(debug, "Timed auto-delete for " << queue->getName() << " initiated");
+ if (canAutoDelete()) {
+ if (settings.autoDeleteDelay) {
+ AbsTime time(now(), Duration(settings.autoDeleteDelay * TIME_SEC));
+ autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new AutoDeleteTask(shared_from_this(), time));
+ broker->getTimer().add(autoDeleteTask);
+ QPID_LOG(debug, "Timed auto-delete for " << getName() << " initiated");
+ } else {
+ tryAutoDelete();
+ }
+ }
+}
+
+void Queue::tryAutoDelete()
+{
+ if (broker->getQueues().destroyIf(name, boost::bind(boost::mem_fn(&Queue::canAutoDelete), shared_from_this()))) {
+ if (broker->getAcl())
+ broker->getAcl()->recordDestroyQueue(name);
+
+ QPID_LOG_CAT(debug, model, "Auto-delete queue deleted: " << name << " (" << deleted << ")");
+ destroyed();
} else {
- tryAutoDeleteImpl(broker, queue, connectionId, userId);
+ QPID_LOG_CAT(debug, model, "Auto-delete queue could not be deleted: " << name);
}
}
bool Queue::isExclusiveOwner(const OwnershipToken* const o) const
{
- Mutex::ScopedLock locker(ownershipLock);
+ Mutex::ScopedLock locker(messageLock);
return o == owner;
}
void Queue::releaseExclusiveOwnership()
{
- Mutex::ScopedLock locker(ownershipLock);
- owner = 0;
- if (mgmtObject) {
- mgmtObject->set_exclusive(false);
+ bool unused;
+ {
+ Mutex::ScopedLock locker(messageLock);
+ owner = 0;
+ if (mgmtObject) {
+ mgmtObject->set_exclusive(false);
+ }
+ unused = !users.isUsed();
+ }
+ if (unused && settings.autodelete) {
+ scheduleAutoDelete();
}
}
@@ -1241,7 +1299,7 @@ bool Queue::setExclusiveOwner(const OwnershipToken* const o)
if (settings.autoDeleteDelay && autoDeleteTask) {
autoDeleteTask->cancel();
}
- Mutex::ScopedLock locker(ownershipLock);
+ Mutex::ScopedLock locker(messageLock);
if (owner) {
return false;
} else {
@@ -1255,7 +1313,7 @@ bool Queue::setExclusiveOwner(const OwnershipToken* const o)
bool Queue::hasExclusiveOwner() const
{
- Mutex::ScopedLock locker(ownershipLock);
+ Mutex::ScopedLock locker(messageLock);
return owner != 0;
}
@@ -1388,7 +1446,7 @@ struct After {
void Queue::setPosition(SequenceNumber n) {
Mutex::ScopedLock locker(messageLock);
if (n < sequence) {
- remove(0, After(n), MessagePredicate(), BROWSER);
+ remove(0, After(n), MessagePredicate(), BROWSER, false);
}
sequence = n;
QPID_LOG(debug, "Set position to " << sequence << " on " << getName());
@@ -1450,6 +1508,12 @@ bool Queue::checkNotDeleted(const Consumer::shared_ptr& c)
return !deleted;
}
+bool Queue::isDeleted() const
+{
+ Mutex::ScopedLock lock(messageLock);
+ return deleted;
+}
+
void Queue::addObserver(boost::shared_ptr<QueueObserver> observer)
{
Mutex::ScopedLock lock(messageLock);
@@ -1495,6 +1559,7 @@ void Queue::setDequeueSincePurge(uint32_t value) {
void Queue::reject(const QueueCursor& cursor)
{
+ ScopedAutoDelete autodelete(*this);
Exchange::shared_ptr alternate = getAlternateExchange();
Message copy;
boost::intrusive_ptr<PersistableMessage> pmsg;
@@ -1505,7 +1570,7 @@ void Queue::reject(const QueueCursor& cursor)
if (alternate) copy = *message;
if (message->isPersistent()) pmsg = message->getPersistentContext();
countRejected();
- observeDequeue(*message, locker);
+ observeDequeue(*message, locker, settings.autodelete ? &autodelete : 0);
messages->deleted(cursor);
} else {
return;
@@ -1526,7 +1591,7 @@ void Queue::reject(const QueueCursor& cursor)
bool Queue::checkDepth(const QueueDepth& increment, const Message&)
{
- if (current && (settings.maxDepth - current < increment)) {
+ if (settings.maxDepth && (settings.maxDepth - current < increment)) {
if (mgmtObject) {
mgmtObject->inc_discardsOverflow();
if (brokerMgmtObject)
@@ -1619,6 +1684,29 @@ void Queue::setMgmtRedirectState( std::string peer, bool enabled, bool isSrc ) {
mgmtObject->set_redirectSource(isSrc);
}
}
+Queue::QueueUsers::QueueUsers() : consumers(0), browsers(0), others(0), controller(false) {}
+void Queue::QueueUsers::addConsumer() { ++consumers; }
+void Queue::QueueUsers::addBrowser() { ++browsers; }
+void Queue::QueueUsers::addLifecycleController() { assert(!controller); controller = true; }
+void Queue::QueueUsers::addOther(){ ++others; }
+void Queue::QueueUsers::removeConsumer() { assert(consumers > 0); --consumers; }
+void Queue::QueueUsers::removeBrowser() { assert(browsers > 0); --browsers; }
+void Queue::QueueUsers::removeLifecycleController() { assert(controller); controller = false; }
+void Queue::QueueUsers::removeOther() { assert(others > 0); --others; }
+bool Queue::QueueUsers::isInUseByController() const { return controller; }
+bool Queue::QueueUsers::isUsed() const { return controller || consumers || browsers || others; }
+uint32_t Queue::QueueUsers::getSubscriberCount() const { return consumers + browsers; }
+bool Queue::QueueUsers::hasConsumers() const { return consumers; }
+
+Queue::ScopedAutoDelete::ScopedAutoDelete(Queue& q) : queue(q), eligible(false) {}
+void Queue::ScopedAutoDelete::check(const sys::Mutex::ScopedLock& lock)
+{
+ eligible = queue.checkAutoDelete(lock);
+}
+Queue::ScopedAutoDelete::~ScopedAutoDelete()
+{
+ if (eligible) queue.scheduleAutoDelete();
+}
}}