summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/QueuePolicy.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/QueuePolicy.cpp28
1 files changed, 14 insertions, 14 deletions
diff --git a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
index 41a6709d27..c59736969f 100644
--- a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
+++ b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
@@ -126,7 +126,7 @@ std::string QueuePolicy::getType(const FieldTable& settings)
FieldTable::ValuePtr v = settings.get(typeKey);
if (v && v->convertsTo<std::string>()) {
std::string t = v->get<std::string>();
- transform(t.begin(), t.end(), t.begin(), tolower);
+ std::transform(t.begin(), t.end(), t.begin(), tolower);
if (t == REJECT || t == FLOW_TO_DISK || t == RING || t == RING_STRICT) return t;
}
return FLOW_TO_DISK;
@@ -197,11 +197,12 @@ void RingQueuePolicy::enqueued(const QueuedMessage& m)
void RingQueuePolicy::dequeued(const QueuedMessage& m)
{
qpid::sys::Mutex::ScopedLock l(lock);
- QueuePolicy::dequeued(m);
//find and remove m from queue
- for (Messages::iterator i = queue.begin(); i != queue.end() && m.position <= i->position; i++) {
- if (i->position == m.position) {
+ for (Messages::iterator i = queue.begin(); i != queue.end(); i++) {
+ if (i->payload == m.payload) {
queue.erase(i);
+ //now update count and size
+ QueuePolicy::dequeued(m);
break;
}
}
@@ -210,9 +211,11 @@ void RingQueuePolicy::dequeued(const QueuedMessage& m)
bool RingQueuePolicy::isEnqueued(const QueuedMessage& m)
{
qpid::sys::Mutex::ScopedLock l(lock);
- //for non-strict ring policy, a message can be dequeued before acked; need to detect this
- for (Messages::iterator i = queue.begin(); i != queue.end() && m.position <= i->position; i++) {
- if (i->position == m.position) {
+ //for non-strict ring policy, a message can be replaced (and
+ //therefore dequeued) before it is accepted or released by
+ //subscriber; need to detect this
+ for (Messages::const_iterator i = queue.begin(); i != queue.end(); i++) {
+ if (i->payload == m.payload) {
return true;
}
}
@@ -236,13 +239,10 @@ bool RingQueuePolicy::checkLimit(const QueuedMessage& m)
oldest = queue.front();
}
if (oldest.queue->acquire(oldest) || !strict) {
- qpid::sys::Mutex::ScopedLock l(lock);
- if (oldest.position == queue.front().position) {
- queue.pop_front();
- QPID_LOG(debug, "Ring policy triggered in queue "
- << (m.queue ? m.queue->getName() : std::string("unknown queue"))
- << ": removed message " << oldest.position << " to make way for " << m.position);
- }
+ oldest.queue->dequeue(0, oldest);
+ QPID_LOG(debug, "Ring policy triggered in queue "
+ << (m.queue ? m.queue->getName() : std::string("unknown queue"))
+ << ": removed message " << oldest.position << " to make way for " << m.position);
return true;
} else {
QPID_LOG(debug, "Ring policy could not be triggered in queue "