summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/Broker.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Broker.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp345
1 files changed, 345 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
new file mode 100644
index 0000000000..b9268db9e5
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -0,0 +1,345 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "config.h"
+#include "Broker.h"
+#include "Connection.h"
+#include "DirectExchange.h"
+#include "FanOutExchange.h"
+#include "HeadersExchange.h"
+#include "MessageStoreModule.h"
+#include "NullMessageStore.h"
+#include "RecoveryManagerImpl.h"
+#include "TopicExchange.h"
+#include "qpid/management/PackageQpid.h"
+#include "qpid/management/ManagementExchange.h"
+#include "qpid/management/ArgsBrokerEcho.h"
+
+#include "qpid/log/Statement.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/ProtocolInitiation.h"
+#include "qpid/sys/Acceptor.h"
+#include "qpid/sys/ConnectionInputHandler.h"
+#include "qpid/sys/ConnectionInputHandlerFactory.h"
+#include "qpid/sys/TimeoutHandler.h"
+#include "qpid/sys/SystemInfo.h"
+#include "qpid/Url.h"
+
+#include <boost/bind.hpp>
+
+#include <iostream>
+#include <memory>
+
+#if HAVE_SASL
+#include <sasl/sasl.h>
+#endif
+
+using qpid::sys::Acceptor;
+using qpid::framing::FrameHandler;
+using qpid::framing::ChannelId;
+using qpid::management::ManagementAgent;
+using qpid::management::ManagementObject;
+using qpid::management::Manageable;
+using qpid::management::Args;
+using qpid::management::ArgsBrokerEcho;
+
+namespace qpid {
+namespace broker {
+
+Broker::Options::Options(const std::string& name) :
+ qpid::Options(name),
+ noDataDir(0),
+ dataDir("/var/lib/qpidd"),
+ port(DEFAULT_PORT),
+ workerThreads(5),
+ maxConnections(500),
+ connectionBacklog(10),
+ stagingThreshold(5000000),
+ enableMgmt(1),
+ mgmtPubInterval(10),
+#if HAVE_SASL
+ //Authentication disabled by default for now to allow any
+ //scripts etc that might fail authentication to be updated.
+ //Note that this is a temporary measure (GS 14-APR-2008).
+ auth(false),
+ //auth(true),
+#else
+ auth(false),
+#endif
+ ack(0)
+{
+ int c = sys::SystemInfo::concurrency();
+ workerThreads=c+1;
+ addOptions()
+ ("data-dir", optValue(dataDir,"DIR"),
+ "Directory to contain persistent data generated by the broker")
+ ("no-data-dir", optValue(noDataDir),
+ "Don't use a data directory. No persistent configuration will be loaded or stored")
+ ("port,p", optValue(port,"PORT"),
+ "Tells the broker to listen on PORT")
+ ("worker-threads", optValue(workerThreads, "N"),
+ "Sets the broker thread pool size")
+ ("max-connections", optValue(maxConnections, "N"),
+ "Sets the maximum allowed connections")
+ ("connection-backlog", optValue(connectionBacklog, "N"),
+ "Sets the connection backlog limit for the server socket")
+ ("staging-threshold", optValue(stagingThreshold, "N"),
+ "Stages messages over N bytes to disk")
+ ("mgmt-enable,m", optValue(enableMgmt,"yes|no"),
+ "Enable Management")
+ ("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"),
+ "Management Publish Interval")
+ ("auth", optValue(auth, "yes|no"),
+ "Enable authentication, if disabled all incoming connections will be trusted")
+ ("ack", optValue(ack, "N"),
+ "Send session.ack/solicit-ack at least every N frames. 0 disables voluntary ack/solitict-ack");
+}
+
+const std::string empty;
+const std::string amq_direct("amq.direct");
+const std::string amq_topic("amq.topic");
+const std::string amq_fanout("amq.fanout");
+const std::string amq_match("amq.match");
+const std::string qpid_management("qpid.management");
+
+Broker::Broker(const Broker::Options& conf) :
+ config(conf),
+ store(0),
+ dataDir(conf.noDataDir ? std::string () : conf.dataDir),
+ factory(*this),
+ sessionManager(conf.ack),
+ previewSessionManager(conf.ack)
+{
+ if(conf.enableMgmt){
+ QPID_LOG(info, "Management enabled");
+ ManagementAgent::enableManagement (dataDir.isEnabled () ? dataDir.getPath () : string (),
+ conf.mgmtPubInterval);
+ managementAgent = ManagementAgent::getAgent ();
+ managementAgent->setInterval (conf.mgmtPubInterval);
+ qpid::management::PackageQpid packageInitializer (managementAgent);
+
+ System* system = new System ();
+ systemObject = System::shared_ptr (system);
+
+ mgmtObject = management::Broker::shared_ptr (new management::Broker (this, system, conf.port));
+ mgmtObject->set_workerThreads (conf.workerThreads);
+ mgmtObject->set_maxConns (conf.maxConnections);
+ mgmtObject->set_connBacklog (conf.connectionBacklog);
+ mgmtObject->set_stagingThreshold (conf.stagingThreshold);
+ mgmtObject->set_mgmtPubInterval (conf.mgmtPubInterval);
+ mgmtObject->set_version (PACKAGE_VERSION);
+ mgmtObject->set_dataDirEnabled (dataDir.isEnabled ());
+ mgmtObject->set_dataDir (dataDir.getPath ());
+
+ managementAgent->addObject (mgmtObject, 1, 0);
+
+ // Since there is currently no support for virtual hosts, a placeholder object
+ // representing the implied single virtual host is added here to keep the
+ // management schema correct.
+ Vhost* vhost = new Vhost (this);
+ vhostObject = Vhost::shared_ptr (vhost);
+
+ queues.setParent (vhost);
+ exchanges.setParent (vhost);
+ }
+
+ // Early-Initialize plugins
+ const Plugin::Plugins& plugins=Plugin::getPlugins();
+ for (Plugin::Plugins::const_iterator i = plugins.begin();
+ i != plugins.end();
+ i++)
+ (*i)->earlyInitialize(*this);
+
+ // If no plugin store module registered itself, set up the null store.
+ if (store == 0)
+ setStore (new NullMessageStore (false));
+
+ queues.setStore (store);
+ dtxManager.setStore (store);
+
+ exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
+
+ if (store != 0) {
+ RecoveryManagerImpl recoverer(queues, exchanges, dtxManager,
+ conf.stagingThreshold);
+ store->recover(recoverer);
+ }
+
+ //ensure standard exchanges exist (done after recovery from store)
+ declareStandardExchange(amq_direct, DirectExchange::typeName);
+ declareStandardExchange(amq_topic, TopicExchange::typeName);
+ declareStandardExchange(amq_fanout, FanOutExchange::typeName);
+ declareStandardExchange(amq_match, HeadersExchange::typeName);
+
+ if(conf.enableMgmt) {
+ exchanges.declare(qpid_management, ManagementExchange::typeName);
+ Exchange::shared_ptr mExchange = exchanges.get (qpid_management);
+ Exchange::shared_ptr dExchange = exchanges.get (amq_direct);
+ managementAgent->setExchange (mExchange, dExchange);
+ dynamic_pointer_cast<ManagementExchange>(mExchange)->setManagmentAgent (managementAgent);
+ }
+ else
+ QPID_LOG(info, "Management not enabled");
+
+ /**
+ * SASL setup, can fail and terminate startup
+ */
+ if (conf.auth) {
+#if HAVE_SASL
+ int code = sasl_server_init(NULL, BROKER_SASL_NAME);
+ if (code != SASL_OK) {
+ // TODO: Figure out who owns the char* returned by
+ // sasl_errstring, though it probably does not matter much
+ throw Exception(sasl_errstring(code, NULL, NULL));
+ }
+ QPID_LOG(info, "SASL enabled");
+#else
+ throw Exception("Requested authentication but SASL unavailable");
+#endif
+ }
+
+ // Initialize plugins
+ for (Plugin::Plugins::const_iterator i = plugins.begin();
+ i != plugins.end();
+ i++)
+ (*i)->initialize(*this);
+}
+
+void Broker::declareStandardExchange(const std::string& name, const std::string& type)
+{
+ bool storeEnabled = store != NULL;
+ std::pair<Exchange::shared_ptr, bool> status = exchanges.declare(name, type, storeEnabled);
+ if (status.second && storeEnabled) {
+ store->create(*status.first, framing::FieldTable ());
+ }
+}
+
+
+shared_ptr<Broker> Broker::create(int16_t port)
+{
+ Options config;
+ config.port=port;
+ return create(config);
+}
+
+shared_ptr<Broker> Broker::create(const Options& opts)
+{
+ return shared_ptr<Broker>(new Broker(opts));
+}
+
+void Broker::setStore (MessageStore* _store)
+{
+ assert (store == 0 && _store != 0);
+ if (store == 0 && _store != 0)
+ store = new MessageStoreModule (_store);
+}
+
+void Broker::run() {
+ getAcceptor().run(&factory);
+}
+
+void Broker::shutdown() {
+ // NB: this function must be async-signal safe, it must not
+ // call any function that is not async-signal safe.
+ // Any unsafe shutdown actions should be done in the destructor.
+ if (acceptor)
+ acceptor->shutdown();
+}
+
+Broker::~Broker() {
+ shutdown();
+ ManagementAgent::shutdown ();
+ delete store;
+ if (config.auth) {
+#if HAVE_SASL
+ sasl_done();
+#endif
+ }
+}
+
+uint16_t Broker::getPort() const { return getAcceptor().getPort(); }
+
+Acceptor& Broker::getAcceptor() const {
+ if (!acceptor) {
+ const_cast<Acceptor::shared_ptr&>(acceptor) =
+ Acceptor::create(config.port,
+ config.connectionBacklog,
+ config.workerThreads);
+ QPID_LOG(info, "Listening on port " << getPort());
+ }
+ return *acceptor;
+}
+
+ManagementObject::shared_ptr Broker::GetManagementObject(void) const
+{
+ return dynamic_pointer_cast<ManagementObject> (mgmtObject);
+}
+
+Manageable* Broker::GetVhostObject(void) const
+{
+ return vhostObject.get();
+}
+
+Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
+ Args& args)
+{
+ Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
+
+ QPID_LOG (debug, "Broker::ManagementMethod [id=" << methodId << "]");
+
+ switch (methodId)
+ {
+ case management::Broker::METHOD_ECHO :
+ status = Manageable::STATUS_OK;
+ break;
+ case management::Broker::METHOD_CONNECT : {
+ management::ArgsBrokerConnect& hp=
+ dynamic_cast<management::ArgsBrokerConnect&>(args);
+ connect(hp.i_host, hp.i_port);
+ status = Manageable::STATUS_OK;
+ break;
+ }
+ case management::Broker::METHOD_JOINCLUSTER :
+ case management::Broker::METHOD_LEAVECLUSTER :
+ status = Manageable::STATUS_NOT_IMPLEMENTED;
+ break;
+ }
+
+ return status;
+}
+
+void Broker::connect(
+ const std::string& host, uint16_t port,
+ sys::ConnectionCodec::Factory* f)
+{
+ getAcceptor().connect(host, port, f ? f : &factory);
+}
+
+void Broker::connect(
+ const Url& url, sys::ConnectionCodec::Factory* f)
+{
+ url.throwIfEmpty();
+ TcpAddress addr=boost::get<TcpAddress>(url[0]);
+ connect(addr.host, addr.port, f);
+}
+
+}} // namespace qpid::broker
+