summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/src/Makefile.am1
-rw-r--r--qpid/cpp/src/qpid/Options.cpp57
-rw-r--r--qpid/cpp/src/qpid/Options.h3
-rw-r--r--qpid/cpp/src/qpid/Plugin.h7
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp83
-rw-r--r--qpid/cpp/src/qpid/broker/DtxManager.cpp8
-rw-r--r--qpid/cpp/src/qpid/broker/DtxManager.h5
-rw-r--r--qpid/cpp/src/qpid/broker/MessageStoreModule.cpp10
-rw-r--r--qpid/cpp/src/qpid/broker/MessageStoreModule.h6
-rw-r--r--qpid/cpp/src/qpid/broker/QueueRegistry.cpp12
-rw-r--r--qpid/cpp/src/qpid/broker/QueueRegistry.h11
-rw-r--r--qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp2
-rw-r--r--qpid/cpp/src/qpidd.cpp85
13 files changed, 210 insertions, 80 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index d663bbf9f0..99022e4e6c 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -92,6 +92,7 @@ libLogger_la_CXXFLAGS=$(AM_CXXFLAGS) -Wno-unused-parameter
libqpidcommon_la_LIBADD = \
-lboost_program_options \
+ -lboost_filesystem \
-luuid \
libLogger.la \
$(LIB_DLOPEN) \
diff --git a/qpid/cpp/src/qpid/Options.cpp b/qpid/cpp/src/qpid/Options.cpp
index e44d053dae..212171b8c5 100644
--- a/qpid/cpp/src/qpid/Options.cpp
+++ b/qpid/cpp/src/qpid/Options.cpp
@@ -40,6 +40,10 @@ struct EnvOptMapper {
return std::equal(env.begin(), env.end(), desc->long_name().begin(), &matchChar);
}
+ static bool matchCase(const string& env, boost::shared_ptr<po::option_description> desc) {
+ return env == desc->long_name();
+ }
+
EnvOptMapper(const Options& o) : opts(o) {}
string operator()(const string& envVar) {
@@ -54,6 +58,20 @@ struct EnvOptMapper {
}
return string();
}
+
+ string configFileLine (string& line) {
+ size_t pos = line.find ('=');
+ if (pos == string::npos)
+ return string();
+ string key = line.substr (0, pos);
+ typedef const std::vector< boost::shared_ptr<po::option_description> > OptDescs;
+ OptDescs::const_iterator i =
+ find_if(opts.options().begin(), opts.options().end(), boost::bind(matchCase, key, _1));
+ if (i != opts.options().end())
+ return string (line) + "\n";
+ return string ();
+ }
+
const Options& opts;
};
@@ -64,23 +82,52 @@ std::string prettyArg(const std::string& name, const std::string& value) {
Options::Options(const string& name) : po::options_description(name) {}
-void Options::parse(int argc, char** argv, const std::string& configFile)
+void Options::parse(int argc, char** argv, const std::string& configFile, bool allowUnknown)
{
string defaultConfigFile = configFile; // May be changed by env/cmdline
string parsing;
try {
po::variables_map vm;
parsing="command line options";
- if (argc > 0 && argv != 0)
- po::store(po::parse_command_line(argc, argv, *this), vm);
+ if (argc > 0 && argv != 0) {
+ if (allowUnknown) {
+ // This hideous workaround is required because boost 1.33 has a bug
+ // that causes 'allow_unregistered' to not work.
+ po::command_line_parser clp = po::command_line_parser(argc, argv).
+ options(*this).allow_unregistered();
+ po::parsed_options opts = clp.run();
+ po::parsed_options filtopts = clp.run();
+ filtopts.options.clear ();
+ for (std::vector< po::basic_option<char> >::iterator i = opts.options.begin();
+ i != opts.options.end(); i++)
+ if (!i->unregistered)
+ filtopts.options.push_back (*i);
+ po::store(filtopts, vm);
+ }
+ else
+ po::store(po::parse_command_line(argc, argv, *this), vm);
+ }
parsing="environment variables";
po::store(po::parse_environment(*this, EnvOptMapper(*this)), vm);
po::notify(vm); // configFile may be updated from arg/env options.
if (!configFile.empty()) {
parsing="configuration file "+configFile;
ifstream conf(configFile.c_str());
- if (conf.good())
- po::store(po::parse_config_file(conf, *this), vm);
+ if (conf.good()) {
+ // Remove this hack when we get a stable version of boost that
+ // can allow unregistered options in config files.
+ EnvOptMapper mapper(*this);
+ stringstream filtered;
+
+ while (!conf.eof()) {
+ string line;
+ getline (conf, line);
+ filtered << mapper.configFileLine (line);
+ }
+
+ po::store(po::parse_config_file(filtered, *this), vm);
+ // End of hack
+ }
else {
// No error if default configfile is missing/unreadable
// but complain for non-default config file.
diff --git a/qpid/cpp/src/qpid/Options.h b/qpid/cpp/src/qpid/Options.h
index b421e12b27..475d8e91d5 100644
--- a/qpid/cpp/src/qpid/Options.h
+++ b/qpid/cpp/src/qpid/Options.h
@@ -130,7 +130,8 @@ struct Options : public po::options_description {
* is updated by argc/argv or environment variable parsing.
*/
void parse(int argc, char** argv,
- const std::string& configfile=std::string());
+ const std::string& configfile=std::string(),
+ bool allowUnknown = false);
};
/**
diff --git a/qpid/cpp/src/qpid/Plugin.h b/qpid/cpp/src/qpid/Plugin.h
index e684d238a3..5aed844b43 100644
--- a/qpid/cpp/src/qpid/Plugin.h
+++ b/qpid/cpp/src/qpid/Plugin.h
@@ -73,6 +73,13 @@ class Plugin : boost::noncopyable
*
* Plugins should ignore targets they don't recognize.
*/
+ virtual void earlyInitialize(Target&) = 0;
+
+ /**
+ * Initialize Plugin functionality on a Target.
+ *
+ * Plugins should ignore targets they don't recognize.
+ */
virtual void initialize(Target&) = 0;
/** List of registered Plugin objects.
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index c4a83b05e7..8edbb25cd5 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -67,13 +67,7 @@ Broker::Options::Options(const std::string& name) :
workerThreads(5),
maxConnections(500),
connectionBacklog(10),
- store(),
stagingThreshold(5000000),
- storeDir("/var"),
- storeAsync(false),
- storeForce(false),
- numJrnlFiles(8),
- jrnlFsizePgs(24),
enableMgmt(0),
mgmtPubInterval(10),
ack(0)
@@ -91,20 +85,6 @@ Broker::Options::Options(const std::string& name) :
"Sets the connection backlog limit for the server socket")
("staging-threshold", optValue(stagingThreshold, "N"),
"Stages messages over N bytes to disk")
-// TODO: These options need to come from within the store module
- ("store-lib,s", optValue(store,"LIBNAME"),
- "Tells the broker to use the message store shared library LIBNAME for persistence")
- ("store-directory", optValue(storeDir,"DIR"),
- "Store directory location for persistence.")
- ("store-async", optValue(storeAsync,"yes|no"),
- "Use async persistence storage - if store supports it, enables AIO O_DIRECT.")
- ("store-force", optValue(storeForce,"yes|no"),
- "Force changing modes of store, will delete all existing data if mode is changed. Be SURE you want to do this!")
- ("num-jfiles", qpid::optValue(numJrnlFiles, "N"),
- "Number of files in persistence journal")
- ("jfile-size-pgs", qpid::optValue(jrnlFsizePgs, "N"),
- "Size of each journal file in multiples of read pages (1 read page = 64kiB)")
-// End of store module options
("mgmt,m", optValue(enableMgmt,"yes|no"),
"Enable Management")
("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"),
@@ -122,29 +102,36 @@ const std::string qpid_management("qpid.management");
Broker::Broker(const Broker::Options& conf) :
config(conf),
- store(createStore(conf)),
- queues(store.get()),
+ store(0),
factory(*this),
- dtxManager(store.get()),
sessionManager(conf.ack)
{
+ // Early-Initialize plugins
+ const Plugin::Plugins& plugins=Plugin::getPlugins();
+ for (Plugin::Plugins::const_iterator i = plugins.begin();
+ i != plugins.end();
+ i++)
+ (*i)->earlyInitialize(*this);
+
+ // If no plugin store module registered itself, set up the null store.
+ if (store == 0)
+ setStore (new NullMessageStore (false));
+
+ queues.setStore (store);
+ dtxManager.setStore (store);
+
if(conf.enableMgmt){
ManagementAgent::enableManagement ();
managementAgent = ManagementAgent::getAgent ();
managementAgent->setInterval (conf.mgmtPubInterval);
mgmtObject = management::Broker::shared_ptr (new management::Broker (this, 0, 0, conf.port));
- mgmtObject->set_workerThreads (conf.workerThreads);
- mgmtObject->set_maxConns (conf.maxConnections);
- mgmtObject->set_connBacklog (conf.connectionBacklog);
- mgmtObject->set_stagingThreshold (conf.stagingThreshold);
- mgmtObject->set_storeLib (conf.store);
- mgmtObject->set_asyncStore (conf.storeAsync);
- mgmtObject->set_mgmtPubInterval (conf.mgmtPubInterval);
- mgmtObject->set_initialDiskPageSize (0);
- mgmtObject->set_initialPagesPerQueue (0);
- mgmtObject->set_clusterName ("");
- mgmtObject->set_version (PACKAGE_VERSION);
+ mgmtObject->set_workerThreads (conf.workerThreads);
+ mgmtObject->set_maxConns (conf.maxConnections);
+ mgmtObject->set_connBacklog (conf.connectionBacklog);
+ mgmtObject->set_stagingThreshold (conf.stagingThreshold);
+ mgmtObject->set_mgmtPubInterval (conf.mgmtPubInterval);
+ mgmtObject->set_version (PACKAGE_VERSION);
managementAgent->addObject (mgmtObject, 1, 0);
@@ -160,16 +147,12 @@ Broker::Broker(const Broker::Options& conf) :
exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
- if(store.get()) {
- if (!store->init(&conf)){
- throw Exception( "Existing Journal in different mode, backup/move existing data \
- before changing modes. Or use --store-force yes to blow existing data away.");
- }else{
- RecoveryManagerImpl recoverer(queues, exchanges, dtxManager,
+ if (store != 0) {
+ RecoveryManagerImpl recoverer(queues, exchanges, dtxManager,
conf.stagingThreshold);
- store->recover(recoverer);
- }
+ store->recover(recoverer);
}
+
//ensure standard exchanges exist (done after recovery from store)
declareStandardExchange(amq_direct, DirectExchange::typeName);
declareStandardExchange(amq_topic, TopicExchange::typeName);
@@ -188,7 +171,6 @@ Broker::Broker(const Broker::Options& conf) :
QPID_LOG(info, "Management not enabled");
// Initialize plugins
- const Plugin::Plugins& plugins=Plugin::getPlugins();
for (Plugin::Plugins::const_iterator i = plugins.begin();
i != plugins.end();
i++)
@@ -197,7 +179,7 @@ Broker::Broker(const Broker::Options& conf) :
void Broker::declareStandardExchange(const std::string& name, const std::string& type)
{
- bool storeEnabled = store.get();
+ bool storeEnabled = store != NULL;
std::pair<Exchange::shared_ptr, bool> status = exchanges.declare(name, type, storeEnabled);
if (status.second && storeEnabled) {
store->create(*status.first);
@@ -217,13 +199,13 @@ shared_ptr<Broker> Broker::create(const Options& opts)
return shared_ptr<Broker>(new Broker(opts));
}
-MessageStore* Broker::createStore(const Options& config) {
- if (config.store.empty())
- return new NullMessageStore(false);
- else
- return new MessageStoreModule(config.store);
+void Broker::setStore (MessageStore* _store)
+{
+ assert (store == 0 && _store != 0);
+ if (store == 0 && _store != 0)
+ store = new MessageStoreModule (_store);
}
-
+
void Broker::run() {
getAcceptor().run(&factory);
}
@@ -236,6 +218,7 @@ void Broker::shutdown() {
Broker::~Broker() {
shutdown();
+ delete store;
}
uint16_t Broker::getPort() const { return getAcceptor().getPort(); }
diff --git a/qpid/cpp/src/qpid/broker/DtxManager.cpp b/qpid/cpp/src/qpid/broker/DtxManager.cpp
index 039a52cf2c..6070b17b24 100644
--- a/qpid/cpp/src/qpid/broker/DtxManager.cpp
+++ b/qpid/cpp/src/qpid/broker/DtxManager.cpp
@@ -32,7 +32,7 @@ using namespace qpid::ptr_map;
using namespace qpid::broker;
using namespace qpid::framing;
-DtxManager::DtxManager(TransactionalStore* const _store) : store(_store) {}
+DtxManager::DtxManager() : store(0) {}
DtxManager::~DtxManager() {}
@@ -160,3 +160,9 @@ void DtxManager::DtxCleanup::fire()
//assume it was explicitly cleaned up after a call to prepare, commit or rollback
}
}
+
+void DtxManager::setStore (TransactionalStore* _store)
+{
+ assert (store == 0 && _store != 0);
+ store = _store;
+}
diff --git a/qpid/cpp/src/qpid/broker/DtxManager.h b/qpid/cpp/src/qpid/broker/DtxManager.h
index 64eae1488d..fa5c62c233 100644
--- a/qpid/cpp/src/qpid/broker/DtxManager.h
+++ b/qpid/cpp/src/qpid/broker/DtxManager.h
@@ -45,7 +45,7 @@ class DtxManager{
};
WorkMap work;
- TransactionalStore* const store;
+ TransactionalStore* store;
qpid::sys::Mutex lock;
Timer timer;
@@ -54,7 +54,7 @@ class DtxManager{
DtxWorkRecord* createWork(std::string xid);
public:
- DtxManager(TransactionalStore* const store);
+ DtxManager();
~DtxManager();
void start(const std::string& xid, DtxBuffer::shared_ptr work);
void join(const std::string& xid, DtxBuffer::shared_ptr work);
@@ -65,6 +65,7 @@ public:
void setTimeout(const std::string& xid, uint32_t secs);
uint32_t getTimeout(const std::string& xid);
void timedout(const std::string& xid);
+ void setStore(TransactionalStore* store);
};
}
diff --git a/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp b/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
index 175055215c..094983e3fb 100644
--- a/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
+++ b/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
@@ -27,15 +27,15 @@
using namespace qpid::broker;
-MessageStoreModule::MessageStoreModule(const std::string& name) : store(name)
-{
-}
+MessageStoreModule::MessageStoreModule(MessageStore* _store) : store(_store) {}
-bool MessageStoreModule::init(const Options* options)
+MessageStoreModule::~MessageStoreModule()
{
- TRANSFER_EXCEPTION(return store->init(options));
+ delete store;
}
+bool MessageStoreModule::init(const Options*) { return true; }
+
void MessageStoreModule::create(PersistableQueue& queue)
{
TRANSFER_EXCEPTION(store->create(queue));
diff --git a/qpid/cpp/src/qpid/broker/MessageStoreModule.h b/qpid/cpp/src/qpid/broker/MessageStoreModule.h
index d8f6ab7299..160d681fab 100644
--- a/qpid/cpp/src/qpid/broker/MessageStoreModule.h
+++ b/qpid/cpp/src/qpid/broker/MessageStoreModule.h
@@ -34,9 +34,9 @@ namespace broker {
*/
class MessageStoreModule : public MessageStore
{
- qpid::sys::Module<MessageStore> store;
+ MessageStore* store;
public:
- MessageStoreModule(const std::string& name);
+ MessageStoreModule(MessageStore* store);
bool init(const Options* options);
std::auto_ptr<TransactionContext> begin();
@@ -69,7 +69,7 @@ public:
u_int32_t outstandingQueueAIO(const PersistableQueue& queue);
void flush(const qpid::broker::PersistableQueue& queue);
- ~MessageStoreModule(){}
+ ~MessageStoreModule();
};
}
diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
index f246c653ea..14d5f362e3 100644
--- a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
@@ -26,8 +26,8 @@
using namespace qpid::broker;
using namespace qpid::sys;
-QueueRegistry::QueueRegistry(MessageStore* const _store) :
- counter(1), store(_store), parent(0) {}
+QueueRegistry::QueueRegistry() :
+ counter(1), store(0), parent(0) {}
QueueRegistry::~QueueRegistry(){}
@@ -82,6 +82,12 @@ string QueueRegistry::generateName(){
return name;
}
-MessageStore* const QueueRegistry::getStore() const {
+void QueueRegistry::setStore (MessageStore* _store)
+{
+ assert (store == 0 && _store != 0);
+ store = _store;
+}
+
+MessageStore* QueueRegistry::getStore() const {
return store;
}
diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.h b/qpid/cpp/src/qpid/broker/QueueRegistry.h
index 653a27b1d8..ccccef31b5 100644
--- a/qpid/cpp/src/qpid/broker/QueueRegistry.h
+++ b/qpid/cpp/src/qpid/broker/QueueRegistry.h
@@ -38,7 +38,7 @@ namespace broker {
*/
class QueueRegistry{
public:
- QueueRegistry(MessageStore* const store = 0);
+ QueueRegistry();
~QueueRegistry();
/**
@@ -86,9 +86,14 @@ class QueueRegistry{
string generateName();
/**
+ * Set the store to use. May only be called once.
+ */
+ void setStore (MessageStore*);
+
+ /**
* Return the message store used.
*/
- MessageStore* const getStore() const;
+ MessageStore* getStore() const;
/**
* Register the manageable parent for declared queues
@@ -100,7 +105,7 @@ private:
QueueMap queues;
qpid::sys::RWlock lock;
int counter;
- MessageStore* const store;
+ MessageStore* store;
management::Manageable* parent;
};
diff --git a/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp b/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
index 309fd6afd0..380bd7f632 100644
--- a/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
+++ b/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
@@ -47,6 +47,8 @@ struct ClusterPlugin : public Plugin {
Options* getOptions() { return &options; }
+ void earlyInitialize(Plugin::Target&) {}
+
void initialize(Plugin::Target& target) {
broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
// Only provide to a Broker, and only if the --cluster config is set.
diff --git a/qpid/cpp/src/qpidd.cpp b/qpid/cpp/src/qpidd.cpp
index 00fa8d8d21..1081805c03 100644
--- a/qpid/cpp/src/qpidd.cpp
+++ b/qpid/cpp/src/qpidd.cpp
@@ -27,6 +27,7 @@
#include "qpid/Plugin.h"
#include "qpid/sys/Shlib.h"
#include "config.h"
+#include <boost/filesystem/operations.hpp>
#include <boost/filesystem/path.hpp>
#include <iostream>
#include <fstream>
@@ -36,7 +37,23 @@
using namespace qpid;
using namespace qpid::broker;
using namespace qpid::sys;
+using namespace qpid::log;
using namespace std;
+namespace fs=boost::filesystem;
+
+struct ModuleOptions : public qpid::Options {
+ string loadDir;
+ vector<string> load;
+ bool noLoad;
+ ModuleOptions() : qpid::Options("Module options"), loadDir("/usr/lib/qpidd"), noLoad(false)
+ {
+ addOptions()
+ ("load-dir", optValue(loadDir, "DIR"), "Load all modules from this directory")
+ ("load", optValue(load, "FILE"), "Specifies additional module(s) to be loaded")
+ ("no-modules", optValue(noLoad), "Don't load any modules");
+ }
+};
+
struct DaemonOptions : public qpid::Options {
bool daemon;
@@ -57,12 +74,14 @@ struct DaemonOptions : public qpid::Options {
struct QpiddOptions : public qpid::Options {
CommonOptions common;
+ ModuleOptions module;
Broker::Options broker;
DaemonOptions daemon;
qpid::log::Options log;
QpiddOptions() : qpid::Options("Options"), common("", "/etc/qpidd.conf") {
add(common);
+ add(module);
add(broker);
add(daemon);
add(log);
@@ -79,6 +98,23 @@ struct QpiddOptions : public qpid::Options {
};
};
+// BootstrapOptions is a minimal subset of options used for a pre-parse
+// of the command line to discover which plugin modules need to be loaded.
+// The pre-parse is necessary because plugin modules may supply their own
+// set of options. CommonOptions is needed to properly support loading
+// from a configuration file.
+struct BootstrapOptions : public qpid::Options {
+ CommonOptions common;
+ ModuleOptions module;
+ qpid::log::Options log;
+
+ BootstrapOptions() : qpid::Options("Options"), common("", "/etc/qpidd.conf") {
+ add(common);
+ add(module);
+ add(log);
+ }
+};
+
// Globals
shared_ptr<Broker> brokerPtr;
auto_ptr<QpiddOptions> options;
@@ -108,24 +144,59 @@ struct QpiddDaemon : public Daemon {
void tryShlib(const char* libname) {
try {
Shlib shlib(libname);
+ QPID_LOG (info, "Loaded Module: " << libname);
+ }
+ catch (const exception& e) {}
+}
+
+void loadModuleDir (string dirname, bool isDefault)
+{
+ fs::path dirPath (dirname);
+
+ if (!fs::exists (dirPath))
+ {
+ if (isDefault)
+ return;
+ throw Exception ("Directory not found: " + dirname);
}
- catch (const exception& e) {
- // TODO aconway 2007-07-09: Should log failures as INFO
- // at least, but we try shlibs before logging is configured.
+
+ fs::directory_iterator endItr;
+ for (fs::directory_iterator itr (dirPath); itr != endItr; ++itr)
+ {
+ if (!fs::is_directory(*itr) &&
+ itr->string().find (".so") == itr->string().length() - 3)
+ tryShlib (itr->string().data());
}
}
int main(int argc, char* argv[])
{
- try {
- // Load optional modules
- tryShlib("libqpidcluster.so.0");
+ try
+ {
+ {
+ BootstrapOptions bootOptions;
+ string defaultPath (bootOptions.module.loadDir);
+
+ // Parse only the common, load, and log options to see which modules need
+ // to be loaded. Once the modules are loaded, the command line will
+ // be re-parsed with all of the module-supplied options.
+ bootOptions.parse (argc, argv, bootOptions.common.config, true);
+ qpid::log::Logger::instance().configure(bootOptions.log, argv[0]);
+ if (!bootOptions.module.noLoad) {
+ for (vector<string>::iterator iter = bootOptions.module.load.begin();
+ iter != bootOptions.module.load.end();
+ iter++)
+ tryShlib (iter->data());
+
+ bool isDefault = defaultPath == bootOptions.module.loadDir;
+ loadModuleDir (bootOptions.module.loadDir, isDefault);
+ }
+ }
// Parse options
options.reset(new QpiddOptions());
options->parse(argc, argv, options->common.config);
- qpid::log::Logger::instance().configure(options->log, argv[0]);
// Options that just print information.
if(options->common.help || options->common.version) {