diff options
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r-- | qpid/cpp/src/Makefile.am | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/Options.cpp | 57 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/Options.h | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/Plugin.h | 7 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.cpp | 83 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/DtxManager.cpp | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/DtxManager.h | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/MessageStoreModule.cpp | 10 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/MessageStoreModule.h | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueueRegistry.cpp | 12 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueueRegistry.h | 11 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpidd.cpp | 85 |
13 files changed, 210 insertions, 80 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index d663bbf9f0..99022e4e6c 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -92,6 +92,7 @@ libLogger_la_CXXFLAGS=$(AM_CXXFLAGS) -Wno-unused-parameter libqpidcommon_la_LIBADD = \ -lboost_program_options \ + -lboost_filesystem \ -luuid \ libLogger.la \ $(LIB_DLOPEN) \ diff --git a/qpid/cpp/src/qpid/Options.cpp b/qpid/cpp/src/qpid/Options.cpp index e44d053dae..212171b8c5 100644 --- a/qpid/cpp/src/qpid/Options.cpp +++ b/qpid/cpp/src/qpid/Options.cpp @@ -40,6 +40,10 @@ struct EnvOptMapper { return std::equal(env.begin(), env.end(), desc->long_name().begin(), &matchChar); } + static bool matchCase(const string& env, boost::shared_ptr<po::option_description> desc) { + return env == desc->long_name(); + } + EnvOptMapper(const Options& o) : opts(o) {} string operator()(const string& envVar) { @@ -54,6 +58,20 @@ struct EnvOptMapper { } return string(); } + + string configFileLine (string& line) { + size_t pos = line.find ('='); + if (pos == string::npos) + return string(); + string key = line.substr (0, pos); + typedef const std::vector< boost::shared_ptr<po::option_description> > OptDescs; + OptDescs::const_iterator i = + find_if(opts.options().begin(), opts.options().end(), boost::bind(matchCase, key, _1)); + if (i != opts.options().end()) + return string (line) + "\n"; + return string (); + } + const Options& opts; }; @@ -64,23 +82,52 @@ std::string prettyArg(const std::string& name, const std::string& value) { Options::Options(const string& name) : po::options_description(name) {} -void Options::parse(int argc, char** argv, const std::string& configFile) +void Options::parse(int argc, char** argv, const std::string& configFile, bool allowUnknown) { string defaultConfigFile = configFile; // May be changed by env/cmdline string parsing; try { po::variables_map vm; parsing="command line options"; - if (argc > 0 && argv != 0) - po::store(po::parse_command_line(argc, argv, *this), vm); + if (argc > 0 && argv != 0) { + if (allowUnknown) { + // This hideous workaround is required because boost 1.33 has a bug + // that causes 'allow_unregistered' to not work. + po::command_line_parser clp = po::command_line_parser(argc, argv). + options(*this).allow_unregistered(); + po::parsed_options opts = clp.run(); + po::parsed_options filtopts = clp.run(); + filtopts.options.clear (); + for (std::vector< po::basic_option<char> >::iterator i = opts.options.begin(); + i != opts.options.end(); i++) + if (!i->unregistered) + filtopts.options.push_back (*i); + po::store(filtopts, vm); + } + else + po::store(po::parse_command_line(argc, argv, *this), vm); + } parsing="environment variables"; po::store(po::parse_environment(*this, EnvOptMapper(*this)), vm); po::notify(vm); // configFile may be updated from arg/env options. if (!configFile.empty()) { parsing="configuration file "+configFile; ifstream conf(configFile.c_str()); - if (conf.good()) - po::store(po::parse_config_file(conf, *this), vm); + if (conf.good()) { + // Remove this hack when we get a stable version of boost that + // can allow unregistered options in config files. + EnvOptMapper mapper(*this); + stringstream filtered; + + while (!conf.eof()) { + string line; + getline (conf, line); + filtered << mapper.configFileLine (line); + } + + po::store(po::parse_config_file(filtered, *this), vm); + // End of hack + } else { // No error if default configfile is missing/unreadable // but complain for non-default config file. diff --git a/qpid/cpp/src/qpid/Options.h b/qpid/cpp/src/qpid/Options.h index b421e12b27..475d8e91d5 100644 --- a/qpid/cpp/src/qpid/Options.h +++ b/qpid/cpp/src/qpid/Options.h @@ -130,7 +130,8 @@ struct Options : public po::options_description { * is updated by argc/argv or environment variable parsing. */ void parse(int argc, char** argv, - const std::string& configfile=std::string()); + const std::string& configfile=std::string(), + bool allowUnknown = false); }; /** diff --git a/qpid/cpp/src/qpid/Plugin.h b/qpid/cpp/src/qpid/Plugin.h index e684d238a3..5aed844b43 100644 --- a/qpid/cpp/src/qpid/Plugin.h +++ b/qpid/cpp/src/qpid/Plugin.h @@ -73,6 +73,13 @@ class Plugin : boost::noncopyable * * Plugins should ignore targets they don't recognize. */ + virtual void earlyInitialize(Target&) = 0; + + /** + * Initialize Plugin functionality on a Target. + * + * Plugins should ignore targets they don't recognize. + */ virtual void initialize(Target&) = 0; /** List of registered Plugin objects. diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index c4a83b05e7..8edbb25cd5 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -67,13 +67,7 @@ Broker::Options::Options(const std::string& name) : workerThreads(5), maxConnections(500), connectionBacklog(10), - store(), stagingThreshold(5000000), - storeDir("/var"), - storeAsync(false), - storeForce(false), - numJrnlFiles(8), - jrnlFsizePgs(24), enableMgmt(0), mgmtPubInterval(10), ack(0) @@ -91,20 +85,6 @@ Broker::Options::Options(const std::string& name) : "Sets the connection backlog limit for the server socket") ("staging-threshold", optValue(stagingThreshold, "N"), "Stages messages over N bytes to disk") -// TODO: These options need to come from within the store module - ("store-lib,s", optValue(store,"LIBNAME"), - "Tells the broker to use the message store shared library LIBNAME for persistence") - ("store-directory", optValue(storeDir,"DIR"), - "Store directory location for persistence.") - ("store-async", optValue(storeAsync,"yes|no"), - "Use async persistence storage - if store supports it, enables AIO O_DIRECT.") - ("store-force", optValue(storeForce,"yes|no"), - "Force changing modes of store, will delete all existing data if mode is changed. Be SURE you want to do this!") - ("num-jfiles", qpid::optValue(numJrnlFiles, "N"), - "Number of files in persistence journal") - ("jfile-size-pgs", qpid::optValue(jrnlFsizePgs, "N"), - "Size of each journal file in multiples of read pages (1 read page = 64kiB)") -// End of store module options ("mgmt,m", optValue(enableMgmt,"yes|no"), "Enable Management") ("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"), @@ -122,29 +102,36 @@ const std::string qpid_management("qpid.management"); Broker::Broker(const Broker::Options& conf) : config(conf), - store(createStore(conf)), - queues(store.get()), + store(0), factory(*this), - dtxManager(store.get()), sessionManager(conf.ack) { + // Early-Initialize plugins + const Plugin::Plugins& plugins=Plugin::getPlugins(); + for (Plugin::Plugins::const_iterator i = plugins.begin(); + i != plugins.end(); + i++) + (*i)->earlyInitialize(*this); + + // If no plugin store module registered itself, set up the null store. + if (store == 0) + setStore (new NullMessageStore (false)); + + queues.setStore (store); + dtxManager.setStore (store); + if(conf.enableMgmt){ ManagementAgent::enableManagement (); managementAgent = ManagementAgent::getAgent (); managementAgent->setInterval (conf.mgmtPubInterval); mgmtObject = management::Broker::shared_ptr (new management::Broker (this, 0, 0, conf.port)); - mgmtObject->set_workerThreads (conf.workerThreads); - mgmtObject->set_maxConns (conf.maxConnections); - mgmtObject->set_connBacklog (conf.connectionBacklog); - mgmtObject->set_stagingThreshold (conf.stagingThreshold); - mgmtObject->set_storeLib (conf.store); - mgmtObject->set_asyncStore (conf.storeAsync); - mgmtObject->set_mgmtPubInterval (conf.mgmtPubInterval); - mgmtObject->set_initialDiskPageSize (0); - mgmtObject->set_initialPagesPerQueue (0); - mgmtObject->set_clusterName (""); - mgmtObject->set_version (PACKAGE_VERSION); + mgmtObject->set_workerThreads (conf.workerThreads); + mgmtObject->set_maxConns (conf.maxConnections); + mgmtObject->set_connBacklog (conf.connectionBacklog); + mgmtObject->set_stagingThreshold (conf.stagingThreshold); + mgmtObject->set_mgmtPubInterval (conf.mgmtPubInterval); + mgmtObject->set_version (PACKAGE_VERSION); managementAgent->addObject (mgmtObject, 1, 0); @@ -160,16 +147,12 @@ Broker::Broker(const Broker::Options& conf) : exchanges.declare(empty, DirectExchange::typeName); // Default exchange. - if(store.get()) { - if (!store->init(&conf)){ - throw Exception( "Existing Journal in different mode, backup/move existing data \ - before changing modes. Or use --store-force yes to blow existing data away."); - }else{ - RecoveryManagerImpl recoverer(queues, exchanges, dtxManager, + if (store != 0) { + RecoveryManagerImpl recoverer(queues, exchanges, dtxManager, conf.stagingThreshold); - store->recover(recoverer); - } + store->recover(recoverer); } + //ensure standard exchanges exist (done after recovery from store) declareStandardExchange(amq_direct, DirectExchange::typeName); declareStandardExchange(amq_topic, TopicExchange::typeName); @@ -188,7 +171,6 @@ Broker::Broker(const Broker::Options& conf) : QPID_LOG(info, "Management not enabled"); // Initialize plugins - const Plugin::Plugins& plugins=Plugin::getPlugins(); for (Plugin::Plugins::const_iterator i = plugins.begin(); i != plugins.end(); i++) @@ -197,7 +179,7 @@ Broker::Broker(const Broker::Options& conf) : void Broker::declareStandardExchange(const std::string& name, const std::string& type) { - bool storeEnabled = store.get(); + bool storeEnabled = store != NULL; std::pair<Exchange::shared_ptr, bool> status = exchanges.declare(name, type, storeEnabled); if (status.second && storeEnabled) { store->create(*status.first); @@ -217,13 +199,13 @@ shared_ptr<Broker> Broker::create(const Options& opts) return shared_ptr<Broker>(new Broker(opts)); } -MessageStore* Broker::createStore(const Options& config) { - if (config.store.empty()) - return new NullMessageStore(false); - else - return new MessageStoreModule(config.store); +void Broker::setStore (MessageStore* _store) +{ + assert (store == 0 && _store != 0); + if (store == 0 && _store != 0) + store = new MessageStoreModule (_store); } - + void Broker::run() { getAcceptor().run(&factory); } @@ -236,6 +218,7 @@ void Broker::shutdown() { Broker::~Broker() { shutdown(); + delete store; } uint16_t Broker::getPort() const { return getAcceptor().getPort(); } diff --git a/qpid/cpp/src/qpid/broker/DtxManager.cpp b/qpid/cpp/src/qpid/broker/DtxManager.cpp index 039a52cf2c..6070b17b24 100644 --- a/qpid/cpp/src/qpid/broker/DtxManager.cpp +++ b/qpid/cpp/src/qpid/broker/DtxManager.cpp @@ -32,7 +32,7 @@ using namespace qpid::ptr_map; using namespace qpid::broker; using namespace qpid::framing; -DtxManager::DtxManager(TransactionalStore* const _store) : store(_store) {} +DtxManager::DtxManager() : store(0) {} DtxManager::~DtxManager() {} @@ -160,3 +160,9 @@ void DtxManager::DtxCleanup::fire() //assume it was explicitly cleaned up after a call to prepare, commit or rollback } } + +void DtxManager::setStore (TransactionalStore* _store) +{ + assert (store == 0 && _store != 0); + store = _store; +} diff --git a/qpid/cpp/src/qpid/broker/DtxManager.h b/qpid/cpp/src/qpid/broker/DtxManager.h index 64eae1488d..fa5c62c233 100644 --- a/qpid/cpp/src/qpid/broker/DtxManager.h +++ b/qpid/cpp/src/qpid/broker/DtxManager.h @@ -45,7 +45,7 @@ class DtxManager{ }; WorkMap work; - TransactionalStore* const store; + TransactionalStore* store; qpid::sys::Mutex lock; Timer timer; @@ -54,7 +54,7 @@ class DtxManager{ DtxWorkRecord* createWork(std::string xid); public: - DtxManager(TransactionalStore* const store); + DtxManager(); ~DtxManager(); void start(const std::string& xid, DtxBuffer::shared_ptr work); void join(const std::string& xid, DtxBuffer::shared_ptr work); @@ -65,6 +65,7 @@ public: void setTimeout(const std::string& xid, uint32_t secs); uint32_t getTimeout(const std::string& xid); void timedout(const std::string& xid); + void setStore(TransactionalStore* store); }; } diff --git a/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp b/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp index 175055215c..094983e3fb 100644 --- a/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp +++ b/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp @@ -27,15 +27,15 @@ using namespace qpid::broker; -MessageStoreModule::MessageStoreModule(const std::string& name) : store(name) -{ -} +MessageStoreModule::MessageStoreModule(MessageStore* _store) : store(_store) {} -bool MessageStoreModule::init(const Options* options) +MessageStoreModule::~MessageStoreModule() { - TRANSFER_EXCEPTION(return store->init(options)); + delete store; } +bool MessageStoreModule::init(const Options*) { return true; } + void MessageStoreModule::create(PersistableQueue& queue) { TRANSFER_EXCEPTION(store->create(queue)); diff --git a/qpid/cpp/src/qpid/broker/MessageStoreModule.h b/qpid/cpp/src/qpid/broker/MessageStoreModule.h index d8f6ab7299..160d681fab 100644 --- a/qpid/cpp/src/qpid/broker/MessageStoreModule.h +++ b/qpid/cpp/src/qpid/broker/MessageStoreModule.h @@ -34,9 +34,9 @@ namespace broker { */ class MessageStoreModule : public MessageStore { - qpid::sys::Module<MessageStore> store; + MessageStore* store; public: - MessageStoreModule(const std::string& name); + MessageStoreModule(MessageStore* store); bool init(const Options* options); std::auto_ptr<TransactionContext> begin(); @@ -69,7 +69,7 @@ public: u_int32_t outstandingQueueAIO(const PersistableQueue& queue); void flush(const qpid::broker::PersistableQueue& queue); - ~MessageStoreModule(){} + ~MessageStoreModule(); }; } diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp index f246c653ea..14d5f362e3 100644 --- a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp +++ b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp @@ -26,8 +26,8 @@ using namespace qpid::broker; using namespace qpid::sys; -QueueRegistry::QueueRegistry(MessageStore* const _store) : - counter(1), store(_store), parent(0) {} +QueueRegistry::QueueRegistry() : + counter(1), store(0), parent(0) {} QueueRegistry::~QueueRegistry(){} @@ -82,6 +82,12 @@ string QueueRegistry::generateName(){ return name; } -MessageStore* const QueueRegistry::getStore() const { +void QueueRegistry::setStore (MessageStore* _store) +{ + assert (store == 0 && _store != 0); + store = _store; +} + +MessageStore* QueueRegistry::getStore() const { return store; } diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.h b/qpid/cpp/src/qpid/broker/QueueRegistry.h index 653a27b1d8..ccccef31b5 100644 --- a/qpid/cpp/src/qpid/broker/QueueRegistry.h +++ b/qpid/cpp/src/qpid/broker/QueueRegistry.h @@ -38,7 +38,7 @@ namespace broker { */ class QueueRegistry{ public: - QueueRegistry(MessageStore* const store = 0); + QueueRegistry(); ~QueueRegistry(); /** @@ -86,9 +86,14 @@ class QueueRegistry{ string generateName(); /** + * Set the store to use. May only be called once. + */ + void setStore (MessageStore*); + + /** * Return the message store used. */ - MessageStore* const getStore() const; + MessageStore* getStore() const; /** * Register the manageable parent for declared queues @@ -100,7 +105,7 @@ private: QueueMap queues; qpid::sys::RWlock lock; int counter; - MessageStore* const store; + MessageStore* store; management::Manageable* parent; }; diff --git a/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp b/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp index 309fd6afd0..380bd7f632 100644 --- a/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -47,6 +47,8 @@ struct ClusterPlugin : public Plugin { Options* getOptions() { return &options; } + void earlyInitialize(Plugin::Target&) {} + void initialize(Plugin::Target& target) { broker::Broker* broker = dynamic_cast<broker::Broker*>(&target); // Only provide to a Broker, and only if the --cluster config is set. diff --git a/qpid/cpp/src/qpidd.cpp b/qpid/cpp/src/qpidd.cpp index 00fa8d8d21..1081805c03 100644 --- a/qpid/cpp/src/qpidd.cpp +++ b/qpid/cpp/src/qpidd.cpp @@ -27,6 +27,7 @@ #include "qpid/Plugin.h" #include "qpid/sys/Shlib.h" #include "config.h" +#include <boost/filesystem/operations.hpp> #include <boost/filesystem/path.hpp> #include <iostream> #include <fstream> @@ -36,7 +37,23 @@ using namespace qpid; using namespace qpid::broker; using namespace qpid::sys; +using namespace qpid::log; using namespace std; +namespace fs=boost::filesystem; + +struct ModuleOptions : public qpid::Options { + string loadDir; + vector<string> load; + bool noLoad; + ModuleOptions() : qpid::Options("Module options"), loadDir("/usr/lib/qpidd"), noLoad(false) + { + addOptions() + ("load-dir", optValue(loadDir, "DIR"), "Load all modules from this directory") + ("load", optValue(load, "FILE"), "Specifies additional module(s) to be loaded") + ("no-modules", optValue(noLoad), "Don't load any modules"); + } +}; + struct DaemonOptions : public qpid::Options { bool daemon; @@ -57,12 +74,14 @@ struct DaemonOptions : public qpid::Options { struct QpiddOptions : public qpid::Options { CommonOptions common; + ModuleOptions module; Broker::Options broker; DaemonOptions daemon; qpid::log::Options log; QpiddOptions() : qpid::Options("Options"), common("", "/etc/qpidd.conf") { add(common); + add(module); add(broker); add(daemon); add(log); @@ -79,6 +98,23 @@ struct QpiddOptions : public qpid::Options { }; }; +// BootstrapOptions is a minimal subset of options used for a pre-parse +// of the command line to discover which plugin modules need to be loaded. +// The pre-parse is necessary because plugin modules may supply their own +// set of options. CommonOptions is needed to properly support loading +// from a configuration file. +struct BootstrapOptions : public qpid::Options { + CommonOptions common; + ModuleOptions module; + qpid::log::Options log; + + BootstrapOptions() : qpid::Options("Options"), common("", "/etc/qpidd.conf") { + add(common); + add(module); + add(log); + } +}; + // Globals shared_ptr<Broker> brokerPtr; auto_ptr<QpiddOptions> options; @@ -108,24 +144,59 @@ struct QpiddDaemon : public Daemon { void tryShlib(const char* libname) { try { Shlib shlib(libname); + QPID_LOG (info, "Loaded Module: " << libname); + } + catch (const exception& e) {} +} + +void loadModuleDir (string dirname, bool isDefault) +{ + fs::path dirPath (dirname); + + if (!fs::exists (dirPath)) + { + if (isDefault) + return; + throw Exception ("Directory not found: " + dirname); } - catch (const exception& e) { - // TODO aconway 2007-07-09: Should log failures as INFO - // at least, but we try shlibs before logging is configured. + + fs::directory_iterator endItr; + for (fs::directory_iterator itr (dirPath); itr != endItr; ++itr) + { + if (!fs::is_directory(*itr) && + itr->string().find (".so") == itr->string().length() - 3) + tryShlib (itr->string().data()); } } int main(int argc, char* argv[]) { - try { - // Load optional modules - tryShlib("libqpidcluster.so.0"); + try + { + { + BootstrapOptions bootOptions; + string defaultPath (bootOptions.module.loadDir); + + // Parse only the common, load, and log options to see which modules need + // to be loaded. Once the modules are loaded, the command line will + // be re-parsed with all of the module-supplied options. + bootOptions.parse (argc, argv, bootOptions.common.config, true); + qpid::log::Logger::instance().configure(bootOptions.log, argv[0]); + if (!bootOptions.module.noLoad) { + for (vector<string>::iterator iter = bootOptions.module.load.begin(); + iter != bootOptions.module.load.end(); + iter++) + tryShlib (iter->data()); + + bool isDefault = defaultPath == bootOptions.module.loadDir; + loadModuleDir (bootOptions.module.loadDir, isDefault); + } + } // Parse options options.reset(new QpiddOptions()); options->parse(argc, argv, options->common.config); - qpid::log::Logger::instance().configure(options->log, argv[0]); // Options that just print information. if(options->common.help || options->common.version) { |