summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2011-02-18 22:38:05 +0000
committerGordon Sim <gsim@apache.org>2011-02-18 22:38:05 +0000
commit9de60b44364c99806877e82dce12b8cf7497cef2 (patch)
treeed33629239dadf45f5772607570b48bcefd0839d
parentd2d9731a6a7903f04df58891cbcac00b03270f6d (diff)
downloadqpid-python-9de60b44364c99806877e82dce12b8cf7497cef2.tar.gz
QPID-3015: Added create and delete methods to management schema for broker
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1072179 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp410
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h54
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp27
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h15
-rw-r--r--qpid/cpp/src/qpid/broker/SessionAdapter.cpp278
-rw-r--r--qpid/cpp/src/qpid/broker/SessionAdapter.h1
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.cpp20
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.h7
-rw-r--r--qpid/cpp/src/tests/MessagingFixture.h117
-rw-r--r--qpid/cpp/src/tests/MessagingSessionTests.cpp47
-rw-r--r--qpid/cpp/src/tests/QueueTest.cpp2
-rwxr-xr-xqpid/cpp/src/tests/qpid-ctrl5
-rw-r--r--qpid/cpp/src/tests/sender.cpp2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java19
-rw-r--r--qpid/specs/management-schema.xml13
15 files changed, 817 insertions, 200 deletions
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index 3c692fc368..fbd7dd3361 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -20,6 +20,7 @@
*/
#include "qpid/broker/Broker.h"
+#include "qpid/broker/ConnectionState.h"
#include "qpid/broker/DirectExchange.h"
#include "qpid/broker/FanOutExchange.h"
#include "qpid/broker/HeadersExchange.h"
@@ -33,10 +34,19 @@
#include "qpid/broker/ExpiryPolicy.h"
#include "qmf/org/apache/qpid/broker/Package.h"
+#include "qmf/org/apache/qpid/broker/ArgsBrokerCreate.h"
+#include "qmf/org/apache/qpid/broker/ArgsBrokerDelete.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerEcho.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerGetLogLevel.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerQueueMoveMessages.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerSetLogLevel.h"
+#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
+#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
+#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h"
+#include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
+#include "qmf/org/apache/qpid/broker/EventBind.h"
+#include "qmf/org/apache/qpid/broker/EventUnbind.h"
+#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/management/ManagementDirectExchange.h"
#include "qpid/management/ManagementTopicExchange.h"
#include "qpid/log/Logger.h"
@@ -44,7 +54,9 @@
#include "qpid/log/Statement.h"
#include "qpid/log/posix/SinkOptions.h"
#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/FieldTable.h"
#include "qpid/framing/ProtocolInitiation.h"
+#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/Uuid.h"
#include "qpid/sys/ProtocolFactory.h"
#include "qpid/sys/Poller.h"
@@ -76,7 +88,10 @@ using qpid::management::ManagementAgent;
using qpid::management::ManagementObject;
using qpid::management::Manageable;
using qpid::management::Args;
+using qpid::management::getManagementExecutionContext;
+using qpid::types::Variant;
using std::string;
+using std::make_pair;
namespace _qmf = qmf::org::apache::qpid::broker;
@@ -443,6 +458,20 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
QPID_LOG (debug, "Broker::getLogLevel()");
status = Manageable::STATUS_OK;
break;
+ case _qmf::Broker::METHOD_CREATE :
+ {
+ _qmf::ArgsBrokerCreate& a = dynamic_cast<_qmf::ArgsBrokerCreate&>(args);
+ createObject(a.i_type, a.i_name, a.i_properties, a.i_strict, getManagementExecutionContext());
+ status = Manageable::STATUS_OK;
+ break;
+ }
+ case _qmf::Broker::METHOD_DELETE :
+ {
+ _qmf::ArgsBrokerDelete& a = dynamic_cast<_qmf::ArgsBrokerDelete&>(args);
+ deleteObject(a.i_type, a.i_name, a.i_options, getManagementExecutionContext());
+ status = Manageable::STATUS_OK;
+ break;
+ }
default:
QPID_LOG (debug, "Broker ManagementMethod not implemented: id=" << methodId << "]");
status = Manageable::STATUS_NOT_IMPLEMENTED;
@@ -452,6 +481,169 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
return status;
}
+namespace
+{
+const std::string TYPE_QUEUE("queue");
+const std::string TYPE_EXCHANGE("exchange");
+const std::string TYPE_TOPIC("topic");
+const std::string TYPE_BINDING("binding");
+const std::string DURABLE("durable");
+const std::string AUTO_DELETE("auto-delete");
+const std::string ALTERNATE_EXCHANGE("alternate-exchange");
+const std::string EXCHANGE_TYPE("exchange-type");
+const std::string QUEUE_NAME("queue");
+const std::string EXCHANGE_NAME("exchange");
+
+const std::string TRUE("true");
+const std::string FALSE("false");
+}
+
+struct InvalidBindingIdentifier : public qpid::Exception
+{
+ InvalidBindingIdentifier(const std::string& name) : qpid::Exception(name) {}
+ std::string getPrefix() const { return "invalid binding"; }
+};
+
+struct BindingIdentifier
+{
+ std::string exchange;
+ std::string queue;
+ std::string key;
+
+ BindingIdentifier(const std::string& name)
+ {
+ std::vector<std::string> path;
+ split(path, name, "/");
+ switch (path.size()) {
+ case 1:
+ queue = path[0];
+ break;
+ case 2:
+ exchange = path[0];
+ queue = path[1];
+ break;
+ case 3:
+ exchange = path[0];
+ queue = path[1];
+ key = path[2];
+ break;
+ default:
+ throw InvalidBindingIdentifier(name);
+ }
+ }
+};
+
+struct ObjectAlreadyExists : public qpid::Exception
+{
+ ObjectAlreadyExists(const std::string& name) : qpid::Exception(name) {}
+ std::string getPrefix() const { return "object already exists"; }
+};
+
+struct UnknownObjectType : public qpid::Exception
+{
+ UnknownObjectType(const std::string& type) : qpid::Exception(type) {}
+ std::string getPrefix() const { return "unknown object type"; }
+};
+
+void Broker::createObject(const std::string& type, const std::string& name,
+ const Variant::Map& properties, bool /*strict*/, const ConnectionState* context)
+{
+ std::string userId;
+ std::string connectionId;
+ if (context) {
+ userId = context->getUserId();
+ connectionId = context->getUrl();
+ }
+ //TODO: implement 'strict' option (check there are no unrecognised properties)
+ QPID_LOG (debug, "Broker::create(" << type << ", " << name << "," << properties << ")");
+ if (type == TYPE_QUEUE) {
+ bool durable(false);
+ bool autodelete(false);
+ std::string alternateExchange;
+ Variant::Map extensions;
+ for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) {
+ // extract durable, auto-delete and alternate-exchange properties
+ if (i->first == DURABLE) durable = i->second;
+ else if (i->first == AUTO_DELETE) autodelete = i->second;
+ else if (i->first == ALTERNATE_EXCHANGE) alternateExchange = i->second.asString();
+ //treat everything else as extension properties
+ else extensions[i->first] = i->second;
+ }
+ framing::FieldTable arguments;
+ amqp_0_10::translate(extensions, arguments);
+
+ std::pair<boost::shared_ptr<Queue>, bool> result =
+ createQueue(name, durable, autodelete, 0, alternateExchange, arguments, userId, connectionId);
+ if (!result.second) {
+ throw ObjectAlreadyExists(name);
+ }
+ } else if (type == TYPE_EXCHANGE || type == TYPE_TOPIC) {
+ bool durable(false);
+ std::string exchangeType;
+ std::string alternateExchange;
+ Variant::Map extensions;
+ for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) {
+ // extract durable, auto-delete and alternate-exchange properties
+ if (i->first == DURABLE) durable = i->second;
+ else if (i->first == EXCHANGE_TYPE) exchangeType = i->second.asString();
+ else if (i->first == ALTERNATE_EXCHANGE) alternateExchange = i->second.asString();
+ //treat everything else as extension properties
+ else extensions[i->first] = i->second;
+ }
+ framing::FieldTable arguments;
+ amqp_0_10::translate(extensions, arguments);
+
+ try {
+ std::pair<boost::shared_ptr<Exchange>, bool> result =
+ createExchange(name, exchangeType, durable, alternateExchange, arguments, userId, connectionId);
+ if (!result.second) {
+ throw ObjectAlreadyExists(name);
+ }
+ } catch (const UnknownExchangeTypeException&) {
+ throw Exception(QPID_MSG("Invalid exchange type: " << exchangeType));
+ }
+ } else if (type == TYPE_BINDING) {
+ BindingIdentifier binding(name);
+ std::string exchangeType("topic");
+ Variant::Map extensions;
+ for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) {
+ // extract durable, auto-delete and alternate-exchange properties
+ if (i->first == EXCHANGE_TYPE) exchangeType = i->second.asString();
+ //treat everything else as extension properties
+ else extensions[i->first] = i->second;
+ }
+ framing::FieldTable arguments;
+ amqp_0_10::translate(extensions, arguments);
+
+ bind(binding.queue, binding.exchange, binding.key, arguments, userId, connectionId);
+ } else {
+ throw UnknownObjectType(type);
+ }
+}
+
+void Broker::deleteObject(const std::string& type, const std::string& name,
+ const Variant::Map& options, const ConnectionState* context)
+{
+ std::string userId;
+ std::string connectionId;
+ if (context) {
+ userId = context->getUserId();
+ connectionId = context->getUrl();
+ }
+ QPID_LOG (debug, "Broker::delete(" << type << ", " << name << "," << options << ")");
+ if (type == TYPE_QUEUE) {
+ deleteQueue(name, userId, connectionId);
+ } else if (type == TYPE_EXCHANGE || type == TYPE_TOPIC) {
+ deleteExchange(name, userId, connectionId);
+ } else if (type == TYPE_BINDING) {
+ BindingIdentifier binding(name);
+ unbind(binding.queue, binding.exchange, binding.key, userId, connectionId);
+ } else {
+ throw UnknownObjectType(type);
+ }
+
+}
+
void Broker::setLogLevel(const std::string& level)
{
QPID_LOG(notice, "Changing log level to " << level);
@@ -466,7 +658,7 @@ std::string Broker::getLogLevel()
const std::vector<std::string>& selectors = qpid::log::Logger::instance().getOptions().selectors;
for (std::vector<std::string>::const_iterator i = selectors.begin(); i != selectors.end(); ++i) {
if (i != selectors.begin()) level += std::string(",");
- level += *i;
+ level += *i;
}
return level;
}
@@ -552,5 +744,221 @@ void Broker::setClusterTimer(std::auto_ptr<sys::Timer> t) {
const std::string Broker::TCP_TRANSPORT("tcp");
+
+std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue(
+ const std::string& name,
+ bool durable,
+ bool autodelete,
+ const OwnershipToken* owner,
+ const std::string& alternateExchange,
+ const qpid::framing::FieldTable& arguments,
+ const std::string& userId,
+ const std::string& connectionId)
+{
+ if (acl) {
+ std::map<acl::Property, std::string> params;
+ params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
+ params.insert(make_pair(acl::PROP_PASSIVE, FALSE));
+ params.insert(make_pair(acl::PROP_DURABLE, durable ? TRUE : FALSE));
+ params.insert(make_pair(acl::PROP_EXCLUSIVE, owner ? TRUE : FALSE));
+ params.insert(make_pair(acl::PROP_AUTODELETE, autodelete ? TRUE : FALSE));
+ params.insert(make_pair(acl::PROP_POLICYTYPE, arguments.getAsString("qpid.policy_type")));
+ params.insert(make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast<string>(arguments.getAsInt("qpid.max_count"))));
+ params.insert(make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast<string>(arguments.getAsInt64("qpid.max_size"))));
+
+ if (!acl->authorise(userId,acl::ACT_CREATE,acl::OBJ_QUEUE,name,&params) )
+ throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue create request from " << userId));
+ }
+
+ Exchange::shared_ptr alternate;
+ if (!alternateExchange.empty()) {
+ alternate = exchanges.get(alternateExchange);
+ if (!alternate) framing::NotFoundException(QPID_MSG("Alternate exchange does not exist: " << alternateExchange));
+ }
+
+ std::pair<Queue::shared_ptr, bool> result = queues.declare(name, durable, autodelete, owner);
+ if (result.second) {
+ if (alternate) {
+ result.first->setAlternateExchange(alternate);
+ alternate->incAlternateUsers();
+ }
+
+ //apply settings & create persistent record if required
+ result.first->create(arguments);
+ //add default binding:
+ result.first->bind(exchanges.getDefault(), name);
+
+ if (managementAgent.get()) {
+ //TODO: debatable whether we should raise an event here for
+ //create when this is a 'declare' event; ideally add a create
+ //event instead?
+ managementAgent->raiseEvent(
+ _qmf::EventQueueDeclare(connectionId, userId, name,
+ durable, owner, autodelete,
+ ManagementAgent::toMap(arguments),
+ "created"));
+ }
+ }
+ return result;
+}
+
+void Broker::deleteQueue(const std::string& name, const std::string& userId,
+ const std::string& connectionId, QueueFunctor check)
+{
+ if (acl && !acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_QUEUE,name,NULL)) {
+ throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue delete request from " << userId));
+ }
+
+ Queue::shared_ptr queue = queues.find(name);
+ if (queue) {
+ if (check) check(queue);
+ queues.destroy(name);
+ queue->destroyed();
+ } else {
+ throw framing::NotFoundException(QPID_MSG("Delete failed. No such queue: " << name));
+ }
+
+ if (managementAgent.get())
+ managementAgent->raiseEvent(_qmf::EventQueueDelete(connectionId, userId, name));
+
+}
+
+std::pair<Exchange::shared_ptr, bool> Broker::createExchange(
+ const std::string& name,
+ const std::string& type,
+ bool durable,
+ const std::string& alternateExchange,
+ const qpid::framing::FieldTable& arguments,
+ const std::string& userId,
+ const std::string& connectionId)
+{
+ if (acl) {
+ std::map<acl::Property, std::string> params;
+ params.insert(make_pair(acl::PROP_TYPE, type));
+ params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
+ params.insert(make_pair(acl::PROP_PASSIVE, FALSE));
+ params.insert(make_pair(acl::PROP_DURABLE, durable ? TRUE : FALSE));
+ if (!acl->authorise(userId,acl::ACT_CREATE,acl::OBJ_EXCHANGE,name,&params) )
+ throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange create request from " << userId));
+ }
+
+ Exchange::shared_ptr alternate;
+ if (!alternateExchange.empty()) {
+ alternate = exchanges.get(alternateExchange);
+ if (!alternate) framing::NotFoundException(QPID_MSG("Alternate exchange does not exist: " << alternateExchange));
+ }
+
+ std::pair<Exchange::shared_ptr, bool> result;
+ result = exchanges.declare(name, type, durable, arguments);
+ if (result.second) {
+ if (alternate) {
+ result.first->setAlternate(alternate);
+ alternate->incAlternateUsers();
+ }
+ if (durable) {
+ store->create(*result.first, arguments);
+ }
+ if (managementAgent.get()) {
+ //TODO: debatable whether we should raise an event here for
+ //create when this is a 'declare' event; ideally add a create
+ //event instead?
+ managementAgent->raiseEvent(_qmf::EventExchangeDeclare(connectionId,
+ userId,
+ name,
+ type,
+ alternateExchange,
+ durable,
+ false,
+ ManagementAgent::toMap(arguments),
+ "created"));
+ }
+ }
+ return result;
+}
+
+void Broker::deleteExchange(const std::string& name, const std::string& userId,
+ const std::string& connectionId)
+{
+ if (acl) {
+ if (!acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_EXCHANGE,name,NULL) )
+ throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange delete request from " << userId));
+ }
+
+ Exchange::shared_ptr exchange(exchanges.get(name));
+ if (!exchange) throw framing::NotFoundException(QPID_MSG("Delete failed. No such exchange: " << name));
+ if (exchange->inUseAsAlternate()) throw framing::NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange."));
+ if (exchange->isDurable()) store->destroy(*exchange);
+ if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers();
+ exchanges.destroy(name);
+
+ if (managementAgent.get())
+ managementAgent->raiseEvent(_qmf::EventExchangeDelete(connectionId, userId, name));
+
+}
+
+void Broker::bind(const std::string& queueName,
+ const std::string& exchangeName,
+ const std::string& key,
+ const qpid::framing::FieldTable& arguments,
+ const std::string& userId,
+ const std::string& connectionId)
+{
+ if (acl) {
+ std::map<acl::Property, std::string> params;
+ params.insert(make_pair(acl::PROP_QUEUENAME, queueName));
+ params.insert(make_pair(acl::PROP_ROUTINGKEY, key));
+
+ if (!acl->authorise(userId,acl::ACT_BIND,acl::OBJ_EXCHANGE,exchangeName,&params))
+ throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange bind request from " << userId));
+ }
+
+ Queue::shared_ptr queue = queues.find(queueName);
+ Exchange::shared_ptr exchange = exchanges.get(exchangeName);
+ if (!queue) {
+ throw framing::NotFoundException(QPID_MSG("Bind failed. No such queue: " << queueName));
+ } else if (!exchange) {
+ throw framing::NotFoundException(QPID_MSG("Bind failed. No such exchange: " << exchangeName));
+ } else {
+ if (queue->bind(exchange, key, arguments)) {
+ if (managementAgent.get()) {
+ managementAgent->raiseEvent(_qmf::EventBind(connectionId, userId, exchangeName,
+ queueName, key, ManagementAgent::toMap(arguments)));
+ }
+ }
+ }
+}
+
+void Broker::unbind(const std::string& queueName,
+ const std::string& exchangeName,
+ const std::string& key,
+ const std::string& userId,
+ const std::string& connectionId)
+{
+ if (acl) {
+ std::map<acl::Property, std::string> params;
+ params.insert(make_pair(acl::PROP_QUEUENAME, queueName));
+ params.insert(make_pair(acl::PROP_ROUTINGKEY, key));
+ if (!acl->authorise(userId,acl::ACT_UNBIND,acl::OBJ_EXCHANGE,exchangeName,&params) )
+ throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange unbind request from " << userId));
+ }
+
+ Queue::shared_ptr queue = queues.find(queueName);
+ Exchange::shared_ptr exchange = exchanges.get(exchangeName);
+ if (!queue) {
+ throw framing::NotFoundException(QPID_MSG("Bind failed. No such queue: " << queueName));
+ } else if (!exchange) {
+ throw framing::NotFoundException(QPID_MSG("Bind failed. No such exchange: " << exchangeName));
+ } else {
+ if (exchange->unbind(queue, key, 0)) {
+ if (exchange->isDurable() && queue->isDurable()) {
+ store->unbind(*exchange, *queue, key, qpid::framing::FieldTable());
+ }
+ if (managementAgent.get()) {
+ managementAgent->raiseEvent(_qmf::EventUnbind(connectionId, userId, exchangeName, queueName, key));
+ }
+ }
+ }
+}
+
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index cd6f81dc70..9af9020c8f 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -49,6 +49,7 @@
#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/sys/Runnable.h"
#include "qpid/sys/Timer.h"
+#include "qpid/types/Variant.h"
#include "qpid/RefCounted.h"
#include "qpid/broker/AclModule.h"
#include "qpid/sys/Mutex.h"
@@ -57,7 +58,7 @@
#include <string>
#include <vector>
-namespace qpid {
+namespace qpid {
namespace sys {
class ProtocolFactory;
@@ -68,6 +69,7 @@ struct Url;
namespace broker {
+class ConnectionState;
class ExpiryPolicy;
class Message;
@@ -80,7 +82,7 @@ struct NoSuchTransportException : qpid::Exception
};
/**
- * A broker instance.
+ * A broker instance.
*/
class Broker : public sys::Runnable, public Plugin::Target,
public management::Manageable,
@@ -148,6 +150,10 @@ public:
void setStore ();
void setLogLevel(const std::string& level);
std::string getLogLevel();
+ void createObject(const std::string& type, const std::string& name,
+ const qpid::types::Variant::Map& properties, bool lenient, const ConnectionState* context);
+ void deleteObject(const std::string& type, const std::string& name,
+ const qpid::types::Variant::Map& options, const ConnectionState* context);
boost::shared_ptr<sys::Poller> poller;
sys::Timer timer;
@@ -179,7 +185,7 @@ public:
bool clusterUpdatee;
boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
ConnectionCounter connectionCounter;
-
+
public:
virtual ~Broker();
@@ -277,7 +283,7 @@ public:
bool isClusterUpdatee() const { return clusterUpdatee; }
management::ManagementAgent* getManagementAgent() { return managementAgent.get(); }
-
+
ConnectionCounter& getConnectionCounter() {return connectionCounter;}
/**
@@ -290,6 +296,42 @@ public:
const boost::intrusive_ptr<Message>& msg)> deferDelivery;
bool isAuthenticating ( ) { return config.auth; }
+
+ typedef boost::function1<void, boost::shared_ptr<Queue> > QueueFunctor;
+
+ std::pair<boost::shared_ptr<Queue>, bool> createQueue(
+ const std::string& name,
+ bool durable,
+ bool autodelete,
+ const OwnershipToken* owner,
+ const std::string& alternateExchange,
+ const qpid::framing::FieldTable& arguments,
+ const std::string& userId,
+ const std::string& connectionId);
+ void deleteQueue(const std::string& name,
+ const std::string& userId,
+ const std::string& connectionId,
+ QueueFunctor check = QueueFunctor());
+ std::pair<Exchange::shared_ptr, bool> createExchange(
+ const std::string& name,
+ const std::string& type,
+ bool durable,
+ const std::string& alternateExchange,
+ const qpid::framing::FieldTable& args,
+ const std::string& userId, const std::string& connectionId);
+ void deleteExchange(const std::string& name, const std::string& userId,
+ const std::string& connectionId);
+ void bind(const std::string& queue,
+ const std::string& exchange,
+ const std::string& key,
+ const qpid::framing::FieldTable& arguments,
+ const std::string& userId,
+ const std::string& connectionId);
+ void unbind(const std::string& queue,
+ const std::string& exchange,
+ const std::string& key,
+ const std::string& userId,
+ const std::string& connectionId);
};
}}
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 27c1cc4ad7..40cb80010c 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -444,7 +444,7 @@ void Queue::purgeExpired()
Mutex::ScopedLock locker(messageLock);
messages->removeIf(boost::bind(&collect_if_expired, expired, _1));
}
- for_each(expired.begin(), expired.end(), bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
+ for_each(expired.begin(), expired.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
}
}
@@ -826,8 +826,9 @@ void Queue::configure(const FieldTable& _settings, bool recovering)
store->create(*this, _settings);
}
-void Queue::destroy()
+void Queue::destroyed()
{
+ unbind(broker->getExchanges());
if (alternateExchange.get()) {
Mutex::ScopedLock locker(messageLock);
while(!messages->empty()){
@@ -846,6 +847,7 @@ void Queue::destroy()
store = 0;//ensure we make no more calls to the store for this queue
}
if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr<TimerTask>();
+ notifyDeleted();
}
void Queue::notifyDeleted()
@@ -865,9 +867,9 @@ void Queue::bound(const string& exchange, const string& key,
bindings.add(exchange, key, args);
}
-void Queue::unbind(ExchangeRegistry& exchanges, Queue::shared_ptr shared_ref)
+void Queue::unbind(ExchangeRegistry& exchanges)
{
- bindings.unbind(exchanges, shared_ref);
+ bindings.unbind(exchanges, shared_from_this());
}
void Queue::setPolicy(std::auto_ptr<QueuePolicy> _policy)
@@ -955,8 +957,7 @@ void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue)
if (broker.getQueues().destroyIf(queue->getName(),
boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) {
QPID_LOG(debug, "Auto-deleting " << queue->getName());
- queue->unbind(broker.getExchanges(), queue);
- queue->destroy();
+ queue->destroyed();
}
}
@@ -1175,6 +1176,20 @@ void Queue::flush()
if (u.acquired && store) store->flush(*this);
}
+bool Queue::bind(boost::shared_ptr<Exchange> exchange, const std::string& key,
+ const qpid::framing::FieldTable& arguments)
+{
+ if (exchange->bind(shared_from_this(), key, &arguments)) {
+ bound(exchange->getName(), key, arguments);
+ if (exchange->isDurable() && isDurable()) {
+ store->bind(*exchange, *this, key, arguments);
+ }
+ return true;
+ } else {
+ return false;
+ }
+}
+
Queue::UsageBarrier::UsageBarrier(Queue& q) : parent(q), count(0) {}
bool Queue::UsageBarrier::acquire()
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index 12a3d273be..1a3b632845 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -172,8 +172,9 @@ class Queue : public boost::enable_shared_from_this<Queue>,
}
}
}
-
+
void checkNotDeleted();
+ void notifyDeleted();
public:
@@ -196,13 +197,17 @@ class Queue : public boost::enable_shared_from_this<Queue>,
// "recovering" means we are doing a MessageStore recovery.
QPID_BROKER_EXTERN void configure(const qpid::framing::FieldTable& settings,
bool recovering = false);
- void destroy();
- void notifyDeleted();
+ void destroyed();
QPID_BROKER_EXTERN void bound(const std::string& exchange,
const std::string& key,
const qpid::framing::FieldTable& args);
- QPID_BROKER_EXTERN void unbind(ExchangeRegistry& exchanges,
- Queue::shared_ptr shared_ref);
+ //TODO: get unbind out of the public interface; only there for purposes of one unit test
+ void unbind(ExchangeRegistry& exchanges);
+ /**
+ * Bind self to specified exchange, and record that binding for unbinding on delete.
+ */
+ bool bind(boost::shared_ptr<Exchange> exchange, const std::string& key,
+ const qpid::framing::FieldTable& arguments=qpid::framing::FieldTable());
QPID_BROKER_EXTERN bool acquire(const QueuedMessage& msg);
QPID_BROKER_EXTERN bool acquireMessageAt(const qpid::framing::SequenceNumber& position, QueuedMessage& message);
diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
index 3d62e73185..c2f90dce47 100644
--- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -64,51 +64,54 @@ void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const
const string& alternateExchange,
bool passive, bool durable, bool /*autoDelete*/, const FieldTable& args){
- AclModule* acl = getBroker().getAcl();
- if (acl) {
- std::map<acl::Property, std::string> params;
- params.insert(make_pair(acl::PROP_TYPE, type));
- params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
- params.insert(make_pair(acl::PROP_PASSIVE, std::string(passive ? _TRUE : _FALSE) ));
- params.insert(make_pair(acl::PROP_DURABLE, std::string(durable ? _TRUE : _FALSE)));
- if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_EXCHANGE,exchange,&params) )
- throw UnauthorizedAccessException(QPID_MSG("ACL denied exchange declare request from " << getConnection().getUserId()));
- }
-
//TODO: implement autoDelete
Exchange::shared_ptr alternate;
if (!alternateExchange.empty()) {
alternate = getBroker().getExchanges().get(alternateExchange);
}
if(passive){
+ AclModule* acl = getBroker().getAcl();
+ if (acl) {
+ //TODO: why does a passive declare require create
+ //permission? The purpose of the passive flag is to state
+ //that the exchange should *not* created. For
+ //authorisation a passive declare is similar to
+ //exchange-query.
+ std::map<acl::Property, std::string> params;
+ params.insert(make_pair(acl::PROP_TYPE, type));
+ params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
+ params.insert(make_pair(acl::PROP_PASSIVE, _TRUE));
+ params.insert(make_pair(acl::PROP_DURABLE, durable ? _TRUE : _FALSE));
+ if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_EXCHANGE,exchange,&params) )
+ throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange create request from " << getConnection().getUserId()));
+ }
Exchange::shared_ptr actual(getBroker().getExchanges().get(exchange));
checkType(actual, type);
checkAlternate(actual, alternate);
- }else{
+ }else{
if(exchange.find("amq.") == 0 || exchange.find("qpid.") == 0) {
throw framing::NotAllowedException(QPID_MSG("Exchange names beginning with \"amq.\" or \"qpid.\" are reserved. (exchange=\"" << exchange << "\")"));
}
try{
- std::pair<Exchange::shared_ptr, bool> response = getBroker().getExchanges().declare(exchange, type, durable, args);
- if (response.second) {
- if (alternate) {
- response.first->setAlternate(alternate);
- alternate->incAlternateUsers();
- }
- if (durable) {
- getBroker().getStore().create(*response.first, args);
- }
- } else {
+ std::pair<Exchange::shared_ptr, bool> response =
+ getBroker().createExchange(exchange, type, durable, alternateExchange, args,
+ getConnection().getUserId(), getConnection().getUrl());
+ if (!response.second) {
+ //exchange already there, not created
checkType(response.first, type);
checkAlternate(response.first, alternate);
+ ManagementAgent* agent = getBroker().getManagementAgent();
+ if (agent)
+ agent->raiseEvent(_qmf::EventExchangeDeclare(getConnection().getUrl(),
+ getConnection().getUserId(),
+ exchange,
+ type,
+ alternateExchange,
+ durable,
+ false,
+ ManagementAgent::toMap(args),
+ "existing"));
}
-
- ManagementAgent* agent = getBroker().getManagementAgent();
- if (agent)
- agent->raiseEvent(_qmf::EventExchangeDeclare(getConnection().getUrl(), getConnection().getUserId(), exchange, type,
- alternateExchange, durable, false, ManagementAgent::toMap(args),
- response.second ? "created" : "existing"));
-
}catch(UnknownExchangeTypeException& /*e*/){
throw CommandInvalidException(QPID_MSG("Exchange type not implemented: " << type));
}
@@ -134,22 +137,8 @@ void SessionAdapter::ExchangeHandlerImpl::checkAlternate(Exchange::shared_ptr ex
void SessionAdapter::ExchangeHandlerImpl::delete_(const string& name, bool /*ifUnused*/)
{
- AclModule* acl = getBroker().getAcl();
- if (acl) {
- if (!acl->authorise(getConnection().getUserId(),acl::ACT_DELETE,acl::OBJ_EXCHANGE,name,NULL) )
- throw UnauthorizedAccessException(QPID_MSG("ACL denied exchange delete request from " << getConnection().getUserId()));
- }
-
- //TODO: implement unused
- Exchange::shared_ptr exchange(getBroker().getExchanges().get(name));
- if (exchange->inUseAsAlternate()) throw NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange."));
- if (exchange->isDurable()) getBroker().getStore().destroy(*exchange);
- if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers();
- getBroker().getExchanges().destroy(name);
-
- ManagementAgent* agent = getBroker().getManagementAgent();
- if (agent)
- agent->raiseEvent(_qmf::EventExchangeDelete(getConnection().getUrl(), getConnection().getUserId(), name));
+ //TODO: implement if-unused
+ getBroker().deleteExchange(name, getConnection().getUserId(), getConnection().getUrl());
}
ExchangeQueryResult SessionAdapter::ExchangeHandlerImpl::query(const string& name)
@@ -169,67 +158,19 @@ ExchangeQueryResult SessionAdapter::ExchangeHandlerImpl::query(const string& nam
}
void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName,
- const string& exchangeName, const string& routingKey,
- const FieldTable& arguments)
+ const string& exchangeName, const string& routingKey,
+ const FieldTable& arguments)
{
- AclModule* acl = getBroker().getAcl();
- if (acl) {
- std::map<acl::Property, std::string> params;
- params.insert(make_pair(acl::PROP_QUEUENAME, queueName));
- params.insert(make_pair(acl::PROP_ROUTINGKEY, routingKey));
-
- if (!acl->authorise(getConnection().getUserId(),acl::ACT_BIND,acl::OBJ_EXCHANGE,exchangeName,&params))
- throw UnauthorizedAccessException(QPID_MSG("ACL denied exchange bind request from " << getConnection().getUserId()));
- }
-
- Queue::shared_ptr queue = getQueue(queueName);
- Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName);
- if(exchange){
- string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey;
- if (exchange->bind(queue, exchangeRoutingKey, &arguments)) {
- queue->bound(exchangeName, routingKey, arguments);
- if (exchange->isDurable() && queue->isDurable()) {
- getBroker().getStore().bind(*exchange, *queue, routingKey, arguments);
- }
-
- ManagementAgent* agent = getBroker().getManagementAgent();
- if (agent)
- agent->raiseEvent(_qmf::EventBind(getConnection().getUrl(), getConnection().getUserId(), exchangeName,
- queueName, exchangeRoutingKey, ManagementAgent::toMap(arguments)));
- }
- }else{
- throw NotFoundException("Bind failed. No such exchange: " + exchangeName);
- }
+ getBroker().bind(queueName, exchangeName, routingKey, arguments,
+ getConnection().getUserId(), getConnection().getUrl());
}
void SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName,
const string& exchangeName,
const string& routingKey)
{
- AclModule* acl = getBroker().getAcl();
- if (acl) {
- std::map<acl::Property, std::string> params;
- params.insert(make_pair(acl::PROP_QUEUENAME, queueName));
- params.insert(make_pair(acl::PROP_ROUTINGKEY, routingKey));
- if (!acl->authorise(getConnection().getUserId(),acl::ACT_UNBIND,acl::OBJ_EXCHANGE,exchangeName,&params) )
- throw UnauthorizedAccessException(QPID_MSG("ACL denied exchange unbind request from " << getConnection().getUserId()));
- }
-
- Queue::shared_ptr queue = getQueue(queueName);
- if (!queue.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName);
-
- Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName);
- if (!exchange.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName);
-
- //TODO: revise unbind to rely solely on binding key (not args)
- if (exchange->unbind(queue, routingKey, 0)) {
- if (exchange->isDurable() && queue->isDurable())
- getBroker().getStore().unbind(*exchange, *queue, routingKey, FieldTable());
-
- ManagementAgent* agent = getBroker().getManagementAgent();
- if (agent)
- agent->raiseEvent(_qmf::EventUnbind(getConnection().getUrl(), getConnection().getUserId(), exchangeName, queueName, routingKey));
- }
+ getBroker().unbind(queueName, exchangeName, routingKey,
+ getConnection().getUserId(), getConnection().getUrl());
}
ExchangeBoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::string& exchangeName,
@@ -332,52 +273,42 @@ QueueQueryResult SessionAdapter::QueueHandlerImpl::query(const string& name)
void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& alternateExchange,
bool passive, bool durable, bool exclusive,
bool autoDelete, const qpid::framing::FieldTable& arguments)
-{
- AclModule* acl = getBroker().getAcl();
- if (acl) {
- std::map<acl::Property, std::string> params;
- params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
- params.insert(make_pair(acl::PROP_PASSIVE, std::string(passive ? _TRUE : _FALSE) ));
- params.insert(make_pair(acl::PROP_DURABLE, std::string(durable ? _TRUE : _FALSE)));
- params.insert(make_pair(acl::PROP_EXCLUSIVE, std::string(exclusive ? _TRUE : _FALSE)));
- params.insert(make_pair(acl::PROP_AUTODELETE, std::string(autoDelete ? _TRUE : _FALSE)));
- params.insert(make_pair(acl::PROP_POLICYTYPE, arguments.getAsString("qpid.policy_type")));
- params.insert(make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast<string>(arguments.getAsInt("qpid.max_count"))));
- params.insert(make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast<string>(arguments.getAsInt64("qpid.max_size"))));
-
- if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_QUEUE,name,&params) )
- throw UnauthorizedAccessException(QPID_MSG("ACL denied queue create request from " << getConnection().getUserId()));
- }
-
- Exchange::shared_ptr alternate;
- if (!alternateExchange.empty()) {
- alternate = getBroker().getExchanges().get(alternateExchange);
- }
+{
Queue::shared_ptr queue;
if (passive && !name.empty()) {
- queue = getQueue(name);
+ AclModule* acl = getBroker().getAcl();
+ if (acl) {
+ //TODO: why does a passive declare require create
+ //permission? The purpose of the passive flag is to state
+ //that the queue should *not* created. For
+ //authorisation a passive declare is similar to
+ //queue-query (or indeed a qmf query).
+ std::map<acl::Property, std::string> params;
+ params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
+ params.insert(make_pair(acl::PROP_PASSIVE, _TRUE));
+ params.insert(make_pair(acl::PROP_DURABLE, std::string(durable ? _TRUE : _FALSE)));
+ params.insert(make_pair(acl::PROP_EXCLUSIVE, std::string(exclusive ? _TRUE : _FALSE)));
+ params.insert(make_pair(acl::PROP_AUTODELETE, std::string(autoDelete ? _TRUE : _FALSE)));
+ params.insert(make_pair(acl::PROP_POLICYTYPE, arguments.getAsString("qpid.policy_type")));
+ params.insert(make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast<string>(arguments.getAsInt("qpid.max_count"))));
+ params.insert(make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast<string>(arguments.getAsInt64("qpid.max_size"))));
+ if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_QUEUE,name,&params) )
+ throw UnauthorizedAccessException(QPID_MSG("ACL denied queue create request from " << getConnection().getUserId()));
+ }
+ queue = getQueue(name);
//TODO: check alternate-exchange is as expected
} else {
- std::pair<Queue::shared_ptr, bool> queue_created =
- getBroker().getQueues().declare(name, durable,
- autoDelete,
- exclusive ? &session : 0);
+ std::pair<Queue::shared_ptr, bool> queue_created =
+ getBroker().createQueue(name, durable,
+ autoDelete,
+ exclusive ? &session : 0,
+ alternateExchange,
+ arguments,
+ getConnection().getUserId(),
+ getConnection().getUrl());
queue = queue_created.first;
assert(queue);
if (queue_created.second) { // This is a new queue
- if (alternate) {
- queue->setAlternateExchange(alternate);
- alternate->incAlternateUsers();
- }
-
- //apply settings & create persistent record if required
- try { queue_created.first->create(arguments); }
- catch (...) { getBroker().getQueues().destroy(name); throw; }
-
- //add default binding:
- getBroker().getExchanges().getDefault()->bind(queue, name, 0);
- queue->bound(getBroker().getExchanges().getDefault()->getName(), name, arguments);
-
//handle automatic cleanup:
if (exclusive) {
exclusiveQueues.push_back(queue);
@@ -386,21 +317,20 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string&
if (exclusive && queue->setExclusiveOwner(&session)) {
exclusiveQueues.push_back(queue);
}
+ ManagementAgent* agent = getBroker().getManagementAgent();
+ if (agent)
+ agent->raiseEvent(_qmf::EventQueueDeclare(getConnection().getUrl(), getConnection().getUserId(),
+ name, durable, exclusive, autoDelete, ManagementAgent::toMap(arguments),
+ "existing"));
}
- ManagementAgent* agent = getBroker().getManagementAgent();
- if (agent)
- agent->raiseEvent(_qmf::EventQueueDeclare(getConnection().getUrl(), getConnection().getUserId(),
- name, durable, exclusive, autoDelete, ManagementAgent::toMap(arguments),
- queue_created.second ? "created" : "existing"));
}
- if (exclusive && !queue->isExclusiveOwner(&session))
+ if (exclusive && !queue->isExclusiveOwner(&session))
throw ResourceLockedException(QPID_MSG("Cannot grant exclusive access to queue "
<< queue->getName()));
-}
-
-
+}
+
void SessionAdapter::QueueHandlerImpl::purge(const string& queue){
AclModule* acl = getBroker().getAcl();
if (acl)
@@ -409,40 +339,32 @@ void SessionAdapter::QueueHandlerImpl::purge(const string& queue){
throw UnauthorizedAccessException(QPID_MSG("ACL denied queue purge request from " << getConnection().getUserId()));
}
getQueue(queue)->purge();
-}
-
-void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnused, bool ifEmpty){
-
- AclModule* acl = getBroker().getAcl();
- if (acl)
- {
- if (!acl->authorise(getConnection().getUserId(),acl::ACT_DELETE,acl::OBJ_QUEUE,queue,NULL) )
- throw UnauthorizedAccessException(QPID_MSG("ACL denied queue delete request from " << getConnection().getUserId()));
- }
+}
- Queue::shared_ptr q = getQueue(queue);
- if (q->hasExclusiveOwner() && !q->isExclusiveOwner(&session))
+void SessionAdapter::QueueHandlerImpl::checkDelete(Queue::shared_ptr queue, bool ifUnused, bool ifEmpty)
+{
+ if (queue->hasExclusiveOwner() && !queue->isExclusiveOwner(&session)) {
throw ResourceLockedException(QPID_MSG("Cannot delete queue "
- << queue << "; it is exclusive to another session"));
- if(ifEmpty && q->getMessageCount() > 0){
- throw PreconditionFailedException("Queue not empty.");
- }else if(ifUnused && q->getConsumerCount() > 0){
- throw PreconditionFailedException("Queue in use.");
- }else{
+ << queue->getName() << "; it is exclusive to another session"));
+ } else if(ifEmpty && queue->getMessageCount() > 0) {
+ throw PreconditionFailedException(QPID_MSG("Cannot delete queue "
+ << queue->getName() << "; queue not empty"));
+ } else if(ifUnused && queue->getConsumerCount() > 0) {
+ throw PreconditionFailedException(QPID_MSG("Cannot delete queue "
+ << queue->getName() << "; queue in use"));
+ } else if (queue->isExclusiveOwner(&getConnection())) {
//remove the queue from the list of exclusive queues if necessary
- if(q->isExclusiveOwner(&getConnection())){
- QueueVector::iterator i = std::find(getConnection().exclusiveQueues.begin(), getConnection().exclusiveQueues.end(), q);
- if(i < getConnection().exclusiveQueues.end()) getConnection().exclusiveQueues.erase(i);
- }
- q->destroy();
- getBroker().getQueues().destroy(queue);
- q->unbind(getBroker().getExchanges(), q);
-
- ManagementAgent* agent = getBroker().getManagementAgent();
- if (agent)
- agent->raiseEvent(_qmf::EventQueueDelete(getConnection().getUrl(), getConnection().getUserId(), queue));
- q->notifyDeleted();
- }
+ QueueVector::iterator i = std::find(getConnection().exclusiveQueues.begin(),
+ getConnection().exclusiveQueues.end(),
+ queue);
+ if (i < getConnection().exclusiveQueues.end()) getConnection().exclusiveQueues.erase(i);
+ }
+}
+
+void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnused, bool ifEmpty)
+{
+ getBroker().deleteQueue(queue, getConnection().getUserId(), getConnection().getUrl(),
+ boost::bind(&SessionAdapter::QueueHandlerImpl::checkDelete, this, _1, ifUnused, ifEmpty));
}
SessionAdapter::MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) :
diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.h b/qpid/cpp/src/qpid/broker/SessionAdapter.h
index ca27fb6e1d..8987c4812f 100644
--- a/qpid/cpp/src/qpid/broker/SessionAdapter.h
+++ b/qpid/cpp/src/qpid/broker/SessionAdapter.h
@@ -138,6 +138,7 @@ class Queue;
bool isLocal(const ConnectionToken* t) const;
void destroyExclusiveQueues();
+ void checkDelete(boost::shared_ptr<Queue> queue, bool ifUnused, bool ifEmpty);
template <class F> void eachExclusiveQueue(F f)
{
std::for_each(exclusiveQueues.begin(), exclusiveQueues.end(), f);
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
index 23c999a98a..8b4defaa73 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
@@ -31,6 +31,7 @@
#include <qpid/broker/Message.h>
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/sys/Time.h"
+#include "qpid/sys/Thread.h"
#include "qpid/broker/ConnectionState.h"
#include "qpid/broker/AclModule.h"
#include "qpid/types/Variant.h"
@@ -2237,6 +2238,7 @@ void ManagementAgent::dispatchAgentCommandLH(Message& msg, bool viaLocal)
uint32_t bufferLen = inBuffer.getPosition();
inBuffer.reset();
+ setManagementExecutionContext((const qpid::broker::ConnectionState*) msg.getPublisher());
const framing::FieldTable *headers = msg.getApplicationHeaders();
if (headers && msg.getAppId() == "qmf2")
{
@@ -3085,3 +3087,21 @@ bool ManagementAgent::moveDeletedObjectsLH() {
}
return !deleteList.empty();
}
+
+namespace qpid {
+namespace management {
+
+namespace {
+QPID_TSS const qpid::broker::ConnectionState* executionContext = 0;
+}
+
+void setManagementExecutionContext(const qpid::broker::ConnectionState* ctxt)
+{
+ executionContext = ctxt;
+}
+const qpid::broker::ConnectionState* getManagementExecutionContext()
+{
+ return executionContext;
+}
+
+}}
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h
index 0db19594a7..fb15dc6ed1 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.h
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.h
@@ -41,6 +41,9 @@
#include <map>
namespace qpid {
+namespace broker {
+class ConnectionState;
+}
namespace management {
class ManagementAgent
@@ -422,6 +425,8 @@ private:
void debugSnapshot(const char* title);
};
+void setManagementExecutionContext(const qpid::broker::ConnectionState*);
+const qpid::broker::ConnectionState* getManagementExecutionContext();
}}
-
+
#endif /*!_ManagementAgent_*/
diff --git a/qpid/cpp/src/tests/MessagingFixture.h b/qpid/cpp/src/tests/MessagingFixture.h
index 715de09bad..2312a87e9d 100644
--- a/qpid/cpp/src/tests/MessagingFixture.h
+++ b/qpid/cpp/src/tests/MessagingFixture.h
@@ -27,15 +27,19 @@
#include "qpid/client/Connection.h"
#include "qpid/client/Session.h"
#include "qpid/framing/Uuid.h"
+#include "qpid/messaging/Address.h"
#include "qpid/messaging/Connection.h"
#include "qpid/messaging/Session.h"
#include "qpid/messaging/Sender.h"
#include "qpid/messaging/Receiver.h"
#include "qpid/messaging/Message.h"
+#include "qpid/types/Variant.h"
namespace qpid {
namespace tests {
+using qpid::types::Variant;
+
struct BrokerAdmin
{
qpid::client::Connection connection;
@@ -223,6 +227,119 @@ inline void receive(messaging::Receiver& receiver, uint count = 1, uint start =
}
}
+
+class MethodInvoker
+{
+ public:
+ MethodInvoker(messaging::Session& session) : replyTo("#; {create:always, node:{x-declare:{auto-delete:true}}}"),
+ sender(session.createSender("qmf.default.direct/broker")),
+ receiver(session.createReceiver(replyTo)) {}
+
+ void createExchange(const std::string& name, const std::string& type, bool durable=false)
+ {
+ Variant::Map params;
+ params["name"]=name;
+ params["type"]="exchange";
+ params["properties"] = Variant::Map();
+ params["properties"].asMap()["exchange-type"] = type;
+ params["properties"].asMap()["durable"] = durable;
+ methodRequest("create", params);
+ }
+
+ void deleteExchange(const std::string& name)
+ {
+ Variant::Map params;
+ params["name"]=name;
+ params["type"]="exchange";
+ methodRequest("delete", params);
+ }
+
+ void createQueue(const std::string& name, bool durable=false, bool autodelete=false,
+ const Variant::Map& options=Variant::Map())
+ {
+ Variant::Map params;
+ params["name"]=name;
+ params["type"]="queue";
+ params["properties"] = options;
+ params["properties"].asMap()["durable"] = durable;
+ params["properties"].asMap()["auto-delete"] = autodelete;
+ methodRequest("create", params);
+ }
+
+ void deleteQueue(const std::string& name)
+ {
+ Variant::Map params;
+ params["name"]=name;
+ params["type"]="queue";
+ methodRequest("delete", params);
+ }
+
+ void bind(const std::string& exchange, const std::string& queue, const std::string& key,
+ const Variant::Map& options=Variant::Map())
+ {
+ Variant::Map params;
+ params["name"]=(boost::format("%1%/%2%/%3%") % (exchange) % (queue) % (key)).str();
+ params["type"]="binding";
+ params["properties"] = options;
+ methodRequest("create", params);
+ }
+
+ void unbind(const std::string& exchange, const std::string& queue, const std::string& key)
+ {
+ Variant::Map params;
+ params["name"]=(boost::format("%1%/%2%/%3%") % (exchange) % (queue) % (key)).str();
+ params["type"]="binding";
+ methodRequest("delete", params);
+ }
+
+ void methodRequest(const std::string& method, const Variant::Map& inParams, Variant::Map* outParams = 0)
+ {
+ Variant::Map content;
+ Variant::Map objectId;
+ objectId["_object_name"] = "org.apache.qpid.broker:broker:amqp-broker";
+ content["_object_id"] = objectId;
+ content["_method_name"] = method;
+ content["_arguments"] = inParams;
+
+ messaging::Message request;
+ request.setReplyTo(replyTo);
+ request.getProperties()["x-amqp-0-10.app-id"] = "qmf2";
+ request.getProperties()["qmf.opcode"] = "_method_request";
+ encode(content, request);
+
+ sender.send(request);
+
+ messaging::Message response;
+ if (receiver.fetch(response, messaging::Duration::SECOND*5)) {
+ if (response.getProperties()["x-amqp-0-10.app-id"] == "qmf2") {
+ std::string opcode = response.getProperties()["qmf.opcode"];
+ if (opcode == "_method_response") {
+ if (outParams) {
+ Variant::Map m;
+ decode(response, m);
+ *outParams = m["_arguments"].asMap();
+ }
+ } else if (opcode == "_exception") {
+ Variant::Map m;
+ decode(response, m);
+ throw Exception(QPID_MSG("Error: " << m["_values"]));
+ } else {
+ throw Exception(QPID_MSG("Invalid response received, unexpected opcode: " << opcode));
+ }
+ } else {
+ throw Exception(QPID_MSG("Invalid response received, not a qmfv2 message: app-id="
+ << response.getProperties()["x-amqp-0-10.app-id"]));
+ }
+ } else {
+ throw Exception(QPID_MSG("No response received"));
+ }
+ }
+ private:
+ messaging::Address replyTo;
+ messaging::Sender sender;
+ messaging::Receiver receiver;
+};
+
}} // namespace qpid::tests
#endif /*!TESTS_MESSAGINGFIXTURE_H*/
diff --git a/qpid/cpp/src/tests/MessagingSessionTests.cpp b/qpid/cpp/src/tests/MessagingSessionTests.cpp
index 991ec847bf..f9a8b0e4c1 100644
--- a/qpid/cpp/src/tests/MessagingSessionTests.cpp
+++ b/qpid/cpp/src/tests/MessagingSessionTests.cpp
@@ -890,6 +890,53 @@ QPID_AUTO_TEST_CASE(testAcknowledge)
BOOST_CHECK(!fix.session.createReceiver(fix.queue).fetch(m, Duration::IMMEDIATE));
}
+QPID_AUTO_TEST_CASE(testQmfCreateAndDelete)
+{
+ MessagingFixture fix(Broker::Options(), true/*enable management*/);
+ MethodInvoker control(fix.session);
+ control.createQueue("my-queue");
+ control.createExchange("my-exchange", "topic");
+ control.bind("my-exchange", "my-queue", "subject1");
+
+ Sender sender = fix.session.createSender("my-exchange");
+ Receiver receiver = fix.session.createReceiver("my-queue");
+ Message out;
+ out.setSubject("subject1");
+ out.setContent("one");
+ sender.send(out);
+ Message in;
+ BOOST_CHECK(receiver.fetch(in, Duration::SECOND*5));
+ BOOST_CHECK_EQUAL(out.getContent(), in.getContent());
+ control.unbind("my-exchange", "my-queue", "subject1");
+ control.bind("my-exchange", "my-queue", "subject2");
+
+ out.setContent("two");
+ sender.send(out);//should be dropped
+
+ out.setSubject("subject2");
+ out.setContent("three");
+ sender.send(out);//should not be dropped
+
+ BOOST_CHECK(receiver.fetch(in, Duration::SECOND*5));
+ BOOST_CHECK_EQUAL(out.getContent(), in.getContent());
+ BOOST_CHECK(!receiver.fetch(in, Duration::IMMEDIATE));
+ sender.close();
+ receiver.close();
+
+ control.deleteExchange("my-exchange");
+ messaging::Session other = fix.connection.createSession();
+ {
+ ScopedSuppressLogging sl;
+ BOOST_CHECK_THROW(other.createSender("my-exchange"), qpid::messaging::NotFound);
+ }
+ control.deleteQueue("my-queue");
+ other = fix.connection.createSession();
+ {
+ ScopedSuppressLogging sl;
+ BOOST_CHECK_THROW(other.createReceiver("my-queue"), qpid::messaging::NotFound);
+ }
+}
+
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests
diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp
index 80c69ac386..57b344498e 100644
--- a/qpid/cpp/src/tests/QueueTest.cpp
+++ b/qpid/cpp/src/tests/QueueTest.cpp
@@ -244,7 +244,7 @@ QPID_AUTO_TEST_CASE(testBound){
exchange2.reset();
//unbind the queue from all exchanges it knows it has been bound to:
- queue->unbind(exchanges, queue);
+ queue->unbind(exchanges);
//ensure the remaining exchanges don't still have the queue bound to them:
FailOnDeliver deliverable;
diff --git a/qpid/cpp/src/tests/qpid-ctrl b/qpid/cpp/src/tests/qpid-ctrl
index 7b46c190fb..4246c57898 100755
--- a/qpid/cpp/src/tests/qpid-ctrl
+++ b/qpid/cpp/src/tests/qpid-ctrl
@@ -92,7 +92,10 @@ try:
arguments = {}
for a in args:
name, val = nameval(a)
- arguments[name] = val
+ if val[0] == '{' or val[0] == '[':
+ arguments[name] = eval(val)
+ else:
+ arguments[name] = val
content = {
"_object_id": {"_object_name": object_name},
"_method_name": method_name,
diff --git a/qpid/cpp/src/tests/sender.cpp b/qpid/cpp/src/tests/sender.cpp
index 9850e851da..063b5e87dc 100644
--- a/qpid/cpp/src/tests/sender.cpp
+++ b/qpid/cpp/src/tests/sender.cpp
@@ -120,7 +120,7 @@ void Sender::execute(AsyncSession& session, bool isRetry)
string data;
while (getline(std::cin, data)) {
message.setData(data);
- message.getHeaders().setInt("SN", ++sent);
+ //message.getHeaders().setInt("SN", ++sent);
string matchKey;
if (lvqMatchValues && getline(lvqMatchValues, matchKey)) {
message.getHeaders().setString(QueueOptions::strLVQMatchProperty, matchKey);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
index c0afae0773..2c8fd737c3 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
@@ -712,6 +712,25 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable
return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED);
}
+ public BrokerSchema.BrokerClass.CreateMethodResponseCommand create(final BrokerSchema.BrokerClass.CreateMethodResponseCommandFactory factory,
+ final String type,
+ final String name,
+ final Map properties,
+ final java.lang.Boolean lenient)
+ {
+ //TODO:
+ return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED);
+ }
+
+ public BrokerSchema.BrokerClass.DeleteMethodResponseCommand delete(final BrokerSchema.BrokerClass.DeleteMethodResponseCommandFactory factory,
+ final String type,
+ final String name,
+ final Map options)
+ {
+ //TODO:
+ return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED);
+ }
+
public UUID getId()
{
return _obj.getId();
diff --git a/qpid/specs/management-schema.xml b/qpid/specs/management-schema.xml
index 65d8d4f11d..277534250e 100644
--- a/qpid/specs/management-schema.xml
+++ b/qpid/specs/management-schema.xml
@@ -102,6 +102,19 @@
<arg name="level" dir="O" type="sstr"/>
</method>
+ <method name="create" desc="Create an object of the specified type">
+ <arg name="type" dir="I" type="sstr" desc="The type of object to create"/>
+ <arg name="name" dir="I" type="sstr" desc="The name of the object to create"/>
+ <arg name="properties" dir="I" type="map" desc="Type specific object properties"/>
+ <arg name="strict" dir="I" type="bool" desc="If specified, treat unrecognised object properties as an error"/>
+ </method>
+
+ <method name="delete" desc="Delete an object of the specified type">
+ <arg name="type" dir="I" type="sstr" desc="The type of object to delete"/>
+ <arg name="name" dir="I" type="sstr" desc="The name of the object to delete"/>
+ <arg name="options" dir="I" type="map" desc="Type specific object options for deletion"/>
+ </method>
+
</class>
<!--