summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp14
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h2
-rw-r--r--qpid/cpp/src/qpid/broker/MessageStore.h3
-rw-r--r--qpid/cpp/src/qpid/broker/MessageStoreModule.cpp49
-rw-r--r--qpid/cpp/src/qpid/broker/MessageStoreModule.h2
-rw-r--r--qpid/cpp/src/qpid/broker/NullMessageStore.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/NullMessageStore.h2
7 files changed, 44 insertions, 30 deletions
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index 95769d224d..9ac73c0219 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -72,6 +72,8 @@ Broker::Options::Options(const std::string& name) :
storeDir("/var"),
storeAsync(false),
storeForce(false),
+ numJrnlFiles(8),
+ jrnlFsizePgs(24),
enableMgmt(0),
mgmtPubInterval(10),
ack(0)
@@ -89,14 +91,20 @@ Broker::Options::Options(const std::string& name) :
"Sets the connection backlog limit for the server socket")
("staging-threshold", optValue(stagingThreshold, "N"),
"Stages messages over N bytes to disk")
+// TODO: These options need to come from within the store module
("store,s", optValue(store,"LIBNAME"),
"Tells the broker to use the message store shared library LIBNAME for persistence")
("store-directory", optValue(storeDir,"DIR"),
"Store directory location for persistence.")
("store-async", optValue(storeAsync,"yes|no"),
- "Use async persistence storage - if store supports it, enable AIO 0-DIRECT.")
+ "Use async persistence storage - if store supports it, enables AIO O_DIRECT.")
("store-force", optValue(storeForce,"yes|no"),
- "Force changing modes of store, will delete all existing data if mode is changed. Be SURE you want to do this")
+ "Force changing modes of store, will delete all existing data if mode is changed. Be SURE you want to do this!")
+ ("num-jfiles", qpid::optValue(numJrnlFiles, "N"),
+ "Number of files in persistence journal")
+ ("jfile-size-pgs", qpid::optValue(jrnlFsizePgs, "N"),
+ "Size of each journal file in multiples of read pages (1 read page = 64kiB)")
+// End of store module options
("mgmt,m", optValue(enableMgmt,"yes|no"),
"Enable Management")
("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"),
@@ -152,7 +160,7 @@ Broker::Broker(const Broker::Options& conf) :
exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
if(store.get()) {
- if (!store->init(conf.storeDir, conf.storeAsync, conf.storeForce)){
+ if (!store->init(&conf)){
throw Exception( "Existing Journal in different mode, backup/move existing data \
before changing modes. Or use --store-force yes to blow existing data away.");
}else{
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index 0980c970d2..c1051856a4 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -71,6 +71,8 @@ class Broker : public sys::Runnable, public Plugin::Target, public management::M
string storeDir;
bool storeAsync;
bool storeForce;
+ u_int16_t numJrnlFiles;
+ u_int32_t jrnlFsizePgs;
bool enableMgmt;
uint16_t mgmtPubInterval;
uint32_t ack;
diff --git a/qpid/cpp/src/qpid/broker/MessageStore.h b/qpid/cpp/src/qpid/broker/MessageStore.h
index 432fe30bb3..73ece93c72 100644
--- a/qpid/cpp/src/qpid/broker/MessageStore.h
+++ b/qpid/cpp/src/qpid/broker/MessageStore.h
@@ -22,6 +22,7 @@
#define _MessageStore_
#include <boost/shared_ptr.hpp>
+#include <qpid/Options.h>
#include "PersistableExchange.h"
#include "PersistableMessage.h"
#include "PersistableQueue.h"
@@ -47,7 +48,7 @@ public:
* @param async true, enable async, false, enable sync
* @param force true, delete data on mode change, false, error on mode change
*/
- virtual bool init(const std::string& dir, const bool async, const bool force) = 0;
+ virtual bool init(const Options* options) = 0;
/**
* Record the existence of a durable queue
diff --git a/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp b/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
index d769030172..4850cfb921 100644
--- a/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
+++ b/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
@@ -22,121 +22,124 @@
#include "MessageStoreModule.h"
#include <iostream>
+// This transfer protects against the unloading of the store lib prior to the handling of the exception
+#define TRANSFER_EXCEPTION(fn) try { fn; } catch (std::exception& e) { throw Exception(e.what()); }
+
using namespace qpid::broker;
MessageStoreModule::MessageStoreModule(const std::string& name) : store(name)
{
}
-bool MessageStoreModule::init(const std::string& dir, const bool async, const bool force)
+bool MessageStoreModule::init(const Options* options)
{
- return store->init(dir, async, force);
+ TRANSFER_EXCEPTION(return store->init(options));
}
void MessageStoreModule::create(PersistableQueue& queue)
{
- store->create(queue);
+ TRANSFER_EXCEPTION(store->create(queue));
}
void MessageStoreModule::destroy(PersistableQueue& queue)
{
- store->destroy(queue);
+ TRANSFER_EXCEPTION(store->destroy(queue));
}
void MessageStoreModule::create(const PersistableExchange& exchange)
{
- store->create(exchange);
+ TRANSFER_EXCEPTION(store->create(exchange));
}
void MessageStoreModule::destroy(const PersistableExchange& exchange)
{
- store->destroy(exchange);
+ TRANSFER_EXCEPTION(store->destroy(exchange));
}
void MessageStoreModule::bind(const PersistableExchange& e, const PersistableQueue& q,
const std::string& k, const framing::FieldTable& a)
{
- store->bind(e, q, k, a);
+ TRANSFER_EXCEPTION(store->bind(e, q, k, a));
}
void MessageStoreModule::unbind(const PersistableExchange& e, const PersistableQueue& q,
const std::string& k, const framing::FieldTable& a)
{
- store->unbind(e, q, k, a);
+ TRANSFER_EXCEPTION(store->unbind(e, q, k, a));
}
void MessageStoreModule::recover(RecoveryManager& registry)
{
- store->recover(registry);
+ TRANSFER_EXCEPTION(store->recover(registry));
}
void MessageStoreModule::stage( intrusive_ptr<PersistableMessage>& msg)
{
- store->stage(msg);
+ TRANSFER_EXCEPTION(store->stage(msg));
}
void MessageStoreModule::destroy(intrusive_ptr<PersistableMessage>& msg)
{
- store->destroy(msg);
+ TRANSFER_EXCEPTION(store->destroy(msg));
}
void MessageStoreModule::appendContent(intrusive_ptr<const PersistableMessage>& msg, const std::string& data)
{
- store->appendContent(msg, data);
+ TRANSFER_EXCEPTION(store->appendContent(msg, data));
}
void MessageStoreModule::loadContent(const qpid::broker::PersistableQueue& queue,
intrusive_ptr<const PersistableMessage>& msg, string& data, uint64_t offset, uint32_t length)
{
- store->loadContent(queue, msg, data, offset, length);
+ TRANSFER_EXCEPTION(store->loadContent(queue, msg, data, offset, length));
}
void MessageStoreModule::enqueue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg, const PersistableQueue& queue)
{
- store->enqueue(ctxt, msg, queue);
+ TRANSFER_EXCEPTION(store->enqueue(ctxt, msg, queue));
}
void MessageStoreModule::dequeue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg, const PersistableQueue& queue)
{
- store->dequeue(ctxt, msg, queue);
+ TRANSFER_EXCEPTION(store->dequeue(ctxt, msg, queue));
}
void MessageStoreModule::flush(const qpid::broker::PersistableQueue& queue)
{
- store->flush(queue);
+ TRANSFER_EXCEPTION(store->flush(queue));
}
u_int32_t MessageStoreModule::outstandingQueueAIO(const PersistableQueue& queue)
{
- return store->outstandingQueueAIO(queue);
+ TRANSFER_EXCEPTION(return store->outstandingQueueAIO(queue));
}
std::auto_ptr<TransactionContext> MessageStoreModule::begin()
{
- return store->begin();
+ TRANSFER_EXCEPTION(return store->begin());
}
std::auto_ptr<TPCTransactionContext> MessageStoreModule::begin(const std::string& xid)
{
- return store->begin(xid);
+ TRANSFER_EXCEPTION(return store->begin(xid));
}
void MessageStoreModule::prepare(TPCTransactionContext& txn)
{
- store->prepare(txn);
+ TRANSFER_EXCEPTION(store->prepare(txn));
}
void MessageStoreModule::commit(TransactionContext& ctxt)
{
- store->commit(ctxt);
+ TRANSFER_EXCEPTION(store->commit(ctxt));
}
void MessageStoreModule::abort(TransactionContext& ctxt)
{
- store->abort(ctxt);
+ TRANSFER_EXCEPTION(store->abort(ctxt));
}
void MessageStoreModule::collectPreparedXids(std::set<std::string>& xids)
{
- store->collectPreparedXids(xids);
+ TRANSFER_EXCEPTION(store->collectPreparedXids(xids));
}
diff --git a/qpid/cpp/src/qpid/broker/MessageStoreModule.h b/qpid/cpp/src/qpid/broker/MessageStoreModule.h
index e7404487b0..ce8e746193 100644
--- a/qpid/cpp/src/qpid/broker/MessageStoreModule.h
+++ b/qpid/cpp/src/qpid/broker/MessageStoreModule.h
@@ -38,7 +38,7 @@ class MessageStoreModule : public MessageStore
public:
MessageStoreModule(const std::string& name);
- bool init(const std::string& dir, const bool async, const bool force);
+ bool init(const Options* options);
std::auto_ptr<TransactionContext> begin();
std::auto_ptr<TPCTransactionContext> begin(const std::string& xid);
void prepare(TPCTransactionContext& txn);
diff --git a/qpid/cpp/src/qpid/broker/NullMessageStore.cpp b/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
index c0dbd9a315..5890be8d1a 100644
--- a/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
+++ b/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
@@ -49,7 +49,7 @@ using namespace qpid::broker;
NullMessageStore::NullMessageStore(bool _warn) : warn(_warn){}
-bool NullMessageStore::init(const std::string& /*dir*/, const bool /*async*/, const bool /*force*/) {return true;}
+bool NullMessageStore::init(const Options* /*options*/) {return true;}
void NullMessageStore::create(PersistableQueue& queue)
{
diff --git a/qpid/cpp/src/qpid/broker/NullMessageStore.h b/qpid/cpp/src/qpid/broker/NullMessageStore.h
index 6a2e960b0f..b83e4c44c7 100644
--- a/qpid/cpp/src/qpid/broker/NullMessageStore.h
+++ b/qpid/cpp/src/qpid/broker/NullMessageStore.h
@@ -38,7 +38,7 @@ class NullMessageStore : public MessageStore
public:
NullMessageStore(bool warn = false);
- virtual bool init(const std::string& dir, const bool async, const bool force);
+ virtual bool init(const Options* options);
virtual std::auto_ptr<TransactionContext> begin();
virtual std::auto_ptr<TPCTransactionContext> begin(const std::string& xid);
virtual void prepare(TPCTransactionContext& txn);