diff options
author | Gordon Sim <gsim@apache.org> | 2011-02-18 22:38:05 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2011-02-18 22:38:05 +0000 |
commit | 9de60b44364c99806877e82dce12b8cf7497cef2 (patch) | |
tree | ed33629239dadf45f5772607570b48bcefd0839d /qpid | |
parent | d2d9731a6a7903f04df58891cbcac00b03270f6d (diff) | |
download | qpid-python-9de60b44364c99806877e82dce12b8cf7497cef2.tar.gz |
QPID-3015: Added create and delete methods to management schema for broker
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1072179 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid')
-rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.cpp | 410 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.h | 54 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 27 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.h | 15 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionAdapter.cpp | 278 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionAdapter.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementAgent.cpp | 20 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementAgent.h | 7 | ||||
-rw-r--r-- | qpid/cpp/src/tests/MessagingFixture.h | 117 | ||||
-rw-r--r-- | qpid/cpp/src/tests/MessagingSessionTests.cpp | 47 | ||||
-rw-r--r-- | qpid/cpp/src/tests/QueueTest.cpp | 2 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/qpid-ctrl | 5 | ||||
-rw-r--r-- | qpid/cpp/src/tests/sender.cpp | 2 | ||||
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java | 19 | ||||
-rw-r--r-- | qpid/specs/management-schema.xml | 13 |
15 files changed, 817 insertions, 200 deletions
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 3c692fc368..fbd7dd3361 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -20,6 +20,7 @@ */ #include "qpid/broker/Broker.h" +#include "qpid/broker/ConnectionState.h" #include "qpid/broker/DirectExchange.h" #include "qpid/broker/FanOutExchange.h" #include "qpid/broker/HeadersExchange.h" @@ -33,10 +34,19 @@ #include "qpid/broker/ExpiryPolicy.h" #include "qmf/org/apache/qpid/broker/Package.h" +#include "qmf/org/apache/qpid/broker/ArgsBrokerCreate.h" +#include "qmf/org/apache/qpid/broker/ArgsBrokerDelete.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerEcho.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerGetLogLevel.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerQueueMoveMessages.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerSetLogLevel.h" +#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h" +#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h" +#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h" +#include "qmf/org/apache/qpid/broker/EventQueueDelete.h" +#include "qmf/org/apache/qpid/broker/EventBind.h" +#include "qmf/org/apache/qpid/broker/EventUnbind.h" +#include "qpid/amqp_0_10/Codecs.h" #include "qpid/management/ManagementDirectExchange.h" #include "qpid/management/ManagementTopicExchange.h" #include "qpid/log/Logger.h" @@ -44,7 +54,9 @@ #include "qpid/log/Statement.h" #include "qpid/log/posix/SinkOptions.h" #include "qpid/framing/AMQFrame.h" +#include "qpid/framing/FieldTable.h" #include "qpid/framing/ProtocolInitiation.h" +#include "qpid/framing/reply_exceptions.h" #include "qpid/framing/Uuid.h" #include "qpid/sys/ProtocolFactory.h" #include "qpid/sys/Poller.h" @@ -76,7 +88,10 @@ using qpid::management::ManagementAgent; using qpid::management::ManagementObject; using qpid::management::Manageable; using qpid::management::Args; +using qpid::management::getManagementExecutionContext; +using qpid::types::Variant; using std::string; +using std::make_pair; namespace _qmf = qmf::org::apache::qpid::broker; @@ -443,6 +458,20 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, QPID_LOG (debug, "Broker::getLogLevel()"); status = Manageable::STATUS_OK; break; + case _qmf::Broker::METHOD_CREATE : + { + _qmf::ArgsBrokerCreate& a = dynamic_cast<_qmf::ArgsBrokerCreate&>(args); + createObject(a.i_type, a.i_name, a.i_properties, a.i_strict, getManagementExecutionContext()); + status = Manageable::STATUS_OK; + break; + } + case _qmf::Broker::METHOD_DELETE : + { + _qmf::ArgsBrokerDelete& a = dynamic_cast<_qmf::ArgsBrokerDelete&>(args); + deleteObject(a.i_type, a.i_name, a.i_options, getManagementExecutionContext()); + status = Manageable::STATUS_OK; + break; + } default: QPID_LOG (debug, "Broker ManagementMethod not implemented: id=" << methodId << "]"); status = Manageable::STATUS_NOT_IMPLEMENTED; @@ -452,6 +481,169 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, return status; } +namespace +{ +const std::string TYPE_QUEUE("queue"); +const std::string TYPE_EXCHANGE("exchange"); +const std::string TYPE_TOPIC("topic"); +const std::string TYPE_BINDING("binding"); +const std::string DURABLE("durable"); +const std::string AUTO_DELETE("auto-delete"); +const std::string ALTERNATE_EXCHANGE("alternate-exchange"); +const std::string EXCHANGE_TYPE("exchange-type"); +const std::string QUEUE_NAME("queue"); +const std::string EXCHANGE_NAME("exchange"); + +const std::string TRUE("true"); +const std::string FALSE("false"); +} + +struct InvalidBindingIdentifier : public qpid::Exception +{ + InvalidBindingIdentifier(const std::string& name) : qpid::Exception(name) {} + std::string getPrefix() const { return "invalid binding"; } +}; + +struct BindingIdentifier +{ + std::string exchange; + std::string queue; + std::string key; + + BindingIdentifier(const std::string& name) + { + std::vector<std::string> path; + split(path, name, "/"); + switch (path.size()) { + case 1: + queue = path[0]; + break; + case 2: + exchange = path[0]; + queue = path[1]; + break; + case 3: + exchange = path[0]; + queue = path[1]; + key = path[2]; + break; + default: + throw InvalidBindingIdentifier(name); + } + } +}; + +struct ObjectAlreadyExists : public qpid::Exception +{ + ObjectAlreadyExists(const std::string& name) : qpid::Exception(name) {} + std::string getPrefix() const { return "object already exists"; } +}; + +struct UnknownObjectType : public qpid::Exception +{ + UnknownObjectType(const std::string& type) : qpid::Exception(type) {} + std::string getPrefix() const { return "unknown object type"; } +}; + +void Broker::createObject(const std::string& type, const std::string& name, + const Variant::Map& properties, bool /*strict*/, const ConnectionState* context) +{ + std::string userId; + std::string connectionId; + if (context) { + userId = context->getUserId(); + connectionId = context->getUrl(); + } + //TODO: implement 'strict' option (check there are no unrecognised properties) + QPID_LOG (debug, "Broker::create(" << type << ", " << name << "," << properties << ")"); + if (type == TYPE_QUEUE) { + bool durable(false); + bool autodelete(false); + std::string alternateExchange; + Variant::Map extensions; + for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) { + // extract durable, auto-delete and alternate-exchange properties + if (i->first == DURABLE) durable = i->second; + else if (i->first == AUTO_DELETE) autodelete = i->second; + else if (i->first == ALTERNATE_EXCHANGE) alternateExchange = i->second.asString(); + //treat everything else as extension properties + else extensions[i->first] = i->second; + } + framing::FieldTable arguments; + amqp_0_10::translate(extensions, arguments); + + std::pair<boost::shared_ptr<Queue>, bool> result = + createQueue(name, durable, autodelete, 0, alternateExchange, arguments, userId, connectionId); + if (!result.second) { + throw ObjectAlreadyExists(name); + } + } else if (type == TYPE_EXCHANGE || type == TYPE_TOPIC) { + bool durable(false); + std::string exchangeType; + std::string alternateExchange; + Variant::Map extensions; + for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) { + // extract durable, auto-delete and alternate-exchange properties + if (i->first == DURABLE) durable = i->second; + else if (i->first == EXCHANGE_TYPE) exchangeType = i->second.asString(); + else if (i->first == ALTERNATE_EXCHANGE) alternateExchange = i->second.asString(); + //treat everything else as extension properties + else extensions[i->first] = i->second; + } + framing::FieldTable arguments; + amqp_0_10::translate(extensions, arguments); + + try { + std::pair<boost::shared_ptr<Exchange>, bool> result = + createExchange(name, exchangeType, durable, alternateExchange, arguments, userId, connectionId); + if (!result.second) { + throw ObjectAlreadyExists(name); + } + } catch (const UnknownExchangeTypeException&) { + throw Exception(QPID_MSG("Invalid exchange type: " << exchangeType)); + } + } else if (type == TYPE_BINDING) { + BindingIdentifier binding(name); + std::string exchangeType("topic"); + Variant::Map extensions; + for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) { + // extract durable, auto-delete and alternate-exchange properties + if (i->first == EXCHANGE_TYPE) exchangeType = i->second.asString(); + //treat everything else as extension properties + else extensions[i->first] = i->second; + } + framing::FieldTable arguments; + amqp_0_10::translate(extensions, arguments); + + bind(binding.queue, binding.exchange, binding.key, arguments, userId, connectionId); + } else { + throw UnknownObjectType(type); + } +} + +void Broker::deleteObject(const std::string& type, const std::string& name, + const Variant::Map& options, const ConnectionState* context) +{ + std::string userId; + std::string connectionId; + if (context) { + userId = context->getUserId(); + connectionId = context->getUrl(); + } + QPID_LOG (debug, "Broker::delete(" << type << ", " << name << "," << options << ")"); + if (type == TYPE_QUEUE) { + deleteQueue(name, userId, connectionId); + } else if (type == TYPE_EXCHANGE || type == TYPE_TOPIC) { + deleteExchange(name, userId, connectionId); + } else if (type == TYPE_BINDING) { + BindingIdentifier binding(name); + unbind(binding.queue, binding.exchange, binding.key, userId, connectionId); + } else { + throw UnknownObjectType(type); + } + +} + void Broker::setLogLevel(const std::string& level) { QPID_LOG(notice, "Changing log level to " << level); @@ -466,7 +658,7 @@ std::string Broker::getLogLevel() const std::vector<std::string>& selectors = qpid::log::Logger::instance().getOptions().selectors; for (std::vector<std::string>::const_iterator i = selectors.begin(); i != selectors.end(); ++i) { if (i != selectors.begin()) level += std::string(","); - level += *i; + level += *i; } return level; } @@ -552,5 +744,221 @@ void Broker::setClusterTimer(std::auto_ptr<sys::Timer> t) { const std::string Broker::TCP_TRANSPORT("tcp"); + +std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue( + const std::string& name, + bool durable, + bool autodelete, + const OwnershipToken* owner, + const std::string& alternateExchange, + const qpid::framing::FieldTable& arguments, + const std::string& userId, + const std::string& connectionId) +{ + if (acl) { + std::map<acl::Property, std::string> params; + params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange)); + params.insert(make_pair(acl::PROP_PASSIVE, FALSE)); + params.insert(make_pair(acl::PROP_DURABLE, durable ? TRUE : FALSE)); + params.insert(make_pair(acl::PROP_EXCLUSIVE, owner ? TRUE : FALSE)); + params.insert(make_pair(acl::PROP_AUTODELETE, autodelete ? TRUE : FALSE)); + params.insert(make_pair(acl::PROP_POLICYTYPE, arguments.getAsString("qpid.policy_type"))); + params.insert(make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast<string>(arguments.getAsInt("qpid.max_count")))); + params.insert(make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast<string>(arguments.getAsInt64("qpid.max_size")))); + + if (!acl->authorise(userId,acl::ACT_CREATE,acl::OBJ_QUEUE,name,¶ms) ) + throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue create request from " << userId)); + } + + Exchange::shared_ptr alternate; + if (!alternateExchange.empty()) { + alternate = exchanges.get(alternateExchange); + if (!alternate) framing::NotFoundException(QPID_MSG("Alternate exchange does not exist: " << alternateExchange)); + } + + std::pair<Queue::shared_ptr, bool> result = queues.declare(name, durable, autodelete, owner); + if (result.second) { + if (alternate) { + result.first->setAlternateExchange(alternate); + alternate->incAlternateUsers(); + } + + //apply settings & create persistent record if required + result.first->create(arguments); + //add default binding: + result.first->bind(exchanges.getDefault(), name); + + if (managementAgent.get()) { + //TODO: debatable whether we should raise an event here for + //create when this is a 'declare' event; ideally add a create + //event instead? + managementAgent->raiseEvent( + _qmf::EventQueueDeclare(connectionId, userId, name, + durable, owner, autodelete, + ManagementAgent::toMap(arguments), + "created")); + } + } + return result; +} + +void Broker::deleteQueue(const std::string& name, const std::string& userId, + const std::string& connectionId, QueueFunctor check) +{ + if (acl && !acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_QUEUE,name,NULL)) { + throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue delete request from " << userId)); + } + + Queue::shared_ptr queue = queues.find(name); + if (queue) { + if (check) check(queue); + queues.destroy(name); + queue->destroyed(); + } else { + throw framing::NotFoundException(QPID_MSG("Delete failed. No such queue: " << name)); + } + + if (managementAgent.get()) + managementAgent->raiseEvent(_qmf::EventQueueDelete(connectionId, userId, name)); + +} + +std::pair<Exchange::shared_ptr, bool> Broker::createExchange( + const std::string& name, + const std::string& type, + bool durable, + const std::string& alternateExchange, + const qpid::framing::FieldTable& arguments, + const std::string& userId, + const std::string& connectionId) +{ + if (acl) { + std::map<acl::Property, std::string> params; + params.insert(make_pair(acl::PROP_TYPE, type)); + params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange)); + params.insert(make_pair(acl::PROP_PASSIVE, FALSE)); + params.insert(make_pair(acl::PROP_DURABLE, durable ? TRUE : FALSE)); + if (!acl->authorise(userId,acl::ACT_CREATE,acl::OBJ_EXCHANGE,name,¶ms) ) + throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange create request from " << userId)); + } + + Exchange::shared_ptr alternate; + if (!alternateExchange.empty()) { + alternate = exchanges.get(alternateExchange); + if (!alternate) framing::NotFoundException(QPID_MSG("Alternate exchange does not exist: " << alternateExchange)); + } + + std::pair<Exchange::shared_ptr, bool> result; + result = exchanges.declare(name, type, durable, arguments); + if (result.second) { + if (alternate) { + result.first->setAlternate(alternate); + alternate->incAlternateUsers(); + } + if (durable) { + store->create(*result.first, arguments); + } + if (managementAgent.get()) { + //TODO: debatable whether we should raise an event here for + //create when this is a 'declare' event; ideally add a create + //event instead? + managementAgent->raiseEvent(_qmf::EventExchangeDeclare(connectionId, + userId, + name, + type, + alternateExchange, + durable, + false, + ManagementAgent::toMap(arguments), + "created")); + } + } + return result; +} + +void Broker::deleteExchange(const std::string& name, const std::string& userId, + const std::string& connectionId) +{ + if (acl) { + if (!acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_EXCHANGE,name,NULL) ) + throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange delete request from " << userId)); + } + + Exchange::shared_ptr exchange(exchanges.get(name)); + if (!exchange) throw framing::NotFoundException(QPID_MSG("Delete failed. No such exchange: " << name)); + if (exchange->inUseAsAlternate()) throw framing::NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange.")); + if (exchange->isDurable()) store->destroy(*exchange); + if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers(); + exchanges.destroy(name); + + if (managementAgent.get()) + managementAgent->raiseEvent(_qmf::EventExchangeDelete(connectionId, userId, name)); + +} + +void Broker::bind(const std::string& queueName, + const std::string& exchangeName, + const std::string& key, + const qpid::framing::FieldTable& arguments, + const std::string& userId, + const std::string& connectionId) +{ + if (acl) { + std::map<acl::Property, std::string> params; + params.insert(make_pair(acl::PROP_QUEUENAME, queueName)); + params.insert(make_pair(acl::PROP_ROUTINGKEY, key)); + + if (!acl->authorise(userId,acl::ACT_BIND,acl::OBJ_EXCHANGE,exchangeName,¶ms)) + throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange bind request from " << userId)); + } + + Queue::shared_ptr queue = queues.find(queueName); + Exchange::shared_ptr exchange = exchanges.get(exchangeName); + if (!queue) { + throw framing::NotFoundException(QPID_MSG("Bind failed. No such queue: " << queueName)); + } else if (!exchange) { + throw framing::NotFoundException(QPID_MSG("Bind failed. No such exchange: " << exchangeName)); + } else { + if (queue->bind(exchange, key, arguments)) { + if (managementAgent.get()) { + managementAgent->raiseEvent(_qmf::EventBind(connectionId, userId, exchangeName, + queueName, key, ManagementAgent::toMap(arguments))); + } + } + } +} + +void Broker::unbind(const std::string& queueName, + const std::string& exchangeName, + const std::string& key, + const std::string& userId, + const std::string& connectionId) +{ + if (acl) { + std::map<acl::Property, std::string> params; + params.insert(make_pair(acl::PROP_QUEUENAME, queueName)); + params.insert(make_pair(acl::PROP_ROUTINGKEY, key)); + if (!acl->authorise(userId,acl::ACT_UNBIND,acl::OBJ_EXCHANGE,exchangeName,¶ms) ) + throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange unbind request from " << userId)); + } + + Queue::shared_ptr queue = queues.find(queueName); + Exchange::shared_ptr exchange = exchanges.get(exchangeName); + if (!queue) { + throw framing::NotFoundException(QPID_MSG("Bind failed. No such queue: " << queueName)); + } else if (!exchange) { + throw framing::NotFoundException(QPID_MSG("Bind failed. No such exchange: " << exchangeName)); + } else { + if (exchange->unbind(queue, key, 0)) { + if (exchange->isDurable() && queue->isDurable()) { + store->unbind(*exchange, *queue, key, qpid::framing::FieldTable()); + } + if (managementAgent.get()) { + managementAgent->raiseEvent(_qmf::EventUnbind(connectionId, userId, exchangeName, queueName, key)); + } + } + } +} + }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index cd6f81dc70..9af9020c8f 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -49,6 +49,7 @@ #include "qpid/framing/ProtocolInitiation.h" #include "qpid/sys/Runnable.h" #include "qpid/sys/Timer.h" +#include "qpid/types/Variant.h" #include "qpid/RefCounted.h" #include "qpid/broker/AclModule.h" #include "qpid/sys/Mutex.h" @@ -57,7 +58,7 @@ #include <string> #include <vector> -namespace qpid { +namespace qpid { namespace sys { class ProtocolFactory; @@ -68,6 +69,7 @@ struct Url; namespace broker { +class ConnectionState; class ExpiryPolicy; class Message; @@ -80,7 +82,7 @@ struct NoSuchTransportException : qpid::Exception }; /** - * A broker instance. + * A broker instance. */ class Broker : public sys::Runnable, public Plugin::Target, public management::Manageable, @@ -148,6 +150,10 @@ public: void setStore (); void setLogLevel(const std::string& level); std::string getLogLevel(); + void createObject(const std::string& type, const std::string& name, + const qpid::types::Variant::Map& properties, bool lenient, const ConnectionState* context); + void deleteObject(const std::string& type, const std::string& name, + const qpid::types::Variant::Map& options, const ConnectionState* context); boost::shared_ptr<sys::Poller> poller; sys::Timer timer; @@ -179,7 +185,7 @@ public: bool clusterUpdatee; boost::intrusive_ptr<ExpiryPolicy> expiryPolicy; ConnectionCounter connectionCounter; - + public: virtual ~Broker(); @@ -277,7 +283,7 @@ public: bool isClusterUpdatee() const { return clusterUpdatee; } management::ManagementAgent* getManagementAgent() { return managementAgent.get(); } - + ConnectionCounter& getConnectionCounter() {return connectionCounter;} /** @@ -290,6 +296,42 @@ public: const boost::intrusive_ptr<Message>& msg)> deferDelivery; bool isAuthenticating ( ) { return config.auth; } + + typedef boost::function1<void, boost::shared_ptr<Queue> > QueueFunctor; + + std::pair<boost::shared_ptr<Queue>, bool> createQueue( + const std::string& name, + bool durable, + bool autodelete, + const OwnershipToken* owner, + const std::string& alternateExchange, + const qpid::framing::FieldTable& arguments, + const std::string& userId, + const std::string& connectionId); + void deleteQueue(const std::string& name, + const std::string& userId, + const std::string& connectionId, + QueueFunctor check = QueueFunctor()); + std::pair<Exchange::shared_ptr, bool> createExchange( + const std::string& name, + const std::string& type, + bool durable, + const std::string& alternateExchange, + const qpid::framing::FieldTable& args, + const std::string& userId, const std::string& connectionId); + void deleteExchange(const std::string& name, const std::string& userId, + const std::string& connectionId); + void bind(const std::string& queue, + const std::string& exchange, + const std::string& key, + const qpid::framing::FieldTable& arguments, + const std::string& userId, + const std::string& connectionId); + void unbind(const std::string& queue, + const std::string& exchange, + const std::string& key, + const std::string& userId, + const std::string& connectionId); }; }} diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 27c1cc4ad7..40cb80010c 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -444,7 +444,7 @@ void Queue::purgeExpired() Mutex::ScopedLock locker(messageLock); messages->removeIf(boost::bind(&collect_if_expired, expired, _1)); } - for_each(expired.begin(), expired.end(), bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); + for_each(expired.begin(), expired.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); } } @@ -826,8 +826,9 @@ void Queue::configure(const FieldTable& _settings, bool recovering) store->create(*this, _settings); } -void Queue::destroy() +void Queue::destroyed() { + unbind(broker->getExchanges()); if (alternateExchange.get()) { Mutex::ScopedLock locker(messageLock); while(!messages->empty()){ @@ -846,6 +847,7 @@ void Queue::destroy() store = 0;//ensure we make no more calls to the store for this queue } if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr<TimerTask>(); + notifyDeleted(); } void Queue::notifyDeleted() @@ -865,9 +867,9 @@ void Queue::bound(const string& exchange, const string& key, bindings.add(exchange, key, args); } -void Queue::unbind(ExchangeRegistry& exchanges, Queue::shared_ptr shared_ref) +void Queue::unbind(ExchangeRegistry& exchanges) { - bindings.unbind(exchanges, shared_ref); + bindings.unbind(exchanges, shared_from_this()); } void Queue::setPolicy(std::auto_ptr<QueuePolicy> _policy) @@ -955,8 +957,7 @@ void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue) if (broker.getQueues().destroyIf(queue->getName(), boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) { QPID_LOG(debug, "Auto-deleting " << queue->getName()); - queue->unbind(broker.getExchanges(), queue); - queue->destroy(); + queue->destroyed(); } } @@ -1175,6 +1176,20 @@ void Queue::flush() if (u.acquired && store) store->flush(*this); } +bool Queue::bind(boost::shared_ptr<Exchange> exchange, const std::string& key, + const qpid::framing::FieldTable& arguments) +{ + if (exchange->bind(shared_from_this(), key, &arguments)) { + bound(exchange->getName(), key, arguments); + if (exchange->isDurable() && isDurable()) { + store->bind(*exchange, *this, key, arguments); + } + return true; + } else { + return false; + } +} + Queue::UsageBarrier::UsageBarrier(Queue& q) : parent(q), count(0) {} bool Queue::UsageBarrier::acquire() diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 12a3d273be..1a3b632845 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -172,8 +172,9 @@ class Queue : public boost::enable_shared_from_this<Queue>, } } } - + void checkNotDeleted(); + void notifyDeleted(); public: @@ -196,13 +197,17 @@ class Queue : public boost::enable_shared_from_this<Queue>, // "recovering" means we are doing a MessageStore recovery. QPID_BROKER_EXTERN void configure(const qpid::framing::FieldTable& settings, bool recovering = false); - void destroy(); - void notifyDeleted(); + void destroyed(); QPID_BROKER_EXTERN void bound(const std::string& exchange, const std::string& key, const qpid::framing::FieldTable& args); - QPID_BROKER_EXTERN void unbind(ExchangeRegistry& exchanges, - Queue::shared_ptr shared_ref); + //TODO: get unbind out of the public interface; only there for purposes of one unit test + void unbind(ExchangeRegistry& exchanges); + /** + * Bind self to specified exchange, and record that binding for unbinding on delete. + */ + bool bind(boost::shared_ptr<Exchange> exchange, const std::string& key, + const qpid::framing::FieldTable& arguments=qpid::framing::FieldTable()); QPID_BROKER_EXTERN bool acquire(const QueuedMessage& msg); QPID_BROKER_EXTERN bool acquireMessageAt(const qpid::framing::SequenceNumber& position, QueuedMessage& message); diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp index 3d62e73185..c2f90dce47 100644 --- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp @@ -64,51 +64,54 @@ void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const const string& alternateExchange, bool passive, bool durable, bool /*autoDelete*/, const FieldTable& args){ - AclModule* acl = getBroker().getAcl(); - if (acl) { - std::map<acl::Property, std::string> params; - params.insert(make_pair(acl::PROP_TYPE, type)); - params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange)); - params.insert(make_pair(acl::PROP_PASSIVE, std::string(passive ? _TRUE : _FALSE) )); - params.insert(make_pair(acl::PROP_DURABLE, std::string(durable ? _TRUE : _FALSE))); - if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_EXCHANGE,exchange,¶ms) ) - throw UnauthorizedAccessException(QPID_MSG("ACL denied exchange declare request from " << getConnection().getUserId())); - } - //TODO: implement autoDelete Exchange::shared_ptr alternate; if (!alternateExchange.empty()) { alternate = getBroker().getExchanges().get(alternateExchange); } if(passive){ + AclModule* acl = getBroker().getAcl(); + if (acl) { + //TODO: why does a passive declare require create + //permission? The purpose of the passive flag is to state + //that the exchange should *not* created. For + //authorisation a passive declare is similar to + //exchange-query. + std::map<acl::Property, std::string> params; + params.insert(make_pair(acl::PROP_TYPE, type)); + params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange)); + params.insert(make_pair(acl::PROP_PASSIVE, _TRUE)); + params.insert(make_pair(acl::PROP_DURABLE, durable ? _TRUE : _FALSE)); + if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_EXCHANGE,exchange,¶ms) ) + throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange create request from " << getConnection().getUserId())); + } Exchange::shared_ptr actual(getBroker().getExchanges().get(exchange)); checkType(actual, type); checkAlternate(actual, alternate); - }else{ + }else{ if(exchange.find("amq.") == 0 || exchange.find("qpid.") == 0) { throw framing::NotAllowedException(QPID_MSG("Exchange names beginning with \"amq.\" or \"qpid.\" are reserved. (exchange=\"" << exchange << "\")")); } try{ - std::pair<Exchange::shared_ptr, bool> response = getBroker().getExchanges().declare(exchange, type, durable, args); - if (response.second) { - if (alternate) { - response.first->setAlternate(alternate); - alternate->incAlternateUsers(); - } - if (durable) { - getBroker().getStore().create(*response.first, args); - } - } else { + std::pair<Exchange::shared_ptr, bool> response = + getBroker().createExchange(exchange, type, durable, alternateExchange, args, + getConnection().getUserId(), getConnection().getUrl()); + if (!response.second) { + //exchange already there, not created checkType(response.first, type); checkAlternate(response.first, alternate); + ManagementAgent* agent = getBroker().getManagementAgent(); + if (agent) + agent->raiseEvent(_qmf::EventExchangeDeclare(getConnection().getUrl(), + getConnection().getUserId(), + exchange, + type, + alternateExchange, + durable, + false, + ManagementAgent::toMap(args), + "existing")); } - - ManagementAgent* agent = getBroker().getManagementAgent(); - if (agent) - agent->raiseEvent(_qmf::EventExchangeDeclare(getConnection().getUrl(), getConnection().getUserId(), exchange, type, - alternateExchange, durable, false, ManagementAgent::toMap(args), - response.second ? "created" : "existing")); - }catch(UnknownExchangeTypeException& /*e*/){ throw CommandInvalidException(QPID_MSG("Exchange type not implemented: " << type)); } @@ -134,22 +137,8 @@ void SessionAdapter::ExchangeHandlerImpl::checkAlternate(Exchange::shared_ptr ex void SessionAdapter::ExchangeHandlerImpl::delete_(const string& name, bool /*ifUnused*/) { - AclModule* acl = getBroker().getAcl(); - if (acl) { - if (!acl->authorise(getConnection().getUserId(),acl::ACT_DELETE,acl::OBJ_EXCHANGE,name,NULL) ) - throw UnauthorizedAccessException(QPID_MSG("ACL denied exchange delete request from " << getConnection().getUserId())); - } - - //TODO: implement unused - Exchange::shared_ptr exchange(getBroker().getExchanges().get(name)); - if (exchange->inUseAsAlternate()) throw NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange.")); - if (exchange->isDurable()) getBroker().getStore().destroy(*exchange); - if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers(); - getBroker().getExchanges().destroy(name); - - ManagementAgent* agent = getBroker().getManagementAgent(); - if (agent) - agent->raiseEvent(_qmf::EventExchangeDelete(getConnection().getUrl(), getConnection().getUserId(), name)); + //TODO: implement if-unused + getBroker().deleteExchange(name, getConnection().getUserId(), getConnection().getUrl()); } ExchangeQueryResult SessionAdapter::ExchangeHandlerImpl::query(const string& name) @@ -169,67 +158,19 @@ ExchangeQueryResult SessionAdapter::ExchangeHandlerImpl::query(const string& nam } void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName, - const string& exchangeName, const string& routingKey, - const FieldTable& arguments) + const string& exchangeName, const string& routingKey, + const FieldTable& arguments) { - AclModule* acl = getBroker().getAcl(); - if (acl) { - std::map<acl::Property, std::string> params; - params.insert(make_pair(acl::PROP_QUEUENAME, queueName)); - params.insert(make_pair(acl::PROP_ROUTINGKEY, routingKey)); - - if (!acl->authorise(getConnection().getUserId(),acl::ACT_BIND,acl::OBJ_EXCHANGE,exchangeName,¶ms)) - throw UnauthorizedAccessException(QPID_MSG("ACL denied exchange bind request from " << getConnection().getUserId())); - } - - Queue::shared_ptr queue = getQueue(queueName); - Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName); - if(exchange){ - string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey; - if (exchange->bind(queue, exchangeRoutingKey, &arguments)) { - queue->bound(exchangeName, routingKey, arguments); - if (exchange->isDurable() && queue->isDurable()) { - getBroker().getStore().bind(*exchange, *queue, routingKey, arguments); - } - - ManagementAgent* agent = getBroker().getManagementAgent(); - if (agent) - agent->raiseEvent(_qmf::EventBind(getConnection().getUrl(), getConnection().getUserId(), exchangeName, - queueName, exchangeRoutingKey, ManagementAgent::toMap(arguments))); - } - }else{ - throw NotFoundException("Bind failed. No such exchange: " + exchangeName); - } + getBroker().bind(queueName, exchangeName, routingKey, arguments, + getConnection().getUserId(), getConnection().getUrl()); } void SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName, const string& exchangeName, const string& routingKey) { - AclModule* acl = getBroker().getAcl(); - if (acl) { - std::map<acl::Property, std::string> params; - params.insert(make_pair(acl::PROP_QUEUENAME, queueName)); - params.insert(make_pair(acl::PROP_ROUTINGKEY, routingKey)); - if (!acl->authorise(getConnection().getUserId(),acl::ACT_UNBIND,acl::OBJ_EXCHANGE,exchangeName,¶ms) ) - throw UnauthorizedAccessException(QPID_MSG("ACL denied exchange unbind request from " << getConnection().getUserId())); - } - - Queue::shared_ptr queue = getQueue(queueName); - if (!queue.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName); - - Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName); - if (!exchange.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName); - - //TODO: revise unbind to rely solely on binding key (not args) - if (exchange->unbind(queue, routingKey, 0)) { - if (exchange->isDurable() && queue->isDurable()) - getBroker().getStore().unbind(*exchange, *queue, routingKey, FieldTable()); - - ManagementAgent* agent = getBroker().getManagementAgent(); - if (agent) - agent->raiseEvent(_qmf::EventUnbind(getConnection().getUrl(), getConnection().getUserId(), exchangeName, queueName, routingKey)); - } + getBroker().unbind(queueName, exchangeName, routingKey, + getConnection().getUserId(), getConnection().getUrl()); } ExchangeBoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::string& exchangeName, @@ -332,52 +273,42 @@ QueueQueryResult SessionAdapter::QueueHandlerImpl::query(const string& name) void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& alternateExchange, bool passive, bool durable, bool exclusive, bool autoDelete, const qpid::framing::FieldTable& arguments) -{ - AclModule* acl = getBroker().getAcl(); - if (acl) { - std::map<acl::Property, std::string> params; - params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange)); - params.insert(make_pair(acl::PROP_PASSIVE, std::string(passive ? _TRUE : _FALSE) )); - params.insert(make_pair(acl::PROP_DURABLE, std::string(durable ? _TRUE : _FALSE))); - params.insert(make_pair(acl::PROP_EXCLUSIVE, std::string(exclusive ? _TRUE : _FALSE))); - params.insert(make_pair(acl::PROP_AUTODELETE, std::string(autoDelete ? _TRUE : _FALSE))); - params.insert(make_pair(acl::PROP_POLICYTYPE, arguments.getAsString("qpid.policy_type"))); - params.insert(make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast<string>(arguments.getAsInt("qpid.max_count")))); - params.insert(make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast<string>(arguments.getAsInt64("qpid.max_size")))); - - if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_QUEUE,name,¶ms) ) - throw UnauthorizedAccessException(QPID_MSG("ACL denied queue create request from " << getConnection().getUserId())); - } - - Exchange::shared_ptr alternate; - if (!alternateExchange.empty()) { - alternate = getBroker().getExchanges().get(alternateExchange); - } +{ Queue::shared_ptr queue; if (passive && !name.empty()) { - queue = getQueue(name); + AclModule* acl = getBroker().getAcl(); + if (acl) { + //TODO: why does a passive declare require create + //permission? The purpose of the passive flag is to state + //that the queue should *not* created. For + //authorisation a passive declare is similar to + //queue-query (or indeed a qmf query). + std::map<acl::Property, std::string> params; + params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange)); + params.insert(make_pair(acl::PROP_PASSIVE, _TRUE)); + params.insert(make_pair(acl::PROP_DURABLE, std::string(durable ? _TRUE : _FALSE))); + params.insert(make_pair(acl::PROP_EXCLUSIVE, std::string(exclusive ? _TRUE : _FALSE))); + params.insert(make_pair(acl::PROP_AUTODELETE, std::string(autoDelete ? _TRUE : _FALSE))); + params.insert(make_pair(acl::PROP_POLICYTYPE, arguments.getAsString("qpid.policy_type"))); + params.insert(make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast<string>(arguments.getAsInt("qpid.max_count")))); + params.insert(make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast<string>(arguments.getAsInt64("qpid.max_size")))); + if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_QUEUE,name,¶ms) ) + throw UnauthorizedAccessException(QPID_MSG("ACL denied queue create request from " << getConnection().getUserId())); + } + queue = getQueue(name); //TODO: check alternate-exchange is as expected } else { - std::pair<Queue::shared_ptr, bool> queue_created = - getBroker().getQueues().declare(name, durable, - autoDelete, - exclusive ? &session : 0); + std::pair<Queue::shared_ptr, bool> queue_created = + getBroker().createQueue(name, durable, + autoDelete, + exclusive ? &session : 0, + alternateExchange, + arguments, + getConnection().getUserId(), + getConnection().getUrl()); queue = queue_created.first; assert(queue); if (queue_created.second) { // This is a new queue - if (alternate) { - queue->setAlternateExchange(alternate); - alternate->incAlternateUsers(); - } - - //apply settings & create persistent record if required - try { queue_created.first->create(arguments); } - catch (...) { getBroker().getQueues().destroy(name); throw; } - - //add default binding: - getBroker().getExchanges().getDefault()->bind(queue, name, 0); - queue->bound(getBroker().getExchanges().getDefault()->getName(), name, arguments); - //handle automatic cleanup: if (exclusive) { exclusiveQueues.push_back(queue); @@ -386,21 +317,20 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& if (exclusive && queue->setExclusiveOwner(&session)) { exclusiveQueues.push_back(queue); } + ManagementAgent* agent = getBroker().getManagementAgent(); + if (agent) + agent->raiseEvent(_qmf::EventQueueDeclare(getConnection().getUrl(), getConnection().getUserId(), + name, durable, exclusive, autoDelete, ManagementAgent::toMap(arguments), + "existing")); } - ManagementAgent* agent = getBroker().getManagementAgent(); - if (agent) - agent->raiseEvent(_qmf::EventQueueDeclare(getConnection().getUrl(), getConnection().getUserId(), - name, durable, exclusive, autoDelete, ManagementAgent::toMap(arguments), - queue_created.second ? "created" : "existing")); } - if (exclusive && !queue->isExclusiveOwner(&session)) + if (exclusive && !queue->isExclusiveOwner(&session)) throw ResourceLockedException(QPID_MSG("Cannot grant exclusive access to queue " << queue->getName())); -} - - +} + void SessionAdapter::QueueHandlerImpl::purge(const string& queue){ AclModule* acl = getBroker().getAcl(); if (acl) @@ -409,40 +339,32 @@ void SessionAdapter::QueueHandlerImpl::purge(const string& queue){ throw UnauthorizedAccessException(QPID_MSG("ACL denied queue purge request from " << getConnection().getUserId())); } getQueue(queue)->purge(); -} - -void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnused, bool ifEmpty){ - - AclModule* acl = getBroker().getAcl(); - if (acl) - { - if (!acl->authorise(getConnection().getUserId(),acl::ACT_DELETE,acl::OBJ_QUEUE,queue,NULL) ) - throw UnauthorizedAccessException(QPID_MSG("ACL denied queue delete request from " << getConnection().getUserId())); - } +} - Queue::shared_ptr q = getQueue(queue); - if (q->hasExclusiveOwner() && !q->isExclusiveOwner(&session)) +void SessionAdapter::QueueHandlerImpl::checkDelete(Queue::shared_ptr queue, bool ifUnused, bool ifEmpty) +{ + if (queue->hasExclusiveOwner() && !queue->isExclusiveOwner(&session)) { throw ResourceLockedException(QPID_MSG("Cannot delete queue " - << queue << "; it is exclusive to another session")); - if(ifEmpty && q->getMessageCount() > 0){ - throw PreconditionFailedException("Queue not empty."); - }else if(ifUnused && q->getConsumerCount() > 0){ - throw PreconditionFailedException("Queue in use."); - }else{ + << queue->getName() << "; it is exclusive to another session")); + } else if(ifEmpty && queue->getMessageCount() > 0) { + throw PreconditionFailedException(QPID_MSG("Cannot delete queue " + << queue->getName() << "; queue not empty")); + } else if(ifUnused && queue->getConsumerCount() > 0) { + throw PreconditionFailedException(QPID_MSG("Cannot delete queue " + << queue->getName() << "; queue in use")); + } else if (queue->isExclusiveOwner(&getConnection())) { //remove the queue from the list of exclusive queues if necessary - if(q->isExclusiveOwner(&getConnection())){ - QueueVector::iterator i = std::find(getConnection().exclusiveQueues.begin(), getConnection().exclusiveQueues.end(), q); - if(i < getConnection().exclusiveQueues.end()) getConnection().exclusiveQueues.erase(i); - } - q->destroy(); - getBroker().getQueues().destroy(queue); - q->unbind(getBroker().getExchanges(), q); - - ManagementAgent* agent = getBroker().getManagementAgent(); - if (agent) - agent->raiseEvent(_qmf::EventQueueDelete(getConnection().getUrl(), getConnection().getUserId(), queue)); - q->notifyDeleted(); - } + QueueVector::iterator i = std::find(getConnection().exclusiveQueues.begin(), + getConnection().exclusiveQueues.end(), + queue); + if (i < getConnection().exclusiveQueues.end()) getConnection().exclusiveQueues.erase(i); + } +} + +void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnused, bool ifEmpty) +{ + getBroker().deleteQueue(queue, getConnection().getUserId(), getConnection().getUrl(), + boost::bind(&SessionAdapter::QueueHandlerImpl::checkDelete, this, _1, ifUnused, ifEmpty)); } SessionAdapter::MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) : diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.h b/qpid/cpp/src/qpid/broker/SessionAdapter.h index ca27fb6e1d..8987c4812f 100644 --- a/qpid/cpp/src/qpid/broker/SessionAdapter.h +++ b/qpid/cpp/src/qpid/broker/SessionAdapter.h @@ -138,6 +138,7 @@ class Queue; bool isLocal(const ConnectionToken* t) const; void destroyExclusiveQueues(); + void checkDelete(boost::shared_ptr<Queue> queue, bool ifUnused, bool ifEmpty); template <class F> void eachExclusiveQueue(F f) { std::for_each(exclusiveQueues.begin(), exclusiveQueues.end(), f); diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp index 23c999a98a..8b4defaa73 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp +++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp @@ -31,6 +31,7 @@ #include <qpid/broker/Message.h> #include "qpid/framing/MessageTransferBody.h" #include "qpid/sys/Time.h" +#include "qpid/sys/Thread.h" #include "qpid/broker/ConnectionState.h" #include "qpid/broker/AclModule.h" #include "qpid/types/Variant.h" @@ -2237,6 +2238,7 @@ void ManagementAgent::dispatchAgentCommandLH(Message& msg, bool viaLocal) uint32_t bufferLen = inBuffer.getPosition(); inBuffer.reset(); + setManagementExecutionContext((const qpid::broker::ConnectionState*) msg.getPublisher()); const framing::FieldTable *headers = msg.getApplicationHeaders(); if (headers && msg.getAppId() == "qmf2") { @@ -3085,3 +3087,21 @@ bool ManagementAgent::moveDeletedObjectsLH() { } return !deleteList.empty(); } + +namespace qpid { +namespace management { + +namespace { +QPID_TSS const qpid::broker::ConnectionState* executionContext = 0; +} + +void setManagementExecutionContext(const qpid::broker::ConnectionState* ctxt) +{ + executionContext = ctxt; +} +const qpid::broker::ConnectionState* getManagementExecutionContext() +{ + return executionContext; +} + +}} diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h index 0db19594a7..fb15dc6ed1 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.h +++ b/qpid/cpp/src/qpid/management/ManagementAgent.h @@ -41,6 +41,9 @@ #include <map> namespace qpid { +namespace broker { +class ConnectionState; +} namespace management { class ManagementAgent @@ -422,6 +425,8 @@ private: void debugSnapshot(const char* title); }; +void setManagementExecutionContext(const qpid::broker::ConnectionState*); +const qpid::broker::ConnectionState* getManagementExecutionContext(); }} - + #endif /*!_ManagementAgent_*/ diff --git a/qpid/cpp/src/tests/MessagingFixture.h b/qpid/cpp/src/tests/MessagingFixture.h index 715de09bad..2312a87e9d 100644 --- a/qpid/cpp/src/tests/MessagingFixture.h +++ b/qpid/cpp/src/tests/MessagingFixture.h @@ -27,15 +27,19 @@ #include "qpid/client/Connection.h" #include "qpid/client/Session.h" #include "qpid/framing/Uuid.h" +#include "qpid/messaging/Address.h" #include "qpid/messaging/Connection.h" #include "qpid/messaging/Session.h" #include "qpid/messaging/Sender.h" #include "qpid/messaging/Receiver.h" #include "qpid/messaging/Message.h" +#include "qpid/types/Variant.h" namespace qpid { namespace tests { +using qpid::types::Variant; + struct BrokerAdmin { qpid::client::Connection connection; @@ -223,6 +227,119 @@ inline void receive(messaging::Receiver& receiver, uint count = 1, uint start = } } + +class MethodInvoker +{ + public: + MethodInvoker(messaging::Session& session) : replyTo("#; {create:always, node:{x-declare:{auto-delete:true}}}"), + sender(session.createSender("qmf.default.direct/broker")), + receiver(session.createReceiver(replyTo)) {} + + void createExchange(const std::string& name, const std::string& type, bool durable=false) + { + Variant::Map params; + params["name"]=name; + params["type"]="exchange"; + params["properties"] = Variant::Map(); + params["properties"].asMap()["exchange-type"] = type; + params["properties"].asMap()["durable"] = durable; + methodRequest("create", params); + } + + void deleteExchange(const std::string& name) + { + Variant::Map params; + params["name"]=name; + params["type"]="exchange"; + methodRequest("delete", params); + } + + void createQueue(const std::string& name, bool durable=false, bool autodelete=false, + const Variant::Map& options=Variant::Map()) + { + Variant::Map params; + params["name"]=name; + params["type"]="queue"; + params["properties"] = options; + params["properties"].asMap()["durable"] = durable; + params["properties"].asMap()["auto-delete"] = autodelete; + methodRequest("create", params); + } + + void deleteQueue(const std::string& name) + { + Variant::Map params; + params["name"]=name; + params["type"]="queue"; + methodRequest("delete", params); + } + + void bind(const std::string& exchange, const std::string& queue, const std::string& key, + const Variant::Map& options=Variant::Map()) + { + Variant::Map params; + params["name"]=(boost::format("%1%/%2%/%3%") % (exchange) % (queue) % (key)).str(); + params["type"]="binding"; + params["properties"] = options; + methodRequest("create", params); + } + + void unbind(const std::string& exchange, const std::string& queue, const std::string& key) + { + Variant::Map params; + params["name"]=(boost::format("%1%/%2%/%3%") % (exchange) % (queue) % (key)).str(); + params["type"]="binding"; + methodRequest("delete", params); + } + + void methodRequest(const std::string& method, const Variant::Map& inParams, Variant::Map* outParams = 0) + { + Variant::Map content; + Variant::Map objectId; + objectId["_object_name"] = "org.apache.qpid.broker:broker:amqp-broker"; + content["_object_id"] = objectId; + content["_method_name"] = method; + content["_arguments"] = inParams; + + messaging::Message request; + request.setReplyTo(replyTo); + request.getProperties()["x-amqp-0-10.app-id"] = "qmf2"; + request.getProperties()["qmf.opcode"] = "_method_request"; + encode(content, request); + + sender.send(request); + + messaging::Message response; + if (receiver.fetch(response, messaging::Duration::SECOND*5)) { + if (response.getProperties()["x-amqp-0-10.app-id"] == "qmf2") { + std::string opcode = response.getProperties()["qmf.opcode"]; + if (opcode == "_method_response") { + if (outParams) { + Variant::Map m; + decode(response, m); + *outParams = m["_arguments"].asMap(); + } + } else if (opcode == "_exception") { + Variant::Map m; + decode(response, m); + throw Exception(QPID_MSG("Error: " << m["_values"])); + } else { + throw Exception(QPID_MSG("Invalid response received, unexpected opcode: " << opcode)); + } + } else { + throw Exception(QPID_MSG("Invalid response received, not a qmfv2 message: app-id=" + << response.getProperties()["x-amqp-0-10.app-id"])); + } + } else { + throw Exception(QPID_MSG("No response received")); + } + } + private: + messaging::Address replyTo; + messaging::Sender sender; + messaging::Receiver receiver; +}; + }} // namespace qpid::tests #endif /*!TESTS_MESSAGINGFIXTURE_H*/ diff --git a/qpid/cpp/src/tests/MessagingSessionTests.cpp b/qpid/cpp/src/tests/MessagingSessionTests.cpp index 991ec847bf..f9a8b0e4c1 100644 --- a/qpid/cpp/src/tests/MessagingSessionTests.cpp +++ b/qpid/cpp/src/tests/MessagingSessionTests.cpp @@ -890,6 +890,53 @@ QPID_AUTO_TEST_CASE(testAcknowledge) BOOST_CHECK(!fix.session.createReceiver(fix.queue).fetch(m, Duration::IMMEDIATE)); } +QPID_AUTO_TEST_CASE(testQmfCreateAndDelete) +{ + MessagingFixture fix(Broker::Options(), true/*enable management*/); + MethodInvoker control(fix.session); + control.createQueue("my-queue"); + control.createExchange("my-exchange", "topic"); + control.bind("my-exchange", "my-queue", "subject1"); + + Sender sender = fix.session.createSender("my-exchange"); + Receiver receiver = fix.session.createReceiver("my-queue"); + Message out; + out.setSubject("subject1"); + out.setContent("one"); + sender.send(out); + Message in; + BOOST_CHECK(receiver.fetch(in, Duration::SECOND*5)); + BOOST_CHECK_EQUAL(out.getContent(), in.getContent()); + control.unbind("my-exchange", "my-queue", "subject1"); + control.bind("my-exchange", "my-queue", "subject2"); + + out.setContent("two"); + sender.send(out);//should be dropped + + out.setSubject("subject2"); + out.setContent("three"); + sender.send(out);//should not be dropped + + BOOST_CHECK(receiver.fetch(in, Duration::SECOND*5)); + BOOST_CHECK_EQUAL(out.getContent(), in.getContent()); + BOOST_CHECK(!receiver.fetch(in, Duration::IMMEDIATE)); + sender.close(); + receiver.close(); + + control.deleteExchange("my-exchange"); + messaging::Session other = fix.connection.createSession(); + { + ScopedSuppressLogging sl; + BOOST_CHECK_THROW(other.createSender("my-exchange"), qpid::messaging::NotFound); + } + control.deleteQueue("my-queue"); + other = fix.connection.createSession(); + { + ScopedSuppressLogging sl; + BOOST_CHECK_THROW(other.createReceiver("my-queue"), qpid::messaging::NotFound); + } +} + QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp index 80c69ac386..57b344498e 100644 --- a/qpid/cpp/src/tests/QueueTest.cpp +++ b/qpid/cpp/src/tests/QueueTest.cpp @@ -244,7 +244,7 @@ QPID_AUTO_TEST_CASE(testBound){ exchange2.reset(); //unbind the queue from all exchanges it knows it has been bound to: - queue->unbind(exchanges, queue); + queue->unbind(exchanges); //ensure the remaining exchanges don't still have the queue bound to them: FailOnDeliver deliverable; diff --git a/qpid/cpp/src/tests/qpid-ctrl b/qpid/cpp/src/tests/qpid-ctrl index 7b46c190fb..4246c57898 100755 --- a/qpid/cpp/src/tests/qpid-ctrl +++ b/qpid/cpp/src/tests/qpid-ctrl @@ -92,7 +92,10 @@ try: arguments = {} for a in args: name, val = nameval(a) - arguments[name] = val + if val[0] == '{' or val[0] == '[': + arguments[name] = eval(val) + else: + arguments[name] = val content = { "_object_id": {"_object_name": object_name}, "_method_name": method_name, diff --git a/qpid/cpp/src/tests/sender.cpp b/qpid/cpp/src/tests/sender.cpp index 9850e851da..063b5e87dc 100644 --- a/qpid/cpp/src/tests/sender.cpp +++ b/qpid/cpp/src/tests/sender.cpp @@ -120,7 +120,7 @@ void Sender::execute(AsyncSession& session, bool isRetry) string data; while (getline(std::cin, data)) { message.setData(data); - message.getHeaders().setInt("SN", ++sent); + //message.getHeaders().setInt("SN", ++sent); string matchKey; if (lvqMatchValues && getline(lvqMatchValues, matchKey)) { message.getHeaders().setString(QueueOptions::strLVQMatchProperty, matchKey); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java index c0afae0773..2c8fd737c3 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java @@ -712,6 +712,25 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED); } + public BrokerSchema.BrokerClass.CreateMethodResponseCommand create(final BrokerSchema.BrokerClass.CreateMethodResponseCommandFactory factory, + final String type, + final String name, + final Map properties, + final java.lang.Boolean lenient) + { + //TODO: + return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED); + } + + public BrokerSchema.BrokerClass.DeleteMethodResponseCommand delete(final BrokerSchema.BrokerClass.DeleteMethodResponseCommandFactory factory, + final String type, + final String name, + final Map options) + { + //TODO: + return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED); + } + public UUID getId() { return _obj.getId(); diff --git a/qpid/specs/management-schema.xml b/qpid/specs/management-schema.xml index 65d8d4f11d..277534250e 100644 --- a/qpid/specs/management-schema.xml +++ b/qpid/specs/management-schema.xml @@ -102,6 +102,19 @@ <arg name="level" dir="O" type="sstr"/> </method> + <method name="create" desc="Create an object of the specified type"> + <arg name="type" dir="I" type="sstr" desc="The type of object to create"/> + <arg name="name" dir="I" type="sstr" desc="The name of the object to create"/> + <arg name="properties" dir="I" type="map" desc="Type specific object properties"/> + <arg name="strict" dir="I" type="bool" desc="If specified, treat unrecognised object properties as an error"/> + </method> + + <method name="delete" desc="Delete an object of the specified type"> + <arg name="type" dir="I" type="sstr" desc="The type of object to delete"/> + <arg name="name" dir="I" type="sstr" desc="The name of the object to delete"/> + <arg name="options" dir="I" type="map" desc="Type specific object options for deletion"/> + </method> + </class> <!-- |