diff options
author | Gordon Sim <gsim@apache.org> | 2006-12-05 13:14:38 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2006-12-05 13:14:38 +0000 |
commit | 46fb2ad9fbc3694e2a321417ecd839badd7b106e (patch) | |
tree | dde3c1f64dedb99402f69e34b02d1ba875c962aa /cpp/lib/broker/BrokerQueue.cpp | |
parent | 7107d5c1c3c8323d832184fc097a5d9223633d32 (diff) | |
download | qpid-python-46fb2ad9fbc3694e2a321417ecd839badd7b106e.tar.gz |
Added queue policy class for controlling when message content should be released from memory.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@482639 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/broker/BrokerQueue.cpp')
-rw-r--r-- | cpp/lib/broker/BrokerQueue.cpp | 51 |
1 files changed, 43 insertions, 8 deletions
diff --git a/cpp/lib/broker/BrokerQueue.cpp b/cpp/lib/broker/BrokerQueue.cpp index 4eabfdec50..26857b6d31 100644 --- a/cpp/lib/broker/BrokerQueue.cpp +++ b/cpp/lib/broker/BrokerQueue.cpp @@ -26,6 +26,7 @@ using namespace qpid::broker; using namespace qpid::sys; +using namespace qpid::framing; Queue::Queue(const string& _name, u_int32_t _autodelete, MessageStore* const _store, @@ -62,8 +63,7 @@ void Queue::deliver(Message::shared_ptr& msg){ } void Queue::recover(Message::shared_ptr& msg){ - queueing = true; - messages.push(msg); + push(msg); if (store && msg->expectedContentSize() != msg->encodedContentSize()) { msg->releaseContent(store); } @@ -72,8 +72,7 @@ void Queue::recover(Message::shared_ptr& msg){ void Queue::process(Message::shared_ptr& msg){ Mutex::ScopedLock locker(lock); if(queueing || !dispatch(msg)){ - queueing = true; - messages.push(msg); + push(msg); } } @@ -116,7 +115,7 @@ void Queue::dispatch(){ while(proceed){ Mutex::ScopedLock locker(lock); if(!messages.empty() && dispatch(messages.front())){ - messages.pop(); + pop(); }else{ dispatching = false; proceed = false; @@ -149,7 +148,7 @@ Message::shared_ptr Queue::dequeue(){ Message::shared_ptr msg; if(!messages.empty()){ msg = messages.front(); - messages.pop(); + pop(); } return msg; } @@ -157,10 +156,19 @@ Message::shared_ptr Queue::dequeue(){ u_int32_t Queue::purge(){ Mutex::ScopedLock locker(lock); int count = messages.size(); - while(!messages.empty()) messages.pop(); + while(!messages.empty()) pop(); return count; } +void Queue::pop(){ + messages.pop(); +} + +void Queue::push(Message::shared_ptr& msg){ + queueing = true; + messages.push(msg); +} + u_int32_t Queue::getMessageCount() const{ Mutex::ScopedLock locker(lock); return messages.size(); @@ -190,8 +198,30 @@ void Queue::dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const st } } -void Queue::create() +namespace { + const std::string qpidMaxSize("qpid.max_size"); + const std::string qpidMaxCount("qpid.max_count"); +} + +void Queue::create(const FieldTable& settings) +{ + //Note: currently field table only contain signed 32 bit ints, which + // restricts the values that can be set on the queue policy. + u_int32_t maxCount(0); + try { + maxCount = settings.getInt(qpidMaxSize); + } catch (FieldNotFoundException& ignore) { + } + u_int32_t maxSize(0); + try { + maxSize = settings.getInt(qpidMaxCount); + } catch (FieldNotFoundException& ignore) { + } + if (maxCount || maxSize) { + setPolicy(std::auto_ptr<QueuePolicy>(new QueuePolicy(maxCount, maxSize))); + } + if (store) { store->create(*this); } @@ -203,3 +233,8 @@ void Queue::destroy() store->destroy(*this); } } + +void Queue::setPolicy(std::auto_ptr<QueuePolicy> _policy) +{ + policy = _policy; +} |