summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2013-07-10 13:47:05 +0000
committerGordon Sim <gsim@apache.org>2013-07-10 13:47:05 +0000
commit28e1f71d646b9b0c86aba3bb0238acb0a89d45bd (patch)
treece49b2ebe8e373cbc165de12b2f899fae19d02d4
parenteaa8c11396b13c46c59c2030a23cc7763ecee9d7 (diff)
downloadqpid-python-28e1f71d646b9b0c86aba3bb0238acb0a89d45bd.tar.gz
QPID-4976: support standard lifetime policies
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1501768 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/amqp/descriptors.h14
-rw-r--r--qpid/cpp/src/qpid/broker/LossyQueue.cpp6
-rw-r--r--qpid/cpp/src/qpid/broker/Lvq.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp242
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h73
-rw-r--r--qpid/cpp/src/qpid/broker/QueueSettings.cpp13
-rw-r--r--qpid/cpp/src/qpid/broker/QueueSettings.h12
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp3
-rw-r--r--qpid/cpp/src/qpid/broker/SessionAdapter.cpp3
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Connection.cpp92
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Connection.h1
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/DataReader.cpp14
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/NodeProperties.cpp164
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/NodeProperties.h7
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp25
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Outgoing.h3
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Session.cpp75
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Session.h2
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp3
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp1
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp38
-rw-r--r--qpid/cpp/src/tests/QueueTest.cpp1
22 files changed, 588 insertions, 206 deletions
diff --git a/qpid/cpp/src/qpid/amqp/descriptors.h b/qpid/cpp/src/qpid/amqp/descriptors.h
index 2a5691beaf..248d6df2df 100644
--- a/qpid/cpp/src/qpid/amqp/descriptors.h
+++ b/qpid/cpp/src/qpid/amqp/descriptors.h
@@ -89,6 +89,18 @@ const uint64_t SELECTOR_FILTER_CODE(0x0000468C00000004ULL);
const uint64_t XQUERY_FILTER_CODE(0x0000468C00000005ULL);
}
+namespace lifetime_policy {
+const std::string DELETE_ON_CLOSE_SYMBOL("amqp:delete-on-close:list");
+const std::string DELETE_ON_NO_LINKS_SYMBOL("amqp:delete-on-no-links:list");
+const std::string DELETE_ON_NO_MESSAGES_SYMBOL("amqp:delete-on-no-messages:list");
+const std::string DELETE_ON_NO_LINKS_OR_MESSAGES_SYMBOL("amqp:delete-on-no-links-or-messages:list");
+
+const uint64_t DELETE_ON_CLOSE_CODE(0x2B);
+const uint64_t DELETE_ON_NO_LINKS_CODE(0x2C);
+const uint64_t DELETE_ON_NO_MESSAGES_CODE(0x2D);
+const uint64_t DELETE_ON_NO_LINKS_OR_MESSAGES_CODE(0x2E);
+}
+
namespace error_conditions {
//note these are not actually descriptors
const std::string INTERNAL_ERROR("amqp:internal-error");
@@ -97,6 +109,8 @@ const std::string UNAUTHORIZED_ACCESS("amqp:unauthorized-access");
const std::string DECODE_ERROR("amqp:decode-error");
const std::string NOT_ALLOWED("amqp:not-allowed");
const std::string RESOURCE_LIMIT_EXCEEDED("amqp:resource-limit-exceeded");
+const std::string RESOURCE_DELETED("amqp:resource-deleted");
+const std::string PRECONDITION_FAILED("amqp:precondition-failed");
}
}} // namespace qpid::amqp
diff --git a/qpid/cpp/src/qpid/broker/LossyQueue.cpp b/qpid/cpp/src/qpid/broker/LossyQueue.cpp
index be19185c3a..4104503dac 100644
--- a/qpid/cpp/src/qpid/broker/LossyQueue.cpp
+++ b/qpid/cpp/src/qpid/broker/LossyQueue.cpp
@@ -48,11 +48,11 @@ bool LossyQueue::checkDepth(const QueueDepth& increment, const Message& message)
<< name << ": size=" << increment.getSize() << ", max-size=" << settings.maxDepth.getSize()));
}
- while (settings.maxDepth && (current + increment > settings.maxDepth)) {
+ while (settings.maxDepth && (settings.maxDepth - current < increment)) {
QPID_LOG(debug, "purging " << name << ": current depth is [" << current << "], max depth is [" << settings.maxDepth << "], new message has size " << increment.getSize());
qpid::sys::Mutex::ScopedUnlock u(messageLock);
//TODO: arguably we should try and purge expired messages first but that is potentially expensive
- if (remove(1, settings.priorities ? boost::bind(&isLowerPriorityThan, message.getPriority(), _1) : MessagePredicate(), MessageFunctor(), PURGE)) {
+ if (remove(1, settings.priorities ? boost::bind(&isLowerPriorityThan, message.getPriority(), _1) : MessagePredicate(), MessageFunctor(), PURGE, false)) {
if (mgmtObject) {
mgmtObject->inc_discardsRing(1);
if (brokerMgmtObject)
@@ -65,7 +65,7 @@ bool LossyQueue::checkDepth(const QueueDepth& increment, const Message& message)
break;
}
}
- if (settings.maxDepth && (current + increment > settings.maxDepth)) {
+ if (settings.maxDepth && (settings.maxDepth - current < increment)) {
//will only be the case where we were unable to purge another
//message, which should only be the case if we are purging
//based on priority and there was no message with a lower (or
diff --git a/qpid/cpp/src/qpid/broker/Lvq.cpp b/qpid/cpp/src/qpid/broker/Lvq.cpp
index 89a47bb14e..ff13b97dbd 100644
--- a/qpid/cpp/src/qpid/broker/Lvq.cpp
+++ b/qpid/cpp/src/qpid/broker/Lvq.cpp
@@ -51,7 +51,7 @@ void Lvq::push(Message& message, bool isRecovery)
brokerMgmtObject->inc_discardsLvq();
}
}
- observeDequeue(old, locker);
+ observeDequeue(old, locker, 0/*can't be empty, so no need to check autodelete*/);
}
}
copy.notify();
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index c402e3e016..1d0a8017ef 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -175,8 +175,6 @@ Queue::Queue(const string& _name, const QueueSettings& _settings,
name(_name),
store(_store),
owner(0),
- consumerCount(0),
- browserCount(0),
exclusive(0),
messages(new MessageDeque()),
persistenceId(0),
@@ -188,8 +186,8 @@ Queue::Queue(const string& _name, const QueueSettings& _settings,
allocator(new FifoDistributor( *messages )),
redirectSource(false)
{
- if (settings.maxDepth.hasCount()) current.setCount(0);
- if (settings.maxDepth.hasSize()) current.setSize(0);
+ current.setCount(0);//always track depth in messages
+ if (settings.maxDepth.hasSize()) current.setSize(0);//track depth in bytes only if policy requires it
if (settings.traceExcludes.size()) {
split(traceExclude, settings.traceExcludes, ", ");
}
@@ -340,6 +338,7 @@ void Queue::release(const QueueCursor& position, bool markRedelivered)
bool Queue::dequeueMessageAt(const SequenceNumber& position)
{
+ ScopedAutoDelete autodelete(*this);
boost::intrusive_ptr<PersistableMessage> pmsg;
{
Mutex::ScopedLock locker(messageLock);
@@ -348,7 +347,7 @@ bool Queue::dequeueMessageAt(const SequenceNumber& position)
Message* msg = messages->find(position, &cursor);
if (msg) {
if (msg->isPersistent()) pmsg = msg->getPersistentContext();
- observeDequeue(*msg, locker);
+ observeDequeue(*msg, locker, settings.autodelete ? &autodelete : 0);
messages->deleted(cursor);
} else {
QPID_LOG(debug, "Could not dequeue message at " << position << "; no such message");
@@ -385,6 +384,7 @@ bool Queue::getNextMessage(Message& m, Consumer::shared_ptr& c)
{
if (!checkNotDeleted(c)) return false;
QueueListeners::NotificationSet set;
+ ScopedAutoDelete autodelete(*this);
while (true) {
//TODO: reduce lock scope
Mutex::ScopedLock locker(messageLock);
@@ -393,7 +393,7 @@ bool Queue::getNextMessage(Message& m, Consumer::shared_ptr& c)
if (msg) {
if (msg->hasExpired()) {
QPID_LOG(debug, "Message expired from queue '" << name << "'");
- observeDequeue(*msg, locker);
+ observeDequeue(*msg, locker, settings.autodelete ? &autodelete : 0);
//ERROR: don't hold lock across call to store!!
if (msg->isPersistent()) dequeueFromStore(msg->getPersistentContext());
if (mgmtObject) {
@@ -490,14 +490,31 @@ bool Queue::find(SequenceNumber pos, Message& msg) const
return false;
}
+void Queue::markInUse(bool controlling)
+{
+ Mutex::ScopedLock locker(messageLock);
+ if (controlling) users.addLifecycleController();
+ else users.addOther();
+}
+
+void Queue::releaseFromUse(bool controlling)
+{
+ if (controlling) {
+ {
+ Mutex::ScopedLock locker(messageLock);
+ users.removeLifecycleController();
+ }
+ scheduleAutoDelete();
+ } else {
+ Mutex::ScopedLock locker(messageLock);
+ users.removeOther();
+ }
+}
+
void Queue::consume(Consumer::shared_ptr c, bool requestExclusive)
{
{
Mutex::ScopedLock locker(messageLock);
- // NOTE: consumerCount is actually a count of all
- // subscriptions, both acquiring and non-acquiring (browsers).
- // Check for exclusivity of acquiring consumers.
- size_t acquiringConsumers = consumerCount - browserCount;
if (c->preAcquires()) {
if(settings.isBrowseOnly) {
throw NotAllowedException(
@@ -509,7 +526,7 @@ void Queue::consume(Consumer::shared_ptr c, bool requestExclusive)
QPID_MSG("Queue " << getName()
<< " has an exclusive consumer. No more consumers allowed."));
} else if(requestExclusive) {
- if(acquiringConsumers) {
+ if(users.hasConsumers()) {
throw ResourceLockedException(
QPID_MSG("Queue " << getName()
<< " already has consumers. Exclusive access denied."));
@@ -517,13 +534,11 @@ void Queue::consume(Consumer::shared_ptr c, bool requestExclusive)
exclusive = c->getSession();
}
}
- }
- else if(c->isCounted()) {
- browserCount++;
+ users.addConsumer();
+ } else if(c->isCounted()) {
+ users.addBrowser();
}
if(c->isCounted()) {
- consumerCount++;
-
//reset auto deletion timer if necessary
if (settings.autoDeleteDelay && autoDeleteTask) {
autoDeleteTask->cancel();
@@ -542,14 +557,24 @@ void Queue::cancel(Consumer::shared_ptr c)
removeListener(c);
if(c->isCounted())
{
- Mutex::ScopedLock locker(messageLock);
- consumerCount--;
- if (!c->preAcquires()) browserCount--;
- if(exclusive) exclusive = 0;
- observeConsumerRemove(*c, locker);
- }
- if (mgmtObject != 0 && c->isCounted()) {
- mgmtObject->dec_consumerCount();
+ bool unused;
+ {
+ Mutex::ScopedLock locker(messageLock);
+ if (c->preAcquires()) {
+ users.removeConsumer();
+ if (exclusive) exclusive = 0;
+ } else {
+ users.removeBrowser();
+ }
+ observeConsumerRemove(*c, locker);
+ unused = !users.isUsed();
+ }
+ if (mgmtObject != 0) {
+ mgmtObject->dec_consumerCount();
+ }
+ if (unused && settings.autodelete) {
+ scheduleAutoDelete();
+ }
}
}
@@ -564,7 +589,7 @@ void Queue::purgeExpired(sys::Duration lapse) {
dequeueSincePurge -= count;
int seconds = int64_t(lapse)/qpid::sys::TIME_SEC;
if (seconds == 0 || count / seconds < 1) {
- uint32_t count = remove(0, boost::bind(&Message::hasExpired, _1), 0, CONSUMER);
+ uint32_t count = remove(0, boost::bind(&Message::hasExpired, _1), 0, CONSUMER, settings.autodelete);
QPID_LOG(debug, "Purged " << count << " expired messages from " << getName());
//
// Report the count of discarded-by-ttl messages
@@ -671,8 +696,9 @@ namespace {
}
} // end namespace
-uint32_t Queue::remove(const uint32_t maxCount, MessagePredicate p, MessageFunctor f, SubscriptionType type)
+uint32_t Queue::remove(const uint32_t maxCount, MessagePredicate p, MessageFunctor f, SubscriptionType type, bool triggerAutoDelete)
{
+ ScopedAutoDelete autodelete(*this);
std::deque<Message> removed;
{
QueueCursor c(type);
@@ -686,7 +712,7 @@ uint32_t Queue::remove(const uint32_t maxCount, MessagePredicate p, MessageFunct
//don't actually acquire, just act as if we did
observeAcquire(*m, locker);
}
- observeDequeue(*m, locker);
+ observeDequeue(*m, locker, triggerAutoDelete ? &autodelete : 0);
removed.push_back(*m);//takes a copy of the message
if (!messages->deleted(c)) {
QPID_LOG(warning, "Failed to correctly remove message from " << name << "; state is not consistent!");
@@ -726,7 +752,7 @@ uint32_t Queue::purge(const uint32_t qty, boost::shared_ptr<Exchange> dest,
const qpid::types::Variant::Map *filter)
{
std::auto_ptr<MessageFilter> mf(MessageFilter::create(filter));
- uint32_t count = remove(qty, boost::bind(&MessageFilter::match, mf.get(), _1), boost::bind(&reroute, dest, _1), CONSUMER/*?*/);
+ uint32_t count = remove(qty, boost::bind(&MessageFilter::match, mf.get(), _1), boost::bind(&reroute, dest, _1), CONSUMER/*?*/, settings.autodelete);
if (mgmtObject && count) {
mgmtObject->inc_acquires(count);
@@ -752,7 +778,7 @@ uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty,
const qpid::types::Variant::Map *filter)
{
std::auto_ptr<MessageFilter> mf(MessageFilter::create(filter));
- return remove(qty, boost::bind(&MessageFilter::match, mf.get(), _1), boost::bind(&moveTo, destq, _1), CONSUMER/*?*/);
+ return remove(qty, boost::bind(&MessageFilter::match, mf.get(), _1), boost::bind(&moveTo, destq, _1), CONSUMER/*?*/, settings.autodelete);
}
void Queue::push(Message& message, bool /*isRecovery*/)
@@ -779,15 +805,41 @@ uint32_t Queue::getMessageCount() const
uint32_t Queue::getConsumerCount() const
{
Mutex::ScopedLock locker(messageLock);
- return consumerCount;
+ return users.getSubscriberCount();
}
bool Queue::canAutoDelete() const
{
Mutex::ScopedLock locker(messageLock);
- return settings.autodelete && !consumerCount && !owner;
+ return !deleted && checkAutoDelete(locker);
+}
+
+bool Queue::checkAutoDelete(const Mutex::ScopedLock& lock) const
+{
+ if (settings.autodelete) {
+ switch (settings.lifetime) {
+ case QueueSettings::DELETE_IF_UNUSED:
+ return isUnused(lock);
+ case QueueSettings::DELETE_IF_EMPTY:
+ return !users.isInUseByController() && isEmpty(lock);
+ case QueueSettings::DELETE_IF_UNUSED_AND_EMPTY:
+ return isUnused(lock) && isEmpty(lock);
+ case QueueSettings::DELETE_ON_CLOSE:
+ return !users.isInUseByController();
+ }
+ }
+ return false;
+}
+
+bool Queue::isUnused(const Mutex::ScopedLock&) const
+{
+ return !owner && !users.isUsed();;
}
+bool Queue::isEmpty(const Mutex::ScopedLock&) const
+{
+ return current.getCount() == 0;
+}
/*
* return true if enqueue succeeded and message should be made
* available; returning false will result in the message being dropped
@@ -853,8 +905,9 @@ void Queue::dequeueCommited(const Message& msg)
//store and will not be available for delivery. The only action
//required is to ensure the observers are notified and the
//management stats are correctly decremented
+ ScopedAutoDelete autodelete(*this);
Mutex::ScopedLock locker(messageLock);
- observeDequeue(msg, locker);
+ observeDequeue(msg, locker, settings.autodelete ? &autodelete : 0);
if (mgmtObject != 0) {
mgmtObject->inc_msgTxnDequeues();
mgmtObject->inc_byteTxnDequeues(msg.getContentSize());
@@ -874,6 +927,7 @@ void Queue::dequeue(TransactionContext* ctxt, const QueueCursor& cursor)
{
ScopedUse u(barrier);
if (!u.acquired) return;
+ ScopedAutoDelete autodelete(*this);
boost::intrusive_ptr<PersistableMessage> pmsg;
{
Mutex::ScopedLock locker(messageLock);
@@ -881,7 +935,7 @@ void Queue::dequeue(TransactionContext* ctxt, const QueueCursor& cursor)
if (msg) {
if (msg->isPersistent()) pmsg = msg->getPersistentContext();
if (!ctxt) {
- observeDequeue(*msg, locker);
+ observeDequeue(*msg, locker, settings.autodelete ? &autodelete : 0);
messages->deleted(cursor);//message pointer not valid after this
}
} else {
@@ -895,11 +949,12 @@ void Queue::dequeue(TransactionContext* ctxt, const QueueCursor& cursor)
void Queue::dequeueCommitted(const QueueCursor& cursor)
{
+ ScopedAutoDelete autodelete(*this);
Mutex::ScopedLock locker(messageLock);
Message* msg = messages->find(cursor);
if (msg) {
const uint64_t contentSize = msg->getContentSize();
- observeDequeue(*msg, locker);
+ observeDequeue(*msg, locker, settings.autodelete ? &autodelete : 0);
if (mgmtObject != 0) {
mgmtObject->inc_msgTxnDequeues();
mgmtObject->inc_byteTxnDequeues(contentSize);
@@ -920,7 +975,7 @@ void Queue::dequeueCommitted(const QueueCursor& cursor)
* Updates policy and management when a message has been dequeued,
* Requires messageLock be held by caller.
*/
-void Queue::observeDequeue(const Message& msg, const Mutex::ScopedLock&)
+void Queue::observeDequeue(const Message& msg, const Mutex::ScopedLock& lock, ScopedAutoDelete* autodelete)
{
current -= QueueDepth(1, msg.getContentSize());
mgntDeqStats(msg, mgmtObject, brokerMgmtObject);
@@ -931,6 +986,7 @@ void Queue::observeDequeue(const Message& msg, const Mutex::ScopedLock&)
QPID_LOG(warning, "Exception on notification of dequeue for queue " << getName() << ": " << e.what());
}
}
+ if (autodelete && isEmpty(lock)) autodelete->check(lock);
}
/** updates queue observers when a message has become unavailable for transfer.
@@ -1053,9 +1109,10 @@ void Queue::abandoned(const Message& message)
void Queue::destroyed()
{
unbind(broker->getExchanges());
- remove(0, 0, boost::bind(&Queue::abandoned, this, _1), REPLICATOR/*even acquired message are treated as abandoned*/);
+ remove(0, 0, boost::bind(&Queue::abandoned, this, _1), REPLICATOR/*even acquired message are treated as abandoned*/, false);
if (alternateExchange.get()) {
alternateExchange->decAlternateUsers();
+ alternateExchange.reset();
}
if (store) {
@@ -1170,33 +1227,12 @@ boost::shared_ptr<Exchange> Queue::getAlternateExchange()
return alternateExchange;
}
-void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue, const std::string& connectionId, const std::string& userId)
-{
- if (broker.getQueues().destroyIf(queue->getName(),
- boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) {
- if (broker.getAcl())
- broker.getAcl()->recordDestroyQueue(queue->getName());
-
- QPID_LOG_CAT(debug, model, "Auto-delete queue: " << queue->getName()
- << " user:" << userId
- << " rhost:" << connectionId );
- queue->destroyed();
- } else {
- QPID_LOG_CAT(debug, model, "Auto-delete queue could not be deleted: " << queue->getName()
- << " user:" << userId
- << " rhost:" << connectionId );
- }
-}
-
struct AutoDeleteTask : qpid::sys::TimerTask
{
- Broker& broker;
Queue::shared_ptr queue;
- std::string connectionId;
- std::string userId;
- AutoDeleteTask(Broker& b, Queue::shared_ptr q, const std::string& cId, const std::string& uId, AbsTime fireTime)
- : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion:"+q->getName()), broker(b), queue(q), connectionId(cId), userId(uId) {}
+ AutoDeleteTask(Queue::shared_ptr q, AbsTime fireTime)
+ : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion:"+q->getName()), queue(q) {}
void fire()
{
@@ -1204,34 +1240,56 @@ struct AutoDeleteTask : qpid::sys::TimerTask
//created, but then became unused again before the task fired;
//in this case ignore this request as there will have already
//been a later task added
- tryAutoDeleteImpl(broker, queue, connectionId, userId);
+ queue->tryAutoDelete();
}
};
-void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue, const std::string& connectionId, const std::string& userId)
+void Queue::scheduleAutoDelete()
{
- if (queue->settings.autoDeleteDelay && queue->canAutoDelete()) {
- AbsTime time(now(), Duration(queue->settings.autoDeleteDelay * TIME_SEC));
- queue->autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new AutoDeleteTask(broker, queue, connectionId, userId, time));
- broker.getTimer().add(queue->autoDeleteTask);
- QPID_LOG(debug, "Timed auto-delete for " << queue->getName() << " initiated");
+ if (canAutoDelete()) {
+ if (settings.autoDeleteDelay) {
+ AbsTime time(now(), Duration(settings.autoDeleteDelay * TIME_SEC));
+ autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new AutoDeleteTask(shared_from_this(), time));
+ broker->getTimer().add(autoDeleteTask);
+ QPID_LOG(debug, "Timed auto-delete for " << getName() << " initiated");
+ } else {
+ tryAutoDelete();
+ }
+ }
+}
+
+void Queue::tryAutoDelete()
+{
+ if (broker->getQueues().destroyIf(name, boost::bind(boost::mem_fn(&Queue::canAutoDelete), shared_from_this()))) {
+ if (broker->getAcl())
+ broker->getAcl()->recordDestroyQueue(name);
+
+ QPID_LOG_CAT(debug, model, "Auto-delete queue deleted: " << name << " (" << deleted << ")");
+ destroyed();
} else {
- tryAutoDeleteImpl(broker, queue, connectionId, userId);
+ QPID_LOG_CAT(debug, model, "Auto-delete queue could not be deleted: " << name);
}
}
bool Queue::isExclusiveOwner(const OwnershipToken* const o) const
{
- Mutex::ScopedLock locker(ownershipLock);
+ Mutex::ScopedLock locker(messageLock);
return o == owner;
}
void Queue::releaseExclusiveOwnership()
{
- Mutex::ScopedLock locker(ownershipLock);
- owner = 0;
- if (mgmtObject) {
- mgmtObject->set_exclusive(false);
+ bool unused;
+ {
+ Mutex::ScopedLock locker(messageLock);
+ owner = 0;
+ if (mgmtObject) {
+ mgmtObject->set_exclusive(false);
+ }
+ unused = !users.isUsed();
+ }
+ if (unused && settings.autodelete) {
+ scheduleAutoDelete();
}
}
@@ -1241,7 +1299,7 @@ bool Queue::setExclusiveOwner(const OwnershipToken* const o)
if (settings.autoDeleteDelay && autoDeleteTask) {
autoDeleteTask->cancel();
}
- Mutex::ScopedLock locker(ownershipLock);
+ Mutex::ScopedLock locker(messageLock);
if (owner) {
return false;
} else {
@@ -1255,7 +1313,7 @@ bool Queue::setExclusiveOwner(const OwnershipToken* const o)
bool Queue::hasExclusiveOwner() const
{
- Mutex::ScopedLock locker(ownershipLock);
+ Mutex::ScopedLock locker(messageLock);
return owner != 0;
}
@@ -1388,7 +1446,7 @@ struct After {
void Queue::setPosition(SequenceNumber n) {
Mutex::ScopedLock locker(messageLock);
if (n < sequence) {
- remove(0, After(n), MessagePredicate(), BROWSER);
+ remove(0, After(n), MessagePredicate(), BROWSER, false);
}
sequence = n;
QPID_LOG(debug, "Set position to " << sequence << " on " << getName());
@@ -1450,6 +1508,12 @@ bool Queue::checkNotDeleted(const Consumer::shared_ptr& c)
return !deleted;
}
+bool Queue::isDeleted() const
+{
+ Mutex::ScopedLock lock(messageLock);
+ return deleted;
+}
+
void Queue::addObserver(boost::shared_ptr<QueueObserver> observer)
{
Mutex::ScopedLock lock(messageLock);
@@ -1495,6 +1559,7 @@ void Queue::setDequeueSincePurge(uint32_t value) {
void Queue::reject(const QueueCursor& cursor)
{
+ ScopedAutoDelete autodelete(*this);
Exchange::shared_ptr alternate = getAlternateExchange();
Message copy;
boost::intrusive_ptr<PersistableMessage> pmsg;
@@ -1505,7 +1570,7 @@ void Queue::reject(const QueueCursor& cursor)
if (alternate) copy = *message;
if (message->isPersistent()) pmsg = message->getPersistentContext();
countRejected();
- observeDequeue(*message, locker);
+ observeDequeue(*message, locker, settings.autodelete ? &autodelete : 0);
messages->deleted(cursor);
} else {
return;
@@ -1526,7 +1591,7 @@ void Queue::reject(const QueueCursor& cursor)
bool Queue::checkDepth(const QueueDepth& increment, const Message&)
{
- if (current && (settings.maxDepth - current < increment)) {
+ if (settings.maxDepth && (settings.maxDepth - current < increment)) {
if (mgmtObject) {
mgmtObject->inc_discardsOverflow();
if (brokerMgmtObject)
@@ -1619,6 +1684,29 @@ void Queue::setMgmtRedirectState( std::string peer, bool enabled, bool isSrc ) {
mgmtObject->set_redirectSource(isSrc);
}
}
+Queue::QueueUsers::QueueUsers() : consumers(0), browsers(0), others(0), controller(false) {}
+void Queue::QueueUsers::addConsumer() { ++consumers; }
+void Queue::QueueUsers::addBrowser() { ++browsers; }
+void Queue::QueueUsers::addLifecycleController() { assert(!controller); controller = true; }
+void Queue::QueueUsers::addOther(){ ++others; }
+void Queue::QueueUsers::removeConsumer() { assert(consumers > 0); --consumers; }
+void Queue::QueueUsers::removeBrowser() { assert(browsers > 0); --browsers; }
+void Queue::QueueUsers::removeLifecycleController() { assert(controller); controller = false; }
+void Queue::QueueUsers::removeOther() { assert(others > 0); --others; }
+bool Queue::QueueUsers::isInUseByController() const { return controller; }
+bool Queue::QueueUsers::isUsed() const { return controller || consumers || browsers || others; }
+uint32_t Queue::QueueUsers::getSubscriberCount() const { return consumers + browsers; }
+bool Queue::QueueUsers::hasConsumers() const { return consumers; }
+
+Queue::ScopedAutoDelete::ScopedAutoDelete(Queue& q) : queue(q), eligible(false) {}
+void Queue::ScopedAutoDelete::check(const sys::Mutex::ScopedLock& lock)
+{
+ eligible = queue.checkAutoDelete(lock);
+}
+Queue::ScopedAutoDelete::~ScopedAutoDelete()
+{
+ if (eligible) queue.scheduleAutoDelete();
+}
}}
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index 29b711075a..5598ee5d13 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -87,6 +87,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
typedef boost::shared_ptr<Queue> shared_ptr;
protected:
+ friend struct AutoDeleteTask;
struct UsageBarrier
{
Queue& parent;
@@ -119,6 +120,54 @@ class Queue : public boost::enable_shared_from_this<Queue>,
void rollback() throw();
};
+ /**
+ * This class tracks whether a queue is in use and how it is being
+ * used.
+ */
+ class QueueUsers
+ {
+ public:
+ QueueUsers();
+ void addConsumer();
+ void addBrowser();
+ void addOther();
+ void removeConsumer();
+ void removeBrowser();
+ void addLifecycleController();
+ void removeLifecycleController();
+ void removeOther();
+ bool isUsed() const;
+ uint32_t getSubscriberCount() const;
+ bool hasConsumers() const;
+ bool isInUseByController() const;
+ private:
+ uint32_t consumers;
+ uint32_t browsers;
+ uint32_t others;
+ bool controller;
+ };
+
+ /**
+ * This class is used to check - and if necessary trigger -
+ * autodeletion when removing messages, as this could cause the
+ * queue to become empty (which is one possible trigger for
+ * autodeletion).
+ *
+ * The constructor and descructor should be called outside the
+ * message lock. The check method should be called while holding
+ * the message lock.
+ */
+ class ScopedAutoDelete
+ {
+ public:
+ ScopedAutoDelete(Queue& q);
+ void check(const sys::Mutex::ScopedLock& lock);
+ ~ScopedAutoDelete();
+ private:
+ Queue& queue;
+ bool eligible;
+ };
+
typedef std::set< boost::shared_ptr<QueueObserver> > Observers;
enum ConsumeCode {NO_MESSAGES=0, CANT_CONSUME=1, CONSUMED=2};
typedef boost::function1<void, Message&> MessageFunctor;
@@ -126,8 +175,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
const std::string name;
MessageStore* store;
const OwnershipToken* owner;
- uint32_t consumerCount; // Actually a count of all subscriptions, acquiring or not.
- uint32_t browserCount; // Count of non-acquiring subscriptions.
+ QueueUsers users;
OwnershipToken* exclusive;
std::vector<std::string> traceExclude;
QueueListeners listeners;
@@ -149,7 +197,6 @@ class Queue : public boost::enable_shared_from_this<Queue>,
* o Queue::UsageBarrier (TBD: move under separate lock)
*/
mutable qpid::sys::Mutex messageLock;
- mutable qpid::sys::Mutex ownershipLock;
mutable uint64_t persistenceId;
QueueSettings settings;
qpid::framing::FieldTable encodableSettings;
@@ -176,6 +223,9 @@ class Queue : public boost::enable_shared_from_this<Queue>,
Queue::shared_ptr redirectPeer;
bool redirectSource;
+ bool checkAutoDelete(const qpid::sys::Mutex::ScopedLock&) const;
+ bool isUnused(const qpid::sys::Mutex::ScopedLock&) const;
+ bool isEmpty(const qpid::sys::Mutex::ScopedLock&) const;
virtual void push(Message& msg, bool isRecovery=false);
bool accept(const Message&);
void process(Message& msg);
@@ -191,7 +241,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
void observeEnqueue(const Message& msg, const sys::Mutex::ScopedLock& lock);
void observeAcquire(const Message& msg, const sys::Mutex::ScopedLock& lock);
void observeRequeue(const Message& msg, const sys::Mutex::ScopedLock& lock);
- void observeDequeue(const Message& msg, const sys::Mutex::ScopedLock& lock);
+ void observeDequeue(const Message& msg, const sys::Mutex::ScopedLock& lock, ScopedAutoDelete*);
void observeConsumerAdd( const Consumer&, const sys::Mutex::ScopedLock& lock);
void observeConsumerRemove( const Consumer&, const sys::Mutex::ScopedLock& lock);
@@ -203,9 +253,9 @@ class Queue : public boost::enable_shared_from_this<Queue>,
void abandoned(const Message& message);
bool checkNotDeleted(const Consumer::shared_ptr&);
void notifyDeleted();
- uint32_t remove(uint32_t maxCount, MessagePredicate, MessageFunctor, SubscriptionType);
+ uint32_t remove(uint32_t maxCount, MessagePredicate, MessageFunctor, SubscriptionType, bool triggerAutoDelete);
virtual bool checkDepth(const QueueDepth& increment, const Message&);
-
+ void tryAutoDelete();
public:
typedef std::vector<shared_ptr> vector;
@@ -284,6 +334,14 @@ class Queue : public boost::enable_shared_from_this<Queue>,
QPID_BROKER_EXTERN void consume(Consumer::shared_ptr c,
bool exclusive = false);
QPID_BROKER_EXTERN void cancel(Consumer::shared_ptr c);
+ /**
+ * Used to indicate that the queue is being used in some other
+ * context than by a subscriber. The controlling flag should only
+ * be set if the mode of use is the one that caused the queue to
+ * be created.
+ */
+ QPID_BROKER_EXTERN void markInUse(bool controlling=false);
+ QPID_BROKER_EXTERN void releaseFromUse(bool controlling=false);
QPID_BROKER_EXTERN uint32_t purge(const uint32_t purge_request=0, //defaults to all messages
boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>(),
@@ -308,6 +366,8 @@ class Queue : public boost::enable_shared_from_this<Queue>,
inline const qpid::framing::FieldTable& getEncodableSettings() const { return encodableSettings; }
inline bool isAutoDelete() const { return settings.autodelete; }
QPID_BROKER_EXTERN bool canAutoDelete() const;
+ QPID_BROKER_EXTERN void scheduleAutoDelete();
+ QPID_BROKER_EXTERN bool isDeleted() const;
const QueueBindings& getBindings() const { return bindings; }
/**
@@ -340,7 +400,6 @@ class Queue : public boost::enable_shared_from_this<Queue>,
* exclusive owner
*/
static Queue::shared_ptr restore(QueueRegistry& queues, framing::Buffer& buffer);
- QPID_BROKER_EXTERN static void tryAutoDelete(Broker& broker, Queue::shared_ptr, const std::string& connectionId, const std::string& userId);
virtual void setExternalQueueStore(ExternalQueueStore* inst);
diff --git a/qpid/cpp/src/qpid/broker/QueueSettings.cpp b/qpid/cpp/src/qpid/broker/QueueSettings.cpp
index 5a149c1c6d..fd90d11d76 100644
--- a/qpid/cpp/src/qpid/broker/QueueSettings.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueSettings.cpp
@@ -58,6 +58,9 @@ const std::string PAGING("qpid.paging");
const std::string MAX_PAGES("qpid.max_pages_loaded");
const std::string PAGE_FACTOR("qpid.page_factor");
const std::string FILTER("qpid.filter");
+const std::string LIFETIME_POLICY("qpid.lifetime-policy");
+const std::string DELETE_IF_UNUSED_KEY("delete-if-unused");
+const std::string DELETE_IF_UNUSED_AND_EMPTY_KEY("delete-if-unused-and-empty");
const std::string LVQ_LEGACY("qpid.last_value_queue");
const std::string LVQ_LEGACY_KEY("qpid.LVQ_key");
@@ -86,6 +89,7 @@ const QueueSettings::Aliases QueueSettings::aliases;
QueueSettings::QueueSettings(bool d, bool a) :
durable(d),
autodelete(a),
+ lifetime(DELETE_IF_UNUSED),
isTemporary(false),
priorities(0),
defaultFairshare(0),
@@ -214,6 +218,15 @@ bool QueueSettings::handle(const std::string& key, const qpid::types::Variant& v
} else if (key == FILTER) {
filter = value.asString();
return true;
+ } else if (key == LIFETIME_POLICY) {
+ if (value.asString() == DELETE_IF_UNUSED_KEY) {
+ lifetime = DELETE_IF_UNUSED;
+ } else if (value.asString() == DELETE_IF_UNUSED_AND_EMPTY_KEY) {
+ lifetime = DELETE_IF_UNUSED_AND_EMPTY;
+ } else {
+ QPID_LOG(warning, "Invalid value for " << LIFETIME_POLICY << ": " << value);
+ }
+ return true;
} else {
return false;
}
diff --git a/qpid/cpp/src/qpid/broker/QueueSettings.h b/qpid/cpp/src/qpid/broker/QueueSettings.h
index 19667e93ae..166445be18 100644
--- a/qpid/cpp/src/qpid/broker/QueueSettings.h
+++ b/qpid/cpp/src/qpid/broker/QueueSettings.h
@@ -40,9 +40,21 @@ namespace broker {
struct QueueSettings
{
QPID_BROKER_EXTERN QueueSettings(bool durable=false, bool autodelete=false);
+ /**
+ * The lifetime policy dictates when an autodelete queue is
+ * eligible for delete.
+ */
+ enum LifetimePolicy
+ {
+ DELETE_IF_UNUSED = 0,
+ DELETE_IF_EMPTY,
+ DELETE_IF_UNUSED_AND_EMPTY,
+ DELETE_ON_CLOSE
+ };
bool durable;
bool autodelete;
+ LifetimePolicy lifetime;
bool isTemporary;
//basic queue types:
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index dd7a25aaa4..4570e3bd87 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -449,9 +449,6 @@ void SemanticState::cancel(ConsumerImpl::shared_ptr c)
Queue::shared_ptr queue = c->getQueue();
if(queue) {
queue->cancel(c);
- // Only run auto-delete for counted consumers.
- if (c->isCounted() && queue->canAutoDelete() && !queue->hasExclusiveOwner())
- Queue::tryAutoDelete(session.getBroker(), queue, connectionId, userID);
}
c->cancel();
}
diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
index 2d4868628f..0124e88832 100644
--- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -227,9 +227,6 @@ void SessionAdapter::QueueHandlerImpl::destroyExclusiveQueues()
while (!exclusiveQueues.empty()) {
Queue::shared_ptr q(exclusiveQueues.front());
q->releaseExclusiveOwnership();
- if (q->canAutoDelete()) {
- Queue::tryAutoDelete(broker, q, connectionId, userId);
- }
exclusiveQueues.erase(exclusiveQueues.begin());
}
}
diff --git a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
index fa0d719bf9..2cb0994138 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
@@ -44,7 +44,7 @@ Connection::Connection(qpid::sys::OutputControl& o, const std::string& i, Broker
: BrokerContext(b), ManagedConnection(getBroker(), i),
connection(pn_connection()),
transport(pn_transport()),
- out(o), id(i), haveOutput(true)
+ out(o), id(i), haveOutput(true), closeInitiated(false)
{
if (pn_transport_bind(transport, connection)) {
//error
@@ -130,9 +130,6 @@ size_t Connection::encode(char* buffer, size_t size)
QPID_LOG_CAT(debug, network, id << " encoded " << n << " bytes from " << size)
haveOutput = true;
return n;
- } else if (n == PN_EOS) {
- haveOutput = size;
- return size;//Is this right?
} else if (n == PN_ERR) {
throw Exception(qpid::amqp::error_conditions::INTERNAL_ERROR, QPID_MSG("Error on output: " << getError()));
} else {
@@ -142,23 +139,29 @@ size_t Connection::encode(char* buffer, size_t size)
}
bool Connection::canEncode()
{
- try {
- for (Sessions::iterator i = sessions.begin();i != sessions.end(); ++i) {
- if (i->second->dispatch()) haveOutput = true;
+ if (!closeInitiated) {
+ try {
+ for (Sessions::iterator i = sessions.begin();i != sessions.end(); ++i) {
+ if (i->second->dispatch()) haveOutput = true;
+ }
+ process();
+ } catch (const Exception& e) {
+ QPID_LOG(error, id << ": " << e.what());
+ pn_condition_t* error = pn_connection_condition(connection);
+ pn_condition_set_name(error, e.symbol());
+ pn_condition_set_description(error, e.what());
+ close();
+ haveOutput = true;
+ } catch (const std::exception& e) {
+ QPID_LOG(error, id << ": " << e.what());
+ pn_condition_t* error = pn_connection_condition(connection);
+ pn_condition_set_name(error, qpid::amqp::error_conditions::INTERNAL_ERROR.c_str());
+ pn_condition_set_description(error, e.what());
+ close();
+ haveOutput = true;
}
- process();
- } catch (const Exception& e) {
- QPID_LOG(error, id << ": " << e.what());
- pn_condition_t* error = pn_connection_condition(connection);
- pn_condition_set_name(error, e.symbol());
- pn_condition_set_description(error, e.what());
- close();
- } catch (const std::exception& e) {
- QPID_LOG(error, id << ": " << e.what());
- pn_condition_t* error = pn_connection_condition(connection);
- pn_condition_set_name(error, qpid::amqp::error_conditions::INTERNAL_ERROR.c_str());
- pn_condition_set_description(error, e.what());
- close();
+ } else {
+ QPID_LOG(info, "Connection " << id << " has been closed locally");
}
//TODO: proper handling of time in and out of tick
pn_transport_tick(transport, 0);
@@ -195,9 +198,12 @@ void Connection::closed()
}
void Connection::close()
{
- closed();
- QPID_LOG_CAT(debug, model, id << " connection closed");
- pn_connection_close(connection);
+ if (!closeInitiated) {
+ closeInitiated = true;
+ closed();
+ QPID_LOG_CAT(debug, model, id << " connection closed");
+ pn_connection_close(connection);
+ }
}
bool Connection::isClosed() const
{
@@ -256,29 +262,29 @@ void Connection::process()
//handle deliveries
for (pn_delivery_t* delivery = pn_work_head(connection); delivery; delivery = pn_work_next(delivery)) {
pn_link_t* link = pn_delivery_link(delivery);
- if (pn_link_is_receiver(link)) {
- Sessions::iterator i = sessions.find(pn_link_session(link));
- if (i != sessions.end()) {
- try {
+ try {
+ if (pn_link_is_receiver(link)) {
+ Sessions::iterator i = sessions.find(pn_link_session(link));
+ if (i != sessions.end()) {
i->second->readable(link, delivery);
- } catch (const Exception& e) {
- QPID_LOG_CAT(error, protocol, "Error on publish: " << e.what());
- pn_condition_t* error = pn_link_condition(link);
- pn_condition_set_name(error, e.symbol());
- pn_condition_set_description(error, e.what());
- pn_link_close(link);
+ } else {
+ pn_delivery_update(delivery, PN_REJECTED);
+ }
+ } else { //i.e. SENDER
+ Sessions::iterator i = sessions.find(pn_link_session(link));
+ if (i != sessions.end()) {
+ QPID_LOG(trace, id << " handling outgoing delivery for " << link << " on session " << pn_link_session(link));
+ i->second->writable(link, delivery);
+ } else {
+ QPID_LOG(error, id << " Got delivery for non-existent session: " << pn_link_session(link) << ", link: " << link);
}
- } else {
- pn_delivery_update(delivery, PN_REJECTED);
- }
- } else { //i.e. SENDER
- Sessions::iterator i = sessions.find(pn_link_session(link));
- if (i != sessions.end()) {
- QPID_LOG(trace, id << " handling outgoing delivery for " << link << " on session " << pn_link_session(link));
- i->second->writable(link, delivery);
- } else {
- QPID_LOG(error, id << " Got delivery for non-existent session: " << pn_link_session(link) << ", link: " << link);
}
+ } catch (const Exception& e) {
+ QPID_LOG_CAT(error, protocol, "Error processing deliveries: " << e.what());
+ pn_condition_t* error = pn_link_condition(link);
+ pn_condition_set_name(error, e.symbol());
+ pn_condition_set_description(error, e.what());
+ pn_link_close(link);
}
}
diff --git a/qpid/cpp/src/qpid/broker/amqp/Connection.h b/qpid/cpp/src/qpid/broker/amqp/Connection.h
index 1384e3560d..e1ae34f899 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Connection.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Connection.h
@@ -67,6 +67,7 @@ class Connection : public BrokerContext, public sys::ConnectionCodec, public Man
const std::string id;
bool haveOutput;
Sessions sessions;
+ bool closeInitiated;
virtual void process();
std::string getError();
diff --git a/qpid/cpp/src/qpid/broker/amqp/DataReader.cpp b/qpid/cpp/src/qpid/broker/amqp/DataReader.cpp
index 1140032174..957134d0e6 100644
--- a/qpid/cpp/src/qpid/broker/amqp/DataReader.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/DataReader.cpp
@@ -160,13 +160,15 @@ void DataReader::readArray(pn_data_t* /*data*/, const qpid::amqp::Descriptor* /*
void DataReader::readList(pn_data_t* data, const qpid::amqp::Descriptor* descriptor)
{
size_t count = pn_data_get_list(data);
- reader.onStartList(count, qpid::amqp::CharSequence(), descriptor);
- pn_data_enter(data);
- for (size_t i = 0; i < count && pn_data_next(data); ++i) {
- read(data);
+ bool skip = reader.onStartList(count, qpid::amqp::CharSequence(), descriptor);
+ if (!skip) {
+ pn_data_enter(data);
+ for (size_t i = 0; i < count && pn_data_next(data); ++i) {
+ read(data);
+ }
+ pn_data_exit(data);
+ reader.onEndList(count, descriptor);
}
- pn_data_exit(data);
- reader.onEndList(count, descriptor);
}
void DataReader::readMap(pn_data_t* data, const qpid::amqp::Descriptor* descriptor)
diff --git a/qpid/cpp/src/qpid/broker/amqp/NodeProperties.cpp b/qpid/cpp/src/qpid/broker/amqp/NodeProperties.cpp
index a937c1171e..eb30c78128 100644
--- a/qpid/cpp/src/qpid/broker/amqp/NodeProperties.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/NodeProperties.cpp
@@ -20,10 +20,16 @@
*/
#include "qpid/broker/amqp/NodeProperties.h"
#include "qpid/broker/amqp/DataReader.h"
+#include "qpid/broker/QueueSettings.h"
#include "qpid/amqp/CharSequence.h"
+#include "qpid/amqp/Descriptor.h"
+#include "qpid/amqp/descriptors.h"
#include "qpid/types/Variant.h"
#include "qpid/broker/QueueSettings.h"
#include "qpid/log/Statement.h"
+extern "C" {
+#include <proton/engine.h>
+}
using qpid::amqp::CharSequence;
using qpid::amqp::Descriptor;
@@ -36,6 +42,7 @@ namespace {
const std::string MOVE("move");
const std::string COPY("copy");
const std::string SUPPORTED_DIST_MODES("supported-dist-modes");
+const std::string LIFETIME_POLICY("lifetime-policy");
//AMQP 0-10 standard parameters:
const std::string DURABLE("durable");
@@ -43,9 +50,57 @@ const std::string EXCLUSIVE("exclusive");
const std::string AUTO_DELETE("auto-delete");
const std::string ALTERNATE_EXCHANGE("alternate-exchange");
const std::string EXCHANGE_TYPE("exchange-type");
+
+pn_bytes_t convert(const std::string& s)
+{
+ pn_bytes_t result;
+ result.start = const_cast<char*>(s.data());
+ result.size = s.size();
+ return result;
}
-NodeProperties::NodeProperties() : queue(true), durable(false), autoDelete(false), exclusive(false), exchangeType("topic") {}
+bool getLifetimePolicy(const Descriptor& d, QueueSettings::LifetimePolicy& policy)
+{
+ if (d.match(qpid::amqp::lifetime_policy::DELETE_ON_CLOSE_SYMBOL, qpid::amqp::lifetime_policy::DELETE_ON_CLOSE_CODE)) {
+ policy = QueueSettings::DELETE_ON_CLOSE;
+ return true;
+ } else if (d.match(qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_SYMBOL, qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_CODE)) {
+ policy = QueueSettings::DELETE_IF_UNUSED;
+ return true;
+ } else if (d.match(qpid::amqp::lifetime_policy::DELETE_ON_NO_MESSAGES_SYMBOL, qpid::amqp::lifetime_policy::DELETE_ON_NO_MESSAGES_CODE)) {
+ policy = QueueSettings::DELETE_IF_EMPTY;
+ return true;
+ } else if (d.match(qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_OR_MESSAGES_SYMBOL, qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_OR_MESSAGES_CODE)) {
+ policy = QueueSettings::DELETE_IF_UNUSED_AND_EMPTY;
+ return true;
+ } else {
+ return false;
+ }
+}
+
+bool getLifetimeDescriptorSymbol(QueueSettings::LifetimePolicy policy, pn_bytes_t& out)
+{
+ switch (policy) {
+ case QueueSettings::DELETE_ON_CLOSE:
+ out = convert(qpid::amqp::lifetime_policy::DELETE_ON_CLOSE_SYMBOL);
+ return true;
+ case QueueSettings::DELETE_IF_UNUSED:
+ out = convert(qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_SYMBOL);
+ return true;
+ case QueueSettings::DELETE_IF_EMPTY:
+ out = convert(qpid::amqp::lifetime_policy::DELETE_ON_NO_MESSAGES_SYMBOL);
+ return true;
+ case QueueSettings::DELETE_IF_UNUSED_AND_EMPTY:
+ out = convert(qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_OR_MESSAGES_SYMBOL);
+ return true;
+ default:
+ return false;
+ }
+}
+
+}
+
+NodeProperties::NodeProperties() : queue(true), durable(false), autoDelete(false), exclusive(false), exchangeType("topic"), lifetime(QueueSettings::DELETE_IF_UNUSED) {}
void NodeProperties::read(pn_data_t* data)
{
@@ -53,12 +108,38 @@ void NodeProperties::read(pn_data_t* data)
reader.read(data);
}
-void NodeProperties::process(const std::string& key, const qpid::types::Variant& value)
+void NodeProperties::write(pn_data_t* data)
+{
+ pn_data_put_map(data);
+ pn_data_enter(data);
+ pn_data_put_symbol(data, convert(SUPPORTED_DIST_MODES));
+ pn_data_put_string(data, convert(queue ? MOVE : COPY));
+ pn_bytes_t symbol;
+ if (autoDelete && getLifetimeDescriptorSymbol(lifetime, symbol)) {
+ pn_data_put_symbol(data, convert(LIFETIME_POLICY));
+ pn_data_put_described(data);
+ pn_data_enter(data);
+ pn_data_put_symbol(data, symbol);
+ pn_data_put_list(data);
+ pn_data_exit(data);
+ }
+ pn_data_exit(data);
+}
+
+void NodeProperties::process(const std::string& key, const qpid::types::Variant& value, const Descriptor* d)
{
- QPID_LOG(notice, "Processing node property " << key << " = " << value);
+ QPID_LOG(debug, "Processing node property " << key << " = " << value);
if (key == SUPPORTED_DIST_MODES) {
if (value == MOVE) queue = true;
else if (value == COPY) queue = false;
+ } else if (key == LIFETIME_POLICY) {
+ if (d) {
+ if (getLifetimePolicy(*d, lifetime)) {
+ autoDelete = true;
+ } else {
+ QPID_LOG(warning, "Unrecognised lifetime policy: " << *d);
+ }
+ }
} else if (key == DURABLE) {
durable = value;
} else if (key == EXCLUSIVE) {
@@ -74,84 +155,91 @@ void NodeProperties::process(const std::string& key, const qpid::types::Variant&
}
}
-void NodeProperties::onNullValue(const CharSequence& key, const Descriptor*)
+bool NodeProperties::onStartListValue(const qpid::amqp::CharSequence& key, uint32_t count, const qpid::amqp::Descriptor* d)
{
- process(key.str(), qpid::types::Variant());
+ QPID_LOG(debug, "NodeProperties::onStartListValue(" << std::string(key.data, key.size) << ", " << count << ", " << d);
+ process(key.str(), qpid::types::Variant(), d);
+ return true;
}
-void NodeProperties::onBooleanValue(const CharSequence& key, bool value, const Descriptor*)
+void NodeProperties::onNullValue(const CharSequence& key, const Descriptor* d)
{
- process(key.str(), value);
+ process(key.str(), qpid::types::Variant(), d);
}
-void NodeProperties::onUByteValue(const CharSequence& key, uint8_t value, const Descriptor*)
+void NodeProperties::onBooleanValue(const CharSequence& key, bool value, const Descriptor* d)
{
- process(key.str(), value);
+ process(key.str(), value, d);
}
-void NodeProperties::onUShortValue(const CharSequence& key, uint16_t value, const Descriptor*)
+void NodeProperties::onUByteValue(const CharSequence& key, uint8_t value, const Descriptor* d)
{
- process(key.str(), value);
+ process(key.str(), value, d);
}
-void NodeProperties::onUIntValue(const CharSequence& key, uint32_t value, const Descriptor*)
+void NodeProperties::onUShortValue(const CharSequence& key, uint16_t value, const Descriptor* d)
{
- process(key.str(), value);
+ process(key.str(), value, d);
}
-void NodeProperties::onULongValue(const CharSequence& key, uint64_t value, const Descriptor*)
+void NodeProperties::onUIntValue(const CharSequence& key, uint32_t value, const Descriptor* d)
{
- process(key.str(), value);
+ process(key.str(), value, d);
}
-void NodeProperties::onByteValue(const CharSequence& key, int8_t value, const Descriptor*)
+void NodeProperties::onULongValue(const CharSequence& key, uint64_t value, const Descriptor* d)
{
- process(key.str(), value);
+ process(key.str(), value, d);
}
-void NodeProperties::onShortValue(const CharSequence& key, int16_t value, const Descriptor*)
+void NodeProperties::onByteValue(const CharSequence& key, int8_t value, const Descriptor* d)
{
- process(key.str(), value);
+ process(key.str(), value, d);
}
-void NodeProperties::onIntValue(const CharSequence& key, int32_t value, const Descriptor*)
+void NodeProperties::onShortValue(const CharSequence& key, int16_t value, const Descriptor* d)
{
- process(key.str(), value);
+ process(key.str(), value, d);
}
-void NodeProperties::onLongValue(const CharSequence& key, int64_t value, const Descriptor*)
+void NodeProperties::onIntValue(const CharSequence& key, int32_t value, const Descriptor* d)
{
- process(key.str(), value);
+ process(key.str(), value, d);
}
-void NodeProperties::onFloatValue(const CharSequence& key, float value, const Descriptor*)
+void NodeProperties::onLongValue(const CharSequence& key, int64_t value, const Descriptor* d)
{
- process(key.str(), value);
+ process(key.str(), value, d);
}
-void NodeProperties::onDoubleValue(const CharSequence& key, double value, const Descriptor*)
+void NodeProperties::onFloatValue(const CharSequence& key, float value, const Descriptor* d)
{
- process(key.str(), value);
+ process(key.str(), value, d);
}
-void NodeProperties::onUuidValue(const CharSequence& key, const CharSequence& value, const Descriptor*)
+void NodeProperties::onDoubleValue(const CharSequence& key, double value, const Descriptor* d)
{
- process(key.str(), value.str());
+ process(key.str(), value, d);
}
-void NodeProperties::onTimestampValue(const CharSequence& key, int64_t value, const Descriptor*)
+void NodeProperties::onUuidValue(const CharSequence& key, const CharSequence& value, const Descriptor* d)
{
- process(key.str(), value);
+ process(key.str(), value.str(), d);
}
-void NodeProperties::onStringValue(const CharSequence& key, const CharSequence& value, const Descriptor*)
+void NodeProperties::onTimestampValue(const CharSequence& key, int64_t value, const Descriptor* d)
{
- process(key.str(), value.str());
+ process(key.str(), value, d);
}
-void NodeProperties::onSymbolValue(const CharSequence& key, const CharSequence& value, const Descriptor*)
+void NodeProperties::onStringValue(const CharSequence& key, const CharSequence& value, const Descriptor* d)
{
- process(key.str(), value.str());
+ process(key.str(), value.str(), d);
+}
+
+void NodeProperties::onSymbolValue(const CharSequence& key, const CharSequence& value, const Descriptor* d)
+{
+ process(key.str(), value.str(), d);
}
QueueSettings NodeProperties::getQueueSettings()
@@ -159,6 +247,7 @@ QueueSettings NodeProperties::getQueueSettings()
QueueSettings settings(durable, autoDelete);
qpid::types::Variant::Map unused;
settings.populate(properties, unused);
+ settings.lifetime = lifetime;
return settings;
}
@@ -183,4 +272,9 @@ std::string NodeProperties::getAlternateExchange() const
return alternateExchange;
}
+bool NodeProperties::trackControllingLink() const
+{
+ return lifetime == QueueSettings::DELETE_ON_CLOSE || lifetime == QueueSettings::DELETE_IF_EMPTY;
+}
+
}}} // namespace qpid::broker::amqp
diff --git a/qpid/cpp/src/qpid/broker/amqp/NodeProperties.h b/qpid/cpp/src/qpid/broker/amqp/NodeProperties.h
index 881fc4e30f..03780c10a9 100644
--- a/qpid/cpp/src/qpid/broker/amqp/NodeProperties.h
+++ b/qpid/cpp/src/qpid/broker/amqp/NodeProperties.h
@@ -23,6 +23,7 @@
*/
#include "qpid/amqp/MapReader.h"
#include "qpid/types/Variant.h"
+#include "qpid/broker/QueueSettings.h"
struct pn_data_t;
namespace qpid {
@@ -35,6 +36,7 @@ class NodeProperties : public qpid::amqp::MapReader
public:
NodeProperties();
void read(pn_data_t*);
+ void write(pn_data_t*);
void onNullValue(const qpid::amqp::CharSequence&, const qpid::amqp::Descriptor*);
void onBooleanValue(const qpid::amqp::CharSequence&, bool, const qpid::amqp::Descriptor*);
void onUByteValue(const qpid::amqp::CharSequence&, uint8_t, const qpid::amqp::Descriptor*);
@@ -51,12 +53,14 @@ class NodeProperties : public qpid::amqp::MapReader
void onTimestampValue(const qpid::amqp::CharSequence&, int64_t, const qpid::amqp::Descriptor*);
void onStringValue(const qpid::amqp::CharSequence&, const qpid::amqp::CharSequence&, const qpid::amqp::Descriptor*);
void onSymbolValue(const qpid::amqp::CharSequence&, const qpid::amqp::CharSequence&, const qpid::amqp::Descriptor*);
+ bool onStartListValue(const qpid::amqp::CharSequence&, uint32_t count, const qpid::amqp::Descriptor*);
bool isQueue() const;
QueueSettings getQueueSettings();
bool isDurable() const;
bool isExclusive() const;
std::string getExchangeType() const;
std::string getAlternateExchange() const;
+ bool trackControllingLink() const;
private:
bool queue;
bool durable;
@@ -65,8 +69,9 @@ class NodeProperties : public qpid::amqp::MapReader
std::string exchangeType;
std::string alternateExchange;
qpid::types::Variant::Map properties;
+ QueueSettings::LifetimePolicy lifetime;
- void process(const std::string&, const qpid::types::Variant&);
+ void process(const std::string&, const qpid::types::Variant&, const qpid::amqp::Descriptor*);
};
}}} // namespace qpid::broker::amqp
diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
index 7dbafb2fd1..68ff979aa4 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
@@ -19,6 +19,7 @@
*
*/
#include "qpid/broker/amqp/Outgoing.h"
+#include "qpid/broker/amqp/Exception.h"
#include "qpid/broker/amqp/Header.h"
#include "qpid/broker/amqp/Session.h"
#include "qpid/broker/amqp/Translation.h"
@@ -26,7 +27,9 @@
#include "qpid/broker/Selector.h"
#include "qpid/broker/TopicKeyNode.h"
#include "qpid/sys/OutputControl.h"
+#include "qpid/amqp/descriptors.h"
#include "qpid/amqp/MessageEncoder.h"
+#include "qpid/framing/reply_exceptions.h"
#include "qpid/log/Statement.h"
namespace qpid {
@@ -41,10 +44,11 @@ void Outgoing::wakeup()
session.wakeup();
}
-OutgoingFromQueue::OutgoingFromQueue(Broker& broker, const std::string& source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l, Session& session, qpid::sys::OutputControl& o, bool e)
+OutgoingFromQueue::OutgoingFromQueue(Broker& broker, const std::string& source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l, Session& session, qpid::sys::OutputControl& o, bool e, bool p)
: Outgoing(broker, session, source, target, pn_link_name(l)),
Consumer(pn_link_name(l), /*FIXME*/CONSUMER),
exclusive(e),
+ isControllingUser(p),
queue(q), deliveries(5000), link(l), out(o),
current(0), outstanding(0),
buffer(1024)/*used only for header at present*/
@@ -52,6 +56,7 @@ OutgoingFromQueue::OutgoingFromQueue(Broker& broker, const std::string& source,
for (size_t i = 0 ; i < deliveries.capacity(); ++i) {
deliveries[i].init(i);
}
+ if (isControllingUser) queue->markInUse(true);
}
void OutgoingFromQueue::init()
@@ -63,11 +68,15 @@ bool OutgoingFromQueue::doWork()
{
QPID_LOG(trace, "Dispatching to " << getName() << ": " << pn_link_credit(link));
if (canDeliver()) {
- if (queue->dispatch(shared_from_this())) {
- return true;
- } else {
- pn_link_drained(link);
- QPID_LOG(debug, "No message available on " << queue->getName());
+ try{
+ if (queue->dispatch(shared_from_this())) {
+ return true;
+ } else {
+ pn_link_drained(link);
+ QPID_LOG(debug, "No message available on " << queue->getName());
+ }
+ } catch (const qpid::framing::ResourceDeletedException& e) {
+ throw Exception(qpid::amqp::error_conditions::RESOURCE_DELETED, e.what());
}
} else {
QPID_LOG(debug, "Can't deliver to " << getName() << " from " << queue->getName() << ": " << pn_link_credit(link));
@@ -142,14 +151,14 @@ bool OutgoingFromQueue::canDeliver()
void OutgoingFromQueue::detached()
{
- QPID_LOG(debug, "Detaching outgoing link from " << queue->getName());
+ QPID_LOG(debug, "Detaching outgoing link " << getName() << " from " << queue->getName());
queue->cancel(shared_from_this());
//TODO: release in a clearer order?
for (size_t i = 0 ; i < deliveries.capacity(); ++i) {
if (deliveries[i].msg) queue->release(deliveries[i].cursor, true);
}
if (exclusive) queue->releaseExclusiveOwnership();
- Queue::tryAutoDelete(*queue->getBroker(), queue, "", "");
+ else if (isControllingUser) queue->releaseFromUse(true);
}
//Consumer interface:
diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
index a63a8cc0a6..86d7d46111 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
@@ -88,7 +88,7 @@ class Outgoing : public ManagedOutgoingLink
class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, public boost::enable_shared_from_this<OutgoingFromQueue>
{
public:
- OutgoingFromQueue(Broker&, const std::string& source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l, Session&, qpid::sys::OutputControl& o, bool exclusive);
+ OutgoingFromQueue(Broker&, const std::string& source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l, Session&, qpid::sys::OutputControl& o, bool exclusive, bool isControllingUser);
void setSubjectFilter(const std::string&);
void setSelectorFilter(const std::string&);
void init();
@@ -124,6 +124,7 @@ class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, public
};
const bool exclusive;
+ const bool isControllingUser;
boost::shared_ptr<Queue> queue;
CircularArray<Record> deliveries;
pn_link_t* link;
diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
index ddfbc7de52..e6ea694d54 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
@@ -24,9 +24,11 @@
#include "Message.h"
#include "Connection.h"
#include "Domain.h"
+#include "Exception.h"
#include "Interconnects.h"
#include "Relay.h"
#include "Topic.h"
+#include "qpid/amqp/descriptors.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/DeliverableMessage.h"
#include "qpid/broker/Exchange.h"
@@ -112,10 +114,16 @@ void setCapabilities(pn_data_t* in, pn_data_t* out, boost::shared_ptr<Exchange>
class IncomingToQueue : public DecodingIncoming
{
public:
- IncomingToQueue(Broker& b, Session& p, boost::shared_ptr<qpid::broker::Queue> q, pn_link_t* l, const std::string& source) : DecodingIncoming(l, b, p, source, q->getName(), pn_link_name(l)), queue(q) {}
+ IncomingToQueue(Broker& b, Session& p, boost::shared_ptr<qpid::broker::Queue> q, pn_link_t* l, const std::string& source, bool icl)
+ : DecodingIncoming(l, b, p, source, q->getName(), pn_link_name(l)), queue(q), isControllingLink(icl)
+ {
+ queue->markInUse(isControllingLink);
+ }
+ ~IncomingToQueue() { queue->releaseFromUse(isControllingLink); }
void handle(qpid::broker::Message& m);
private:
boost::shared_ptr<qpid::broker::Queue> queue;
+ bool isControllingLink;
};
class IncomingToExchange : public DecodingIncoming
@@ -152,6 +160,7 @@ Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* te
node.exchange = connection.getBroker().createExchange(name, node.properties.getExchangeType(), node.properties.isDurable(), node.properties.getAlternateExchange(),
args, connection.getUserId(), connection.getId()).first;
}
+ node.created = true;
} else {
size_t i = name.find('@');
if (i != std::string::npos && (i+1) < name.length()) {
@@ -188,7 +197,12 @@ Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* te
std::string Session::generateName(pn_link_t* link)
{
std::stringstream s;
- s << qpid::types::Uuid(true) << "::" << pn_link_name(link);
+ if (connection.getContainerId().empty()) {
+ s << qpid::types::Uuid(true);
+ } else {
+ s << connection.getContainerId();
+ }
+ s << "_" << pn_link_name(link);
return s.str();
}
@@ -210,7 +224,7 @@ void Session::attach(pn_link_t* link)
//i.e a subscription
std::string name;
if (pn_terminus_get_type(source) == PN_UNSPECIFIED) {
- throw qpid::Exception("No source specified!");/*invalid-field?*/
+ throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, "No source specified!");
} else if (pn_terminus_is_dynamic(source)) {
name = generateName(link);
QPID_LOG(debug, "Received attach request for outgoing link from " << name);
@@ -226,7 +240,7 @@ void Session::attach(pn_link_t* link)
pn_terminus_t* target = pn_link_remote_target(link);
std::string name;
if (pn_terminus_get_type(target) == PN_UNSPECIFIED) {
- throw qpid::Exception("No target specified!");/*invalid field?*/
+ throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, "No target specified!");
} else if (pn_terminus_is_dynamic(target)) {
name = generateName(link);
QPID_LOG(debug, "Received attach request for incoming link to " << name);
@@ -252,6 +266,9 @@ void Session::setupIncoming(pn_link_t* link, pn_terminus_t* target, const std::s
setCapabilities(pn_terminus_capabilities(target), pn_terminus_capabilities(pn_link_target(link)), node.exchange);
authorise.incoming(node.exchange);
}
+ if (node.created) {
+ node.properties.write(pn_terminus_properties(pn_link_target(link)));
+ }
const char* sourceAddress = pn_terminus_get_address(pn_link_remote_source(link));
if (!sourceAddress) {
@@ -262,7 +279,7 @@ void Session::setupIncoming(pn_link_t* link, pn_terminus_t* target, const std::s
source = sourceAddress;
}
if (node.queue) {
- boost::shared_ptr<Incoming> q(new IncomingToQueue(connection.getBroker(), *this, node.queue, link, source));
+ boost::shared_ptr<Incoming> q(new IncomingToQueue(connection.getBroker(), *this, node.queue, link, source, node.properties.trackControllingLink()));
incoming[link] = q;
} else if (node.exchange) {
boost::shared_ptr<Incoming> e(new IncomingToExchange(connection.getBroker(), *this, node.exchange, link, source));
@@ -272,7 +289,7 @@ void Session::setupIncoming(pn_link_t* link, pn_terminus_t* target, const std::s
incoming[link] = in;
} else {
pn_terminus_set_type(pn_link_target(link), PN_UNSPECIFIED);
- throw qpid::Exception("Node not found: " + name);/*not-found*/
+ throw Exception(qpid::amqp::error_conditions::NOT_FOUND, std::string("Node not found: ") + name);
}
if (connection.getBroker().getOptions().auth && !connection.isLink())
incoming[link]->verify(connection.getUserId(), connection.getBroker().getOptions().realm);
@@ -284,6 +301,9 @@ void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::s
ResolvedNode node = resolve(name, source, false);
if (node.queue) setCapabilities(pn_terminus_capabilities(source), pn_terminus_capabilities(pn_link_source(link)), node.queue);
else if (node.exchange) setCapabilities(pn_terminus_capabilities(source), pn_terminus_capabilities(pn_link_source(link)), node.exchange);
+ if (node.created) {
+ node.properties.write(pn_terminus_properties(pn_link_source(link)));
+ }
Filter filter;
filter.read(pn_terminus_filter(source));
@@ -299,7 +319,7 @@ void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::s
if (node.queue) {
authorise.outgoing(node.queue);
- boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(connection.getBroker(), name, target, node.queue, link, *this, out, false));
+ boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(connection.getBroker(), name, target, node.queue, link, *this, out, false, node.properties.trackControllingLink()));
q->init();
filter.apply(q);
outgoing[link] = q;
@@ -330,7 +350,7 @@ void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::s
if (!shared) queue->setExclusiveOwner(this);
authorise.outgoing(node.exchange, queue, filter);
filter.bind(node.exchange, queue);
- boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(connection.getBroker(), name, target, queue, link, *this, out, !shared));
+ boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(connection.getBroker(), name, target, queue, link, *this, out, !shared, false));
outgoing[link] = q;
q->init();
} else if (node.relay) {
@@ -339,7 +359,7 @@ void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::s
out->init();
} else {
pn_terminus_set_type(pn_link_source(link), PN_UNSPECIFIED);
- throw qpid::Exception("Node not found: " + name);/*not-found*/
+ throw Exception(qpid::amqp::error_conditions::NOT_FOUND, std::string("Node not found: ") + name);/*not-found*/
}
filter.write(pn_terminus_filter(pn_link_source(link)));
QPID_LOG(debug, "Outgoing link attached");
@@ -438,8 +458,19 @@ void Session::writable(pn_link_t* link, pn_delivery_t* delivery)
bool Session::dispatch()
{
bool output(false);
- for (OutgoingLinks::iterator s = outgoing.begin(); s != outgoing.end(); ++s) {
- if (s->second->doWork()) output = true;
+ for (OutgoingLinks::iterator s = outgoing.begin(); s != outgoing.end();) {
+ try {
+ if (s->second->doWork()) output = true;
+ ++s;
+ } catch (const Exception& e) {
+ pn_condition_t* error = pn_link_condition(s->first);
+ pn_condition_set_name(error, e.symbol());
+ pn_condition_set_description(error, e.what());
+ pn_link_close(s->first);
+ s->second->detached();
+ outgoing.erase(s++);
+ output = true;
+ }
}
if (completed.size()) {
output = true;
@@ -452,8 +483,19 @@ bool Session::dispatch()
accepted(*i, true);
}
}
- for (IncomingLinks::iterator i = incoming.begin(); i != incoming.end(); ++i) {
- if (i->second->doWork()) output = true;
+ for (IncomingLinks::iterator i = incoming.begin(); i != incoming.end();) {
+ try {
+ if (i->second->doWork()) output = true;
+ ++i;
+ } catch (const Exception& e) {
+ pn_condition_t* error = pn_link_condition(i->first);
+ pn_condition_set_name(error, e.symbol());
+ pn_condition_set_description(error, e.what());
+ pn_link_close(i->first);
+ i->second->detached();
+ incoming.erase(i++);
+ output = true;
+ }
}
return output;
@@ -470,7 +512,7 @@ void Session::close()
}
outgoing.clear();
incoming.clear();
- QPID_LOG(debug, "Session closed, all links detached.");
+ QPID_LOG(debug, "Session " << session << " closed, all links detached.");
for (std::set< boost::shared_ptr<Queue> >::const_iterator i = exclusiveQueues.begin(); i != exclusiveQueues.end(); ++i) {
(*i)->releaseExclusiveOwnership();
}
@@ -490,6 +532,11 @@ Authorise& Session::getAuthorise()
void IncomingToQueue::handle(qpid::broker::Message& message)
{
+ if (queue->isDeleted()) {
+ std::stringstream msg;
+ msg << " Queue " << queue->getName() << " has been deleted";
+ throw Exception(qpid::amqp::error_conditions::RESOURCE_DELETED, msg.str());
+ }
queue->deliver(message);
}
diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.h b/qpid/cpp/src/qpid/broker/amqp/Session.h
index a991ac9e3e..b94d3c226d 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Session.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Session.h
@@ -100,6 +100,8 @@ class Session : public ManagedSession, public boost::enable_shared_from_this<Ses
boost::shared_ptr<qpid::broker::amqp::Topic> topic;
boost::shared_ptr<Relay> relay;
NodeProperties properties;
+ bool created;
+ ResolvedNode() : created(false) {}
};
ResolvedNode resolve(const std::string name, pn_terminus_t* terminus, bool incoming);
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index 42ced2988d..17b00185ef 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -871,7 +871,8 @@ void BrokerReplicator::autoDeleteCheck(boost::shared_ptr<Exchange> ex) {
if (qr->getQueue()->isAutoDelete() && qr->isSubscribed()) {
if (qr->getQueue()->getSettings().autoDeleteDelay) {
// Start the auto-delete timer
- Queue::tryAutoDelete(broker, qr->getQueue(), remoteHost, userId);
+ qr->getQueue()->releaseFromUse();
+ qr->getQueue()->scheduleAutoDelete();
}
else {
// Delete immediately. Don't purge, the primary is gone so we need
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index 4c3c209eab..d99602fdda 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -121,6 +121,7 @@ QueueReplicator::QueueReplicator(HaBroker& hb,
framing::FieldTable args = getArgs();
args.setString(QPID_REPLICATE, printable(NONE).str());
setArgs(args);
+ if (q->isAutoDelete()) q->markInUse();
}
// This must be called immediately after the constructor.
diff --git a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
index 829459eda6..9ecb46d872 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
@@ -75,6 +75,12 @@ const std::string MOVE("move");
const std::string COPY("copy");
const std::string SUPPORTED_DIST_MODES("supported-dist-modes");
+const std::string AUTO_DELETE("auto-delete");
+const std::string LIFETIME_POLICY("lifetime-policy");
+const std::string DELETE_ON_CLOSE("delete-on-close");
+const std::string DELETE_IF_UNUSED("delete-if-unused");
+const std::string DELETE_IF_EMPTY("delete-if-empty");
+const std::string DELETE_IF_UNUSED_AND_EMPTY("delete-if-unused-and-empty");
const std::string CREATE_ON_DEMAND("create-on-demand");
const std::string DUMMY(".");
@@ -308,6 +314,10 @@ AddressHelper::AddressHelper(const Address& address) :
add(properties, x_declare);
node.erase(i);
}
+ //for temp queues, if neither lifetime-policy nor autodelete are specified, assume delete-on-close
+ if (isTemporary && properties.find(LIFETIME_POLICY) == properties.end() && properties.find(AUTO_DELETE) == properties.end()) {
+ properties[LIFETIME_POLICY] = DELETE_ON_CLOSE;
+ }
if (properties.size() && !(isTemporary || createPolicy.size())) {
QPID_LOG(warning, "Properties will be ignored! " << address);
@@ -559,7 +569,24 @@ std::string AddressHelper::getLinkName(const Address& address)
return name.str();
}
}
-
+namespace {
+std::string toLifetimePolicy(const std::string& value)
+{
+ if (value == DELETE_ON_CLOSE) return qpid::amqp::lifetime_policy::DELETE_ON_CLOSE_SYMBOL;
+ else if (value == DELETE_IF_UNUSED) return qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_SYMBOL;
+ else if (value == DELETE_IF_EMPTY) return qpid::amqp::lifetime_policy::DELETE_ON_NO_MESSAGES_SYMBOL;
+ else if (value == DELETE_IF_UNUSED_AND_EMPTY) return qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_OR_MESSAGES_SYMBOL;
+ else return value;//asume value is itself the symbolic descriptor
+}
+void putLifetimePolicy(pn_data_t* data, const std::string& value)
+{
+ pn_data_put_described(data);
+ pn_data_enter(data);
+ pn_data_put_symbol(data, convert(value));
+ pn_data_put_list(data);
+ pn_data_exit(data);
+}
+}
void AddressHelper::setNodeProperties(pn_terminus_t* terminus)
{
if (properties.size() || type.size()) {
@@ -575,8 +602,13 @@ void AddressHelper::setNodeProperties(pn_terminus_t* terminus)
pn_data_put_bool(data, true);
}
for (qpid::types::Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) {
- pn_data_put_symbol(data, convert(i->first));
- pn_data_put_string(data, convert(i->second.asString()));
+ if (i->first == LIFETIME_POLICY) {
+ pn_data_put_symbol(data, convert(i->first));
+ putLifetimePolicy(data, toLifetimePolicy(i->second.asString()));
+ } else {
+ pn_data_put_symbol(data, convert(i->first));
+ pn_data_put_string(data, convert(i->second.asString()));
+ }
}
pn_data_exit(data);
}
diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp
index 1177bf7119..a9769740d6 100644
--- a/qpid/cpp/src/tests/QueueTest.cpp
+++ b/qpid/cpp/src/tests/QueueTest.cpp
@@ -388,6 +388,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) {
// Owners= ---, ---, ---
TestConsumer::shared_ptr c3(new TestConsumer("C3"));
+ queue->consume(c3);
std::deque<QueueCursor> dequeMeC3;
verifyAcquire(queue, c3, dequeMeC3, "a", 2 );