From 220f7ec525b29efd94113f2d3be511c5156c3b3a Mon Sep 17 00:00:00 2001 From: Kim van der Riet Date: Tue, 22 Oct 2013 13:16:43 +0000 Subject: QPID-4984: WIP - Merge from trunk r.1534626. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/linearstore@1534627 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/acl/AclValidator.cpp | 2 +- qpid/cpp/src/qpid/broker/Broker.cpp | 4 ++-- qpid/cpp/src/qpid/broker/QueueSettings.cpp | 6 ++++++ qpid/cpp/src/qpid/broker/QueueSettings.h | 1 + qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp | 6 ++++++ qpid/cpp/src/qpid/broker/amqp/Outgoing.h | 1 + qpid/cpp/src/qpid/broker/amqp/Session.cpp | 4 +++- qpid/cpp/src/tests/acl.py | 15 +++++++++++++-- 8 files changed, 33 insertions(+), 6 deletions(-) diff --git a/qpid/cpp/src/qpid/acl/AclValidator.cpp b/qpid/cpp/src/qpid/acl/AclValidator.cpp index 73b49b2959..a077667a33 100644 --- a/qpid/cpp/src/qpid/acl/AclValidator.cpp +++ b/qpid/cpp/src/qpid/acl/AclValidator.cpp @@ -110,7 +110,7 @@ namespace acl { boost::shared_ptr( new IntPropertyType(0,std::numeric_limits::max())))); - std::string policyTypes[] = {"ring", "ring_strict", "flow_to_disk", "reject"}; + std::string policyTypes[] = {"ring", "self-destruct", "reject"}; std::vector v(policyTypes, policyTypes + sizeof(policyTypes) / sizeof(std::string)); validators.insert(Validator(acl::SPECPROP_POLICYTYPE, boost::shared_ptr( diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index dd2769ec92..3f3c769435 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -1285,7 +1285,7 @@ std::pair, bool> Broker::createQueue( params.insert(make_pair(acl::PROP_DURABLE, settings.durable ? _TRUE : _FALSE)); params.insert(make_pair(acl::PROP_EXCLUSIVE, owner ? _TRUE : _FALSE)); params.insert(make_pair(acl::PROP_AUTODELETE, settings.autodelete ? _TRUE : _FALSE)); - params.insert(make_pair(acl::PROP_POLICYTYPE, settings.dropMessagesAtLimit ? "ring" : "reject")); + params.insert(make_pair(acl::PROP_POLICYTYPE, settings.getLimitPolicy())); params.insert(make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast(settings.maxDepth.getCount()))); params.insert(make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast(settings.maxDepth.getSize()))); params.insert(make_pair(acl::PROP_MAXFILECOUNT, boost::lexical_cast(settings.maxFileCount))); @@ -1338,7 +1338,7 @@ void Broker::deleteQueue(const std::string& name, const std::string& userId, params.insert(make_pair(acl::PROP_DURABLE, queue->isDurable() ? _TRUE : _FALSE)); params.insert(make_pair(acl::PROP_EXCLUSIVE, queue->hasExclusiveOwner() ? _TRUE : _FALSE)); params.insert(make_pair(acl::PROP_AUTODELETE, queue->isAutoDelete() ? _TRUE : _FALSE)); - params.insert(make_pair(acl::PROP_POLICYTYPE, queue->getSettings().dropMessagesAtLimit ? "ring" : "reject")); + params.insert(make_pair(acl::PROP_POLICYTYPE, queue->getSettings().getLimitPolicy())); if (!acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_QUEUE,name,¶ms) ) throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue delete request from " << userId)); diff --git a/qpid/cpp/src/qpid/broker/QueueSettings.cpp b/qpid/cpp/src/qpid/broker/QueueSettings.cpp index 0b4a268489..53194cf064 100644 --- a/qpid/cpp/src/qpid/broker/QueueSettings.cpp +++ b/qpid/cpp/src/qpid/broker/QueueSettings.cpp @@ -314,4 +314,10 @@ QueueSettings::Aliases::Aliases() insert(value_type("x-qpid-maximum-message-size", "qpid.alert_size")); } +std::string QueueSettings::getLimitPolicy() const +{ + if (dropMessagesAtLimit) return POLICY_TYPE_RING; + else if (selfDestructAtLimit) return POLICY_TYPE_SELF_DESTRUCT; + else return POLICY_TYPE_REJECT; +} }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/QueueSettings.h b/qpid/cpp/src/qpid/broker/QueueSettings.h index e150e365c2..9fda51e17a 100644 --- a/qpid/cpp/src/qpid/broker/QueueSettings.h +++ b/qpid/cpp/src/qpid/broker/QueueSettings.h @@ -111,6 +111,7 @@ struct QueueSettings QPID_BROKER_EXTERN void populate(const std::map& inputs, std::map& unused); QPID_BROKER_EXTERN void populate(const qpid::framing::FieldTable& inputs, qpid::framing::FieldTable& unused); QPID_BROKER_EXTERN std::map asMap() const; + std::string getLimitPolicy() const; struct Aliases : std::map { diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp index 1b7a47b360..7a0bc6c0d8 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp @@ -300,6 +300,12 @@ size_t OutgoingFromQueue::Record::getIndex(pn_delivery_tag_t t) return (size_t) buffer.getLong(); } +boost::shared_ptr OutgoingFromQueue::getExclusiveSubscriptionQueue(Outgoing* o) +{ + OutgoingFromQueue* s = dynamic_cast(o); + if (s && s->exclusive) return s->queue; + else return boost::shared_ptr(); +} }}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h index 81994f2b66..48f041171c 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h +++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h @@ -107,6 +107,7 @@ class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, public void cancel(); void acknowledged(const qpid::broker::DeliveryRecord&); qpid::broker::OwnershipToken* getSession(); + static boost::shared_ptr getExclusiveSubscriptionQueue(Outgoing*); private: diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp index c4265ef420..9203ce17e4 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp @@ -396,7 +396,7 @@ void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::s authorise.access(node.exchange);//do separate access check before trying to create the queue bool shared = is_capability_requested(SHARED, pn_terminus_capabilities(source)); bool durable = pn_terminus_get_durability(source); - bool autodelete = !durable && pn_link_remote_snd_settle_mode(link) == PN_SND_SETTLED; + bool autodelete = !durable && pn_link_remote_snd_settle_mode(link) != PN_SND_UNSETTLED; QueueSettings settings(durable, autodelete); std::string altExchange; if (node.topic) { @@ -475,6 +475,8 @@ void Session::detach(pn_link_t* link) OutgoingLinks::iterator i = outgoing.find(link); if (i != outgoing.end()) { i->second->detached(); + boost::shared_ptr q = OutgoingFromQueue::getExclusiveSubscriptionQueue(i->second.get()); + if (q) connection.getBroker().deleteQueue(q->getName(), connection.getUserId(), connection.getMgmtId()); outgoing.erase(i); QPID_LOG(debug, "Outgoing link detached"); } diff --git a/qpid/cpp/src/tests/acl.py b/qpid/cpp/src/tests/acl.py index 666ae46a39..fe8254a6fa 100755 --- a/qpid/cpp/src/tests/acl.py +++ b/qpid/cpp/src/tests/acl.py @@ -382,8 +382,7 @@ class ACLTests(TestBase010): aclf.close() result = self.reload_acl() - expected = "ding is not a valid value for 'policytype', possible values are one of" \ - " { 'ring' 'ring_strict' 'flow_to_disk' 'reject' }"; + expected = "ding is not a valid value for 'policytype', possible values are one of" if (result.find(expected) == -1): self.fail(result) @@ -802,6 +801,7 @@ class ACLTests(TestBase010): aclf.write('acl allow bob@QPID delete queue name=q4\n') aclf.write('acl allow bob@QPID create queue name=q5 maxqueuesize=1000 maxqueuecount=100\n') aclf.write('acl allow bob@QPID create queue name=q6 queuemaxsizelowerlimit=50 queuemaxsizeupperlimit=100 queuemaxcountlowerlimit=50 queuemaxcountupperlimit=100\n') + aclf.write('acl allow bob@QPID create queue name=q7 policytype=self-destruct\n') aclf.write('acl allow anonymous all all\n') aclf.write('acl deny all all') aclf.close() @@ -914,6 +914,17 @@ class ACLTests(TestBase010): if (403 == e.args[0].error_code): self.fail("ACL should allow queue create request for q2 with exclusive=true policytype=ring"); + try: + session.queue_declare(queue="q7", arguments={"qpid.policy_type": "ring"}) + self.fail("ACL should not allow queue create request for q7 with policytype=ring"); + except qpid.session.SessionException, e: + session = self.get_session('bob','bob') + + try: + session.queue_declare(queue="q7", arguments={"qpid.policy_type": "self-destruct"}) + except qpid.session.SessionException, e: + self.fail("ACL should allow queue create request for q7 with policytype=self-destruct"); + try: session.queue_declare(queue="q3") session.queue_declare(queue="q4") -- cgit v1.2.1