diff options
Diffstat (limited to 'qpid/cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.cpp | 14 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/MessageStore.h | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/MessageStoreModule.cpp | 49 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/MessageStoreModule.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/NullMessageStore.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/NullMessageStore.h | 2 |
7 files changed, 44 insertions, 30 deletions
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 95769d224d..9ac73c0219 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -72,6 +72,8 @@ Broker::Options::Options(const std::string& name) : storeDir("/var"), storeAsync(false), storeForce(false), + numJrnlFiles(8), + jrnlFsizePgs(24), enableMgmt(0), mgmtPubInterval(10), ack(0) @@ -89,14 +91,20 @@ Broker::Options::Options(const std::string& name) : "Sets the connection backlog limit for the server socket") ("staging-threshold", optValue(stagingThreshold, "N"), "Stages messages over N bytes to disk") +// TODO: These options need to come from within the store module ("store,s", optValue(store,"LIBNAME"), "Tells the broker to use the message store shared library LIBNAME for persistence") ("store-directory", optValue(storeDir,"DIR"), "Store directory location for persistence.") ("store-async", optValue(storeAsync,"yes|no"), - "Use async persistence storage - if store supports it, enable AIO 0-DIRECT.") + "Use async persistence storage - if store supports it, enables AIO O_DIRECT.") ("store-force", optValue(storeForce,"yes|no"), - "Force changing modes of store, will delete all existing data if mode is changed. Be SURE you want to do this") + "Force changing modes of store, will delete all existing data if mode is changed. Be SURE you want to do this!") + ("num-jfiles", qpid::optValue(numJrnlFiles, "N"), + "Number of files in persistence journal") + ("jfile-size-pgs", qpid::optValue(jrnlFsizePgs, "N"), + "Size of each journal file in multiples of read pages (1 read page = 64kiB)") +// End of store module options ("mgmt,m", optValue(enableMgmt,"yes|no"), "Enable Management") ("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"), @@ -152,7 +160,7 @@ Broker::Broker(const Broker::Options& conf) : exchanges.declare(empty, DirectExchange::typeName); // Default exchange. if(store.get()) { - if (!store->init(conf.storeDir, conf.storeAsync, conf.storeForce)){ + if (!store->init(&conf)){ throw Exception( "Existing Journal in different mode, backup/move existing data \ before changing modes. Or use --store-force yes to blow existing data away."); }else{ diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index 0980c970d2..c1051856a4 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -71,6 +71,8 @@ class Broker : public sys::Runnable, public Plugin::Target, public management::M string storeDir; bool storeAsync; bool storeForce; + u_int16_t numJrnlFiles; + u_int32_t jrnlFsizePgs; bool enableMgmt; uint16_t mgmtPubInterval; uint32_t ack; diff --git a/qpid/cpp/src/qpid/broker/MessageStore.h b/qpid/cpp/src/qpid/broker/MessageStore.h index 432fe30bb3..73ece93c72 100644 --- a/qpid/cpp/src/qpid/broker/MessageStore.h +++ b/qpid/cpp/src/qpid/broker/MessageStore.h @@ -22,6 +22,7 @@ #define _MessageStore_ #include <boost/shared_ptr.hpp> +#include <qpid/Options.h> #include "PersistableExchange.h" #include "PersistableMessage.h" #include "PersistableQueue.h" @@ -47,7 +48,7 @@ public: * @param async true, enable async, false, enable sync * @param force true, delete data on mode change, false, error on mode change */ - virtual bool init(const std::string& dir, const bool async, const bool force) = 0; + virtual bool init(const Options* options) = 0; /** * Record the existence of a durable queue diff --git a/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp b/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp index d769030172..4850cfb921 100644 --- a/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp +++ b/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp @@ -22,121 +22,124 @@ #include "MessageStoreModule.h" #include <iostream> +// This transfer protects against the unloading of the store lib prior to the handling of the exception +#define TRANSFER_EXCEPTION(fn) try { fn; } catch (std::exception& e) { throw Exception(e.what()); } + using namespace qpid::broker; MessageStoreModule::MessageStoreModule(const std::string& name) : store(name) { } -bool MessageStoreModule::init(const std::string& dir, const bool async, const bool force) +bool MessageStoreModule::init(const Options* options) { - return store->init(dir, async, force); + TRANSFER_EXCEPTION(return store->init(options)); } void MessageStoreModule::create(PersistableQueue& queue) { - store->create(queue); + TRANSFER_EXCEPTION(store->create(queue)); } void MessageStoreModule::destroy(PersistableQueue& queue) { - store->destroy(queue); + TRANSFER_EXCEPTION(store->destroy(queue)); } void MessageStoreModule::create(const PersistableExchange& exchange) { - store->create(exchange); + TRANSFER_EXCEPTION(store->create(exchange)); } void MessageStoreModule::destroy(const PersistableExchange& exchange) { - store->destroy(exchange); + TRANSFER_EXCEPTION(store->destroy(exchange)); } void MessageStoreModule::bind(const PersistableExchange& e, const PersistableQueue& q, const std::string& k, const framing::FieldTable& a) { - store->bind(e, q, k, a); + TRANSFER_EXCEPTION(store->bind(e, q, k, a)); } void MessageStoreModule::unbind(const PersistableExchange& e, const PersistableQueue& q, const std::string& k, const framing::FieldTable& a) { - store->unbind(e, q, k, a); + TRANSFER_EXCEPTION(store->unbind(e, q, k, a)); } void MessageStoreModule::recover(RecoveryManager& registry) { - store->recover(registry); + TRANSFER_EXCEPTION(store->recover(registry)); } void MessageStoreModule::stage( intrusive_ptr<PersistableMessage>& msg) { - store->stage(msg); + TRANSFER_EXCEPTION(store->stage(msg)); } void MessageStoreModule::destroy(intrusive_ptr<PersistableMessage>& msg) { - store->destroy(msg); + TRANSFER_EXCEPTION(store->destroy(msg)); } void MessageStoreModule::appendContent(intrusive_ptr<const PersistableMessage>& msg, const std::string& data) { - store->appendContent(msg, data); + TRANSFER_EXCEPTION(store->appendContent(msg, data)); } void MessageStoreModule::loadContent(const qpid::broker::PersistableQueue& queue, intrusive_ptr<const PersistableMessage>& msg, string& data, uint64_t offset, uint32_t length) { - store->loadContent(queue, msg, data, offset, length); + TRANSFER_EXCEPTION(store->loadContent(queue, msg, data, offset, length)); } void MessageStoreModule::enqueue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg, const PersistableQueue& queue) { - store->enqueue(ctxt, msg, queue); + TRANSFER_EXCEPTION(store->enqueue(ctxt, msg, queue)); } void MessageStoreModule::dequeue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg, const PersistableQueue& queue) { - store->dequeue(ctxt, msg, queue); + TRANSFER_EXCEPTION(store->dequeue(ctxt, msg, queue)); } void MessageStoreModule::flush(const qpid::broker::PersistableQueue& queue) { - store->flush(queue); + TRANSFER_EXCEPTION(store->flush(queue)); } u_int32_t MessageStoreModule::outstandingQueueAIO(const PersistableQueue& queue) { - return store->outstandingQueueAIO(queue); + TRANSFER_EXCEPTION(return store->outstandingQueueAIO(queue)); } std::auto_ptr<TransactionContext> MessageStoreModule::begin() { - return store->begin(); + TRANSFER_EXCEPTION(return store->begin()); } std::auto_ptr<TPCTransactionContext> MessageStoreModule::begin(const std::string& xid) { - return store->begin(xid); + TRANSFER_EXCEPTION(return store->begin(xid)); } void MessageStoreModule::prepare(TPCTransactionContext& txn) { - store->prepare(txn); + TRANSFER_EXCEPTION(store->prepare(txn)); } void MessageStoreModule::commit(TransactionContext& ctxt) { - store->commit(ctxt); + TRANSFER_EXCEPTION(store->commit(ctxt)); } void MessageStoreModule::abort(TransactionContext& ctxt) { - store->abort(ctxt); + TRANSFER_EXCEPTION(store->abort(ctxt)); } void MessageStoreModule::collectPreparedXids(std::set<std::string>& xids) { - store->collectPreparedXids(xids); + TRANSFER_EXCEPTION(store->collectPreparedXids(xids)); } diff --git a/qpid/cpp/src/qpid/broker/MessageStoreModule.h b/qpid/cpp/src/qpid/broker/MessageStoreModule.h index e7404487b0..ce8e746193 100644 --- a/qpid/cpp/src/qpid/broker/MessageStoreModule.h +++ b/qpid/cpp/src/qpid/broker/MessageStoreModule.h @@ -38,7 +38,7 @@ class MessageStoreModule : public MessageStore public: MessageStoreModule(const std::string& name); - bool init(const std::string& dir, const bool async, const bool force); + bool init(const Options* options); std::auto_ptr<TransactionContext> begin(); std::auto_ptr<TPCTransactionContext> begin(const std::string& xid); void prepare(TPCTransactionContext& txn); diff --git a/qpid/cpp/src/qpid/broker/NullMessageStore.cpp b/qpid/cpp/src/qpid/broker/NullMessageStore.cpp index c0dbd9a315..5890be8d1a 100644 --- a/qpid/cpp/src/qpid/broker/NullMessageStore.cpp +++ b/qpid/cpp/src/qpid/broker/NullMessageStore.cpp @@ -49,7 +49,7 @@ using namespace qpid::broker; NullMessageStore::NullMessageStore(bool _warn) : warn(_warn){} -bool NullMessageStore::init(const std::string& /*dir*/, const bool /*async*/, const bool /*force*/) {return true;} +bool NullMessageStore::init(const Options* /*options*/) {return true;} void NullMessageStore::create(PersistableQueue& queue) { diff --git a/qpid/cpp/src/qpid/broker/NullMessageStore.h b/qpid/cpp/src/qpid/broker/NullMessageStore.h index 6a2e960b0f..b83e4c44c7 100644 --- a/qpid/cpp/src/qpid/broker/NullMessageStore.h +++ b/qpid/cpp/src/qpid/broker/NullMessageStore.h @@ -38,7 +38,7 @@ class NullMessageStore : public MessageStore public: NullMessageStore(bool warn = false); - virtual bool init(const std::string& dir, const bool async, const bool force); + virtual bool init(const Options* options); virtual std::auto_ptr<TransactionContext> begin(); virtual std::auto_ptr<TPCTransactionContext> begin(const std::string& xid); virtual void prepare(TPCTransactionContext& txn); |