diff options
author | Gordon Sim <gsim@apache.org> | 2009-02-12 20:01:32 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2009-02-12 20:01:32 +0000 |
commit | b446b29e2a948bafba4c9afa13afb3b6d4ab669e (patch) | |
tree | d0308a18078a2206626833af4bac1b51dfdecc70 | |
parent | fcdd0466a29df17e20d4d7c158cad4f91ca8700a (diff) | |
download | qpid-python-b446b29e2a948bafba4c9afa13afb3b6d4ab669e.tar.gz |
Queue::checkLvqReplace() needs to update the lvq map if it makes a replacement or the map will contain a pointer to a stale message and further updates will be lost.
Also added in locking to set-/get- ReplacementMessage() in Message.cpp
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@743857 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/Message.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 13 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.h | 2 |
3 files changed, 14 insertions, 3 deletions
diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp index ce0477b08c..133e2b5ad1 100644 --- a/qpid/cpp/src/qpid/broker/Message.cpp +++ b/qpid/cpp/src/qpid/broker/Message.cpp @@ -344,6 +344,7 @@ bool Message::hasExpired() boost::intrusive_ptr<Message>& Message::getReplacementMessage(const Queue* qfor) const { + sys::Mutex::ScopedLock l(lock); Replacement::iterator i = replacement.find(qfor); if (i != replacement.end()){ return i->second; @@ -353,6 +354,7 @@ boost::intrusive_ptr<Message>& Message::getReplacementMessage(const Queue* qfor) void Message::setReplacementMessage(boost::intrusive_ptr<Message> msg, const Queue* qfor) { + sys::Mutex::ScopedLock l(lock); replacement[qfor] = msg; } diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index c9ee7f394f..bcce83af1e 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -587,10 +587,19 @@ QueuedMessage Queue::getFront() return msg; } -QueuedMessage& Queue::checkLvqReplace(QueuedMessage& msg) const +QueuedMessage& Queue::checkLvqReplace(QueuedMessage& msg) { boost::intrusive_ptr<Message> replacement = msg.payload->getReplacementMessage(this); - if (replacement.get()) msg.payload = replacement; + if (replacement.get()) { + const framing::FieldTable* ft = replacement->getApplicationHeaders(); + if (ft) { + string key = ft->getAsString(qpidVQMatchProperty); + if (lvq.find(key) != lvq.end()){ + lvq[key] = replacement; + } + } + msg.payload = replacement; + } return msg; } diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 61fbd45de8..14849b3c8e 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -116,7 +116,7 @@ namespace qpid { void dequeued(const QueuedMessage& msg); void popAndDequeue(); QueuedMessage getFront(); - QueuedMessage& checkLvqReplace(QueuedMessage& msg) const; + QueuedMessage& checkLvqReplace(QueuedMessage& msg); void clearLVQIndex(const QueuedMessage& msg); inline void mgntEnqStats(const boost::intrusive_ptr<Message>& msg) |