diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/QueuePolicy.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueuePolicy.cpp | 28 |
1 files changed, 14 insertions, 14 deletions
diff --git a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp index 41a6709d27..c59736969f 100644 --- a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp +++ b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp @@ -126,7 +126,7 @@ std::string QueuePolicy::getType(const FieldTable& settings) FieldTable::ValuePtr v = settings.get(typeKey); if (v && v->convertsTo<std::string>()) { std::string t = v->get<std::string>(); - transform(t.begin(), t.end(), t.begin(), tolower); + std::transform(t.begin(), t.end(), t.begin(), tolower); if (t == REJECT || t == FLOW_TO_DISK || t == RING || t == RING_STRICT) return t; } return FLOW_TO_DISK; @@ -197,11 +197,12 @@ void RingQueuePolicy::enqueued(const QueuedMessage& m) void RingQueuePolicy::dequeued(const QueuedMessage& m) { qpid::sys::Mutex::ScopedLock l(lock); - QueuePolicy::dequeued(m); //find and remove m from queue - for (Messages::iterator i = queue.begin(); i != queue.end() && m.position <= i->position; i++) { - if (i->position == m.position) { + for (Messages::iterator i = queue.begin(); i != queue.end(); i++) { + if (i->payload == m.payload) { queue.erase(i); + //now update count and size + QueuePolicy::dequeued(m); break; } } @@ -210,9 +211,11 @@ void RingQueuePolicy::dequeued(const QueuedMessage& m) bool RingQueuePolicy::isEnqueued(const QueuedMessage& m) { qpid::sys::Mutex::ScopedLock l(lock); - //for non-strict ring policy, a message can be dequeued before acked; need to detect this - for (Messages::iterator i = queue.begin(); i != queue.end() && m.position <= i->position; i++) { - if (i->position == m.position) { + //for non-strict ring policy, a message can be replaced (and + //therefore dequeued) before it is accepted or released by + //subscriber; need to detect this + for (Messages::const_iterator i = queue.begin(); i != queue.end(); i++) { + if (i->payload == m.payload) { return true; } } @@ -236,13 +239,10 @@ bool RingQueuePolicy::checkLimit(const QueuedMessage& m) oldest = queue.front(); } if (oldest.queue->acquire(oldest) || !strict) { - qpid::sys::Mutex::ScopedLock l(lock); - if (oldest.position == queue.front().position) { - queue.pop_front(); - QPID_LOG(debug, "Ring policy triggered in queue " - << (m.queue ? m.queue->getName() : std::string("unknown queue")) - << ": removed message " << oldest.position << " to make way for " << m.position); - } + oldest.queue->dequeue(0, oldest); + QPID_LOG(debug, "Ring policy triggered in queue " + << (m.queue ? m.queue->getName() : std::string("unknown queue")) + << ": removed message " << oldest.position << " to make way for " << m.position); return true; } else { QPID_LOG(debug, "Ring policy could not be triggered in queue " |