summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/QueuePolicy.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/QueuePolicy.cpp')
-rw-r--r--cpp/src/qpid/broker/QueuePolicy.cpp36
1 files changed, 18 insertions, 18 deletions
diff --git a/cpp/src/qpid/broker/QueuePolicy.cpp b/cpp/src/qpid/broker/QueuePolicy.cpp
index 39afe90134..03a7951237 100644
--- a/cpp/src/qpid/broker/QueuePolicy.cpp
+++ b/cpp/src/qpid/broker/QueuePolicy.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -28,7 +28,7 @@
using namespace qpid::broker;
using namespace qpid::framing;
-QueuePolicy::QueuePolicy(uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) :
+QueuePolicy::QueuePolicy(uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) :
maxCount(_maxCount), maxSize(_maxSize), type(_type), count(0), size(0), policyExceeded(false) {}
void QueuePolicy::enqueued(uint64_t _size)
@@ -89,7 +89,7 @@ void QueuePolicy::tryEnqueue(const QueuedMessage& m)
} else {
std::string queue = m.queue ? m.queue->getName() : std::string("unknown queue");
throw ResourceLimitExceededException(
- QPID_MSG("Policy exceeded on " << queue << " by message " << m.position
+ QPID_MSG("Policy exceeded on " << queue << " by message " << m.position
<< " of size " << m.payload->contentSize() << " , policy: " << *this));
}
}
@@ -129,7 +129,7 @@ std::string QueuePolicy::getType(const FieldTable& settings)
FieldTable::ValuePtr v = settings.get(typeKey);
if (v && v->convertsTo<std::string>()) {
std::string t = v->get<std::string>();
- std::transform(t.begin(), t.end(), t.begin(), tolower);
+ std::transform(t.begin(), t.end(), t.begin(), tolower);
if (t == REJECT || t == FLOW_TO_DISK || t == RING || t == RING_STRICT) return t;
}
return FLOW_TO_DISK;
@@ -152,7 +152,7 @@ void QueuePolicy::encode(Buffer& buffer) const
buffer.putLongLong(size.get());
}
-void QueuePolicy::decode ( Buffer& buffer )
+void QueuePolicy::decode ( Buffer& buffer )
{
maxCount = buffer.getLong();
maxSize = buffer.getLongLong();
@@ -179,15 +179,15 @@ const std::string QueuePolicy::RING("ring");
const std::string QueuePolicy::RING_STRICT("ring_strict");
uint64_t QueuePolicy::defaultMaxSize(0);
-FlowToDiskPolicy::FlowToDiskPolicy(uint32_t _maxCount, uint64_t _maxSize) :
+FlowToDiskPolicy::FlowToDiskPolicy(uint32_t _maxCount, uint64_t _maxSize) :
QueuePolicy(_maxCount, _maxSize, FLOW_TO_DISK) {}
bool FlowToDiskPolicy::checkLimit(const QueuedMessage& m)
{
- return QueuePolicy::checkLimit(m) || m.queue->releaseMessageContent(m);
+ return QueuePolicy::checkLimit(m) || (m.queue->getPersistenceId() && m.payload->requestContentRelease());
}
-RingQueuePolicy::RingQueuePolicy(uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) :
+RingQueuePolicy::RingQueuePolicy(uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) :
QueuePolicy(_maxCount, _maxSize, _type), strict(_type == RING_STRICT) {}
bool before(const QueuedMessage& a, const QueuedMessage& b)
@@ -219,19 +219,19 @@ bool RingQueuePolicy::isEnqueued(const QueuedMessage& m)
//for non-strict ring policy, a message can be replaced (and
//therefore dequeued) before it is accepted or released by
//subscriber; need to detect this
- return find(m, pendingDequeues, false) || find(m, queue, false);
+ return find(m, pendingDequeues, false) || find(m, queue, false);
}
bool RingQueuePolicy::checkLimit(const QueuedMessage& m)
{
if (QueuePolicy::checkLimit(m)) return true;//if haven't hit limit, ok to accept
-
+
QueuedMessage oldest;
{
qpid::sys::Mutex::ScopedLock l(lock);
if (queue.empty()) {
- QPID_LOG(debug, "Message too large for ring queue "
- << (m.queue ? m.queue->getName() : std::string("unknown queue"))
+ QPID_LOG(debug, "Message too large for ring queue "
+ << (m.queue ? m.queue->getName() : std::string("unknown queue"))
<< " [" << *this << "] "
<< ": message size = " << m.payload->contentSize() << " bytes");
return false;
@@ -251,13 +251,13 @@ bool RingQueuePolicy::checkLimit(const QueuedMessage& m)
pendingDequeues.push_back(oldest);
}
oldest.queue->addPendingDequeue(oldest);
- QPID_LOG(debug, "Ring policy triggered in queue "
+ QPID_LOG(debug, "Ring policy triggered in queue "
<< (m.queue ? m.queue->getName() : std::string("unknown queue"))
<< ": removed message " << oldest.position << " to make way for " << m.position);
return true;
} else {
- QPID_LOG(debug, "Ring policy could not be triggered in queue "
- << (m.queue ? m.queue->getName() : std::string("unknown queue"))
+ QPID_LOG(debug, "Ring policy could not be triggered in queue "
+ << (m.queue ? m.queue->getName() : std::string("unknown queue"))
<< ": oldest message (seq-no=" << oldest.position << ") has been delivered but not yet acknowledged or requeued");
//in strict mode, if oldest message has been delivered (hence
//cannot be acquired) but not yet acked, it should not be
@@ -299,7 +299,7 @@ std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(uint32_t maxCount, uin
}
}
-
+
namespace qpid {
namespace broker {
@@ -309,7 +309,7 @@ std::ostream& operator<<(std::ostream& out, const QueuePolicy& p)
else out << "size: unlimited";
out << "; ";
if (p.maxCount) out << "count: max=" << p.maxCount << ", current=" << p.count.get();
- else out << "count: unlimited";
+ else out << "count: unlimited";
out << "; type=" << p.type;
return out;
}