diff options
author | Gordon Sim <gsim@apache.org> | 2006-12-06 12:01:40 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2006-12-06 12:01:40 +0000 |
commit | c017c1cd768a88c7e74076b660be36902059528a (patch) | |
tree | 027de49a21a56ef7ac3952b7230028ec9c883b90 /cpp | |
parent | 905c59a988010c9db7f64ee90f9d0b6e1011f0d0 (diff) | |
download | qpid-python-c017c1cd768a88c7e74076b660be36902059528a.tar.gz |
Added new configuration option for staging threshold (size above which messages
will be written to disk as content arrives rather than accumulating that content
in memory). Pass this through to all channels and to the store on recovery.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@483046 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/lib/broker/BrokerChannel.cpp | 6 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerChannel.h | 7 | ||||
-rw-r--r-- | cpp/lib/broker/Configuration.cpp | 29 | ||||
-rw-r--r-- | cpp/lib/broker/Configuration.h | 17 | ||||
-rw-r--r-- | cpp/lib/broker/QueueRegistry.cpp | 4 | ||||
-rw-r--r-- | cpp/lib/broker/QueueRegistry.h | 6 | ||||
-rw-r--r-- | cpp/lib/broker/SessionHandlerFactoryImpl.cpp | 9 | ||||
-rw-r--r-- | cpp/lib/broker/SessionHandlerFactoryImpl.h | 5 | ||||
-rw-r--r-- | cpp/lib/broker/SessionHandlerImpl.cpp | 9 | ||||
-rw-r--r-- | cpp/lib/broker/SessionHandlerImpl.h | 12 | ||||
-rw-r--r-- | cpp/tests/ConfigurationTest.cpp | 10 |
11 files changed, 96 insertions, 18 deletions
diff --git a/cpp/lib/broker/BrokerChannel.cpp b/cpp/lib/broker/BrokerChannel.cpp index 42e45dd291..9bbdcabfc9 100644 --- a/cpp/lib/broker/BrokerChannel.cpp +++ b/cpp/lib/broker/BrokerChannel.cpp @@ -31,7 +31,7 @@ using namespace qpid::framing; using namespace qpid::sys; -Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize) : +Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize, MessageStore* const _store, u_int64_t _stagingThreshold) : id(_id), out(_out), currentDeliveryTag(1), @@ -40,8 +40,8 @@ Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize) : prefetchCount(0), framesize(_framesize), tagGenerator("sgen"), - store(0), - messageBuilder(this){ + store(_store), + messageBuilder(this, _store, _stagingThreshold){ outstanding.reset(); } diff --git a/cpp/lib/broker/BrokerChannel.h b/cpp/lib/broker/BrokerChannel.h index 804d6866b1..50c8358e96 100644 --- a/cpp/lib/broker/BrokerChannel.h +++ b/cpp/lib/broker/BrokerChannel.h @@ -36,7 +36,7 @@ #include <NameGenerator.h> #include <Prefetch.h> #include <BrokerQueue.h> -#include <TransactionalStore.h> +#include <MessageStore.h> #include <TxAck.h> #include <TxBuffer.h> #include <TxPublish.h> @@ -85,7 +85,7 @@ namespace qpid { qpid::sys::Mutex deliveryLock; TxBuffer txBuffer; AccumulatedAck accumulatedAck; - TransactionalStore* store; + MessageStore* const store; MessageBuilder messageBuilder;//builder for in-progress message Exchange::shared_ptr exchange;//exchange to which any in-progress message was published to @@ -95,7 +95,8 @@ namespace qpid { bool checkPrefetch(Message::shared_ptr& msg); public: - Channel(qpid::framing::OutputHandler* out, int id, u_int32_t framesize); + Channel(qpid::framing::OutputHandler* out, int id, u_int32_t framesize, + MessageStore* const _store = 0, u_int64_t stagingThreshold = 0); ~Channel(); inline void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; } inline Queue::shared_ptr getDefaultQueue(){ return defaultQueue; } diff --git a/cpp/lib/broker/Configuration.cpp b/cpp/lib/broker/Configuration.cpp index ccc5de7fa9..65d60ae1ca 100644 --- a/cpp/lib/broker/Configuration.cpp +++ b/cpp/lib/broker/Configuration.cpp @@ -32,6 +32,7 @@ Configuration::Configuration() : maxConnections("max-connections", "Set the maximum number of connections the broker can accept (default=500).", 500), connectionBacklog("connection-backlog", "Set the connection backlog for the servers socket (default=10)", 10), store('s', "store", "Set the message store module to use (default='' which implies no store)", ""), + stagingThreshold("staging-threshold", "Set the message size threshold above which messages will be written to disk as they arrive (default=5,000,000)", 5000000), help("help", "Print usage information", false), version("version", "Print version information", false) { @@ -41,6 +42,7 @@ Configuration::Configuration() : options.push_back(&maxConnections); options.push_back(&connectionBacklog); options.push_back(&store); + options.push_back(&stagingThreshold); options.push_back(&help); options.push_back(&version); } @@ -106,6 +108,11 @@ const std::string& Configuration::getStore() const { return store.getValue(); } +long Configuration::getStagingThreshold() const { + return stagingThreshold.getValue(); +} + + Configuration::Option::Option(const char _flag, const string& _name, const string& _desc) : flag(string("-") + _flag), name("--" +_name), desc(_desc) {} @@ -192,6 +199,28 @@ void Configuration::IntOption::setValue(const std::string& _value){ value = atoi(_value.c_str()); } +// Long Option: + +Configuration::LongOption::LongOption(const char _flag, const string& _name, const string& _desc, const long _value) : + Option(_flag,_name,_desc), defaultValue(_value), value(_value) {} + +Configuration::LongOption::LongOption(const string& _name, const string& _desc, const long _value) : + Option(_name,_desc), defaultValue(_value), value(_value) {} + +Configuration::LongOption::~LongOption(){} + +long Configuration::LongOption::getValue() const { + return value; +} + +bool Configuration::LongOption::needsValue() const { + return true; +} + +void Configuration::LongOption::setValue(const std::string& _value){ + value = atol(_value.c_str()); +} + // Bool Option: Configuration::BoolOption::BoolOption(const char _flag, const string& _name, const string& _desc, const bool _value) : diff --git a/cpp/lib/broker/Configuration.h b/cpp/lib/broker/Configuration.h index 3f2ffd1662..0351601807 100644 --- a/cpp/lib/broker/Configuration.h +++ b/cpp/lib/broker/Configuration.h @@ -63,6 +63,20 @@ namespace qpid { virtual void setValue(int _value) { value = _value; } }; + class LongOption : public Option{ + const long defaultValue; + int value; + public: + LongOption(char flag, const std::string& name, const std::string& desc, const long value = 0); + LongOption(const std::string& name, const std::string& desc, const long value = 0); + virtual ~LongOption(); + + long getValue() const; + virtual bool needsValue() const; + virtual void setValue(const std::string& value); + virtual void setValue(int _value) { value = _value; } + }; + class StringOption : public Option{ const std::string defaultValue; std::string value; @@ -96,6 +110,7 @@ namespace qpid { IntOption maxConnections; IntOption connectionBacklog; StringOption store; + LongOption stagingThreshold; BoolOption help; BoolOption version; char const *programName; @@ -123,6 +138,7 @@ namespace qpid { int getMaxConnections() const; int getConnectionBacklog() const; const std::string& getStore() const; + long getStagingThreshold() const; void setHelp(bool b) { help.setValue(b); } void setVersion(bool b) { version.setValue(b); } @@ -132,6 +148,7 @@ namespace qpid { void setMaxConnections(int i) { maxConnections.setValue(i); } void setConnectionBacklog(int i) { connectionBacklog.setValue(i); } void setStore(const std::string& s) { store.setValue(s); } + void setStagingThreshold(long l) { stagingThreshold.setValue(l); } void usage(); }; diff --git a/cpp/lib/broker/QueueRegistry.cpp b/cpp/lib/broker/QueueRegistry.cpp index 304f696a7f..2d1382ef09 100644 --- a/cpp/lib/broker/QueueRegistry.cpp +++ b/cpp/lib/broker/QueueRegistry.cpp @@ -73,3 +73,7 @@ string QueueRegistry::generateName(){ } while(queues.find(name) != queues.end()); return name; } + +MessageStore* const QueueRegistry::getStore() const { + return store; +} diff --git a/cpp/lib/broker/QueueRegistry.h b/cpp/lib/broker/QueueRegistry.h index bb9f2f4f26..7232024675 100644 --- a/cpp/lib/broker/QueueRegistry.h +++ b/cpp/lib/broker/QueueRegistry.h @@ -74,6 +74,12 @@ class QueueRegistry{ */ string generateName(); + /** + * Return the message store used. + */ + MessageStore* const getStore() const; + + private: typedef std::map<string, Queue::shared_ptr> QueueMap; QueueMap queues; diff --git a/cpp/lib/broker/SessionHandlerFactoryImpl.cpp b/cpp/lib/broker/SessionHandlerFactoryImpl.cpp index 2cc09a67e0..1b5441e3cf 100644 --- a/cpp/lib/broker/SessionHandlerFactoryImpl.cpp +++ b/cpp/lib/broker/SessionHandlerFactoryImpl.cpp @@ -39,9 +39,9 @@ const std::string amq_fanout("amq.fanout"); const std::string amq_match("amq.match"); } -SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(const std::string& _store, u_int32_t _timeout) : +SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(const std::string& _store, u_int64_t _stagingThreshold, u_int32_t _timeout) : store(_store.empty() ? (MessageStore*) new NullMessageStore() : (MessageStore*) new MessageStoreModule(_store)), - queues(store.get()), timeout(_timeout), cleaner(&queues, timeout/10) + queues(store.get()), settings(_timeout, _stagingThreshold), cleaner(&queues, _timeout/10) { exchanges.declare(empty, DirectExchange::typeName); // Default exchange. exchanges.declare(amq_direct, DirectExchange::typeName); @@ -51,7 +51,8 @@ SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(const std::string& _store, if(store.get()) { RecoveryManager recoverer(queues, exchanges); - store->recover(recoverer); + MessageStoreSettings storeSettings = { settings.stagingThreshold }; + store->recover(recoverer, &storeSettings); } cleaner.start(); @@ -59,7 +60,7 @@ SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(const std::string& _store, SessionHandler* SessionHandlerFactoryImpl::create(SessionContext* ctxt) { - return new SessionHandlerImpl(ctxt, &queues, &exchanges, &cleaner, timeout); + return new SessionHandlerImpl(ctxt, &queues, &exchanges, &cleaner, settings); } SessionHandlerFactoryImpl::~SessionHandlerFactoryImpl() diff --git a/cpp/lib/broker/SessionHandlerFactoryImpl.h b/cpp/lib/broker/SessionHandlerFactoryImpl.h index 73ae879a58..a69b67b08d 100644 --- a/cpp/lib/broker/SessionHandlerFactoryImpl.h +++ b/cpp/lib/broker/SessionHandlerFactoryImpl.h @@ -31,6 +31,7 @@ #include <sys/SessionHandler.h> #include <sys/SessionHandlerFactory.h> #include <sys/TimeoutHandler.h> +#include <SessionHandlerImpl.h> #include <memory> namespace qpid { @@ -41,10 +42,10 @@ namespace qpid { std::auto_ptr<MessageStore> store; QueueRegistry queues; ExchangeRegistry exchanges; - const u_int32_t timeout;//timeout for auto-deleted queues (in ms) + const Settings settings; AutoDelete cleaner; public: - SessionHandlerFactoryImpl(const std::string& store = "", u_int32_t timeout = 30000); + SessionHandlerFactoryImpl(const std::string& store = "", u_int64_t stagingThreshold = 0, u_int32_t timeout = 30000); virtual qpid::sys::SessionHandler* create(qpid::sys::SessionContext* ctxt); virtual ~SessionHandlerFactoryImpl(); }; diff --git a/cpp/lib/broker/SessionHandlerImpl.cpp b/cpp/lib/broker/SessionHandlerImpl.cpp index 6d7f5048ea..8757cc2fc3 100644 --- a/cpp/lib/broker/SessionHandlerImpl.cpp +++ b/cpp/lib/broker/SessionHandlerImpl.cpp @@ -35,7 +35,7 @@ SessionHandlerImpl::SessionHandlerImpl(SessionContext* _context, QueueRegistry* _queues, ExchangeRegistry* _exchanges, AutoDelete* _cleaner, - const u_int32_t _timeout) : + const Settings& _settings) : context(_context), // AMQP version management change - kpvdr 2006-11-17 // TODO: Make this class version-aware and link these hard-wired numbers to that version @@ -43,7 +43,7 @@ SessionHandlerImpl::SessionHandlerImpl(SessionContext* _context, queues(_queues), exchanges(_exchanges), cleaner(_cleaner), - timeout(_timeout), + settings(_settings), basicHandler(new BasicHandlerImpl(this)), channelHandler(new ChannelHandlerImpl(this)), connectionHandler(new ConnectionHandlerImpl(this)), @@ -200,7 +200,8 @@ void SessionHandlerImpl::ConnectionHandlerImpl::closeOk(u_int16_t /*channel*/){ void SessionHandlerImpl::ChannelHandlerImpl::open(u_int16_t channel, const string& /*outOfBand*/){ - parent->channels[channel] = new Channel(parent->context, channel, parent->framemax); + parent->channels[channel] = new Channel(parent->context, channel, parent->framemax, + parent->queues->getStore(), parent->settings.stagingThreshold); parent->client.getChannel().openOk(channel); } @@ -262,7 +263,7 @@ void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t queue = parent->getQueue(name, channel); } else { std::pair<Queue::shared_ptr, bool> queue_created = - parent->queues->declare(name, durable, autoDelete ? parent->timeout : 0, exclusive ? parent : 0); + parent->queues->declare(name, durable, autoDelete ? parent->settings.timeout : 0, exclusive ? parent : 0); queue = queue_created.first; assert(queue); if (queue_created.second) { // This is a new queue diff --git a/cpp/lib/broker/SessionHandlerImpl.h b/cpp/lib/broker/SessionHandlerImpl.h index 4b89dbeaa1..98c87a7806 100644 --- a/cpp/lib/broker/SessionHandlerImpl.h +++ b/cpp/lib/broker/SessionHandlerImpl.h @@ -60,6 +60,14 @@ struct ConnectionException : public std::exception { const char* what() const throw() { return text.c_str(); } }; +class Settings { +public: + const u_int32_t timeout;//timeout for auto-deleted queues (in ms) + const u_int64_t stagingThreshold; + + Settings(u_int32_t _timeout, u_int64_t _stagingThreshold) : timeout(_timeout), stagingThreshold(_stagingThreshold) {} +}; + class SessionHandlerImpl : public virtual qpid::sys::SessionHandler, public virtual qpid::framing::AMQP_ServerOperations, public virtual ConnectionToken @@ -72,7 +80,7 @@ class SessionHandlerImpl : public virtual qpid::sys::SessionHandler, QueueRegistry* queues; ExchangeRegistry* const exchanges; AutoDelete* const cleaner; - const u_int32_t timeout;//timeout for auto-deleted queues (in ms) + const Settings settings; std::auto_ptr<BasicHandler> basicHandler; std::auto_ptr<ChannelHandler> channelHandler; @@ -104,7 +112,7 @@ class SessionHandlerImpl : public virtual qpid::sys::SessionHandler, public: SessionHandlerImpl(qpid::sys::SessionContext* context, QueueRegistry* queues, - ExchangeRegistry* exchanges, AutoDelete* cleaner, const u_int32_t timeout); + ExchangeRegistry* exchanges, AutoDelete* cleaner, const Settings& settings); virtual void received(qpid::framing::AMQFrame* frame); virtual void initiated(qpid::framing::ProtocolInitiation* header); virtual void idleOut(); diff --git a/cpp/tests/ConfigurationTest.cpp b/cpp/tests/ConfigurationTest.cpp index c2c8eb6f32..3a1d5ba85d 100644 --- a/cpp/tests/ConfigurationTest.cpp +++ b/cpp/tests/ConfigurationTest.cpp @@ -32,6 +32,7 @@ class ConfigurationTest : public CppUnit::TestCase CPPUNIT_TEST(testPortLongForm); CPPUNIT_TEST(testPortShortForm); CPPUNIT_TEST(testStore); + CPPUNIT_TEST(testStagingThreshold); CPPUNIT_TEST(testVarious); CPPUNIT_TEST_SUITE_END(); @@ -70,6 +71,15 @@ class ConfigurationTest : public CppUnit::TestCase CPPUNIT_ASSERT_EQUAL(expected, conf.getStore()); } + void testStagingThreshold() + { + Configuration conf; + char* argv[] = {"ignore", "--staging-threshold", "123456789"}; + conf.parse("ignore", 3, argv); + long expected = 123456789; + CPPUNIT_ASSERT_EQUAL(expected, conf.getStagingThreshold()); + } + void testVarious() { Configuration conf; |