diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Broker.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.cpp | 967 |
1 files changed, 967 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp new file mode 100644 index 0000000000..ca3be5b567 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -0,0 +1,967 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/broker/Broker.h" +#include "qpid/broker/ConnectionState.h" +#include "qpid/broker/DirectExchange.h" +#include "qpid/broker/FanOutExchange.h" +#include "qpid/broker/HeadersExchange.h" +#include "qpid/broker/MessageStoreModule.h" +#include "qpid/broker/NullMessageStore.h" +#include "qpid/broker/RecoveryManagerImpl.h" +#include "qpid/broker/SaslAuthenticator.h" +#include "qpid/broker/SecureConnectionFactory.h" +#include "qpid/broker/TopicExchange.h" +#include "qpid/broker/Link.h" +#include "qpid/broker/ExpiryPolicy.h" +#include "qpid/broker/QueueFlowLimit.h" + +#include "qmf/org/apache/qpid/broker/Package.h" +#include "qmf/org/apache/qpid/broker/ArgsBrokerCreate.h" +#include "qmf/org/apache/qpid/broker/ArgsBrokerDelete.h" +#include "qmf/org/apache/qpid/broker/ArgsBrokerEcho.h" +#include "qmf/org/apache/qpid/broker/ArgsBrokerGetLogLevel.h" +#include "qmf/org/apache/qpid/broker/ArgsBrokerQueueMoveMessages.h" +#include "qmf/org/apache/qpid/broker/ArgsBrokerSetLogLevel.h" +#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h" +#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h" +#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h" +#include "qmf/org/apache/qpid/broker/EventQueueDelete.h" +#include "qmf/org/apache/qpid/broker/EventBind.h" +#include "qmf/org/apache/qpid/broker/EventUnbind.h" +#include "qpid/amqp_0_10/Codecs.h" +#include "qpid/management/ManagementDirectExchange.h" +#include "qpid/management/ManagementTopicExchange.h" +#include "qpid/log/Logger.h" +#include "qpid/log/Options.h" +#include "qpid/log/Statement.h" +#include "qpid/log/posix/SinkOptions.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/FieldTable.h" +#include "qpid/framing/ProtocolInitiation.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/framing/Uuid.h" +#include "qpid/sys/ProtocolFactory.h" +#include "qpid/sys/Poller.h" +#include "qpid/sys/Dispatcher.h" +#include "qpid/sys/Thread.h" +#include "qpid/sys/Time.h" +#include "qpid/sys/ConnectionInputHandler.h" +#include "qpid/sys/ConnectionInputHandlerFactory.h" +#include "qpid/sys/TimeoutHandler.h" +#include "qpid/sys/SystemInfo.h" +#include "qpid/Address.h" +#include "qpid/StringUtils.h" +#include "qpid/Url.h" +#include "qpid/Version.h" + +#include <boost/bind.hpp> +#include <boost/format.hpp> + +#include <iostream> +#include <memory> + +using qpid::sys::ProtocolFactory; +using qpid::sys::Poller; +using qpid::sys::Dispatcher; +using qpid::sys::Thread; +using qpid::framing::FrameHandler; +using qpid::framing::ChannelId; +using qpid::management::ManagementAgent; +using qpid::management::ManagementObject; +using qpid::management::Manageable; +using qpid::management::Args; +using qpid::management::getManagementExecutionContext; +using qpid::types::Variant; +using std::string; +using std::make_pair; + +namespace _qmf = qmf::org::apache::qpid::broker; + +namespace qpid { +namespace broker { + +Broker::Options::Options(const std::string& name) : + qpid::Options(name), + noDataDir(0), + port(DEFAULT_PORT), + workerThreads(5), + maxConnections(500), + connectionBacklog(10), + enableMgmt(1), + mgmtPubInterval(10), + queueCleanInterval(60*10),//10 minutes + auth(SaslAuthenticator::available()), + realm("QPID"), + replayFlushLimit(0), + replayHardLimit(0), + queueLimit(100*1048576/*100M default limit*/), + tcpNoDelay(false), + requireEncrypted(false), + maxSessionRate(0), + asyncQueueEvents(false), // Must be false in a cluster. + qmf2Support(true), + qmf1Support(true), + queueFlowStopRatio(80), + queueFlowResumeRatio(70), + queueThresholdEventRatio(80) +{ + int c = sys::SystemInfo::concurrency(); + workerThreads=c+1; + std::string home = getHome(); + + if (home.length() == 0) + dataDir += DEFAULT_DATA_DIR_LOCATION; + else + dataDir += home; + dataDir += DEFAULT_DATA_DIR_NAME; + + addOptions() + ("data-dir", optValue(dataDir,"DIR"), "Directory to contain persistent data generated by the broker") + ("no-data-dir", optValue(noDataDir), "Don't use a data directory. No persistent configuration will be loaded or stored") + ("port,p", optValue(port,"PORT"), "Tells the broker to listen on PORT") + ("worker-threads", optValue(workerThreads, "N"), "Sets the broker thread pool size") + ("max-connections", optValue(maxConnections, "N"), "Sets the maximum allowed connections") + ("connection-backlog", optValue(connectionBacklog, "N"), "Sets the connection backlog limit for the server socket") + ("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management") + ("mgmt-qmf2", optValue(qmf2Support,"yes|no"), "Enable broadcast of management information over QMF v2") + ("mgmt-qmf1", optValue(qmf1Support,"yes|no"), "Enable broadcast of management information over QMF v1") + ("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"), "Management Publish Interval") + ("queue-purge-interval", optValue(queueCleanInterval, "SECONDS"), + "Interval between attempts to purge any expired messages from queues") + ("auth", optValue(auth, "yes|no"), "Enable authentication, if disabled all incoming connections will be trusted") + ("realm", optValue(realm, "REALM"), "Use the given realm when performing authentication") + ("default-queue-limit", optValue(queueLimit, "BYTES"), "Default maximum size for queues (in bytes)") + ("tcp-nodelay", optValue(tcpNoDelay), "Set TCP_NODELAY on TCP connections") + ("require-encryption", optValue(requireEncrypted), "Only accept connections that are encrypted") + ("known-hosts-url", optValue(knownHosts, "URL or 'none'"), "URL to send as 'known-hosts' to clients ('none' implies empty list)") + ("sasl-config", optValue(saslConfigPath, "DIR"), "gets sasl config info from nonstandard location") + ("max-session-rate", optValue(maxSessionRate, "MESSAGES/S"), "Sets the maximum message rate per session (0=unlimited)") + ("async-queue-events", optValue(asyncQueueEvents, "yes|no"), "Set Queue Events async, used for services like replication") + ("default-flow-stop-threshold", optValue(queueFlowStopRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is activated.") + ("default-flow-resume-threshold", optValue(queueFlowResumeRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is de-activated.") + ("default-event-threshold-ratio", optValue(queueThresholdEventRatio, "%age of limit"), "The ratio of any specified queue limit at which an event will be raised"); +} + +const std::string empty; +const std::string amq_direct("amq.direct"); +const std::string amq_topic("amq.topic"); +const std::string amq_fanout("amq.fanout"); +const std::string amq_match("amq.match"); +const std::string qpid_management("qpid.management"); +const std::string knownHostsNone("none"); + +Broker::Broker(const Broker::Options& conf) : + poller(new Poller), + config(conf), + managementAgent(conf.enableMgmt ? new ManagementAgent(conf.qmf1Support, + conf.qmf2Support) + : 0), + store(new NullMessageStore), + acl(0), + dataDir(conf.noDataDir ? std::string() : conf.dataDir), + queues(this), + exchanges(this), + links(this), + factory(new SecureConnectionFactory(*this)), + dtxManager(timer), + sessionManager( + qpid::SessionState::Configuration( + conf.replayFlushLimit*1024, // convert kb to bytes. + conf.replayHardLimit*1024), + *this), + queueCleaner(queues, timer), + queueEvents(poller,!conf.asyncQueueEvents), + recovery(true), + inCluster(false), + clusterUpdatee(false), + expiryPolicy(new ExpiryPolicy), + connectionCounter(conf.maxConnections), + getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this)), + deferDelivery(boost::bind(&Broker::deferDeliveryImpl, this, _1, _2)) +{ + try { + if (conf.enableMgmt) { + QPID_LOG(info, "Management enabled"); + managementAgent->configure(dataDir.isEnabled() ? dataDir.getPath() : string(), + conf.mgmtPubInterval, this, conf.workerThreads + 3); + managementAgent->setName("apache.org", "qpidd"); + _qmf::Package packageInitializer(managementAgent.get()); + + System* system = new System (dataDir.isEnabled() ? dataDir.getPath() : string(), this); + systemObject = System::shared_ptr(system); + + mgmtObject = new _qmf::Broker(managementAgent.get(), this, system, "amqp-broker"); + mgmtObject->set_systemRef(system->GetManagementObject()->getObjectId()); + mgmtObject->set_port(conf.port); + mgmtObject->set_workerThreads(conf.workerThreads); + mgmtObject->set_maxConns(conf.maxConnections); + mgmtObject->set_connBacklog(conf.connectionBacklog); + mgmtObject->set_mgmtPubInterval(conf.mgmtPubInterval); + mgmtObject->set_version(qpid::version); + if (dataDir.isEnabled()) + mgmtObject->set_dataDir(dataDir.getPath()); + else + mgmtObject->clr_dataDir(); + + managementAgent->addObject(mgmtObject, 0, true); + + // Since there is currently no support for virtual hosts, a placeholder object + // representing the implied single virtual host is added here to keep the + // management schema correct. + Vhost* vhost = new Vhost(this, this); + vhostObject = Vhost::shared_ptr(vhost); + framing::Uuid uuid(managementAgent->getUuid()); + federationTag = uuid.str(); + vhostObject->setFederationTag(federationTag); + + queues.setParent(vhost); + exchanges.setParent(vhost); + links.setParent(vhost); + } else { + // Management is disabled so there is no broker management ID. + // Create a unique uuid to use as the federation tag. + framing::Uuid uuid(true); + federationTag = uuid.str(); + } + + QueuePolicy::setDefaultMaxSize(conf.queueLimit); + + // Early-Initialize plugins + Plugin::earlyInitAll(*this); + + QueueFlowLimit::setDefaults(conf.queueLimit, conf.queueFlowStopRatio, conf.queueFlowResumeRatio); + + // If no plugin store module registered itself, set up the null store. + if (NullMessageStore::isNullStore(store.get())) + setStore(); + + exchanges.declare(empty, DirectExchange::typeName); // Default exchange. + + if (store.get() != 0) { + // The cluster plug-in will setRecovery(false) on all but the first + // broker to join a cluster. + if (getRecovery()) { + RecoveryManagerImpl recoverer(queues, exchanges, links, dtxManager); + store->recover(recoverer); + } + else { + QPID_LOG(notice, "Cluster recovery: recovered journal data discarded and journal files pushed down"); + store->truncateInit(true); // save old files in subdir + } + } + + //ensure standard exchanges exist (done after recovery from store) + declareStandardExchange(amq_direct, DirectExchange::typeName); + declareStandardExchange(amq_topic, TopicExchange::typeName); + declareStandardExchange(amq_fanout, FanOutExchange::typeName); + declareStandardExchange(amq_match, HeadersExchange::typeName); + + if(conf.enableMgmt) { + exchanges.declare(qpid_management, ManagementTopicExchange::typeName); + Exchange::shared_ptr mExchange = exchanges.get(qpid_management); + Exchange::shared_ptr dExchange = exchanges.get(amq_direct); + managementAgent->setExchange(mExchange, dExchange); + boost::dynamic_pointer_cast<ManagementTopicExchange>(mExchange)->setManagmentAgent(managementAgent.get(), 1); + + std::string qmfTopic("qmf.default.topic"); + std::string qmfDirect("qmf.default.direct"); + + std::pair<Exchange::shared_ptr, bool> topicPair(exchanges.declare(qmfTopic, ManagementTopicExchange::typeName)); + std::pair<Exchange::shared_ptr, bool> directPair(exchanges.declare(qmfDirect, ManagementDirectExchange::typeName)); + + boost::dynamic_pointer_cast<ManagementDirectExchange>(directPair.first)->setManagmentAgent(managementAgent.get(), 2); + boost::dynamic_pointer_cast<ManagementTopicExchange>(topicPair.first)->setManagmentAgent(managementAgent.get(), 2); + + managementAgent->setExchangeV2(topicPair.first, directPair.first); + } + else + QPID_LOG(info, "Management not enabled"); + + /** + * SASL setup, can fail and terminate startup + */ + if (conf.auth) { + SaslAuthenticator::init(qpid::saslName, conf.saslConfigPath); + QPID_LOG(info, "SASL enabled"); + } else { + QPID_LOG(notice, "SASL disabled: No Authentication Performed"); + } + + // Initialize plugins + Plugin::initializeAll(*this); + + if (managementAgent.get()) managementAgent->pluginsInitialized(); + + if (conf.queueCleanInterval) { + queueCleaner.start(conf.queueCleanInterval * qpid::sys::TIME_SEC); + } + + //initialize known broker urls (TODO: add support for urls for other transports (SSL, RDMA)): + if (conf.knownHosts.empty()) { + boost::shared_ptr<ProtocolFactory> factory = getProtocolFactory(TCP_TRANSPORT); + if (factory) { + knownBrokers.push_back ( qpid::Url::getIpAddressesUrl ( factory->getPort() ) ); + } + } else if (conf.knownHosts != knownHostsNone) { + knownBrokers.push_back(Url(conf.knownHosts)); + } + } catch (const std::exception& /*e*/) { + finalize(); + throw; + } +} + +void Broker::declareStandardExchange(const std::string& name, const std::string& type) +{ + bool storeEnabled = store.get() != NULL; + std::pair<Exchange::shared_ptr, bool> status = exchanges.declare(name, type, storeEnabled); + if (status.second && storeEnabled) { + store->create(*status.first, framing::FieldTable ()); + } +} + + +boost::intrusive_ptr<Broker> Broker::create(int16_t port) +{ + Options config; + config.port=port; + return create(config); +} + +boost::intrusive_ptr<Broker> Broker::create(const Options& opts) +{ + return boost::intrusive_ptr<Broker>(new Broker(opts)); +} + +void Broker::setStore (boost::shared_ptr<MessageStore>& _store) +{ + store.reset(new MessageStoreModule (_store)); + setStore(); +} + +void Broker::setStore () { + queues.setStore (store.get()); + dtxManager.setStore (store.get()); + links.setStore (store.get()); +} + +void Broker::run() { + if (config.workerThreads > 0) { + QPID_LOG(notice, "Broker running"); + Dispatcher d(poller); + int numIOThreads = config.workerThreads; + std::vector<Thread> t(numIOThreads-1); + + // Run n-1 io threads + for (int i=0; i<numIOThreads-1; ++i) + t[i] = Thread(d); + + // Run final thread + d.run(); + + // Now wait for n-1 io threads to exit + for (int i=0; i<numIOThreads-1; ++i) { + t[i].join(); + } + } else { + throw Exception((boost::format("Invalid value for worker-threads: %1%") % config.workerThreads).str()); + } +} + +void Broker::shutdown() { + // NB: this function must be async-signal safe, it must not + // call any function that is not async-signal safe. + // Any unsafe shutdown actions should be done in the destructor. + poller->shutdown(); +} + +Broker::~Broker() { + shutdown(); + queueEvents.shutdown(); + finalize(); // Finalize any plugins. + if (config.auth) + SaslAuthenticator::fini(); + timer.stop(); + QPID_LOG(notice, "Shut down"); +} + +ManagementObject* Broker::GetManagementObject(void) const +{ + return (ManagementObject*) mgmtObject; +} + +Manageable* Broker::GetVhostObject(void) const +{ + return vhostObject.get(); +} + +Manageable::status_t Broker::ManagementMethod (uint32_t methodId, + Args& args, + string&) +{ + Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; + + switch (methodId) + { + case _qmf::Broker::METHOD_ECHO : + QPID_LOG (debug, "Broker::echo(" + << dynamic_cast<_qmf::ArgsBrokerEcho&>(args).io_sequence + << ", " + << dynamic_cast<_qmf::ArgsBrokerEcho&>(args).io_body + << ")"); + status = Manageable::STATUS_OK; + break; + case _qmf::Broker::METHOD_CONNECT : { + _qmf::ArgsBrokerConnect& hp= + dynamic_cast<_qmf::ArgsBrokerConnect&>(args); + + QPID_LOG (debug, "Broker::connect()"); + string transport = hp.i_transport.empty() ? TCP_TRANSPORT : hp.i_transport; + if (!getProtocolFactory(transport)) { + QPID_LOG(error, "Transport '" << transport << "' not supported"); + return Manageable::STATUS_NOT_IMPLEMENTED; + } + std::pair<Link::shared_ptr, bool> response = + links.declare (hp.i_host, hp.i_port, transport, hp.i_durable, + hp.i_authMechanism, hp.i_username, hp.i_password); + if (hp.i_durable && response.second) + store->create(*response.first); + status = Manageable::STATUS_OK; + break; + } + case _qmf::Broker::METHOD_QUEUEMOVEMESSAGES : { + _qmf::ArgsBrokerQueueMoveMessages& moveArgs= + dynamic_cast<_qmf::ArgsBrokerQueueMoveMessages&>(args); + QPID_LOG (debug, "Broker::queueMoveMessages()"); + if (queueMoveMessages(moveArgs.i_srcQueue, moveArgs.i_destQueue, moveArgs.i_qty)) + status = Manageable::STATUS_OK; + else + return Manageable::STATUS_PARAMETER_INVALID; + break; + } + case _qmf::Broker::METHOD_SETLOGLEVEL : + setLogLevel(dynamic_cast<_qmf::ArgsBrokerSetLogLevel&>(args).i_level); + QPID_LOG (debug, "Broker::setLogLevel()"); + status = Manageable::STATUS_OK; + break; + case _qmf::Broker::METHOD_GETLOGLEVEL : + dynamic_cast<_qmf::ArgsBrokerGetLogLevel&>(args).o_level = getLogLevel(); + QPID_LOG (debug, "Broker::getLogLevel()"); + status = Manageable::STATUS_OK; + break; + case _qmf::Broker::METHOD_CREATE : + { + _qmf::ArgsBrokerCreate& a = dynamic_cast<_qmf::ArgsBrokerCreate&>(args); + createObject(a.i_type, a.i_name, a.i_properties, a.i_strict, getManagementExecutionContext()); + status = Manageable::STATUS_OK; + break; + } + case _qmf::Broker::METHOD_DELETE : + { + _qmf::ArgsBrokerDelete& a = dynamic_cast<_qmf::ArgsBrokerDelete&>(args); + deleteObject(a.i_type, a.i_name, a.i_options, getManagementExecutionContext()); + status = Manageable::STATUS_OK; + break; + } + default: + QPID_LOG (debug, "Broker ManagementMethod not implemented: id=" << methodId << "]"); + status = Manageable::STATUS_NOT_IMPLEMENTED; + break; + } + + return status; +} + +namespace +{ +const std::string TYPE_QUEUE("queue"); +const std::string TYPE_EXCHANGE("exchange"); +const std::string TYPE_TOPIC("topic"); +const std::string TYPE_BINDING("binding"); +const std::string DURABLE("durable"); +const std::string AUTO_DELETE("auto-delete"); +const std::string ALTERNATE_EXCHANGE("alternate-exchange"); +const std::string EXCHANGE_TYPE("exchange-type"); +const std::string QUEUE_NAME("queue"); +const std::string EXCHANGE_NAME("exchange"); + +const std::string _TRUE("true"); +const std::string _FALSE("false"); +} + +struct InvalidBindingIdentifier : public qpid::Exception +{ + InvalidBindingIdentifier(const std::string& name) : qpid::Exception(name) {} + std::string getPrefix() const { return "invalid binding"; } +}; + +struct BindingIdentifier +{ + std::string exchange; + std::string queue; + std::string key; + + BindingIdentifier(const std::string& name) + { + std::vector<std::string> path; + split(path, name, "/"); + switch (path.size()) { + case 1: + queue = path[0]; + break; + case 2: + exchange = path[0]; + queue = path[1]; + break; + case 3: + exchange = path[0]; + queue = path[1]; + key = path[2]; + break; + default: + throw InvalidBindingIdentifier(name); + } + } +}; + +struct ObjectAlreadyExists : public qpid::Exception +{ + ObjectAlreadyExists(const std::string& name) : qpid::Exception(name) {} + std::string getPrefix() const { return "object already exists"; } +}; + +struct UnknownObjectType : public qpid::Exception +{ + UnknownObjectType(const std::string& type) : qpid::Exception(type) {} + std::string getPrefix() const { return "unknown object type"; } +}; + +void Broker::createObject(const std::string& type, const std::string& name, + const Variant::Map& properties, bool /*strict*/, const ConnectionState* context) +{ + std::string userId; + std::string connectionId; + if (context) { + userId = context->getUserId(); + connectionId = context->getUrl(); + } + //TODO: implement 'strict' option (check there are no unrecognised properties) + QPID_LOG (debug, "Broker::create(" << type << ", " << name << "," << properties << ")"); + if (type == TYPE_QUEUE) { + bool durable(false); + bool autodelete(false); + std::string alternateExchange; + Variant::Map extensions; + for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) { + // extract durable, auto-delete and alternate-exchange properties + if (i->first == DURABLE) durable = i->second; + else if (i->first == AUTO_DELETE) autodelete = i->second; + else if (i->first == ALTERNATE_EXCHANGE) alternateExchange = i->second.asString(); + //treat everything else as extension properties + else extensions[i->first] = i->second; + } + framing::FieldTable arguments; + amqp_0_10::translate(extensions, arguments); + + std::pair<boost::shared_ptr<Queue>, bool> result = + createQueue(name, durable, autodelete, 0, alternateExchange, arguments, userId, connectionId); + if (!result.second) { + throw ObjectAlreadyExists(name); + } + } else if (type == TYPE_EXCHANGE || type == TYPE_TOPIC) { + bool durable(false); + std::string exchangeType("topic"); + std::string alternateExchange; + Variant::Map extensions; + for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) { + // extract durable, auto-delete and alternate-exchange properties + if (i->first == DURABLE) durable = i->second; + else if (i->first == EXCHANGE_TYPE) exchangeType = i->second.asString(); + else if (i->first == ALTERNATE_EXCHANGE) alternateExchange = i->second.asString(); + //treat everything else as extension properties + else extensions[i->first] = i->second; + } + framing::FieldTable arguments; + amqp_0_10::translate(extensions, arguments); + + try { + std::pair<boost::shared_ptr<Exchange>, bool> result = + createExchange(name, exchangeType, durable, alternateExchange, arguments, userId, connectionId); + if (!result.second) { + throw ObjectAlreadyExists(name); + } + } catch (const UnknownExchangeTypeException&) { + throw Exception(QPID_MSG("Invalid exchange type: " << exchangeType)); + } + } else if (type == TYPE_BINDING) { + BindingIdentifier binding(name); + std::string exchangeType("topic"); + Variant::Map extensions; + for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) { + // extract durable, auto-delete and alternate-exchange properties + if (i->first == EXCHANGE_TYPE) exchangeType = i->second.asString(); + //treat everything else as extension properties + else extensions[i->first] = i->second; + } + framing::FieldTable arguments; + amqp_0_10::translate(extensions, arguments); + + bind(binding.queue, binding.exchange, binding.key, arguments, userId, connectionId); + } else { + throw UnknownObjectType(type); + } +} + +void Broker::deleteObject(const std::string& type, const std::string& name, + const Variant::Map& options, const ConnectionState* context) +{ + std::string userId; + std::string connectionId; + if (context) { + userId = context->getUserId(); + connectionId = context->getUrl(); + } + QPID_LOG (debug, "Broker::delete(" << type << ", " << name << "," << options << ")"); + if (type == TYPE_QUEUE) { + deleteQueue(name, userId, connectionId); + } else if (type == TYPE_EXCHANGE || type == TYPE_TOPIC) { + deleteExchange(name, userId, connectionId); + } else if (type == TYPE_BINDING) { + BindingIdentifier binding(name); + unbind(binding.queue, binding.exchange, binding.key, userId, connectionId); + } else { + throw UnknownObjectType(type); + } + +} + +void Broker::setLogLevel(const std::string& level) +{ + QPID_LOG(notice, "Changing log level to " << level); + std::vector<std::string> selectors; + split(selectors, level, ", "); + qpid::log::Logger::instance().reconfigure(selectors); +} + +std::string Broker::getLogLevel() +{ + std::string level; + const std::vector<std::string>& selectors = qpid::log::Logger::instance().getOptions().selectors; + for (std::vector<std::string>::const_iterator i = selectors.begin(); i != selectors.end(); ++i) { + if (i != selectors.begin()) level += std::string(","); + level += *i; + } + return level; +} + +boost::shared_ptr<ProtocolFactory> Broker::getProtocolFactory(const std::string& name) const { + ProtocolFactoryMap::const_iterator i + = name.empty() ? protocolFactories.begin() : protocolFactories.find(name); + if (i == protocolFactories.end()) return boost::shared_ptr<ProtocolFactory>(); + else return i->second; +} + +uint16_t Broker::getPort(const std::string& name) const { + boost::shared_ptr<ProtocolFactory> factory = getProtocolFactory(name); + if (factory) { + return factory->getPort(); + } else { + throw NoSuchTransportException(QPID_MSG("No such transport: '" << name << "'")); + } +} + +void Broker::registerProtocolFactory(const std::string& name, ProtocolFactory::shared_ptr protocolFactory) { + protocolFactories[name] = protocolFactory; + Url::addProtocol(name); +} + +void Broker::accept() { + for (ProtocolFactoryMap::const_iterator i = protocolFactories.begin(); i != protocolFactories.end(); i++) { + i->second->accept(poller, factory.get()); + } +} + +void Broker::connect( + const std::string& host, const std::string& port, const std::string& transport, + boost::function2<void, int, std::string> failed, + sys::ConnectionCodec::Factory* f) +{ + boost::shared_ptr<ProtocolFactory> pf = getProtocolFactory(transport); + if (pf) pf->connect(poller, host, port, f ? f : factory.get(), failed); + else throw NoSuchTransportException(QPID_MSG("Unsupported transport type: " << transport)); +} + +void Broker::connect( + const Url& url, + boost::function2<void, int, std::string> failed, + sys::ConnectionCodec::Factory* f) +{ + url.throwIfEmpty(); + const Address& addr=url[0]; + connect(addr.host, boost::lexical_cast<std::string>(addr.port), addr.protocol, failed, f); +} + +uint32_t Broker::queueMoveMessages( + const std::string& srcQueue, + const std::string& destQueue, + uint32_t qty) +{ + Queue::shared_ptr src_queue = queues.find(srcQueue); + if (!src_queue) + return 0; + Queue::shared_ptr dest_queue = queues.find(destQueue); + if (!dest_queue) + return 0; + + return src_queue->move(dest_queue, qty); +} + + +boost::shared_ptr<sys::Poller> Broker::getPoller() { return poller; } + +std::vector<Url> +Broker::getKnownBrokersImpl() +{ + return knownBrokers; +} + +bool Broker::deferDeliveryImpl(const std::string& , + const boost::intrusive_ptr<Message>& ) +{ return false; } + +void Broker::setClusterTimer(std::auto_ptr<sys::Timer> t) { + clusterTimer = t; +} + +const std::string Broker::TCP_TRANSPORT("tcp"); + + +std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue( + const std::string& name, + bool durable, + bool autodelete, + const OwnershipToken* owner, + const std::string& alternateExchange, + const qpid::framing::FieldTable& arguments, + const std::string& userId, + const std::string& connectionId) +{ + if (acl) { + std::map<acl::Property, std::string> params; + params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange)); + params.insert(make_pair(acl::PROP_PASSIVE, _FALSE)); + params.insert(make_pair(acl::PROP_DURABLE, durable ? _TRUE : _FALSE)); + params.insert(make_pair(acl::PROP_EXCLUSIVE, owner ? _TRUE : _FALSE)); + params.insert(make_pair(acl::PROP_AUTODELETE, autodelete ? _TRUE : _FALSE)); + params.insert(make_pair(acl::PROP_POLICYTYPE, arguments.getAsString("qpid.policy_type"))); + params.insert(make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast<string>(arguments.getAsInt("qpid.max_count")))); + params.insert(make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast<string>(arguments.getAsInt64("qpid.max_size")))); + + if (!acl->authorise(userId,acl::ACT_CREATE,acl::OBJ_QUEUE,name,¶ms) ) + throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue create request from " << userId)); + } + + Exchange::shared_ptr alternate; + if (!alternateExchange.empty()) { + alternate = exchanges.get(alternateExchange); + if (!alternate) throw framing::NotFoundException(QPID_MSG("Alternate exchange does not exist: " << alternateExchange)); + } + + std::pair<Queue::shared_ptr, bool> result = queues.declare(name, durable, autodelete, owner, alternate, arguments); + if (result.second) { + //add default binding: + result.first->bind(exchanges.getDefault(), name); + + if (managementAgent.get()) { + //TODO: debatable whether we should raise an event here for + //create when this is a 'declare' event; ideally add a create + //event instead? + managementAgent->raiseEvent( + _qmf::EventQueueDeclare(connectionId, userId, name, + durable, owner, autodelete, + ManagementAgent::toMap(arguments), + "created")); + } + } + return result; +} + +void Broker::deleteQueue(const std::string& name, const std::string& userId, + const std::string& connectionId, QueueFunctor check) +{ + if (acl && !acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_QUEUE,name,NULL)) { + throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue delete request from " << userId)); + } + + Queue::shared_ptr queue = queues.find(name); + if (queue) { + if (check) check(queue); + queues.destroy(name); + queue->destroyed(); + } else { + throw framing::NotFoundException(QPID_MSG("Delete failed. No such queue: " << name)); + } + + if (managementAgent.get()) + managementAgent->raiseEvent(_qmf::EventQueueDelete(connectionId, userId, name)); + +} + +std::pair<Exchange::shared_ptr, bool> Broker::createExchange( + const std::string& name, + const std::string& type, + bool durable, + const std::string& alternateExchange, + const qpid::framing::FieldTable& arguments, + const std::string& userId, + const std::string& connectionId) +{ + if (acl) { + std::map<acl::Property, std::string> params; + params.insert(make_pair(acl::PROP_TYPE, type)); + params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange)); + params.insert(make_pair(acl::PROP_PASSIVE, _FALSE)); + params.insert(make_pair(acl::PROP_DURABLE, durable ? _TRUE : _FALSE)); + if (!acl->authorise(userId,acl::ACT_CREATE,acl::OBJ_EXCHANGE,name,¶ms) ) + throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange create request from " << userId)); + } + + Exchange::shared_ptr alternate; + if (!alternateExchange.empty()) { + alternate = exchanges.get(alternateExchange); + if (!alternate) throw framing::NotFoundException(QPID_MSG("Alternate exchange does not exist: " << alternateExchange)); + } + + std::pair<Exchange::shared_ptr, bool> result; + result = exchanges.declare(name, type, durable, arguments); + if (result.second) { + if (alternate) { + result.first->setAlternate(alternate); + alternate->incAlternateUsers(); + } + if (durable) { + store->create(*result.first, arguments); + } + if (managementAgent.get()) { + //TODO: debatable whether we should raise an event here for + //create when this is a 'declare' event; ideally add a create + //event instead? + managementAgent->raiseEvent(_qmf::EventExchangeDeclare(connectionId, + userId, + name, + type, + alternateExchange, + durable, + false, + ManagementAgent::toMap(arguments), + "created")); + } + } + return result; +} + +void Broker::deleteExchange(const std::string& name, const std::string& userId, + const std::string& connectionId) +{ + if (acl) { + if (!acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_EXCHANGE,name,NULL) ) + throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange delete request from " << userId)); + } + + Exchange::shared_ptr exchange(exchanges.get(name)); + if (!exchange) throw framing::NotFoundException(QPID_MSG("Delete failed. No such exchange: " << name)); + if (exchange->inUseAsAlternate()) throw framing::NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange.")); + if (exchange->isDurable()) store->destroy(*exchange); + if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers(); + exchanges.destroy(name); + + if (managementAgent.get()) + managementAgent->raiseEvent(_qmf::EventExchangeDelete(connectionId, userId, name)); + +} + +void Broker::bind(const std::string& queueName, + const std::string& exchangeName, + const std::string& key, + const qpid::framing::FieldTable& arguments, + const std::string& userId, + const std::string& connectionId) +{ + if (acl) { + std::map<acl::Property, std::string> params; + params.insert(make_pair(acl::PROP_QUEUENAME, queueName)); + params.insert(make_pair(acl::PROP_ROUTINGKEY, key)); + + if (!acl->authorise(userId,acl::ACT_BIND,acl::OBJ_EXCHANGE,exchangeName,¶ms)) + throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange bind request from " << userId)); + } + + Queue::shared_ptr queue = queues.find(queueName); + Exchange::shared_ptr exchange = exchanges.get(exchangeName); + if (!queue) { + throw framing::NotFoundException(QPID_MSG("Bind failed. No such queue: " << queueName)); + } else if (!exchange) { + throw framing::NotFoundException(QPID_MSG("Bind failed. No such exchange: " << exchangeName)); + } else { + if (queue->bind(exchange, key, arguments)) { + if (managementAgent.get()) { + managementAgent->raiseEvent(_qmf::EventBind(connectionId, userId, exchangeName, + queueName, key, ManagementAgent::toMap(arguments))); + } + } + } +} + +void Broker::unbind(const std::string& queueName, + const std::string& exchangeName, + const std::string& key, + const std::string& userId, + const std::string& connectionId) +{ + if (acl) { + std::map<acl::Property, std::string> params; + params.insert(make_pair(acl::PROP_QUEUENAME, queueName)); + params.insert(make_pair(acl::PROP_ROUTINGKEY, key)); + if (!acl->authorise(userId,acl::ACT_UNBIND,acl::OBJ_EXCHANGE,exchangeName,¶ms) ) + throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange unbind request from " << userId)); + } + + Queue::shared_ptr queue = queues.find(queueName); + Exchange::shared_ptr exchange = exchanges.get(exchangeName); + if (!queue) { + throw framing::NotFoundException(QPID_MSG("Bind failed. No such queue: " << queueName)); + } else if (!exchange) { + throw framing::NotFoundException(QPID_MSG("Bind failed. No such exchange: " << exchangeName)); + } else { + if (exchange->unbind(queue, key, 0)) { + if (exchange->isDurable() && queue->isDurable()) { + store->unbind(*exchange, *queue, key, qpid::framing::FieldTable()); + } + if (managementAgent.get()) { + managementAgent->raiseEvent(_qmf::EventUnbind(connectionId, userId, exchangeName, queueName, key)); + } + } + } +} + +}} // namespace qpid::broker + |