summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cpp/lib/broker/BrokerChannel.cpp6
-rw-r--r--cpp/lib/broker/BrokerChannel.h7
-rw-r--r--cpp/lib/broker/Configuration.cpp29
-rw-r--r--cpp/lib/broker/Configuration.h17
-rw-r--r--cpp/lib/broker/QueueRegistry.cpp4
-rw-r--r--cpp/lib/broker/QueueRegistry.h6
-rw-r--r--cpp/lib/broker/SessionHandlerFactoryImpl.cpp9
-rw-r--r--cpp/lib/broker/SessionHandlerFactoryImpl.h5
-rw-r--r--cpp/lib/broker/SessionHandlerImpl.cpp9
-rw-r--r--cpp/lib/broker/SessionHandlerImpl.h12
-rw-r--r--cpp/tests/ConfigurationTest.cpp10
11 files changed, 96 insertions, 18 deletions
diff --git a/cpp/lib/broker/BrokerChannel.cpp b/cpp/lib/broker/BrokerChannel.cpp
index 42e45dd291..9bbdcabfc9 100644
--- a/cpp/lib/broker/BrokerChannel.cpp
+++ b/cpp/lib/broker/BrokerChannel.cpp
@@ -31,7 +31,7 @@ using namespace qpid::framing;
using namespace qpid::sys;
-Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize) :
+Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize, MessageStore* const _store, u_int64_t _stagingThreshold) :
id(_id),
out(_out),
currentDeliveryTag(1),
@@ -40,8 +40,8 @@ Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize) :
prefetchCount(0),
framesize(_framesize),
tagGenerator("sgen"),
- store(0),
- messageBuilder(this){
+ store(_store),
+ messageBuilder(this, _store, _stagingThreshold){
outstanding.reset();
}
diff --git a/cpp/lib/broker/BrokerChannel.h b/cpp/lib/broker/BrokerChannel.h
index 804d6866b1..50c8358e96 100644
--- a/cpp/lib/broker/BrokerChannel.h
+++ b/cpp/lib/broker/BrokerChannel.h
@@ -36,7 +36,7 @@
#include <NameGenerator.h>
#include <Prefetch.h>
#include <BrokerQueue.h>
-#include <TransactionalStore.h>
+#include <MessageStore.h>
#include <TxAck.h>
#include <TxBuffer.h>
#include <TxPublish.h>
@@ -85,7 +85,7 @@ namespace qpid {
qpid::sys::Mutex deliveryLock;
TxBuffer txBuffer;
AccumulatedAck accumulatedAck;
- TransactionalStore* store;
+ MessageStore* const store;
MessageBuilder messageBuilder;//builder for in-progress message
Exchange::shared_ptr exchange;//exchange to which any in-progress message was published to
@@ -95,7 +95,8 @@ namespace qpid {
bool checkPrefetch(Message::shared_ptr& msg);
public:
- Channel(qpid::framing::OutputHandler* out, int id, u_int32_t framesize);
+ Channel(qpid::framing::OutputHandler* out, int id, u_int32_t framesize,
+ MessageStore* const _store = 0, u_int64_t stagingThreshold = 0);
~Channel();
inline void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; }
inline Queue::shared_ptr getDefaultQueue(){ return defaultQueue; }
diff --git a/cpp/lib/broker/Configuration.cpp b/cpp/lib/broker/Configuration.cpp
index ccc5de7fa9..65d60ae1ca 100644
--- a/cpp/lib/broker/Configuration.cpp
+++ b/cpp/lib/broker/Configuration.cpp
@@ -32,6 +32,7 @@ Configuration::Configuration() :
maxConnections("max-connections", "Set the maximum number of connections the broker can accept (default=500).", 500),
connectionBacklog("connection-backlog", "Set the connection backlog for the servers socket (default=10)", 10),
store('s', "store", "Set the message store module to use (default='' which implies no store)", ""),
+ stagingThreshold("staging-threshold", "Set the message size threshold above which messages will be written to disk as they arrive (default=5,000,000)", 5000000),
help("help", "Print usage information", false),
version("version", "Print version information", false)
{
@@ -41,6 +42,7 @@ Configuration::Configuration() :
options.push_back(&maxConnections);
options.push_back(&connectionBacklog);
options.push_back(&store);
+ options.push_back(&stagingThreshold);
options.push_back(&help);
options.push_back(&version);
}
@@ -106,6 +108,11 @@ const std::string& Configuration::getStore() const {
return store.getValue();
}
+long Configuration::getStagingThreshold() const {
+ return stagingThreshold.getValue();
+}
+
+
Configuration::Option::Option(const char _flag, const string& _name, const string& _desc) :
flag(string("-") + _flag), name("--" +_name), desc(_desc) {}
@@ -192,6 +199,28 @@ void Configuration::IntOption::setValue(const std::string& _value){
value = atoi(_value.c_str());
}
+// Long Option:
+
+Configuration::LongOption::LongOption(const char _flag, const string& _name, const string& _desc, const long _value) :
+ Option(_flag,_name,_desc), defaultValue(_value), value(_value) {}
+
+Configuration::LongOption::LongOption(const string& _name, const string& _desc, const long _value) :
+ Option(_name,_desc), defaultValue(_value), value(_value) {}
+
+Configuration::LongOption::~LongOption(){}
+
+long Configuration::LongOption::getValue() const {
+ return value;
+}
+
+bool Configuration::LongOption::needsValue() const {
+ return true;
+}
+
+void Configuration::LongOption::setValue(const std::string& _value){
+ value = atol(_value.c_str());
+}
+
// Bool Option:
Configuration::BoolOption::BoolOption(const char _flag, const string& _name, const string& _desc, const bool _value) :
diff --git a/cpp/lib/broker/Configuration.h b/cpp/lib/broker/Configuration.h
index 3f2ffd1662..0351601807 100644
--- a/cpp/lib/broker/Configuration.h
+++ b/cpp/lib/broker/Configuration.h
@@ -63,6 +63,20 @@ namespace qpid {
virtual void setValue(int _value) { value = _value; }
};
+ class LongOption : public Option{
+ const long defaultValue;
+ int value;
+ public:
+ LongOption(char flag, const std::string& name, const std::string& desc, const long value = 0);
+ LongOption(const std::string& name, const std::string& desc, const long value = 0);
+ virtual ~LongOption();
+
+ long getValue() const;
+ virtual bool needsValue() const;
+ virtual void setValue(const std::string& value);
+ virtual void setValue(int _value) { value = _value; }
+ };
+
class StringOption : public Option{
const std::string defaultValue;
std::string value;
@@ -96,6 +110,7 @@ namespace qpid {
IntOption maxConnections;
IntOption connectionBacklog;
StringOption store;
+ LongOption stagingThreshold;
BoolOption help;
BoolOption version;
char const *programName;
@@ -123,6 +138,7 @@ namespace qpid {
int getMaxConnections() const;
int getConnectionBacklog() const;
const std::string& getStore() const;
+ long getStagingThreshold() const;
void setHelp(bool b) { help.setValue(b); }
void setVersion(bool b) { version.setValue(b); }
@@ -132,6 +148,7 @@ namespace qpid {
void setMaxConnections(int i) { maxConnections.setValue(i); }
void setConnectionBacklog(int i) { connectionBacklog.setValue(i); }
void setStore(const std::string& s) { store.setValue(s); }
+ void setStagingThreshold(long l) { stagingThreshold.setValue(l); }
void usage();
};
diff --git a/cpp/lib/broker/QueueRegistry.cpp b/cpp/lib/broker/QueueRegistry.cpp
index 304f696a7f..2d1382ef09 100644
--- a/cpp/lib/broker/QueueRegistry.cpp
+++ b/cpp/lib/broker/QueueRegistry.cpp
@@ -73,3 +73,7 @@ string QueueRegistry::generateName(){
} while(queues.find(name) != queues.end());
return name;
}
+
+MessageStore* const QueueRegistry::getStore() const {
+ return store;
+}
diff --git a/cpp/lib/broker/QueueRegistry.h b/cpp/lib/broker/QueueRegistry.h
index bb9f2f4f26..7232024675 100644
--- a/cpp/lib/broker/QueueRegistry.h
+++ b/cpp/lib/broker/QueueRegistry.h
@@ -74,6 +74,12 @@ class QueueRegistry{
*/
string generateName();
+ /**
+ * Return the message store used.
+ */
+ MessageStore* const getStore() const;
+
+
private:
typedef std::map<string, Queue::shared_ptr> QueueMap;
QueueMap queues;
diff --git a/cpp/lib/broker/SessionHandlerFactoryImpl.cpp b/cpp/lib/broker/SessionHandlerFactoryImpl.cpp
index 2cc09a67e0..1b5441e3cf 100644
--- a/cpp/lib/broker/SessionHandlerFactoryImpl.cpp
+++ b/cpp/lib/broker/SessionHandlerFactoryImpl.cpp
@@ -39,9 +39,9 @@ const std::string amq_fanout("amq.fanout");
const std::string amq_match("amq.match");
}
-SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(const std::string& _store, u_int32_t _timeout) :
+SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(const std::string& _store, u_int64_t _stagingThreshold, u_int32_t _timeout) :
store(_store.empty() ? (MessageStore*) new NullMessageStore() : (MessageStore*) new MessageStoreModule(_store)),
- queues(store.get()), timeout(_timeout), cleaner(&queues, timeout/10)
+ queues(store.get()), settings(_timeout, _stagingThreshold), cleaner(&queues, _timeout/10)
{
exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
exchanges.declare(amq_direct, DirectExchange::typeName);
@@ -51,7 +51,8 @@ SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(const std::string& _store,
if(store.get()) {
RecoveryManager recoverer(queues, exchanges);
- store->recover(recoverer);
+ MessageStoreSettings storeSettings = { settings.stagingThreshold };
+ store->recover(recoverer, &storeSettings);
}
cleaner.start();
@@ -59,7 +60,7 @@ SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(const std::string& _store,
SessionHandler* SessionHandlerFactoryImpl::create(SessionContext* ctxt)
{
- return new SessionHandlerImpl(ctxt, &queues, &exchanges, &cleaner, timeout);
+ return new SessionHandlerImpl(ctxt, &queues, &exchanges, &cleaner, settings);
}
SessionHandlerFactoryImpl::~SessionHandlerFactoryImpl()
diff --git a/cpp/lib/broker/SessionHandlerFactoryImpl.h b/cpp/lib/broker/SessionHandlerFactoryImpl.h
index 73ae879a58..a69b67b08d 100644
--- a/cpp/lib/broker/SessionHandlerFactoryImpl.h
+++ b/cpp/lib/broker/SessionHandlerFactoryImpl.h
@@ -31,6 +31,7 @@
#include <sys/SessionHandler.h>
#include <sys/SessionHandlerFactory.h>
#include <sys/TimeoutHandler.h>
+#include <SessionHandlerImpl.h>
#include <memory>
namespace qpid {
@@ -41,10 +42,10 @@ namespace qpid {
std::auto_ptr<MessageStore> store;
QueueRegistry queues;
ExchangeRegistry exchanges;
- const u_int32_t timeout;//timeout for auto-deleted queues (in ms)
+ const Settings settings;
AutoDelete cleaner;
public:
- SessionHandlerFactoryImpl(const std::string& store = "", u_int32_t timeout = 30000);
+ SessionHandlerFactoryImpl(const std::string& store = "", u_int64_t stagingThreshold = 0, u_int32_t timeout = 30000);
virtual qpid::sys::SessionHandler* create(qpid::sys::SessionContext* ctxt);
virtual ~SessionHandlerFactoryImpl();
};
diff --git a/cpp/lib/broker/SessionHandlerImpl.cpp b/cpp/lib/broker/SessionHandlerImpl.cpp
index 6d7f5048ea..8757cc2fc3 100644
--- a/cpp/lib/broker/SessionHandlerImpl.cpp
+++ b/cpp/lib/broker/SessionHandlerImpl.cpp
@@ -35,7 +35,7 @@ SessionHandlerImpl::SessionHandlerImpl(SessionContext* _context,
QueueRegistry* _queues,
ExchangeRegistry* _exchanges,
AutoDelete* _cleaner,
- const u_int32_t _timeout) :
+ const Settings& _settings) :
context(_context),
// AMQP version management change - kpvdr 2006-11-17
// TODO: Make this class version-aware and link these hard-wired numbers to that version
@@ -43,7 +43,7 @@ SessionHandlerImpl::SessionHandlerImpl(SessionContext* _context,
queues(_queues),
exchanges(_exchanges),
cleaner(_cleaner),
- timeout(_timeout),
+ settings(_settings),
basicHandler(new BasicHandlerImpl(this)),
channelHandler(new ChannelHandlerImpl(this)),
connectionHandler(new ConnectionHandlerImpl(this)),
@@ -200,7 +200,8 @@ void SessionHandlerImpl::ConnectionHandlerImpl::closeOk(u_int16_t /*channel*/){
void SessionHandlerImpl::ChannelHandlerImpl::open(u_int16_t channel, const string& /*outOfBand*/){
- parent->channels[channel] = new Channel(parent->context, channel, parent->framemax);
+ parent->channels[channel] = new Channel(parent->context, channel, parent->framemax,
+ parent->queues->getStore(), parent->settings.stagingThreshold);
parent->client.getChannel().openOk(channel);
}
@@ -262,7 +263,7 @@ void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t
queue = parent->getQueue(name, channel);
} else {
std::pair<Queue::shared_ptr, bool> queue_created =
- parent->queues->declare(name, durable, autoDelete ? parent->timeout : 0, exclusive ? parent : 0);
+ parent->queues->declare(name, durable, autoDelete ? parent->settings.timeout : 0, exclusive ? parent : 0);
queue = queue_created.first;
assert(queue);
if (queue_created.second) { // This is a new queue
diff --git a/cpp/lib/broker/SessionHandlerImpl.h b/cpp/lib/broker/SessionHandlerImpl.h
index 4b89dbeaa1..98c87a7806 100644
--- a/cpp/lib/broker/SessionHandlerImpl.h
+++ b/cpp/lib/broker/SessionHandlerImpl.h
@@ -60,6 +60,14 @@ struct ConnectionException : public std::exception {
const char* what() const throw() { return text.c_str(); }
};
+class Settings {
+public:
+ const u_int32_t timeout;//timeout for auto-deleted queues (in ms)
+ const u_int64_t stagingThreshold;
+
+ Settings(u_int32_t _timeout, u_int64_t _stagingThreshold) : timeout(_timeout), stagingThreshold(_stagingThreshold) {}
+};
+
class SessionHandlerImpl : public virtual qpid::sys::SessionHandler,
public virtual qpid::framing::AMQP_ServerOperations,
public virtual ConnectionToken
@@ -72,7 +80,7 @@ class SessionHandlerImpl : public virtual qpid::sys::SessionHandler,
QueueRegistry* queues;
ExchangeRegistry* const exchanges;
AutoDelete* const cleaner;
- const u_int32_t timeout;//timeout for auto-deleted queues (in ms)
+ const Settings settings;
std::auto_ptr<BasicHandler> basicHandler;
std::auto_ptr<ChannelHandler> channelHandler;
@@ -104,7 +112,7 @@ class SessionHandlerImpl : public virtual qpid::sys::SessionHandler,
public:
SessionHandlerImpl(qpid::sys::SessionContext* context, QueueRegistry* queues,
- ExchangeRegistry* exchanges, AutoDelete* cleaner, const u_int32_t timeout);
+ ExchangeRegistry* exchanges, AutoDelete* cleaner, const Settings& settings);
virtual void received(qpid::framing::AMQFrame* frame);
virtual void initiated(qpid::framing::ProtocolInitiation* header);
virtual void idleOut();
diff --git a/cpp/tests/ConfigurationTest.cpp b/cpp/tests/ConfigurationTest.cpp
index c2c8eb6f32..3a1d5ba85d 100644
--- a/cpp/tests/ConfigurationTest.cpp
+++ b/cpp/tests/ConfigurationTest.cpp
@@ -32,6 +32,7 @@ class ConfigurationTest : public CppUnit::TestCase
CPPUNIT_TEST(testPortLongForm);
CPPUNIT_TEST(testPortShortForm);
CPPUNIT_TEST(testStore);
+ CPPUNIT_TEST(testStagingThreshold);
CPPUNIT_TEST(testVarious);
CPPUNIT_TEST_SUITE_END();
@@ -70,6 +71,15 @@ class ConfigurationTest : public CppUnit::TestCase
CPPUNIT_ASSERT_EQUAL(expected, conf.getStore());
}
+ void testStagingThreshold()
+ {
+ Configuration conf;
+ char* argv[] = {"ignore", "--staging-threshold", "123456789"};
+ conf.parse("ignore", 3, argv);
+ long expected = 123456789;
+ CPPUNIT_ASSERT_EQUAL(expected, conf.getStagingThreshold());
+ }
+
void testVarious()
{
Configuration conf;