From 28e1f71d646b9b0c86aba3bb0238acb0a89d45bd Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Wed, 10 Jul 2013 13:47:05 +0000 Subject: QPID-4976: support standard lifetime policies git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1501768 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/amqp/descriptors.h | 14 ++ qpid/cpp/src/qpid/broker/LossyQueue.cpp | 6 +- qpid/cpp/src/qpid/broker/Lvq.cpp | 2 +- qpid/cpp/src/qpid/broker/Queue.cpp | 242 ++++++++++++++------- qpid/cpp/src/qpid/broker/Queue.h | 73 ++++++- qpid/cpp/src/qpid/broker/QueueSettings.cpp | 13 ++ qpid/cpp/src/qpid/broker/QueueSettings.h | 12 + qpid/cpp/src/qpid/broker/SemanticState.cpp | 3 - qpid/cpp/src/qpid/broker/SessionAdapter.cpp | 3 - qpid/cpp/src/qpid/broker/amqp/Connection.cpp | 92 ++++---- qpid/cpp/src/qpid/broker/amqp/Connection.h | 1 + qpid/cpp/src/qpid/broker/amqp/DataReader.cpp | 14 +- qpid/cpp/src/qpid/broker/amqp/NodeProperties.cpp | 164 +++++++++++--- qpid/cpp/src/qpid/broker/amqp/NodeProperties.h | 7 +- qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp | 25 ++- qpid/cpp/src/qpid/broker/amqp/Outgoing.h | 3 +- qpid/cpp/src/qpid/broker/amqp/Session.cpp | 75 +++++-- qpid/cpp/src/qpid/broker/amqp/Session.h | 2 + qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 3 +- qpid/cpp/src/qpid/ha/QueueReplicator.cpp | 1 + qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp | 38 +++- qpid/cpp/src/tests/QueueTest.cpp | 1 + 22 files changed, 588 insertions(+), 206 deletions(-) diff --git a/qpid/cpp/src/qpid/amqp/descriptors.h b/qpid/cpp/src/qpid/amqp/descriptors.h index 2a5691beaf..248d6df2df 100644 --- a/qpid/cpp/src/qpid/amqp/descriptors.h +++ b/qpid/cpp/src/qpid/amqp/descriptors.h @@ -89,6 +89,18 @@ const uint64_t SELECTOR_FILTER_CODE(0x0000468C00000004ULL); const uint64_t XQUERY_FILTER_CODE(0x0000468C00000005ULL); } +namespace lifetime_policy { +const std::string DELETE_ON_CLOSE_SYMBOL("amqp:delete-on-close:list"); +const std::string DELETE_ON_NO_LINKS_SYMBOL("amqp:delete-on-no-links:list"); +const std::string DELETE_ON_NO_MESSAGES_SYMBOL("amqp:delete-on-no-messages:list"); +const std::string DELETE_ON_NO_LINKS_OR_MESSAGES_SYMBOL("amqp:delete-on-no-links-or-messages:list"); + +const uint64_t DELETE_ON_CLOSE_CODE(0x2B); +const uint64_t DELETE_ON_NO_LINKS_CODE(0x2C); +const uint64_t DELETE_ON_NO_MESSAGES_CODE(0x2D); +const uint64_t DELETE_ON_NO_LINKS_OR_MESSAGES_CODE(0x2E); +} + namespace error_conditions { //note these are not actually descriptors const std::string INTERNAL_ERROR("amqp:internal-error"); @@ -97,6 +109,8 @@ const std::string UNAUTHORIZED_ACCESS("amqp:unauthorized-access"); const std::string DECODE_ERROR("amqp:decode-error"); const std::string NOT_ALLOWED("amqp:not-allowed"); const std::string RESOURCE_LIMIT_EXCEEDED("amqp:resource-limit-exceeded"); +const std::string RESOURCE_DELETED("amqp:resource-deleted"); +const std::string PRECONDITION_FAILED("amqp:precondition-failed"); } }} // namespace qpid::amqp diff --git a/qpid/cpp/src/qpid/broker/LossyQueue.cpp b/qpid/cpp/src/qpid/broker/LossyQueue.cpp index be19185c3a..4104503dac 100644 --- a/qpid/cpp/src/qpid/broker/LossyQueue.cpp +++ b/qpid/cpp/src/qpid/broker/LossyQueue.cpp @@ -48,11 +48,11 @@ bool LossyQueue::checkDepth(const QueueDepth& increment, const Message& message) << name << ": size=" << increment.getSize() << ", max-size=" << settings.maxDepth.getSize())); } - while (settings.maxDepth && (current + increment > settings.maxDepth)) { + while (settings.maxDepth && (settings.maxDepth - current < increment)) { QPID_LOG(debug, "purging " << name << ": current depth is [" << current << "], max depth is [" << settings.maxDepth << "], new message has size " << increment.getSize()); qpid::sys::Mutex::ScopedUnlock u(messageLock); //TODO: arguably we should try and purge expired messages first but that is potentially expensive - if (remove(1, settings.priorities ? boost::bind(&isLowerPriorityThan, message.getPriority(), _1) : MessagePredicate(), MessageFunctor(), PURGE)) { + if (remove(1, settings.priorities ? boost::bind(&isLowerPriorityThan, message.getPriority(), _1) : MessagePredicate(), MessageFunctor(), PURGE, false)) { if (mgmtObject) { mgmtObject->inc_discardsRing(1); if (brokerMgmtObject) @@ -65,7 +65,7 @@ bool LossyQueue::checkDepth(const QueueDepth& increment, const Message& message) break; } } - if (settings.maxDepth && (current + increment > settings.maxDepth)) { + if (settings.maxDepth && (settings.maxDepth - current < increment)) { //will only be the case where we were unable to purge another //message, which should only be the case if we are purging //based on priority and there was no message with a lower (or diff --git a/qpid/cpp/src/qpid/broker/Lvq.cpp b/qpid/cpp/src/qpid/broker/Lvq.cpp index 89a47bb14e..ff13b97dbd 100644 --- a/qpid/cpp/src/qpid/broker/Lvq.cpp +++ b/qpid/cpp/src/qpid/broker/Lvq.cpp @@ -51,7 +51,7 @@ void Lvq::push(Message& message, bool isRecovery) brokerMgmtObject->inc_discardsLvq(); } } - observeDequeue(old, locker); + observeDequeue(old, locker, 0/*can't be empty, so no need to check autodelete*/); } } copy.notify(); diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index c402e3e016..1d0a8017ef 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -175,8 +175,6 @@ Queue::Queue(const string& _name, const QueueSettings& _settings, name(_name), store(_store), owner(0), - consumerCount(0), - browserCount(0), exclusive(0), messages(new MessageDeque()), persistenceId(0), @@ -188,8 +186,8 @@ Queue::Queue(const string& _name, const QueueSettings& _settings, allocator(new FifoDistributor( *messages )), redirectSource(false) { - if (settings.maxDepth.hasCount()) current.setCount(0); - if (settings.maxDepth.hasSize()) current.setSize(0); + current.setCount(0);//always track depth in messages + if (settings.maxDepth.hasSize()) current.setSize(0);//track depth in bytes only if policy requires it if (settings.traceExcludes.size()) { split(traceExclude, settings.traceExcludes, ", "); } @@ -340,6 +338,7 @@ void Queue::release(const QueueCursor& position, bool markRedelivered) bool Queue::dequeueMessageAt(const SequenceNumber& position) { + ScopedAutoDelete autodelete(*this); boost::intrusive_ptr pmsg; { Mutex::ScopedLock locker(messageLock); @@ -348,7 +347,7 @@ bool Queue::dequeueMessageAt(const SequenceNumber& position) Message* msg = messages->find(position, &cursor); if (msg) { if (msg->isPersistent()) pmsg = msg->getPersistentContext(); - observeDequeue(*msg, locker); + observeDequeue(*msg, locker, settings.autodelete ? &autodelete : 0); messages->deleted(cursor); } else { QPID_LOG(debug, "Could not dequeue message at " << position << "; no such message"); @@ -385,6 +384,7 @@ bool Queue::getNextMessage(Message& m, Consumer::shared_ptr& c) { if (!checkNotDeleted(c)) return false; QueueListeners::NotificationSet set; + ScopedAutoDelete autodelete(*this); while (true) { //TODO: reduce lock scope Mutex::ScopedLock locker(messageLock); @@ -393,7 +393,7 @@ bool Queue::getNextMessage(Message& m, Consumer::shared_ptr& c) if (msg) { if (msg->hasExpired()) { QPID_LOG(debug, "Message expired from queue '" << name << "'"); - observeDequeue(*msg, locker); + observeDequeue(*msg, locker, settings.autodelete ? &autodelete : 0); //ERROR: don't hold lock across call to store!! if (msg->isPersistent()) dequeueFromStore(msg->getPersistentContext()); if (mgmtObject) { @@ -490,14 +490,31 @@ bool Queue::find(SequenceNumber pos, Message& msg) const return false; } +void Queue::markInUse(bool controlling) +{ + Mutex::ScopedLock locker(messageLock); + if (controlling) users.addLifecycleController(); + else users.addOther(); +} + +void Queue::releaseFromUse(bool controlling) +{ + if (controlling) { + { + Mutex::ScopedLock locker(messageLock); + users.removeLifecycleController(); + } + scheduleAutoDelete(); + } else { + Mutex::ScopedLock locker(messageLock); + users.removeOther(); + } +} + void Queue::consume(Consumer::shared_ptr c, bool requestExclusive) { { Mutex::ScopedLock locker(messageLock); - // NOTE: consumerCount is actually a count of all - // subscriptions, both acquiring and non-acquiring (browsers). - // Check for exclusivity of acquiring consumers. - size_t acquiringConsumers = consumerCount - browserCount; if (c->preAcquires()) { if(settings.isBrowseOnly) { throw NotAllowedException( @@ -509,7 +526,7 @@ void Queue::consume(Consumer::shared_ptr c, bool requestExclusive) QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed.")); } else if(requestExclusive) { - if(acquiringConsumers) { + if(users.hasConsumers()) { throw ResourceLockedException( QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied.")); @@ -517,13 +534,11 @@ void Queue::consume(Consumer::shared_ptr c, bool requestExclusive) exclusive = c->getSession(); } } - } - else if(c->isCounted()) { - browserCount++; + users.addConsumer(); + } else if(c->isCounted()) { + users.addBrowser(); } if(c->isCounted()) { - consumerCount++; - //reset auto deletion timer if necessary if (settings.autoDeleteDelay && autoDeleteTask) { autoDeleteTask->cancel(); @@ -542,14 +557,24 @@ void Queue::cancel(Consumer::shared_ptr c) removeListener(c); if(c->isCounted()) { - Mutex::ScopedLock locker(messageLock); - consumerCount--; - if (!c->preAcquires()) browserCount--; - if(exclusive) exclusive = 0; - observeConsumerRemove(*c, locker); - } - if (mgmtObject != 0 && c->isCounted()) { - mgmtObject->dec_consumerCount(); + bool unused; + { + Mutex::ScopedLock locker(messageLock); + if (c->preAcquires()) { + users.removeConsumer(); + if (exclusive) exclusive = 0; + } else { + users.removeBrowser(); + } + observeConsumerRemove(*c, locker); + unused = !users.isUsed(); + } + if (mgmtObject != 0) { + mgmtObject->dec_consumerCount(); + } + if (unused && settings.autodelete) { + scheduleAutoDelete(); + } } } @@ -564,7 +589,7 @@ void Queue::purgeExpired(sys::Duration lapse) { dequeueSincePurge -= count; int seconds = int64_t(lapse)/qpid::sys::TIME_SEC; if (seconds == 0 || count / seconds < 1) { - uint32_t count = remove(0, boost::bind(&Message::hasExpired, _1), 0, CONSUMER); + uint32_t count = remove(0, boost::bind(&Message::hasExpired, _1), 0, CONSUMER, settings.autodelete); QPID_LOG(debug, "Purged " << count << " expired messages from " << getName()); // // Report the count of discarded-by-ttl messages @@ -671,8 +696,9 @@ namespace { } } // end namespace -uint32_t Queue::remove(const uint32_t maxCount, MessagePredicate p, MessageFunctor f, SubscriptionType type) +uint32_t Queue::remove(const uint32_t maxCount, MessagePredicate p, MessageFunctor f, SubscriptionType type, bool triggerAutoDelete) { + ScopedAutoDelete autodelete(*this); std::deque removed; { QueueCursor c(type); @@ -686,7 +712,7 @@ uint32_t Queue::remove(const uint32_t maxCount, MessagePredicate p, MessageFunct //don't actually acquire, just act as if we did observeAcquire(*m, locker); } - observeDequeue(*m, locker); + observeDequeue(*m, locker, triggerAutoDelete ? &autodelete : 0); removed.push_back(*m);//takes a copy of the message if (!messages->deleted(c)) { QPID_LOG(warning, "Failed to correctly remove message from " << name << "; state is not consistent!"); @@ -726,7 +752,7 @@ uint32_t Queue::purge(const uint32_t qty, boost::shared_ptr dest, const qpid::types::Variant::Map *filter) { std::auto_ptr mf(MessageFilter::create(filter)); - uint32_t count = remove(qty, boost::bind(&MessageFilter::match, mf.get(), _1), boost::bind(&reroute, dest, _1), CONSUMER/*?*/); + uint32_t count = remove(qty, boost::bind(&MessageFilter::match, mf.get(), _1), boost::bind(&reroute, dest, _1), CONSUMER/*?*/, settings.autodelete); if (mgmtObject && count) { mgmtObject->inc_acquires(count); @@ -752,7 +778,7 @@ uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty, const qpid::types::Variant::Map *filter) { std::auto_ptr mf(MessageFilter::create(filter)); - return remove(qty, boost::bind(&MessageFilter::match, mf.get(), _1), boost::bind(&moveTo, destq, _1), CONSUMER/*?*/); + return remove(qty, boost::bind(&MessageFilter::match, mf.get(), _1), boost::bind(&moveTo, destq, _1), CONSUMER/*?*/, settings.autodelete); } void Queue::push(Message& message, bool /*isRecovery*/) @@ -779,15 +805,41 @@ uint32_t Queue::getMessageCount() const uint32_t Queue::getConsumerCount() const { Mutex::ScopedLock locker(messageLock); - return consumerCount; + return users.getSubscriberCount(); } bool Queue::canAutoDelete() const { Mutex::ScopedLock locker(messageLock); - return settings.autodelete && !consumerCount && !owner; + return !deleted && checkAutoDelete(locker); +} + +bool Queue::checkAutoDelete(const Mutex::ScopedLock& lock) const +{ + if (settings.autodelete) { + switch (settings.lifetime) { + case QueueSettings::DELETE_IF_UNUSED: + return isUnused(lock); + case QueueSettings::DELETE_IF_EMPTY: + return !users.isInUseByController() && isEmpty(lock); + case QueueSettings::DELETE_IF_UNUSED_AND_EMPTY: + return isUnused(lock) && isEmpty(lock); + case QueueSettings::DELETE_ON_CLOSE: + return !users.isInUseByController(); + } + } + return false; +} + +bool Queue::isUnused(const Mutex::ScopedLock&) const +{ + return !owner && !users.isUsed();; } +bool Queue::isEmpty(const Mutex::ScopedLock&) const +{ + return current.getCount() == 0; +} /* * return true if enqueue succeeded and message should be made * available; returning false will result in the message being dropped @@ -853,8 +905,9 @@ void Queue::dequeueCommited(const Message& msg) //store and will not be available for delivery. The only action //required is to ensure the observers are notified and the //management stats are correctly decremented + ScopedAutoDelete autodelete(*this); Mutex::ScopedLock locker(messageLock); - observeDequeue(msg, locker); + observeDequeue(msg, locker, settings.autodelete ? &autodelete : 0); if (mgmtObject != 0) { mgmtObject->inc_msgTxnDequeues(); mgmtObject->inc_byteTxnDequeues(msg.getContentSize()); @@ -874,6 +927,7 @@ void Queue::dequeue(TransactionContext* ctxt, const QueueCursor& cursor) { ScopedUse u(barrier); if (!u.acquired) return; + ScopedAutoDelete autodelete(*this); boost::intrusive_ptr pmsg; { Mutex::ScopedLock locker(messageLock); @@ -881,7 +935,7 @@ void Queue::dequeue(TransactionContext* ctxt, const QueueCursor& cursor) if (msg) { if (msg->isPersistent()) pmsg = msg->getPersistentContext(); if (!ctxt) { - observeDequeue(*msg, locker); + observeDequeue(*msg, locker, settings.autodelete ? &autodelete : 0); messages->deleted(cursor);//message pointer not valid after this } } else { @@ -895,11 +949,12 @@ void Queue::dequeue(TransactionContext* ctxt, const QueueCursor& cursor) void Queue::dequeueCommitted(const QueueCursor& cursor) { + ScopedAutoDelete autodelete(*this); Mutex::ScopedLock locker(messageLock); Message* msg = messages->find(cursor); if (msg) { const uint64_t contentSize = msg->getContentSize(); - observeDequeue(*msg, locker); + observeDequeue(*msg, locker, settings.autodelete ? &autodelete : 0); if (mgmtObject != 0) { mgmtObject->inc_msgTxnDequeues(); mgmtObject->inc_byteTxnDequeues(contentSize); @@ -920,7 +975,7 @@ void Queue::dequeueCommitted(const QueueCursor& cursor) * Updates policy and management when a message has been dequeued, * Requires messageLock be held by caller. */ -void Queue::observeDequeue(const Message& msg, const Mutex::ScopedLock&) +void Queue::observeDequeue(const Message& msg, const Mutex::ScopedLock& lock, ScopedAutoDelete* autodelete) { current -= QueueDepth(1, msg.getContentSize()); mgntDeqStats(msg, mgmtObject, brokerMgmtObject); @@ -931,6 +986,7 @@ void Queue::observeDequeue(const Message& msg, const Mutex::ScopedLock&) QPID_LOG(warning, "Exception on notification of dequeue for queue " << getName() << ": " << e.what()); } } + if (autodelete && isEmpty(lock)) autodelete->check(lock); } /** updates queue observers when a message has become unavailable for transfer. @@ -1053,9 +1109,10 @@ void Queue::abandoned(const Message& message) void Queue::destroyed() { unbind(broker->getExchanges()); - remove(0, 0, boost::bind(&Queue::abandoned, this, _1), REPLICATOR/*even acquired message are treated as abandoned*/); + remove(0, 0, boost::bind(&Queue::abandoned, this, _1), REPLICATOR/*even acquired message are treated as abandoned*/, false); if (alternateExchange.get()) { alternateExchange->decAlternateUsers(); + alternateExchange.reset(); } if (store) { @@ -1170,33 +1227,12 @@ boost::shared_ptr Queue::getAlternateExchange() return alternateExchange; } -void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue, const std::string& connectionId, const std::string& userId) -{ - if (broker.getQueues().destroyIf(queue->getName(), - boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) { - if (broker.getAcl()) - broker.getAcl()->recordDestroyQueue(queue->getName()); - - QPID_LOG_CAT(debug, model, "Auto-delete queue: " << queue->getName() - << " user:" << userId - << " rhost:" << connectionId ); - queue->destroyed(); - } else { - QPID_LOG_CAT(debug, model, "Auto-delete queue could not be deleted: " << queue->getName() - << " user:" << userId - << " rhost:" << connectionId ); - } -} - struct AutoDeleteTask : qpid::sys::TimerTask { - Broker& broker; Queue::shared_ptr queue; - std::string connectionId; - std::string userId; - AutoDeleteTask(Broker& b, Queue::shared_ptr q, const std::string& cId, const std::string& uId, AbsTime fireTime) - : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion:"+q->getName()), broker(b), queue(q), connectionId(cId), userId(uId) {} + AutoDeleteTask(Queue::shared_ptr q, AbsTime fireTime) + : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion:"+q->getName()), queue(q) {} void fire() { @@ -1204,34 +1240,56 @@ struct AutoDeleteTask : qpid::sys::TimerTask //created, but then became unused again before the task fired; //in this case ignore this request as there will have already //been a later task added - tryAutoDeleteImpl(broker, queue, connectionId, userId); + queue->tryAutoDelete(); } }; -void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue, const std::string& connectionId, const std::string& userId) +void Queue::scheduleAutoDelete() { - if (queue->settings.autoDeleteDelay && queue->canAutoDelete()) { - AbsTime time(now(), Duration(queue->settings.autoDeleteDelay * TIME_SEC)); - queue->autoDeleteTask = boost::intrusive_ptr(new AutoDeleteTask(broker, queue, connectionId, userId, time)); - broker.getTimer().add(queue->autoDeleteTask); - QPID_LOG(debug, "Timed auto-delete for " << queue->getName() << " initiated"); + if (canAutoDelete()) { + if (settings.autoDeleteDelay) { + AbsTime time(now(), Duration(settings.autoDeleteDelay * TIME_SEC)); + autoDeleteTask = boost::intrusive_ptr(new AutoDeleteTask(shared_from_this(), time)); + broker->getTimer().add(autoDeleteTask); + QPID_LOG(debug, "Timed auto-delete for " << getName() << " initiated"); + } else { + tryAutoDelete(); + } + } +} + +void Queue::tryAutoDelete() +{ + if (broker->getQueues().destroyIf(name, boost::bind(boost::mem_fn(&Queue::canAutoDelete), shared_from_this()))) { + if (broker->getAcl()) + broker->getAcl()->recordDestroyQueue(name); + + QPID_LOG_CAT(debug, model, "Auto-delete queue deleted: " << name << " (" << deleted << ")"); + destroyed(); } else { - tryAutoDeleteImpl(broker, queue, connectionId, userId); + QPID_LOG_CAT(debug, model, "Auto-delete queue could not be deleted: " << name); } } bool Queue::isExclusiveOwner(const OwnershipToken* const o) const { - Mutex::ScopedLock locker(ownershipLock); + Mutex::ScopedLock locker(messageLock); return o == owner; } void Queue::releaseExclusiveOwnership() { - Mutex::ScopedLock locker(ownershipLock); - owner = 0; - if (mgmtObject) { - mgmtObject->set_exclusive(false); + bool unused; + { + Mutex::ScopedLock locker(messageLock); + owner = 0; + if (mgmtObject) { + mgmtObject->set_exclusive(false); + } + unused = !users.isUsed(); + } + if (unused && settings.autodelete) { + scheduleAutoDelete(); } } @@ -1241,7 +1299,7 @@ bool Queue::setExclusiveOwner(const OwnershipToken* const o) if (settings.autoDeleteDelay && autoDeleteTask) { autoDeleteTask->cancel(); } - Mutex::ScopedLock locker(ownershipLock); + Mutex::ScopedLock locker(messageLock); if (owner) { return false; } else { @@ -1255,7 +1313,7 @@ bool Queue::setExclusiveOwner(const OwnershipToken* const o) bool Queue::hasExclusiveOwner() const { - Mutex::ScopedLock locker(ownershipLock); + Mutex::ScopedLock locker(messageLock); return owner != 0; } @@ -1388,7 +1446,7 @@ struct After { void Queue::setPosition(SequenceNumber n) { Mutex::ScopedLock locker(messageLock); if (n < sequence) { - remove(0, After(n), MessagePredicate(), BROWSER); + remove(0, After(n), MessagePredicate(), BROWSER, false); } sequence = n; QPID_LOG(debug, "Set position to " << sequence << " on " << getName()); @@ -1450,6 +1508,12 @@ bool Queue::checkNotDeleted(const Consumer::shared_ptr& c) return !deleted; } +bool Queue::isDeleted() const +{ + Mutex::ScopedLock lock(messageLock); + return deleted; +} + void Queue::addObserver(boost::shared_ptr observer) { Mutex::ScopedLock lock(messageLock); @@ -1495,6 +1559,7 @@ void Queue::setDequeueSincePurge(uint32_t value) { void Queue::reject(const QueueCursor& cursor) { + ScopedAutoDelete autodelete(*this); Exchange::shared_ptr alternate = getAlternateExchange(); Message copy; boost::intrusive_ptr pmsg; @@ -1505,7 +1570,7 @@ void Queue::reject(const QueueCursor& cursor) if (alternate) copy = *message; if (message->isPersistent()) pmsg = message->getPersistentContext(); countRejected(); - observeDequeue(*message, locker); + observeDequeue(*message, locker, settings.autodelete ? &autodelete : 0); messages->deleted(cursor); } else { return; @@ -1526,7 +1591,7 @@ void Queue::reject(const QueueCursor& cursor) bool Queue::checkDepth(const QueueDepth& increment, const Message&) { - if (current && (settings.maxDepth - current < increment)) { + if (settings.maxDepth && (settings.maxDepth - current < increment)) { if (mgmtObject) { mgmtObject->inc_discardsOverflow(); if (brokerMgmtObject) @@ -1619,6 +1684,29 @@ void Queue::setMgmtRedirectState( std::string peer, bool enabled, bool isSrc ) { mgmtObject->set_redirectSource(isSrc); } } +Queue::QueueUsers::QueueUsers() : consumers(0), browsers(0), others(0), controller(false) {} +void Queue::QueueUsers::addConsumer() { ++consumers; } +void Queue::QueueUsers::addBrowser() { ++browsers; } +void Queue::QueueUsers::addLifecycleController() { assert(!controller); controller = true; } +void Queue::QueueUsers::addOther(){ ++others; } +void Queue::QueueUsers::removeConsumer() { assert(consumers > 0); --consumers; } +void Queue::QueueUsers::removeBrowser() { assert(browsers > 0); --browsers; } +void Queue::QueueUsers::removeLifecycleController() { assert(controller); controller = false; } +void Queue::QueueUsers::removeOther() { assert(others > 0); --others; } +bool Queue::QueueUsers::isInUseByController() const { return controller; } +bool Queue::QueueUsers::isUsed() const { return controller || consumers || browsers || others; } +uint32_t Queue::QueueUsers::getSubscriberCount() const { return consumers + browsers; } +bool Queue::QueueUsers::hasConsumers() const { return consumers; } + +Queue::ScopedAutoDelete::ScopedAutoDelete(Queue& q) : queue(q), eligible(false) {} +void Queue::ScopedAutoDelete::check(const sys::Mutex::ScopedLock& lock) +{ + eligible = queue.checkAutoDelete(lock); +} +Queue::ScopedAutoDelete::~ScopedAutoDelete() +{ + if (eligible) queue.scheduleAutoDelete(); +} }} diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 29b711075a..5598ee5d13 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -87,6 +87,7 @@ class Queue : public boost::enable_shared_from_this, typedef boost::shared_ptr shared_ptr; protected: + friend struct AutoDeleteTask; struct UsageBarrier { Queue& parent; @@ -119,6 +120,54 @@ class Queue : public boost::enable_shared_from_this, void rollback() throw(); }; + /** + * This class tracks whether a queue is in use and how it is being + * used. + */ + class QueueUsers + { + public: + QueueUsers(); + void addConsumer(); + void addBrowser(); + void addOther(); + void removeConsumer(); + void removeBrowser(); + void addLifecycleController(); + void removeLifecycleController(); + void removeOther(); + bool isUsed() const; + uint32_t getSubscriberCount() const; + bool hasConsumers() const; + bool isInUseByController() const; + private: + uint32_t consumers; + uint32_t browsers; + uint32_t others; + bool controller; + }; + + /** + * This class is used to check - and if necessary trigger - + * autodeletion when removing messages, as this could cause the + * queue to become empty (which is one possible trigger for + * autodeletion). + * + * The constructor and descructor should be called outside the + * message lock. The check method should be called while holding + * the message lock. + */ + class ScopedAutoDelete + { + public: + ScopedAutoDelete(Queue& q); + void check(const sys::Mutex::ScopedLock& lock); + ~ScopedAutoDelete(); + private: + Queue& queue; + bool eligible; + }; + typedef std::set< boost::shared_ptr > Observers; enum ConsumeCode {NO_MESSAGES=0, CANT_CONSUME=1, CONSUMED=2}; typedef boost::function1 MessageFunctor; @@ -126,8 +175,7 @@ class Queue : public boost::enable_shared_from_this, const std::string name; MessageStore* store; const OwnershipToken* owner; - uint32_t consumerCount; // Actually a count of all subscriptions, acquiring or not. - uint32_t browserCount; // Count of non-acquiring subscriptions. + QueueUsers users; OwnershipToken* exclusive; std::vector traceExclude; QueueListeners listeners; @@ -149,7 +197,6 @@ class Queue : public boost::enable_shared_from_this, * o Queue::UsageBarrier (TBD: move under separate lock) */ mutable qpid::sys::Mutex messageLock; - mutable qpid::sys::Mutex ownershipLock; mutable uint64_t persistenceId; QueueSettings settings; qpid::framing::FieldTable encodableSettings; @@ -176,6 +223,9 @@ class Queue : public boost::enable_shared_from_this, Queue::shared_ptr redirectPeer; bool redirectSource; + bool checkAutoDelete(const qpid::sys::Mutex::ScopedLock&) const; + bool isUnused(const qpid::sys::Mutex::ScopedLock&) const; + bool isEmpty(const qpid::sys::Mutex::ScopedLock&) const; virtual void push(Message& msg, bool isRecovery=false); bool accept(const Message&); void process(Message& msg); @@ -191,7 +241,7 @@ class Queue : public boost::enable_shared_from_this, void observeEnqueue(const Message& msg, const sys::Mutex::ScopedLock& lock); void observeAcquire(const Message& msg, const sys::Mutex::ScopedLock& lock); void observeRequeue(const Message& msg, const sys::Mutex::ScopedLock& lock); - void observeDequeue(const Message& msg, const sys::Mutex::ScopedLock& lock); + void observeDequeue(const Message& msg, const sys::Mutex::ScopedLock& lock, ScopedAutoDelete*); void observeConsumerAdd( const Consumer&, const sys::Mutex::ScopedLock& lock); void observeConsumerRemove( const Consumer&, const sys::Mutex::ScopedLock& lock); @@ -203,9 +253,9 @@ class Queue : public boost::enable_shared_from_this, void abandoned(const Message& message); bool checkNotDeleted(const Consumer::shared_ptr&); void notifyDeleted(); - uint32_t remove(uint32_t maxCount, MessagePredicate, MessageFunctor, SubscriptionType); + uint32_t remove(uint32_t maxCount, MessagePredicate, MessageFunctor, SubscriptionType, bool triggerAutoDelete); virtual bool checkDepth(const QueueDepth& increment, const Message&); - + void tryAutoDelete(); public: typedef std::vector vector; @@ -284,6 +334,14 @@ class Queue : public boost::enable_shared_from_this, QPID_BROKER_EXTERN void consume(Consumer::shared_ptr c, bool exclusive = false); QPID_BROKER_EXTERN void cancel(Consumer::shared_ptr c); + /** + * Used to indicate that the queue is being used in some other + * context than by a subscriber. The controlling flag should only + * be set if the mode of use is the one that caused the queue to + * be created. + */ + QPID_BROKER_EXTERN void markInUse(bool controlling=false); + QPID_BROKER_EXTERN void releaseFromUse(bool controlling=false); QPID_BROKER_EXTERN uint32_t purge(const uint32_t purge_request=0, //defaults to all messages boost::shared_ptr dest=boost::shared_ptr(), @@ -308,6 +366,8 @@ class Queue : public boost::enable_shared_from_this, inline const qpid::framing::FieldTable& getEncodableSettings() const { return encodableSettings; } inline bool isAutoDelete() const { return settings.autodelete; } QPID_BROKER_EXTERN bool canAutoDelete() const; + QPID_BROKER_EXTERN void scheduleAutoDelete(); + QPID_BROKER_EXTERN bool isDeleted() const; const QueueBindings& getBindings() const { return bindings; } /** @@ -340,7 +400,6 @@ class Queue : public boost::enable_shared_from_this, * exclusive owner */ static Queue::shared_ptr restore(QueueRegistry& queues, framing::Buffer& buffer); - QPID_BROKER_EXTERN static void tryAutoDelete(Broker& broker, Queue::shared_ptr, const std::string& connectionId, const std::string& userId); virtual void setExternalQueueStore(ExternalQueueStore* inst); diff --git a/qpid/cpp/src/qpid/broker/QueueSettings.cpp b/qpid/cpp/src/qpid/broker/QueueSettings.cpp index 5a149c1c6d..fd90d11d76 100644 --- a/qpid/cpp/src/qpid/broker/QueueSettings.cpp +++ b/qpid/cpp/src/qpid/broker/QueueSettings.cpp @@ -58,6 +58,9 @@ const std::string PAGING("qpid.paging"); const std::string MAX_PAGES("qpid.max_pages_loaded"); const std::string PAGE_FACTOR("qpid.page_factor"); const std::string FILTER("qpid.filter"); +const std::string LIFETIME_POLICY("qpid.lifetime-policy"); +const std::string DELETE_IF_UNUSED_KEY("delete-if-unused"); +const std::string DELETE_IF_UNUSED_AND_EMPTY_KEY("delete-if-unused-and-empty"); const std::string LVQ_LEGACY("qpid.last_value_queue"); const std::string LVQ_LEGACY_KEY("qpid.LVQ_key"); @@ -86,6 +89,7 @@ const QueueSettings::Aliases QueueSettings::aliases; QueueSettings::QueueSettings(bool d, bool a) : durable(d), autodelete(a), + lifetime(DELETE_IF_UNUSED), isTemporary(false), priorities(0), defaultFairshare(0), @@ -214,6 +218,15 @@ bool QueueSettings::handle(const std::string& key, const qpid::types::Variant& v } else if (key == FILTER) { filter = value.asString(); return true; + } else if (key == LIFETIME_POLICY) { + if (value.asString() == DELETE_IF_UNUSED_KEY) { + lifetime = DELETE_IF_UNUSED; + } else if (value.asString() == DELETE_IF_UNUSED_AND_EMPTY_KEY) { + lifetime = DELETE_IF_UNUSED_AND_EMPTY; + } else { + QPID_LOG(warning, "Invalid value for " << LIFETIME_POLICY << ": " << value); + } + return true; } else { return false; } diff --git a/qpid/cpp/src/qpid/broker/QueueSettings.h b/qpid/cpp/src/qpid/broker/QueueSettings.h index 19667e93ae..166445be18 100644 --- a/qpid/cpp/src/qpid/broker/QueueSettings.h +++ b/qpid/cpp/src/qpid/broker/QueueSettings.h @@ -40,9 +40,21 @@ namespace broker { struct QueueSettings { QPID_BROKER_EXTERN QueueSettings(bool durable=false, bool autodelete=false); + /** + * The lifetime policy dictates when an autodelete queue is + * eligible for delete. + */ + enum LifetimePolicy + { + DELETE_IF_UNUSED = 0, + DELETE_IF_EMPTY, + DELETE_IF_UNUSED_AND_EMPTY, + DELETE_ON_CLOSE + }; bool durable; bool autodelete; + LifetimePolicy lifetime; bool isTemporary; //basic queue types: diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index dd7a25aaa4..4570e3bd87 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -449,9 +449,6 @@ void SemanticState::cancel(ConsumerImpl::shared_ptr c) Queue::shared_ptr queue = c->getQueue(); if(queue) { queue->cancel(c); - // Only run auto-delete for counted consumers. - if (c->isCounted() && queue->canAutoDelete() && !queue->hasExclusiveOwner()) - Queue::tryAutoDelete(session.getBroker(), queue, connectionId, userID); } c->cancel(); } diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp index 2d4868628f..0124e88832 100644 --- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp @@ -227,9 +227,6 @@ void SessionAdapter::QueueHandlerImpl::destroyExclusiveQueues() while (!exclusiveQueues.empty()) { Queue::shared_ptr q(exclusiveQueues.front()); q->releaseExclusiveOwnership(); - if (q->canAutoDelete()) { - Queue::tryAutoDelete(broker, q, connectionId, userId); - } exclusiveQueues.erase(exclusiveQueues.begin()); } } diff --git a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp index fa0d719bf9..2cb0994138 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp @@ -44,7 +44,7 @@ Connection::Connection(qpid::sys::OutputControl& o, const std::string& i, Broker : BrokerContext(b), ManagedConnection(getBroker(), i), connection(pn_connection()), transport(pn_transport()), - out(o), id(i), haveOutput(true) + out(o), id(i), haveOutput(true), closeInitiated(false) { if (pn_transport_bind(transport, connection)) { //error @@ -130,9 +130,6 @@ size_t Connection::encode(char* buffer, size_t size) QPID_LOG_CAT(debug, network, id << " encoded " << n << " bytes from " << size) haveOutput = true; return n; - } else if (n == PN_EOS) { - haveOutput = size; - return size;//Is this right? } else if (n == PN_ERR) { throw Exception(qpid::amqp::error_conditions::INTERNAL_ERROR, QPID_MSG("Error on output: " << getError())); } else { @@ -142,23 +139,29 @@ size_t Connection::encode(char* buffer, size_t size) } bool Connection::canEncode() { - try { - for (Sessions::iterator i = sessions.begin();i != sessions.end(); ++i) { - if (i->second->dispatch()) haveOutput = true; + if (!closeInitiated) { + try { + for (Sessions::iterator i = sessions.begin();i != sessions.end(); ++i) { + if (i->second->dispatch()) haveOutput = true; + } + process(); + } catch (const Exception& e) { + QPID_LOG(error, id << ": " << e.what()); + pn_condition_t* error = pn_connection_condition(connection); + pn_condition_set_name(error, e.symbol()); + pn_condition_set_description(error, e.what()); + close(); + haveOutput = true; + } catch (const std::exception& e) { + QPID_LOG(error, id << ": " << e.what()); + pn_condition_t* error = pn_connection_condition(connection); + pn_condition_set_name(error, qpid::amqp::error_conditions::INTERNAL_ERROR.c_str()); + pn_condition_set_description(error, e.what()); + close(); + haveOutput = true; } - process(); - } catch (const Exception& e) { - QPID_LOG(error, id << ": " << e.what()); - pn_condition_t* error = pn_connection_condition(connection); - pn_condition_set_name(error, e.symbol()); - pn_condition_set_description(error, e.what()); - close(); - } catch (const std::exception& e) { - QPID_LOG(error, id << ": " << e.what()); - pn_condition_t* error = pn_connection_condition(connection); - pn_condition_set_name(error, qpid::amqp::error_conditions::INTERNAL_ERROR.c_str()); - pn_condition_set_description(error, e.what()); - close(); + } else { + QPID_LOG(info, "Connection " << id << " has been closed locally"); } //TODO: proper handling of time in and out of tick pn_transport_tick(transport, 0); @@ -195,9 +198,12 @@ void Connection::closed() } void Connection::close() { - closed(); - QPID_LOG_CAT(debug, model, id << " connection closed"); - pn_connection_close(connection); + if (!closeInitiated) { + closeInitiated = true; + closed(); + QPID_LOG_CAT(debug, model, id << " connection closed"); + pn_connection_close(connection); + } } bool Connection::isClosed() const { @@ -256,29 +262,29 @@ void Connection::process() //handle deliveries for (pn_delivery_t* delivery = pn_work_head(connection); delivery; delivery = pn_work_next(delivery)) { pn_link_t* link = pn_delivery_link(delivery); - if (pn_link_is_receiver(link)) { - Sessions::iterator i = sessions.find(pn_link_session(link)); - if (i != sessions.end()) { - try { + try { + if (pn_link_is_receiver(link)) { + Sessions::iterator i = sessions.find(pn_link_session(link)); + if (i != sessions.end()) { i->second->readable(link, delivery); - } catch (const Exception& e) { - QPID_LOG_CAT(error, protocol, "Error on publish: " << e.what()); - pn_condition_t* error = pn_link_condition(link); - pn_condition_set_name(error, e.symbol()); - pn_condition_set_description(error, e.what()); - pn_link_close(link); + } else { + pn_delivery_update(delivery, PN_REJECTED); + } + } else { //i.e. SENDER + Sessions::iterator i = sessions.find(pn_link_session(link)); + if (i != sessions.end()) { + QPID_LOG(trace, id << " handling outgoing delivery for " << link << " on session " << pn_link_session(link)); + i->second->writable(link, delivery); + } else { + QPID_LOG(error, id << " Got delivery for non-existent session: " << pn_link_session(link) << ", link: " << link); } - } else { - pn_delivery_update(delivery, PN_REJECTED); - } - } else { //i.e. SENDER - Sessions::iterator i = sessions.find(pn_link_session(link)); - if (i != sessions.end()) { - QPID_LOG(trace, id << " handling outgoing delivery for " << link << " on session " << pn_link_session(link)); - i->second->writable(link, delivery); - } else { - QPID_LOG(error, id << " Got delivery for non-existent session: " << pn_link_session(link) << ", link: " << link); } + } catch (const Exception& e) { + QPID_LOG_CAT(error, protocol, "Error processing deliveries: " << e.what()); + pn_condition_t* error = pn_link_condition(link); + pn_condition_set_name(error, e.symbol()); + pn_condition_set_description(error, e.what()); + pn_link_close(link); } } diff --git a/qpid/cpp/src/qpid/broker/amqp/Connection.h b/qpid/cpp/src/qpid/broker/amqp/Connection.h index 1384e3560d..e1ae34f899 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Connection.h +++ b/qpid/cpp/src/qpid/broker/amqp/Connection.h @@ -67,6 +67,7 @@ class Connection : public BrokerContext, public sys::ConnectionCodec, public Man const std::string id; bool haveOutput; Sessions sessions; + bool closeInitiated; virtual void process(); std::string getError(); diff --git a/qpid/cpp/src/qpid/broker/amqp/DataReader.cpp b/qpid/cpp/src/qpid/broker/amqp/DataReader.cpp index 1140032174..957134d0e6 100644 --- a/qpid/cpp/src/qpid/broker/amqp/DataReader.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/DataReader.cpp @@ -160,13 +160,15 @@ void DataReader::readArray(pn_data_t* /*data*/, const qpid::amqp::Descriptor* /* void DataReader::readList(pn_data_t* data, const qpid::amqp::Descriptor* descriptor) { size_t count = pn_data_get_list(data); - reader.onStartList(count, qpid::amqp::CharSequence(), descriptor); - pn_data_enter(data); - for (size_t i = 0; i < count && pn_data_next(data); ++i) { - read(data); + bool skip = reader.onStartList(count, qpid::amqp::CharSequence(), descriptor); + if (!skip) { + pn_data_enter(data); + for (size_t i = 0; i < count && pn_data_next(data); ++i) { + read(data); + } + pn_data_exit(data); + reader.onEndList(count, descriptor); } - pn_data_exit(data); - reader.onEndList(count, descriptor); } void DataReader::readMap(pn_data_t* data, const qpid::amqp::Descriptor* descriptor) diff --git a/qpid/cpp/src/qpid/broker/amqp/NodeProperties.cpp b/qpid/cpp/src/qpid/broker/amqp/NodeProperties.cpp index a937c1171e..eb30c78128 100644 --- a/qpid/cpp/src/qpid/broker/amqp/NodeProperties.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/NodeProperties.cpp @@ -20,10 +20,16 @@ */ #include "qpid/broker/amqp/NodeProperties.h" #include "qpid/broker/amqp/DataReader.h" +#include "qpid/broker/QueueSettings.h" #include "qpid/amqp/CharSequence.h" +#include "qpid/amqp/Descriptor.h" +#include "qpid/amqp/descriptors.h" #include "qpid/types/Variant.h" #include "qpid/broker/QueueSettings.h" #include "qpid/log/Statement.h" +extern "C" { +#include +} using qpid::amqp::CharSequence; using qpid::amqp::Descriptor; @@ -36,6 +42,7 @@ namespace { const std::string MOVE("move"); const std::string COPY("copy"); const std::string SUPPORTED_DIST_MODES("supported-dist-modes"); +const std::string LIFETIME_POLICY("lifetime-policy"); //AMQP 0-10 standard parameters: const std::string DURABLE("durable"); @@ -43,9 +50,57 @@ const std::string EXCLUSIVE("exclusive"); const std::string AUTO_DELETE("auto-delete"); const std::string ALTERNATE_EXCHANGE("alternate-exchange"); const std::string EXCHANGE_TYPE("exchange-type"); + +pn_bytes_t convert(const std::string& s) +{ + pn_bytes_t result; + result.start = const_cast(s.data()); + result.size = s.size(); + return result; } -NodeProperties::NodeProperties() : queue(true), durable(false), autoDelete(false), exclusive(false), exchangeType("topic") {} +bool getLifetimePolicy(const Descriptor& d, QueueSettings::LifetimePolicy& policy) +{ + if (d.match(qpid::amqp::lifetime_policy::DELETE_ON_CLOSE_SYMBOL, qpid::amqp::lifetime_policy::DELETE_ON_CLOSE_CODE)) { + policy = QueueSettings::DELETE_ON_CLOSE; + return true; + } else if (d.match(qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_SYMBOL, qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_CODE)) { + policy = QueueSettings::DELETE_IF_UNUSED; + return true; + } else if (d.match(qpid::amqp::lifetime_policy::DELETE_ON_NO_MESSAGES_SYMBOL, qpid::amqp::lifetime_policy::DELETE_ON_NO_MESSAGES_CODE)) { + policy = QueueSettings::DELETE_IF_EMPTY; + return true; + } else if (d.match(qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_OR_MESSAGES_SYMBOL, qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_OR_MESSAGES_CODE)) { + policy = QueueSettings::DELETE_IF_UNUSED_AND_EMPTY; + return true; + } else { + return false; + } +} + +bool getLifetimeDescriptorSymbol(QueueSettings::LifetimePolicy policy, pn_bytes_t& out) +{ + switch (policy) { + case QueueSettings::DELETE_ON_CLOSE: + out = convert(qpid::amqp::lifetime_policy::DELETE_ON_CLOSE_SYMBOL); + return true; + case QueueSettings::DELETE_IF_UNUSED: + out = convert(qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_SYMBOL); + return true; + case QueueSettings::DELETE_IF_EMPTY: + out = convert(qpid::amqp::lifetime_policy::DELETE_ON_NO_MESSAGES_SYMBOL); + return true; + case QueueSettings::DELETE_IF_UNUSED_AND_EMPTY: + out = convert(qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_OR_MESSAGES_SYMBOL); + return true; + default: + return false; + } +} + +} + +NodeProperties::NodeProperties() : queue(true), durable(false), autoDelete(false), exclusive(false), exchangeType("topic"), lifetime(QueueSettings::DELETE_IF_UNUSED) {} void NodeProperties::read(pn_data_t* data) { @@ -53,12 +108,38 @@ void NodeProperties::read(pn_data_t* data) reader.read(data); } -void NodeProperties::process(const std::string& key, const qpid::types::Variant& value) +void NodeProperties::write(pn_data_t* data) +{ + pn_data_put_map(data); + pn_data_enter(data); + pn_data_put_symbol(data, convert(SUPPORTED_DIST_MODES)); + pn_data_put_string(data, convert(queue ? MOVE : COPY)); + pn_bytes_t symbol; + if (autoDelete && getLifetimeDescriptorSymbol(lifetime, symbol)) { + pn_data_put_symbol(data, convert(LIFETIME_POLICY)); + pn_data_put_described(data); + pn_data_enter(data); + pn_data_put_symbol(data, symbol); + pn_data_put_list(data); + pn_data_exit(data); + } + pn_data_exit(data); +} + +void NodeProperties::process(const std::string& key, const qpid::types::Variant& value, const Descriptor* d) { - QPID_LOG(notice, "Processing node property " << key << " = " << value); + QPID_LOG(debug, "Processing node property " << key << " = " << value); if (key == SUPPORTED_DIST_MODES) { if (value == MOVE) queue = true; else if (value == COPY) queue = false; + } else if (key == LIFETIME_POLICY) { + if (d) { + if (getLifetimePolicy(*d, lifetime)) { + autoDelete = true; + } else { + QPID_LOG(warning, "Unrecognised lifetime policy: " << *d); + } + } } else if (key == DURABLE) { durable = value; } else if (key == EXCLUSIVE) { @@ -74,84 +155,91 @@ void NodeProperties::process(const std::string& key, const qpid::types::Variant& } } -void NodeProperties::onNullValue(const CharSequence& key, const Descriptor*) +bool NodeProperties::onStartListValue(const qpid::amqp::CharSequence& key, uint32_t count, const qpid::amqp::Descriptor* d) { - process(key.str(), qpid::types::Variant()); + QPID_LOG(debug, "NodeProperties::onStartListValue(" << std::string(key.data, key.size) << ", " << count << ", " << d); + process(key.str(), qpid::types::Variant(), d); + return true; } -void NodeProperties::onBooleanValue(const CharSequence& key, bool value, const Descriptor*) +void NodeProperties::onNullValue(const CharSequence& key, const Descriptor* d) { - process(key.str(), value); + process(key.str(), qpid::types::Variant(), d); } -void NodeProperties::onUByteValue(const CharSequence& key, uint8_t value, const Descriptor*) +void NodeProperties::onBooleanValue(const CharSequence& key, bool value, const Descriptor* d) { - process(key.str(), value); + process(key.str(), value, d); } -void NodeProperties::onUShortValue(const CharSequence& key, uint16_t value, const Descriptor*) +void NodeProperties::onUByteValue(const CharSequence& key, uint8_t value, const Descriptor* d) { - process(key.str(), value); + process(key.str(), value, d); } -void NodeProperties::onUIntValue(const CharSequence& key, uint32_t value, const Descriptor*) +void NodeProperties::onUShortValue(const CharSequence& key, uint16_t value, const Descriptor* d) { - process(key.str(), value); + process(key.str(), value, d); } -void NodeProperties::onULongValue(const CharSequence& key, uint64_t value, const Descriptor*) +void NodeProperties::onUIntValue(const CharSequence& key, uint32_t value, const Descriptor* d) { - process(key.str(), value); + process(key.str(), value, d); } -void NodeProperties::onByteValue(const CharSequence& key, int8_t value, const Descriptor*) +void NodeProperties::onULongValue(const CharSequence& key, uint64_t value, const Descriptor* d) { - process(key.str(), value); + process(key.str(), value, d); } -void NodeProperties::onShortValue(const CharSequence& key, int16_t value, const Descriptor*) +void NodeProperties::onByteValue(const CharSequence& key, int8_t value, const Descriptor* d) { - process(key.str(), value); + process(key.str(), value, d); } -void NodeProperties::onIntValue(const CharSequence& key, int32_t value, const Descriptor*) +void NodeProperties::onShortValue(const CharSequence& key, int16_t value, const Descriptor* d) { - process(key.str(), value); + process(key.str(), value, d); } -void NodeProperties::onLongValue(const CharSequence& key, int64_t value, const Descriptor*) +void NodeProperties::onIntValue(const CharSequence& key, int32_t value, const Descriptor* d) { - process(key.str(), value); + process(key.str(), value, d); } -void NodeProperties::onFloatValue(const CharSequence& key, float value, const Descriptor*) +void NodeProperties::onLongValue(const CharSequence& key, int64_t value, const Descriptor* d) { - process(key.str(), value); + process(key.str(), value, d); } -void NodeProperties::onDoubleValue(const CharSequence& key, double value, const Descriptor*) +void NodeProperties::onFloatValue(const CharSequence& key, float value, const Descriptor* d) { - process(key.str(), value); + process(key.str(), value, d); } -void NodeProperties::onUuidValue(const CharSequence& key, const CharSequence& value, const Descriptor*) +void NodeProperties::onDoubleValue(const CharSequence& key, double value, const Descriptor* d) { - process(key.str(), value.str()); + process(key.str(), value, d); } -void NodeProperties::onTimestampValue(const CharSequence& key, int64_t value, const Descriptor*) +void NodeProperties::onUuidValue(const CharSequence& key, const CharSequence& value, const Descriptor* d) { - process(key.str(), value); + process(key.str(), value.str(), d); } -void NodeProperties::onStringValue(const CharSequence& key, const CharSequence& value, const Descriptor*) +void NodeProperties::onTimestampValue(const CharSequence& key, int64_t value, const Descriptor* d) { - process(key.str(), value.str()); + process(key.str(), value, d); } -void NodeProperties::onSymbolValue(const CharSequence& key, const CharSequence& value, const Descriptor*) +void NodeProperties::onStringValue(const CharSequence& key, const CharSequence& value, const Descriptor* d) { - process(key.str(), value.str()); + process(key.str(), value.str(), d); +} + +void NodeProperties::onSymbolValue(const CharSequence& key, const CharSequence& value, const Descriptor* d) +{ + process(key.str(), value.str(), d); } QueueSettings NodeProperties::getQueueSettings() @@ -159,6 +247,7 @@ QueueSettings NodeProperties::getQueueSettings() QueueSettings settings(durable, autoDelete); qpid::types::Variant::Map unused; settings.populate(properties, unused); + settings.lifetime = lifetime; return settings; } @@ -183,4 +272,9 @@ std::string NodeProperties::getAlternateExchange() const return alternateExchange; } +bool NodeProperties::trackControllingLink() const +{ + return lifetime == QueueSettings::DELETE_ON_CLOSE || lifetime == QueueSettings::DELETE_IF_EMPTY; +} + }}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/NodeProperties.h b/qpid/cpp/src/qpid/broker/amqp/NodeProperties.h index 881fc4e30f..03780c10a9 100644 --- a/qpid/cpp/src/qpid/broker/amqp/NodeProperties.h +++ b/qpid/cpp/src/qpid/broker/amqp/NodeProperties.h @@ -23,6 +23,7 @@ */ #include "qpid/amqp/MapReader.h" #include "qpid/types/Variant.h" +#include "qpid/broker/QueueSettings.h" struct pn_data_t; namespace qpid { @@ -35,6 +36,7 @@ class NodeProperties : public qpid::amqp::MapReader public: NodeProperties(); void read(pn_data_t*); + void write(pn_data_t*); void onNullValue(const qpid::amqp::CharSequence&, const qpid::amqp::Descriptor*); void onBooleanValue(const qpid::amqp::CharSequence&, bool, const qpid::amqp::Descriptor*); void onUByteValue(const qpid::amqp::CharSequence&, uint8_t, const qpid::amqp::Descriptor*); @@ -51,12 +53,14 @@ class NodeProperties : public qpid::amqp::MapReader void onTimestampValue(const qpid::amqp::CharSequence&, int64_t, const qpid::amqp::Descriptor*); void onStringValue(const qpid::amqp::CharSequence&, const qpid::amqp::CharSequence&, const qpid::amqp::Descriptor*); void onSymbolValue(const qpid::amqp::CharSequence&, const qpid::amqp::CharSequence&, const qpid::amqp::Descriptor*); + bool onStartListValue(const qpid::amqp::CharSequence&, uint32_t count, const qpid::amqp::Descriptor*); bool isQueue() const; QueueSettings getQueueSettings(); bool isDurable() const; bool isExclusive() const; std::string getExchangeType() const; std::string getAlternateExchange() const; + bool trackControllingLink() const; private: bool queue; bool durable; @@ -65,8 +69,9 @@ class NodeProperties : public qpid::amqp::MapReader std::string exchangeType; std::string alternateExchange; qpid::types::Variant::Map properties; + QueueSettings::LifetimePolicy lifetime; - void process(const std::string&, const qpid::types::Variant&); + void process(const std::string&, const qpid::types::Variant&, const qpid::amqp::Descriptor*); }; }}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp index 7dbafb2fd1..68ff979aa4 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp @@ -19,6 +19,7 @@ * */ #include "qpid/broker/amqp/Outgoing.h" +#include "qpid/broker/amqp/Exception.h" #include "qpid/broker/amqp/Header.h" #include "qpid/broker/amqp/Session.h" #include "qpid/broker/amqp/Translation.h" @@ -26,7 +27,9 @@ #include "qpid/broker/Selector.h" #include "qpid/broker/TopicKeyNode.h" #include "qpid/sys/OutputControl.h" +#include "qpid/amqp/descriptors.h" #include "qpid/amqp/MessageEncoder.h" +#include "qpid/framing/reply_exceptions.h" #include "qpid/log/Statement.h" namespace qpid { @@ -41,10 +44,11 @@ void Outgoing::wakeup() session.wakeup(); } -OutgoingFromQueue::OutgoingFromQueue(Broker& broker, const std::string& source, const std::string& target, boost::shared_ptr q, pn_link_t* l, Session& session, qpid::sys::OutputControl& o, bool e) +OutgoingFromQueue::OutgoingFromQueue(Broker& broker, const std::string& source, const std::string& target, boost::shared_ptr q, pn_link_t* l, Session& session, qpid::sys::OutputControl& o, bool e, bool p) : Outgoing(broker, session, source, target, pn_link_name(l)), Consumer(pn_link_name(l), /*FIXME*/CONSUMER), exclusive(e), + isControllingUser(p), queue(q), deliveries(5000), link(l), out(o), current(0), outstanding(0), buffer(1024)/*used only for header at present*/ @@ -52,6 +56,7 @@ OutgoingFromQueue::OutgoingFromQueue(Broker& broker, const std::string& source, for (size_t i = 0 ; i < deliveries.capacity(); ++i) { deliveries[i].init(i); } + if (isControllingUser) queue->markInUse(true); } void OutgoingFromQueue::init() @@ -63,11 +68,15 @@ bool OutgoingFromQueue::doWork() { QPID_LOG(trace, "Dispatching to " << getName() << ": " << pn_link_credit(link)); if (canDeliver()) { - if (queue->dispatch(shared_from_this())) { - return true; - } else { - pn_link_drained(link); - QPID_LOG(debug, "No message available on " << queue->getName()); + try{ + if (queue->dispatch(shared_from_this())) { + return true; + } else { + pn_link_drained(link); + QPID_LOG(debug, "No message available on " << queue->getName()); + } + } catch (const qpid::framing::ResourceDeletedException& e) { + throw Exception(qpid::amqp::error_conditions::RESOURCE_DELETED, e.what()); } } else { QPID_LOG(debug, "Can't deliver to " << getName() << " from " << queue->getName() << ": " << pn_link_credit(link)); @@ -142,14 +151,14 @@ bool OutgoingFromQueue::canDeliver() void OutgoingFromQueue::detached() { - QPID_LOG(debug, "Detaching outgoing link from " << queue->getName()); + QPID_LOG(debug, "Detaching outgoing link " << getName() << " from " << queue->getName()); queue->cancel(shared_from_this()); //TODO: release in a clearer order? for (size_t i = 0 ; i < deliveries.capacity(); ++i) { if (deliveries[i].msg) queue->release(deliveries[i].cursor, true); } if (exclusive) queue->releaseExclusiveOwnership(); - Queue::tryAutoDelete(*queue->getBroker(), queue, "", ""); + else if (isControllingUser) queue->releaseFromUse(true); } //Consumer interface: diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h index a63a8cc0a6..86d7d46111 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h +++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h @@ -88,7 +88,7 @@ class Outgoing : public ManagedOutgoingLink class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, public boost::enable_shared_from_this { public: - OutgoingFromQueue(Broker&, const std::string& source, const std::string& target, boost::shared_ptr q, pn_link_t* l, Session&, qpid::sys::OutputControl& o, bool exclusive); + OutgoingFromQueue(Broker&, const std::string& source, const std::string& target, boost::shared_ptr q, pn_link_t* l, Session&, qpid::sys::OutputControl& o, bool exclusive, bool isControllingUser); void setSubjectFilter(const std::string&); void setSelectorFilter(const std::string&); void init(); @@ -124,6 +124,7 @@ class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, public }; const bool exclusive; + const bool isControllingUser; boost::shared_ptr queue; CircularArray deliveries; pn_link_t* link; diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp index ddfbc7de52..e6ea694d54 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp @@ -24,9 +24,11 @@ #include "Message.h" #include "Connection.h" #include "Domain.h" +#include "Exception.h" #include "Interconnects.h" #include "Relay.h" #include "Topic.h" +#include "qpid/amqp/descriptors.h" #include "qpid/broker/Broker.h" #include "qpid/broker/DeliverableMessage.h" #include "qpid/broker/Exchange.h" @@ -112,10 +114,16 @@ void setCapabilities(pn_data_t* in, pn_data_t* out, boost::shared_ptr class IncomingToQueue : public DecodingIncoming { public: - IncomingToQueue(Broker& b, Session& p, boost::shared_ptr q, pn_link_t* l, const std::string& source) : DecodingIncoming(l, b, p, source, q->getName(), pn_link_name(l)), queue(q) {} + IncomingToQueue(Broker& b, Session& p, boost::shared_ptr q, pn_link_t* l, const std::string& source, bool icl) + : DecodingIncoming(l, b, p, source, q->getName(), pn_link_name(l)), queue(q), isControllingLink(icl) + { + queue->markInUse(isControllingLink); + } + ~IncomingToQueue() { queue->releaseFromUse(isControllingLink); } void handle(qpid::broker::Message& m); private: boost::shared_ptr queue; + bool isControllingLink; }; class IncomingToExchange : public DecodingIncoming @@ -152,6 +160,7 @@ Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* te node.exchange = connection.getBroker().createExchange(name, node.properties.getExchangeType(), node.properties.isDurable(), node.properties.getAlternateExchange(), args, connection.getUserId(), connection.getId()).first; } + node.created = true; } else { size_t i = name.find('@'); if (i != std::string::npos && (i+1) < name.length()) { @@ -188,7 +197,12 @@ Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* te std::string Session::generateName(pn_link_t* link) { std::stringstream s; - s << qpid::types::Uuid(true) << "::" << pn_link_name(link); + if (connection.getContainerId().empty()) { + s << qpid::types::Uuid(true); + } else { + s << connection.getContainerId(); + } + s << "_" << pn_link_name(link); return s.str(); } @@ -210,7 +224,7 @@ void Session::attach(pn_link_t* link) //i.e a subscription std::string name; if (pn_terminus_get_type(source) == PN_UNSPECIFIED) { - throw qpid::Exception("No source specified!");/*invalid-field?*/ + throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, "No source specified!"); } else if (pn_terminus_is_dynamic(source)) { name = generateName(link); QPID_LOG(debug, "Received attach request for outgoing link from " << name); @@ -226,7 +240,7 @@ void Session::attach(pn_link_t* link) pn_terminus_t* target = pn_link_remote_target(link); std::string name; if (pn_terminus_get_type(target) == PN_UNSPECIFIED) { - throw qpid::Exception("No target specified!");/*invalid field?*/ + throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, "No target specified!"); } else if (pn_terminus_is_dynamic(target)) { name = generateName(link); QPID_LOG(debug, "Received attach request for incoming link to " << name); @@ -252,6 +266,9 @@ void Session::setupIncoming(pn_link_t* link, pn_terminus_t* target, const std::s setCapabilities(pn_terminus_capabilities(target), pn_terminus_capabilities(pn_link_target(link)), node.exchange); authorise.incoming(node.exchange); } + if (node.created) { + node.properties.write(pn_terminus_properties(pn_link_target(link))); + } const char* sourceAddress = pn_terminus_get_address(pn_link_remote_source(link)); if (!sourceAddress) { @@ -262,7 +279,7 @@ void Session::setupIncoming(pn_link_t* link, pn_terminus_t* target, const std::s source = sourceAddress; } if (node.queue) { - boost::shared_ptr q(new IncomingToQueue(connection.getBroker(), *this, node.queue, link, source)); + boost::shared_ptr q(new IncomingToQueue(connection.getBroker(), *this, node.queue, link, source, node.properties.trackControllingLink())); incoming[link] = q; } else if (node.exchange) { boost::shared_ptr e(new IncomingToExchange(connection.getBroker(), *this, node.exchange, link, source)); @@ -272,7 +289,7 @@ void Session::setupIncoming(pn_link_t* link, pn_terminus_t* target, const std::s incoming[link] = in; } else { pn_terminus_set_type(pn_link_target(link), PN_UNSPECIFIED); - throw qpid::Exception("Node not found: " + name);/*not-found*/ + throw Exception(qpid::amqp::error_conditions::NOT_FOUND, std::string("Node not found: ") + name); } if (connection.getBroker().getOptions().auth && !connection.isLink()) incoming[link]->verify(connection.getUserId(), connection.getBroker().getOptions().realm); @@ -284,6 +301,9 @@ void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::s ResolvedNode node = resolve(name, source, false); if (node.queue) setCapabilities(pn_terminus_capabilities(source), pn_terminus_capabilities(pn_link_source(link)), node.queue); else if (node.exchange) setCapabilities(pn_terminus_capabilities(source), pn_terminus_capabilities(pn_link_source(link)), node.exchange); + if (node.created) { + node.properties.write(pn_terminus_properties(pn_link_source(link))); + } Filter filter; filter.read(pn_terminus_filter(source)); @@ -299,7 +319,7 @@ void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::s if (node.queue) { authorise.outgoing(node.queue); - boost::shared_ptr q(new OutgoingFromQueue(connection.getBroker(), name, target, node.queue, link, *this, out, false)); + boost::shared_ptr q(new OutgoingFromQueue(connection.getBroker(), name, target, node.queue, link, *this, out, false, node.properties.trackControllingLink())); q->init(); filter.apply(q); outgoing[link] = q; @@ -330,7 +350,7 @@ void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::s if (!shared) queue->setExclusiveOwner(this); authorise.outgoing(node.exchange, queue, filter); filter.bind(node.exchange, queue); - boost::shared_ptr q(new OutgoingFromQueue(connection.getBroker(), name, target, queue, link, *this, out, !shared)); + boost::shared_ptr q(new OutgoingFromQueue(connection.getBroker(), name, target, queue, link, *this, out, !shared, false)); outgoing[link] = q; q->init(); } else if (node.relay) { @@ -339,7 +359,7 @@ void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::s out->init(); } else { pn_terminus_set_type(pn_link_source(link), PN_UNSPECIFIED); - throw qpid::Exception("Node not found: " + name);/*not-found*/ + throw Exception(qpid::amqp::error_conditions::NOT_FOUND, std::string("Node not found: ") + name);/*not-found*/ } filter.write(pn_terminus_filter(pn_link_source(link))); QPID_LOG(debug, "Outgoing link attached"); @@ -438,8 +458,19 @@ void Session::writable(pn_link_t* link, pn_delivery_t* delivery) bool Session::dispatch() { bool output(false); - for (OutgoingLinks::iterator s = outgoing.begin(); s != outgoing.end(); ++s) { - if (s->second->doWork()) output = true; + for (OutgoingLinks::iterator s = outgoing.begin(); s != outgoing.end();) { + try { + if (s->second->doWork()) output = true; + ++s; + } catch (const Exception& e) { + pn_condition_t* error = pn_link_condition(s->first); + pn_condition_set_name(error, e.symbol()); + pn_condition_set_description(error, e.what()); + pn_link_close(s->first); + s->second->detached(); + outgoing.erase(s++); + output = true; + } } if (completed.size()) { output = true; @@ -452,8 +483,19 @@ bool Session::dispatch() accepted(*i, true); } } - for (IncomingLinks::iterator i = incoming.begin(); i != incoming.end(); ++i) { - if (i->second->doWork()) output = true; + for (IncomingLinks::iterator i = incoming.begin(); i != incoming.end();) { + try { + if (i->second->doWork()) output = true; + ++i; + } catch (const Exception& e) { + pn_condition_t* error = pn_link_condition(i->first); + pn_condition_set_name(error, e.symbol()); + pn_condition_set_description(error, e.what()); + pn_link_close(i->first); + i->second->detached(); + incoming.erase(i++); + output = true; + } } return output; @@ -470,7 +512,7 @@ void Session::close() } outgoing.clear(); incoming.clear(); - QPID_LOG(debug, "Session closed, all links detached."); + QPID_LOG(debug, "Session " << session << " closed, all links detached."); for (std::set< boost::shared_ptr >::const_iterator i = exclusiveQueues.begin(); i != exclusiveQueues.end(); ++i) { (*i)->releaseExclusiveOwnership(); } @@ -490,6 +532,11 @@ Authorise& Session::getAuthorise() void IncomingToQueue::handle(qpid::broker::Message& message) { + if (queue->isDeleted()) { + std::stringstream msg; + msg << " Queue " << queue->getName() << " has been deleted"; + throw Exception(qpid::amqp::error_conditions::RESOURCE_DELETED, msg.str()); + } queue->deliver(message); } diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.h b/qpid/cpp/src/qpid/broker/amqp/Session.h index a991ac9e3e..b94d3c226d 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Session.h +++ b/qpid/cpp/src/qpid/broker/amqp/Session.h @@ -100,6 +100,8 @@ class Session : public ManagedSession, public boost::enable_shared_from_this topic; boost::shared_ptr relay; NodeProperties properties; + bool created; + ResolvedNode() : created(false) {} }; ResolvedNode resolve(const std::string name, pn_terminus_t* terminus, bool incoming); diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index 42ced2988d..17b00185ef 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -871,7 +871,8 @@ void BrokerReplicator::autoDeleteCheck(boost::shared_ptr ex) { if (qr->getQueue()->isAutoDelete() && qr->isSubscribed()) { if (qr->getQueue()->getSettings().autoDeleteDelay) { // Start the auto-delete timer - Queue::tryAutoDelete(broker, qr->getQueue(), remoteHost, userId); + qr->getQueue()->releaseFromUse(); + qr->getQueue()->scheduleAutoDelete(); } else { // Delete immediately. Don't purge, the primary is gone so we need diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index 4c3c209eab..d99602fdda 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -121,6 +121,7 @@ QueueReplicator::QueueReplicator(HaBroker& hb, framing::FieldTable args = getArgs(); args.setString(QPID_REPLICATE, printable(NONE).str()); setArgs(args); + if (q->isAutoDelete()) q->markInUse(); } // This must be called immediately after the constructor. diff --git a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp index 829459eda6..9ecb46d872 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp @@ -75,6 +75,12 @@ const std::string MOVE("move"); const std::string COPY("copy"); const std::string SUPPORTED_DIST_MODES("supported-dist-modes"); +const std::string AUTO_DELETE("auto-delete"); +const std::string LIFETIME_POLICY("lifetime-policy"); +const std::string DELETE_ON_CLOSE("delete-on-close"); +const std::string DELETE_IF_UNUSED("delete-if-unused"); +const std::string DELETE_IF_EMPTY("delete-if-empty"); +const std::string DELETE_IF_UNUSED_AND_EMPTY("delete-if-unused-and-empty"); const std::string CREATE_ON_DEMAND("create-on-demand"); const std::string DUMMY("."); @@ -308,6 +314,10 @@ AddressHelper::AddressHelper(const Address& address) : add(properties, x_declare); node.erase(i); } + //for temp queues, if neither lifetime-policy nor autodelete are specified, assume delete-on-close + if (isTemporary && properties.find(LIFETIME_POLICY) == properties.end() && properties.find(AUTO_DELETE) == properties.end()) { + properties[LIFETIME_POLICY] = DELETE_ON_CLOSE; + } if (properties.size() && !(isTemporary || createPolicy.size())) { QPID_LOG(warning, "Properties will be ignored! " << address); @@ -559,7 +569,24 @@ std::string AddressHelper::getLinkName(const Address& address) return name.str(); } } - +namespace { +std::string toLifetimePolicy(const std::string& value) +{ + if (value == DELETE_ON_CLOSE) return qpid::amqp::lifetime_policy::DELETE_ON_CLOSE_SYMBOL; + else if (value == DELETE_IF_UNUSED) return qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_SYMBOL; + else if (value == DELETE_IF_EMPTY) return qpid::amqp::lifetime_policy::DELETE_ON_NO_MESSAGES_SYMBOL; + else if (value == DELETE_IF_UNUSED_AND_EMPTY) return qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_OR_MESSAGES_SYMBOL; + else return value;//asume value is itself the symbolic descriptor +} +void putLifetimePolicy(pn_data_t* data, const std::string& value) +{ + pn_data_put_described(data); + pn_data_enter(data); + pn_data_put_symbol(data, convert(value)); + pn_data_put_list(data); + pn_data_exit(data); +} +} void AddressHelper::setNodeProperties(pn_terminus_t* terminus) { if (properties.size() || type.size()) { @@ -575,8 +602,13 @@ void AddressHelper::setNodeProperties(pn_terminus_t* terminus) pn_data_put_bool(data, true); } for (qpid::types::Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) { - pn_data_put_symbol(data, convert(i->first)); - pn_data_put_string(data, convert(i->second.asString())); + if (i->first == LIFETIME_POLICY) { + pn_data_put_symbol(data, convert(i->first)); + putLifetimePolicy(data, toLifetimePolicy(i->second.asString())); + } else { + pn_data_put_symbol(data, convert(i->first)); + pn_data_put_string(data, convert(i->second.asString())); + } } pn_data_exit(data); } diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp index 1177bf7119..a9769740d6 100644 --- a/qpid/cpp/src/tests/QueueTest.cpp +++ b/qpid/cpp/src/tests/QueueTest.cpp @@ -388,6 +388,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) { // Owners= ---, ---, --- TestConsumer::shared_ptr c3(new TestConsumer("C3")); + queue->consume(c3); std::deque dequeMeC3; verifyAcquire(queue, c3, dequeMeC3, "a", 2 ); -- cgit v1.2.1