summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2013-10-22 13:16:43 +0000
committerKim van der Riet <kpvdr@apache.org>2013-10-22 13:16:43 +0000
commit220f7ec525b29efd94113f2d3be511c5156c3b3a (patch)
tree4a6b950101c98bd84ae28f341652522a940bca94
parent7348c75f13673a6a0434bfddc8dff474ba6b69c2 (diff)
downloadqpid-python-linearstore.tar.gz
QPID-4984: WIP - Merge from trunk r.1534626.linearstore
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/linearstore@1534627 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/acl/AclValidator.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp4
-rw-r--r--qpid/cpp/src/qpid/broker/QueueSettings.cpp6
-rw-r--r--qpid/cpp/src/qpid/broker/QueueSettings.h1
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp6
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Outgoing.h1
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Session.cpp4
-rwxr-xr-xqpid/cpp/src/tests/acl.py15
8 files changed, 33 insertions, 6 deletions
diff --git a/qpid/cpp/src/qpid/acl/AclValidator.cpp b/qpid/cpp/src/qpid/acl/AclValidator.cpp
index 73b49b2959..a077667a33 100644
--- a/qpid/cpp/src/qpid/acl/AclValidator.cpp
+++ b/qpid/cpp/src/qpid/acl/AclValidator.cpp
@@ -110,7 +110,7 @@ namespace acl {
boost::shared_ptr<PropertyType>(
new IntPropertyType(0,std::numeric_limits<int64_t>::max()))));
- std::string policyTypes[] = {"ring", "ring_strict", "flow_to_disk", "reject"};
+ std::string policyTypes[] = {"ring", "self-destruct", "reject"};
std::vector<std::string> v(policyTypes, policyTypes + sizeof(policyTypes) / sizeof(std::string));
validators.insert(Validator(acl::SPECPROP_POLICYTYPE,
boost::shared_ptr<PropertyType>(
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index dd2769ec92..3f3c769435 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -1285,7 +1285,7 @@ std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue(
params.insert(make_pair(acl::PROP_DURABLE, settings.durable ? _TRUE : _FALSE));
params.insert(make_pair(acl::PROP_EXCLUSIVE, owner ? _TRUE : _FALSE));
params.insert(make_pair(acl::PROP_AUTODELETE, settings.autodelete ? _TRUE : _FALSE));
- params.insert(make_pair(acl::PROP_POLICYTYPE, settings.dropMessagesAtLimit ? "ring" : "reject"));
+ params.insert(make_pair(acl::PROP_POLICYTYPE, settings.getLimitPolicy()));
params.insert(make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast<string>(settings.maxDepth.getCount())));
params.insert(make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast<string>(settings.maxDepth.getSize())));
params.insert(make_pair(acl::PROP_MAXFILECOUNT, boost::lexical_cast<string>(settings.maxFileCount)));
@@ -1338,7 +1338,7 @@ void Broker::deleteQueue(const std::string& name, const std::string& userId,
params.insert(make_pair(acl::PROP_DURABLE, queue->isDurable() ? _TRUE : _FALSE));
params.insert(make_pair(acl::PROP_EXCLUSIVE, queue->hasExclusiveOwner() ? _TRUE : _FALSE));
params.insert(make_pair(acl::PROP_AUTODELETE, queue->isAutoDelete() ? _TRUE : _FALSE));
- params.insert(make_pair(acl::PROP_POLICYTYPE, queue->getSettings().dropMessagesAtLimit ? "ring" : "reject"));
+ params.insert(make_pair(acl::PROP_POLICYTYPE, queue->getSettings().getLimitPolicy()));
if (!acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_QUEUE,name,&params) )
throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue delete request from " << userId));
diff --git a/qpid/cpp/src/qpid/broker/QueueSettings.cpp b/qpid/cpp/src/qpid/broker/QueueSettings.cpp
index 0b4a268489..53194cf064 100644
--- a/qpid/cpp/src/qpid/broker/QueueSettings.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueSettings.cpp
@@ -314,4 +314,10 @@ QueueSettings::Aliases::Aliases()
insert(value_type("x-qpid-maximum-message-size", "qpid.alert_size"));
}
+std::string QueueSettings::getLimitPolicy() const
+{
+ if (dropMessagesAtLimit) return POLICY_TYPE_RING;
+ else if (selfDestructAtLimit) return POLICY_TYPE_SELF_DESTRUCT;
+ else return POLICY_TYPE_REJECT;
+}
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/QueueSettings.h b/qpid/cpp/src/qpid/broker/QueueSettings.h
index e150e365c2..9fda51e17a 100644
--- a/qpid/cpp/src/qpid/broker/QueueSettings.h
+++ b/qpid/cpp/src/qpid/broker/QueueSettings.h
@@ -111,6 +111,7 @@ struct QueueSettings
QPID_BROKER_EXTERN void populate(const std::map<std::string, qpid::types::Variant>& inputs, std::map<std::string, qpid::types::Variant>& unused);
QPID_BROKER_EXTERN void populate(const qpid::framing::FieldTable& inputs, qpid::framing::FieldTable& unused);
QPID_BROKER_EXTERN std::map<std::string, qpid::types::Variant> asMap() const;
+ std::string getLimitPolicy() const;
struct Aliases : std::map<std::string, std::string>
{
diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
index 1b7a47b360..7a0bc6c0d8 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
@@ -300,6 +300,12 @@ size_t OutgoingFromQueue::Record::getIndex(pn_delivery_tag_t t)
return (size_t) buffer.getLong();
}
+boost::shared_ptr<Queue> OutgoingFromQueue::getExclusiveSubscriptionQueue(Outgoing* o)
+{
+ OutgoingFromQueue* s = dynamic_cast<OutgoingFromQueue*>(o);
+ if (s && s->exclusive) return s->queue;
+ else return boost::shared_ptr<Queue>();
+}
}}} // namespace qpid::broker::amqp
diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
index 81994f2b66..48f041171c 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
@@ -107,6 +107,7 @@ class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, public
void cancel();
void acknowledged(const qpid::broker::DeliveryRecord&);
qpid::broker::OwnershipToken* getSession();
+ static boost::shared_ptr<Queue> getExclusiveSubscriptionQueue(Outgoing*);
private:
diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
index c4265ef420..9203ce17e4 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
@@ -396,7 +396,7 @@ void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::s
authorise.access(node.exchange);//do separate access check before trying to create the queue
bool shared = is_capability_requested(SHARED, pn_terminus_capabilities(source));
bool durable = pn_terminus_get_durability(source);
- bool autodelete = !durable && pn_link_remote_snd_settle_mode(link) == PN_SND_SETTLED;
+ bool autodelete = !durable && pn_link_remote_snd_settle_mode(link) != PN_SND_UNSETTLED;
QueueSettings settings(durable, autodelete);
std::string altExchange;
if (node.topic) {
@@ -475,6 +475,8 @@ void Session::detach(pn_link_t* link)
OutgoingLinks::iterator i = outgoing.find(link);
if (i != outgoing.end()) {
i->second->detached();
+ boost::shared_ptr<Queue> q = OutgoingFromQueue::getExclusiveSubscriptionQueue(i->second.get());
+ if (q) connection.getBroker().deleteQueue(q->getName(), connection.getUserId(), connection.getMgmtId());
outgoing.erase(i);
QPID_LOG(debug, "Outgoing link detached");
}
diff --git a/qpid/cpp/src/tests/acl.py b/qpid/cpp/src/tests/acl.py
index 666ae46a39..fe8254a6fa 100755
--- a/qpid/cpp/src/tests/acl.py
+++ b/qpid/cpp/src/tests/acl.py
@@ -382,8 +382,7 @@ class ACLTests(TestBase010):
aclf.close()
result = self.reload_acl()
- expected = "ding is not a valid value for 'policytype', possible values are one of" \
- " { 'ring' 'ring_strict' 'flow_to_disk' 'reject' }";
+ expected = "ding is not a valid value for 'policytype', possible values are one of"
if (result.find(expected) == -1):
self.fail(result)
@@ -802,6 +801,7 @@ class ACLTests(TestBase010):
aclf.write('acl allow bob@QPID delete queue name=q4\n')
aclf.write('acl allow bob@QPID create queue name=q5 maxqueuesize=1000 maxqueuecount=100\n')
aclf.write('acl allow bob@QPID create queue name=q6 queuemaxsizelowerlimit=50 queuemaxsizeupperlimit=100 queuemaxcountlowerlimit=50 queuemaxcountupperlimit=100\n')
+ aclf.write('acl allow bob@QPID create queue name=q7 policytype=self-destruct\n')
aclf.write('acl allow anonymous all all\n')
aclf.write('acl deny all all')
aclf.close()
@@ -915,6 +915,17 @@ class ACLTests(TestBase010):
self.fail("ACL should allow queue create request for q2 with exclusive=true policytype=ring");
try:
+ session.queue_declare(queue="q7", arguments={"qpid.policy_type": "ring"})
+ self.fail("ACL should not allow queue create request for q7 with policytype=ring");
+ except qpid.session.SessionException, e:
+ session = self.get_session('bob','bob')
+
+ try:
+ session.queue_declare(queue="q7", arguments={"qpid.policy_type": "self-destruct"})
+ except qpid.session.SessionException, e:
+ self.fail("ACL should allow queue create request for q7 with policytype=self-destruct");
+
+ try:
session.queue_declare(queue="q3")
session.queue_declare(queue="q4")
except qpid.session.SessionException, e: