diff options
author | Kim van der Riet <kpvdr@apache.org> | 2013-10-22 13:16:43 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2013-10-22 13:16:43 +0000 |
commit | 220f7ec525b29efd94113f2d3be511c5156c3b3a (patch) | |
tree | 4a6b950101c98bd84ae28f341652522a940bca94 /qpid/cpp/src/qpid | |
parent | 7348c75f13673a6a0434bfddc8dff474ba6b69c2 (diff) | |
download | qpid-python-220f7ec525b29efd94113f2d3be511c5156c3b3a.tar.gz |
QPID-4984: WIP - Merge from trunk r.1534626.linearstore
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/linearstore@1534627 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid')
-rw-r--r-- | qpid/cpp/src/qpid/acl/AclValidator.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueueSettings.cpp | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueueSettings.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Outgoing.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Session.cpp | 4 |
7 files changed, 20 insertions, 4 deletions
diff --git a/qpid/cpp/src/qpid/acl/AclValidator.cpp b/qpid/cpp/src/qpid/acl/AclValidator.cpp index 73b49b2959..a077667a33 100644 --- a/qpid/cpp/src/qpid/acl/AclValidator.cpp +++ b/qpid/cpp/src/qpid/acl/AclValidator.cpp @@ -110,7 +110,7 @@ namespace acl { boost::shared_ptr<PropertyType>( new IntPropertyType(0,std::numeric_limits<int64_t>::max())))); - std::string policyTypes[] = {"ring", "ring_strict", "flow_to_disk", "reject"}; + std::string policyTypes[] = {"ring", "self-destruct", "reject"}; std::vector<std::string> v(policyTypes, policyTypes + sizeof(policyTypes) / sizeof(std::string)); validators.insert(Validator(acl::SPECPROP_POLICYTYPE, boost::shared_ptr<PropertyType>( diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index dd2769ec92..3f3c769435 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -1285,7 +1285,7 @@ std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue( params.insert(make_pair(acl::PROP_DURABLE, settings.durable ? _TRUE : _FALSE)); params.insert(make_pair(acl::PROP_EXCLUSIVE, owner ? _TRUE : _FALSE)); params.insert(make_pair(acl::PROP_AUTODELETE, settings.autodelete ? _TRUE : _FALSE)); - params.insert(make_pair(acl::PROP_POLICYTYPE, settings.dropMessagesAtLimit ? "ring" : "reject")); + params.insert(make_pair(acl::PROP_POLICYTYPE, settings.getLimitPolicy())); params.insert(make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast<string>(settings.maxDepth.getCount()))); params.insert(make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast<string>(settings.maxDepth.getSize()))); params.insert(make_pair(acl::PROP_MAXFILECOUNT, boost::lexical_cast<string>(settings.maxFileCount))); @@ -1338,7 +1338,7 @@ void Broker::deleteQueue(const std::string& name, const std::string& userId, params.insert(make_pair(acl::PROP_DURABLE, queue->isDurable() ? _TRUE : _FALSE)); params.insert(make_pair(acl::PROP_EXCLUSIVE, queue->hasExclusiveOwner() ? _TRUE : _FALSE)); params.insert(make_pair(acl::PROP_AUTODELETE, queue->isAutoDelete() ? _TRUE : _FALSE)); - params.insert(make_pair(acl::PROP_POLICYTYPE, queue->getSettings().dropMessagesAtLimit ? "ring" : "reject")); + params.insert(make_pair(acl::PROP_POLICYTYPE, queue->getSettings().getLimitPolicy())); if (!acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_QUEUE,name,¶ms) ) throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue delete request from " << userId)); diff --git a/qpid/cpp/src/qpid/broker/QueueSettings.cpp b/qpid/cpp/src/qpid/broker/QueueSettings.cpp index 0b4a268489..53194cf064 100644 --- a/qpid/cpp/src/qpid/broker/QueueSettings.cpp +++ b/qpid/cpp/src/qpid/broker/QueueSettings.cpp @@ -314,4 +314,10 @@ QueueSettings::Aliases::Aliases() insert(value_type("x-qpid-maximum-message-size", "qpid.alert_size")); } +std::string QueueSettings::getLimitPolicy() const +{ + if (dropMessagesAtLimit) return POLICY_TYPE_RING; + else if (selfDestructAtLimit) return POLICY_TYPE_SELF_DESTRUCT; + else return POLICY_TYPE_REJECT; +} }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/QueueSettings.h b/qpid/cpp/src/qpid/broker/QueueSettings.h index e150e365c2..9fda51e17a 100644 --- a/qpid/cpp/src/qpid/broker/QueueSettings.h +++ b/qpid/cpp/src/qpid/broker/QueueSettings.h @@ -111,6 +111,7 @@ struct QueueSettings QPID_BROKER_EXTERN void populate(const std::map<std::string, qpid::types::Variant>& inputs, std::map<std::string, qpid::types::Variant>& unused); QPID_BROKER_EXTERN void populate(const qpid::framing::FieldTable& inputs, qpid::framing::FieldTable& unused); QPID_BROKER_EXTERN std::map<std::string, qpid::types::Variant> asMap() const; + std::string getLimitPolicy() const; struct Aliases : std::map<std::string, std::string> { diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp index 1b7a47b360..7a0bc6c0d8 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp @@ -300,6 +300,12 @@ size_t OutgoingFromQueue::Record::getIndex(pn_delivery_tag_t t) return (size_t) buffer.getLong(); } +boost::shared_ptr<Queue> OutgoingFromQueue::getExclusiveSubscriptionQueue(Outgoing* o) +{ + OutgoingFromQueue* s = dynamic_cast<OutgoingFromQueue*>(o); + if (s && s->exclusive) return s->queue; + else return boost::shared_ptr<Queue>(); +} }}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h index 81994f2b66..48f041171c 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h +++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h @@ -107,6 +107,7 @@ class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, public void cancel(); void acknowledged(const qpid::broker::DeliveryRecord&); qpid::broker::OwnershipToken* getSession(); + static boost::shared_ptr<Queue> getExclusiveSubscriptionQueue(Outgoing*); private: diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp index c4265ef420..9203ce17e4 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp @@ -396,7 +396,7 @@ void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::s authorise.access(node.exchange);//do separate access check before trying to create the queue bool shared = is_capability_requested(SHARED, pn_terminus_capabilities(source)); bool durable = pn_terminus_get_durability(source); - bool autodelete = !durable && pn_link_remote_snd_settle_mode(link) == PN_SND_SETTLED; + bool autodelete = !durable && pn_link_remote_snd_settle_mode(link) != PN_SND_UNSETTLED; QueueSettings settings(durable, autodelete); std::string altExchange; if (node.topic) { @@ -475,6 +475,8 @@ void Session::detach(pn_link_t* link) OutgoingLinks::iterator i = outgoing.find(link); if (i != outgoing.end()) { i->second->detached(); + boost::shared_ptr<Queue> q = OutgoingFromQueue::getExclusiveSubscriptionQueue(i->second.get()); + if (q) connection.getBroker().deleteQueue(q->getName(), connection.getUserId(), connection.getMgmtId()); outgoing.erase(i); QPID_LOG(debug, "Outgoing link detached"); } |