summaryrefslogtreecommitdiff
path: root/cpp/lib/broker/BrokerQueue.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2006-12-05 13:14:38 +0000
committerGordon Sim <gsim@apache.org>2006-12-05 13:14:38 +0000
commit46fb2ad9fbc3694e2a321417ecd839badd7b106e (patch)
treedde3c1f64dedb99402f69e34b02d1ba875c962aa /cpp/lib/broker/BrokerQueue.cpp
parent7107d5c1c3c8323d832184fc097a5d9223633d32 (diff)
downloadqpid-python-46fb2ad9fbc3694e2a321417ecd839badd7b106e.tar.gz
Added queue policy class for controlling when message content should be released from memory.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@482639 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/broker/BrokerQueue.cpp')
-rw-r--r--cpp/lib/broker/BrokerQueue.cpp51
1 files changed, 43 insertions, 8 deletions
diff --git a/cpp/lib/broker/BrokerQueue.cpp b/cpp/lib/broker/BrokerQueue.cpp
index 4eabfdec50..26857b6d31 100644
--- a/cpp/lib/broker/BrokerQueue.cpp
+++ b/cpp/lib/broker/BrokerQueue.cpp
@@ -26,6 +26,7 @@
using namespace qpid::broker;
using namespace qpid::sys;
+using namespace qpid::framing;
Queue::Queue(const string& _name, u_int32_t _autodelete,
MessageStore* const _store,
@@ -62,8 +63,7 @@ void Queue::deliver(Message::shared_ptr& msg){
}
void Queue::recover(Message::shared_ptr& msg){
- queueing = true;
- messages.push(msg);
+ push(msg);
if (store && msg->expectedContentSize() != msg->encodedContentSize()) {
msg->releaseContent(store);
}
@@ -72,8 +72,7 @@ void Queue::recover(Message::shared_ptr& msg){
void Queue::process(Message::shared_ptr& msg){
Mutex::ScopedLock locker(lock);
if(queueing || !dispatch(msg)){
- queueing = true;
- messages.push(msg);
+ push(msg);
}
}
@@ -116,7 +115,7 @@ void Queue::dispatch(){
while(proceed){
Mutex::ScopedLock locker(lock);
if(!messages.empty() && dispatch(messages.front())){
- messages.pop();
+ pop();
}else{
dispatching = false;
proceed = false;
@@ -149,7 +148,7 @@ Message::shared_ptr Queue::dequeue(){
Message::shared_ptr msg;
if(!messages.empty()){
msg = messages.front();
- messages.pop();
+ pop();
}
return msg;
}
@@ -157,10 +156,19 @@ Message::shared_ptr Queue::dequeue(){
u_int32_t Queue::purge(){
Mutex::ScopedLock locker(lock);
int count = messages.size();
- while(!messages.empty()) messages.pop();
+ while(!messages.empty()) pop();
return count;
}
+void Queue::pop(){
+ messages.pop();
+}
+
+void Queue::push(Message::shared_ptr& msg){
+ queueing = true;
+ messages.push(msg);
+}
+
u_int32_t Queue::getMessageCount() const{
Mutex::ScopedLock locker(lock);
return messages.size();
@@ -190,8 +198,30 @@ void Queue::dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const st
}
}
-void Queue::create()
+namespace
{
+ const std::string qpidMaxSize("qpid.max_size");
+ const std::string qpidMaxCount("qpid.max_count");
+}
+
+void Queue::create(const FieldTable& settings)
+{
+ //Note: currently field table only contain signed 32 bit ints, which
+ // restricts the values that can be set on the queue policy.
+ u_int32_t maxCount(0);
+ try {
+ maxCount = settings.getInt(qpidMaxSize);
+ } catch (FieldNotFoundException& ignore) {
+ }
+ u_int32_t maxSize(0);
+ try {
+ maxSize = settings.getInt(qpidMaxCount);
+ } catch (FieldNotFoundException& ignore) {
+ }
+ if (maxCount || maxSize) {
+ setPolicy(std::auto_ptr<QueuePolicy>(new QueuePolicy(maxCount, maxSize)));
+ }
+
if (store) {
store->create(*this);
}
@@ -203,3 +233,8 @@ void Queue::destroy()
store->destroy(*this);
}
}
+
+void Queue::setPolicy(std::auto_ptr<QueuePolicy> _policy)
+{
+ policy = _policy;
+}