From b446b29e2a948bafba4c9afa13afb3b6d4ab669e Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Thu, 12 Feb 2009 20:01:32 +0000 Subject: Queue::checkLvqReplace() needs to update the lvq map if it makes a replacement or the map will contain a pointer to a stale message and further updates will be lost. Also added in locking to set-/get- ReplacementMessage() in Message.cpp git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@743857 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/broker/Message.cpp | 2 ++ qpid/cpp/src/qpid/broker/Queue.cpp | 13 +++++++++++-- qpid/cpp/src/qpid/broker/Queue.h | 2 +- 3 files changed, 14 insertions(+), 3 deletions(-) diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp index ce0477b08c..133e2b5ad1 100644 --- a/qpid/cpp/src/qpid/broker/Message.cpp +++ b/qpid/cpp/src/qpid/broker/Message.cpp @@ -344,6 +344,7 @@ bool Message::hasExpired() boost::intrusive_ptr& Message::getReplacementMessage(const Queue* qfor) const { + sys::Mutex::ScopedLock l(lock); Replacement::iterator i = replacement.find(qfor); if (i != replacement.end()){ return i->second; @@ -353,6 +354,7 @@ boost::intrusive_ptr& Message::getReplacementMessage(const Queue* qfor) void Message::setReplacementMessage(boost::intrusive_ptr msg, const Queue* qfor) { + sys::Mutex::ScopedLock l(lock); replacement[qfor] = msg; } diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index c9ee7f394f..bcce83af1e 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -587,10 +587,19 @@ QueuedMessage Queue::getFront() return msg; } -QueuedMessage& Queue::checkLvqReplace(QueuedMessage& msg) const +QueuedMessage& Queue::checkLvqReplace(QueuedMessage& msg) { boost::intrusive_ptr replacement = msg.payload->getReplacementMessage(this); - if (replacement.get()) msg.payload = replacement; + if (replacement.get()) { + const framing::FieldTable* ft = replacement->getApplicationHeaders(); + if (ft) { + string key = ft->getAsString(qpidVQMatchProperty); + if (lvq.find(key) != lvq.end()){ + lvq[key] = replacement; + } + } + msg.payload = replacement; + } return msg; } diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 61fbd45de8..14849b3c8e 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -116,7 +116,7 @@ namespace qpid { void dequeued(const QueuedMessage& msg); void popAndDequeue(); QueuedMessage getFront(); - QueuedMessage& checkLvqReplace(QueuedMessage& msg) const; + QueuedMessage& checkLvqReplace(QueuedMessage& msg); void clearLVQIndex(const QueuedMessage& msg); inline void mgntEnqStats(const boost::intrusive_ptr& msg) -- cgit v1.2.1