diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Broker.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.cpp | 345 |
1 files changed, 345 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp new file mode 100644 index 0000000000..b9268db9e5 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -0,0 +1,345 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "config.h" +#include "Broker.h" +#include "Connection.h" +#include "DirectExchange.h" +#include "FanOutExchange.h" +#include "HeadersExchange.h" +#include "MessageStoreModule.h" +#include "NullMessageStore.h" +#include "RecoveryManagerImpl.h" +#include "TopicExchange.h" +#include "qpid/management/PackageQpid.h" +#include "qpid/management/ManagementExchange.h" +#include "qpid/management/ArgsBrokerEcho.h" + +#include "qpid/log/Statement.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/ProtocolInitiation.h" +#include "qpid/sys/Acceptor.h" +#include "qpid/sys/ConnectionInputHandler.h" +#include "qpid/sys/ConnectionInputHandlerFactory.h" +#include "qpid/sys/TimeoutHandler.h" +#include "qpid/sys/SystemInfo.h" +#include "qpid/Url.h" + +#include <boost/bind.hpp> + +#include <iostream> +#include <memory> + +#if HAVE_SASL +#include <sasl/sasl.h> +#endif + +using qpid::sys::Acceptor; +using qpid::framing::FrameHandler; +using qpid::framing::ChannelId; +using qpid::management::ManagementAgent; +using qpid::management::ManagementObject; +using qpid::management::Manageable; +using qpid::management::Args; +using qpid::management::ArgsBrokerEcho; + +namespace qpid { +namespace broker { + +Broker::Options::Options(const std::string& name) : + qpid::Options(name), + noDataDir(0), + dataDir("/var/lib/qpidd"), + port(DEFAULT_PORT), + workerThreads(5), + maxConnections(500), + connectionBacklog(10), + stagingThreshold(5000000), + enableMgmt(1), + mgmtPubInterval(10), +#if HAVE_SASL + //Authentication disabled by default for now to allow any + //scripts etc that might fail authentication to be updated. + //Note that this is a temporary measure (GS 14-APR-2008). + auth(false), + //auth(true), +#else + auth(false), +#endif + ack(0) +{ + int c = sys::SystemInfo::concurrency(); + workerThreads=c+1; + addOptions() + ("data-dir", optValue(dataDir,"DIR"), + "Directory to contain persistent data generated by the broker") + ("no-data-dir", optValue(noDataDir), + "Don't use a data directory. No persistent configuration will be loaded or stored") + ("port,p", optValue(port,"PORT"), + "Tells the broker to listen on PORT") + ("worker-threads", optValue(workerThreads, "N"), + "Sets the broker thread pool size") + ("max-connections", optValue(maxConnections, "N"), + "Sets the maximum allowed connections") + ("connection-backlog", optValue(connectionBacklog, "N"), + "Sets the connection backlog limit for the server socket") + ("staging-threshold", optValue(stagingThreshold, "N"), + "Stages messages over N bytes to disk") + ("mgmt-enable,m", optValue(enableMgmt,"yes|no"), + "Enable Management") + ("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"), + "Management Publish Interval") + ("auth", optValue(auth, "yes|no"), + "Enable authentication, if disabled all incoming connections will be trusted") + ("ack", optValue(ack, "N"), + "Send session.ack/solicit-ack at least every N frames. 0 disables voluntary ack/solitict-ack"); +} + +const std::string empty; +const std::string amq_direct("amq.direct"); +const std::string amq_topic("amq.topic"); +const std::string amq_fanout("amq.fanout"); +const std::string amq_match("amq.match"); +const std::string qpid_management("qpid.management"); + +Broker::Broker(const Broker::Options& conf) : + config(conf), + store(0), + dataDir(conf.noDataDir ? std::string () : conf.dataDir), + factory(*this), + sessionManager(conf.ack), + previewSessionManager(conf.ack) +{ + if(conf.enableMgmt){ + QPID_LOG(info, "Management enabled"); + ManagementAgent::enableManagement (dataDir.isEnabled () ? dataDir.getPath () : string (), + conf.mgmtPubInterval); + managementAgent = ManagementAgent::getAgent (); + managementAgent->setInterval (conf.mgmtPubInterval); + qpid::management::PackageQpid packageInitializer (managementAgent); + + System* system = new System (); + systemObject = System::shared_ptr (system); + + mgmtObject = management::Broker::shared_ptr (new management::Broker (this, system, conf.port)); + mgmtObject->set_workerThreads (conf.workerThreads); + mgmtObject->set_maxConns (conf.maxConnections); + mgmtObject->set_connBacklog (conf.connectionBacklog); + mgmtObject->set_stagingThreshold (conf.stagingThreshold); + mgmtObject->set_mgmtPubInterval (conf.mgmtPubInterval); + mgmtObject->set_version (PACKAGE_VERSION); + mgmtObject->set_dataDirEnabled (dataDir.isEnabled ()); + mgmtObject->set_dataDir (dataDir.getPath ()); + + managementAgent->addObject (mgmtObject, 1, 0); + + // Since there is currently no support for virtual hosts, a placeholder object + // representing the implied single virtual host is added here to keep the + // management schema correct. + Vhost* vhost = new Vhost (this); + vhostObject = Vhost::shared_ptr (vhost); + + queues.setParent (vhost); + exchanges.setParent (vhost); + } + + // Early-Initialize plugins + const Plugin::Plugins& plugins=Plugin::getPlugins(); + for (Plugin::Plugins::const_iterator i = plugins.begin(); + i != plugins.end(); + i++) + (*i)->earlyInitialize(*this); + + // If no plugin store module registered itself, set up the null store. + if (store == 0) + setStore (new NullMessageStore (false)); + + queues.setStore (store); + dtxManager.setStore (store); + + exchanges.declare(empty, DirectExchange::typeName); // Default exchange. + + if (store != 0) { + RecoveryManagerImpl recoverer(queues, exchanges, dtxManager, + conf.stagingThreshold); + store->recover(recoverer); + } + + //ensure standard exchanges exist (done after recovery from store) + declareStandardExchange(amq_direct, DirectExchange::typeName); + declareStandardExchange(amq_topic, TopicExchange::typeName); + declareStandardExchange(amq_fanout, FanOutExchange::typeName); + declareStandardExchange(amq_match, HeadersExchange::typeName); + + if(conf.enableMgmt) { + exchanges.declare(qpid_management, ManagementExchange::typeName); + Exchange::shared_ptr mExchange = exchanges.get (qpid_management); + Exchange::shared_ptr dExchange = exchanges.get (amq_direct); + managementAgent->setExchange (mExchange, dExchange); + dynamic_pointer_cast<ManagementExchange>(mExchange)->setManagmentAgent (managementAgent); + } + else + QPID_LOG(info, "Management not enabled"); + + /** + * SASL setup, can fail and terminate startup + */ + if (conf.auth) { +#if HAVE_SASL + int code = sasl_server_init(NULL, BROKER_SASL_NAME); + if (code != SASL_OK) { + // TODO: Figure out who owns the char* returned by + // sasl_errstring, though it probably does not matter much + throw Exception(sasl_errstring(code, NULL, NULL)); + } + QPID_LOG(info, "SASL enabled"); +#else + throw Exception("Requested authentication but SASL unavailable"); +#endif + } + + // Initialize plugins + for (Plugin::Plugins::const_iterator i = plugins.begin(); + i != plugins.end(); + i++) + (*i)->initialize(*this); +} + +void Broker::declareStandardExchange(const std::string& name, const std::string& type) +{ + bool storeEnabled = store != NULL; + std::pair<Exchange::shared_ptr, bool> status = exchanges.declare(name, type, storeEnabled); + if (status.second && storeEnabled) { + store->create(*status.first, framing::FieldTable ()); + } +} + + +shared_ptr<Broker> Broker::create(int16_t port) +{ + Options config; + config.port=port; + return create(config); +} + +shared_ptr<Broker> Broker::create(const Options& opts) +{ + return shared_ptr<Broker>(new Broker(opts)); +} + +void Broker::setStore (MessageStore* _store) +{ + assert (store == 0 && _store != 0); + if (store == 0 && _store != 0) + store = new MessageStoreModule (_store); +} + +void Broker::run() { + getAcceptor().run(&factory); +} + +void Broker::shutdown() { + // NB: this function must be async-signal safe, it must not + // call any function that is not async-signal safe. + // Any unsafe shutdown actions should be done in the destructor. + if (acceptor) + acceptor->shutdown(); +} + +Broker::~Broker() { + shutdown(); + ManagementAgent::shutdown (); + delete store; + if (config.auth) { +#if HAVE_SASL + sasl_done(); +#endif + } +} + +uint16_t Broker::getPort() const { return getAcceptor().getPort(); } + +Acceptor& Broker::getAcceptor() const { + if (!acceptor) { + const_cast<Acceptor::shared_ptr&>(acceptor) = + Acceptor::create(config.port, + config.connectionBacklog, + config.workerThreads); + QPID_LOG(info, "Listening on port " << getPort()); + } + return *acceptor; +} + +ManagementObject::shared_ptr Broker::GetManagementObject(void) const +{ + return dynamic_pointer_cast<ManagementObject> (mgmtObject); +} + +Manageable* Broker::GetVhostObject(void) const +{ + return vhostObject.get(); +} + +Manageable::status_t Broker::ManagementMethod (uint32_t methodId, + Args& args) +{ + Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; + + QPID_LOG (debug, "Broker::ManagementMethod [id=" << methodId << "]"); + + switch (methodId) + { + case management::Broker::METHOD_ECHO : + status = Manageable::STATUS_OK; + break; + case management::Broker::METHOD_CONNECT : { + management::ArgsBrokerConnect& hp= + dynamic_cast<management::ArgsBrokerConnect&>(args); + connect(hp.i_host, hp.i_port); + status = Manageable::STATUS_OK; + break; + } + case management::Broker::METHOD_JOINCLUSTER : + case management::Broker::METHOD_LEAVECLUSTER : + status = Manageable::STATUS_NOT_IMPLEMENTED; + break; + } + + return status; +} + +void Broker::connect( + const std::string& host, uint16_t port, + sys::ConnectionCodec::Factory* f) +{ + getAcceptor().connect(host, port, f ? f : &factory); +} + +void Broker::connect( + const Url& url, sys::ConnectionCodec::Factory* f) +{ + url.throwIfEmpty(); + TcpAddress addr=boost::get<TcpAddress>(url[0]); + connect(addr.host, addr.port, f); +} + +}} // namespace qpid::broker + |