diff options
Diffstat (limited to 'cpp/lib/broker')
-rw-r--r-- | cpp/lib/broker/BrokerChannel.cpp | 6 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerChannel.h | 7 | ||||
-rw-r--r-- | cpp/lib/broker/Configuration.cpp | 29 | ||||
-rw-r--r-- | cpp/lib/broker/Configuration.h | 17 | ||||
-rw-r--r-- | cpp/lib/broker/QueueRegistry.cpp | 4 | ||||
-rw-r--r-- | cpp/lib/broker/QueueRegistry.h | 6 | ||||
-rw-r--r-- | cpp/lib/broker/SessionHandlerFactoryImpl.cpp | 9 | ||||
-rw-r--r-- | cpp/lib/broker/SessionHandlerFactoryImpl.h | 5 | ||||
-rw-r--r-- | cpp/lib/broker/SessionHandlerImpl.cpp | 9 | ||||
-rw-r--r-- | cpp/lib/broker/SessionHandlerImpl.h | 12 |
10 files changed, 86 insertions, 18 deletions
diff --git a/cpp/lib/broker/BrokerChannel.cpp b/cpp/lib/broker/BrokerChannel.cpp index 42e45dd291..9bbdcabfc9 100644 --- a/cpp/lib/broker/BrokerChannel.cpp +++ b/cpp/lib/broker/BrokerChannel.cpp @@ -31,7 +31,7 @@ using namespace qpid::framing; using namespace qpid::sys; -Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize) : +Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize, MessageStore* const _store, u_int64_t _stagingThreshold) : id(_id), out(_out), currentDeliveryTag(1), @@ -40,8 +40,8 @@ Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize) : prefetchCount(0), framesize(_framesize), tagGenerator("sgen"), - store(0), - messageBuilder(this){ + store(_store), + messageBuilder(this, _store, _stagingThreshold){ outstanding.reset(); } diff --git a/cpp/lib/broker/BrokerChannel.h b/cpp/lib/broker/BrokerChannel.h index 804d6866b1..50c8358e96 100644 --- a/cpp/lib/broker/BrokerChannel.h +++ b/cpp/lib/broker/BrokerChannel.h @@ -36,7 +36,7 @@ #include <NameGenerator.h> #include <Prefetch.h> #include <BrokerQueue.h> -#include <TransactionalStore.h> +#include <MessageStore.h> #include <TxAck.h> #include <TxBuffer.h> #include <TxPublish.h> @@ -85,7 +85,7 @@ namespace qpid { qpid::sys::Mutex deliveryLock; TxBuffer txBuffer; AccumulatedAck accumulatedAck; - TransactionalStore* store; + MessageStore* const store; MessageBuilder messageBuilder;//builder for in-progress message Exchange::shared_ptr exchange;//exchange to which any in-progress message was published to @@ -95,7 +95,8 @@ namespace qpid { bool checkPrefetch(Message::shared_ptr& msg); public: - Channel(qpid::framing::OutputHandler* out, int id, u_int32_t framesize); + Channel(qpid::framing::OutputHandler* out, int id, u_int32_t framesize, + MessageStore* const _store = 0, u_int64_t stagingThreshold = 0); ~Channel(); inline void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; } inline Queue::shared_ptr getDefaultQueue(){ return defaultQueue; } diff --git a/cpp/lib/broker/Configuration.cpp b/cpp/lib/broker/Configuration.cpp index ccc5de7fa9..65d60ae1ca 100644 --- a/cpp/lib/broker/Configuration.cpp +++ b/cpp/lib/broker/Configuration.cpp @@ -32,6 +32,7 @@ Configuration::Configuration() : maxConnections("max-connections", "Set the maximum number of connections the broker can accept (default=500).", 500), connectionBacklog("connection-backlog", "Set the connection backlog for the servers socket (default=10)", 10), store('s', "store", "Set the message store module to use (default='' which implies no store)", ""), + stagingThreshold("staging-threshold", "Set the message size threshold above which messages will be written to disk as they arrive (default=5,000,000)", 5000000), help("help", "Print usage information", false), version("version", "Print version information", false) { @@ -41,6 +42,7 @@ Configuration::Configuration() : options.push_back(&maxConnections); options.push_back(&connectionBacklog); options.push_back(&store); + options.push_back(&stagingThreshold); options.push_back(&help); options.push_back(&version); } @@ -106,6 +108,11 @@ const std::string& Configuration::getStore() const { return store.getValue(); } +long Configuration::getStagingThreshold() const { + return stagingThreshold.getValue(); +} + + Configuration::Option::Option(const char _flag, const string& _name, const string& _desc) : flag(string("-") + _flag), name("--" +_name), desc(_desc) {} @@ -192,6 +199,28 @@ void Configuration::IntOption::setValue(const std::string& _value){ value = atoi(_value.c_str()); } +// Long Option: + +Configuration::LongOption::LongOption(const char _flag, const string& _name, const string& _desc, const long _value) : + Option(_flag,_name,_desc), defaultValue(_value), value(_value) {} + +Configuration::LongOption::LongOption(const string& _name, const string& _desc, const long _value) : + Option(_name,_desc), defaultValue(_value), value(_value) {} + +Configuration::LongOption::~LongOption(){} + +long Configuration::LongOption::getValue() const { + return value; +} + +bool Configuration::LongOption::needsValue() const { + return true; +} + +void Configuration::LongOption::setValue(const std::string& _value){ + value = atol(_value.c_str()); +} + // Bool Option: Configuration::BoolOption::BoolOption(const char _flag, const string& _name, const string& _desc, const bool _value) : diff --git a/cpp/lib/broker/Configuration.h b/cpp/lib/broker/Configuration.h index 3f2ffd1662..0351601807 100644 --- a/cpp/lib/broker/Configuration.h +++ b/cpp/lib/broker/Configuration.h @@ -63,6 +63,20 @@ namespace qpid { virtual void setValue(int _value) { value = _value; } }; + class LongOption : public Option{ + const long defaultValue; + int value; + public: + LongOption(char flag, const std::string& name, const std::string& desc, const long value = 0); + LongOption(const std::string& name, const std::string& desc, const long value = 0); + virtual ~LongOption(); + + long getValue() const; + virtual bool needsValue() const; + virtual void setValue(const std::string& value); + virtual void setValue(int _value) { value = _value; } + }; + class StringOption : public Option{ const std::string defaultValue; std::string value; @@ -96,6 +110,7 @@ namespace qpid { IntOption maxConnections; IntOption connectionBacklog; StringOption store; + LongOption stagingThreshold; BoolOption help; BoolOption version; char const *programName; @@ -123,6 +138,7 @@ namespace qpid { int getMaxConnections() const; int getConnectionBacklog() const; const std::string& getStore() const; + long getStagingThreshold() const; void setHelp(bool b) { help.setValue(b); } void setVersion(bool b) { version.setValue(b); } @@ -132,6 +148,7 @@ namespace qpid { void setMaxConnections(int i) { maxConnections.setValue(i); } void setConnectionBacklog(int i) { connectionBacklog.setValue(i); } void setStore(const std::string& s) { store.setValue(s); } + void setStagingThreshold(long l) { stagingThreshold.setValue(l); } void usage(); }; diff --git a/cpp/lib/broker/QueueRegistry.cpp b/cpp/lib/broker/QueueRegistry.cpp index 304f696a7f..2d1382ef09 100644 --- a/cpp/lib/broker/QueueRegistry.cpp +++ b/cpp/lib/broker/QueueRegistry.cpp @@ -73,3 +73,7 @@ string QueueRegistry::generateName(){ } while(queues.find(name) != queues.end()); return name; } + +MessageStore* const QueueRegistry::getStore() const { + return store; +} diff --git a/cpp/lib/broker/QueueRegistry.h b/cpp/lib/broker/QueueRegistry.h index bb9f2f4f26..7232024675 100644 --- a/cpp/lib/broker/QueueRegistry.h +++ b/cpp/lib/broker/QueueRegistry.h @@ -74,6 +74,12 @@ class QueueRegistry{ */ string generateName(); + /** + * Return the message store used. + */ + MessageStore* const getStore() const; + + private: typedef std::map<string, Queue::shared_ptr> QueueMap; QueueMap queues; diff --git a/cpp/lib/broker/SessionHandlerFactoryImpl.cpp b/cpp/lib/broker/SessionHandlerFactoryImpl.cpp index 2cc09a67e0..1b5441e3cf 100644 --- a/cpp/lib/broker/SessionHandlerFactoryImpl.cpp +++ b/cpp/lib/broker/SessionHandlerFactoryImpl.cpp @@ -39,9 +39,9 @@ const std::string amq_fanout("amq.fanout"); const std::string amq_match("amq.match"); } -SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(const std::string& _store, u_int32_t _timeout) : +SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(const std::string& _store, u_int64_t _stagingThreshold, u_int32_t _timeout) : store(_store.empty() ? (MessageStore*) new NullMessageStore() : (MessageStore*) new MessageStoreModule(_store)), - queues(store.get()), timeout(_timeout), cleaner(&queues, timeout/10) + queues(store.get()), settings(_timeout, _stagingThreshold), cleaner(&queues, _timeout/10) { exchanges.declare(empty, DirectExchange::typeName); // Default exchange. exchanges.declare(amq_direct, DirectExchange::typeName); @@ -51,7 +51,8 @@ SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(const std::string& _store, if(store.get()) { RecoveryManager recoverer(queues, exchanges); - store->recover(recoverer); + MessageStoreSettings storeSettings = { settings.stagingThreshold }; + store->recover(recoverer, &storeSettings); } cleaner.start(); @@ -59,7 +60,7 @@ SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(const std::string& _store, SessionHandler* SessionHandlerFactoryImpl::create(SessionContext* ctxt) { - return new SessionHandlerImpl(ctxt, &queues, &exchanges, &cleaner, timeout); + return new SessionHandlerImpl(ctxt, &queues, &exchanges, &cleaner, settings); } SessionHandlerFactoryImpl::~SessionHandlerFactoryImpl() diff --git a/cpp/lib/broker/SessionHandlerFactoryImpl.h b/cpp/lib/broker/SessionHandlerFactoryImpl.h index 73ae879a58..a69b67b08d 100644 --- a/cpp/lib/broker/SessionHandlerFactoryImpl.h +++ b/cpp/lib/broker/SessionHandlerFactoryImpl.h @@ -31,6 +31,7 @@ #include <sys/SessionHandler.h> #include <sys/SessionHandlerFactory.h> #include <sys/TimeoutHandler.h> +#include <SessionHandlerImpl.h> #include <memory> namespace qpid { @@ -41,10 +42,10 @@ namespace qpid { std::auto_ptr<MessageStore> store; QueueRegistry queues; ExchangeRegistry exchanges; - const u_int32_t timeout;//timeout for auto-deleted queues (in ms) + const Settings settings; AutoDelete cleaner; public: - SessionHandlerFactoryImpl(const std::string& store = "", u_int32_t timeout = 30000); + SessionHandlerFactoryImpl(const std::string& store = "", u_int64_t stagingThreshold = 0, u_int32_t timeout = 30000); virtual qpid::sys::SessionHandler* create(qpid::sys::SessionContext* ctxt); virtual ~SessionHandlerFactoryImpl(); }; diff --git a/cpp/lib/broker/SessionHandlerImpl.cpp b/cpp/lib/broker/SessionHandlerImpl.cpp index 6d7f5048ea..8757cc2fc3 100644 --- a/cpp/lib/broker/SessionHandlerImpl.cpp +++ b/cpp/lib/broker/SessionHandlerImpl.cpp @@ -35,7 +35,7 @@ SessionHandlerImpl::SessionHandlerImpl(SessionContext* _context, QueueRegistry* _queues, ExchangeRegistry* _exchanges, AutoDelete* _cleaner, - const u_int32_t _timeout) : + const Settings& _settings) : context(_context), // AMQP version management change - kpvdr 2006-11-17 // TODO: Make this class version-aware and link these hard-wired numbers to that version @@ -43,7 +43,7 @@ SessionHandlerImpl::SessionHandlerImpl(SessionContext* _context, queues(_queues), exchanges(_exchanges), cleaner(_cleaner), - timeout(_timeout), + settings(_settings), basicHandler(new BasicHandlerImpl(this)), channelHandler(new ChannelHandlerImpl(this)), connectionHandler(new ConnectionHandlerImpl(this)), @@ -200,7 +200,8 @@ void SessionHandlerImpl::ConnectionHandlerImpl::closeOk(u_int16_t /*channel*/){ void SessionHandlerImpl::ChannelHandlerImpl::open(u_int16_t channel, const string& /*outOfBand*/){ - parent->channels[channel] = new Channel(parent->context, channel, parent->framemax); + parent->channels[channel] = new Channel(parent->context, channel, parent->framemax, + parent->queues->getStore(), parent->settings.stagingThreshold); parent->client.getChannel().openOk(channel); } @@ -262,7 +263,7 @@ void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t queue = parent->getQueue(name, channel); } else { std::pair<Queue::shared_ptr, bool> queue_created = - parent->queues->declare(name, durable, autoDelete ? parent->timeout : 0, exclusive ? parent : 0); + parent->queues->declare(name, durable, autoDelete ? parent->settings.timeout : 0, exclusive ? parent : 0); queue = queue_created.first; assert(queue); if (queue_created.second) { // This is a new queue diff --git a/cpp/lib/broker/SessionHandlerImpl.h b/cpp/lib/broker/SessionHandlerImpl.h index 4b89dbeaa1..98c87a7806 100644 --- a/cpp/lib/broker/SessionHandlerImpl.h +++ b/cpp/lib/broker/SessionHandlerImpl.h @@ -60,6 +60,14 @@ struct ConnectionException : public std::exception { const char* what() const throw() { return text.c_str(); } }; +class Settings { +public: + const u_int32_t timeout;//timeout for auto-deleted queues (in ms) + const u_int64_t stagingThreshold; + + Settings(u_int32_t _timeout, u_int64_t _stagingThreshold) : timeout(_timeout), stagingThreshold(_stagingThreshold) {} +}; + class SessionHandlerImpl : public virtual qpid::sys::SessionHandler, public virtual qpid::framing::AMQP_ServerOperations, public virtual ConnectionToken @@ -72,7 +80,7 @@ class SessionHandlerImpl : public virtual qpid::sys::SessionHandler, QueueRegistry* queues; ExchangeRegistry* const exchanges; AutoDelete* const cleaner; - const u_int32_t timeout;//timeout for auto-deleted queues (in ms) + const Settings settings; std::auto_ptr<BasicHandler> basicHandler; std::auto_ptr<ChannelHandler> channelHandler; @@ -104,7 +112,7 @@ class SessionHandlerImpl : public virtual qpid::sys::SessionHandler, public: SessionHandlerImpl(qpid::sys::SessionContext* context, QueueRegistry* queues, - ExchangeRegistry* exchanges, AutoDelete* cleaner, const u_int32_t timeout); + ExchangeRegistry* exchanges, AutoDelete* cleaner, const Settings& settings); virtual void received(qpid::framing::AMQFrame* frame); virtual void initiated(qpid::framing::ProtocolInitiation* header); virtual void idleOut(); |