summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2009-02-12 20:01:32 +0000
committerGordon Sim <gsim@apache.org>2009-02-12 20:01:32 +0000
commitb446b29e2a948bafba4c9afa13afb3b6d4ab669e (patch)
treed0308a18078a2206626833af4bac1b51dfdecc70
parentfcdd0466a29df17e20d4d7c158cad4f91ca8700a (diff)
downloadqpid-python-b446b29e2a948bafba4c9afa13afb3b6d4ab669e.tar.gz
Queue::checkLvqReplace() needs to update the lvq map if it makes a replacement or the map will contain a pointer to a stale message and further updates will be lost.
Also added in locking to set-/get- ReplacementMessage() in Message.cpp git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@743857 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Message.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp13
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h2
3 files changed, 14 insertions, 3 deletions
diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp
index ce0477b08c..133e2b5ad1 100644
--- a/qpid/cpp/src/qpid/broker/Message.cpp
+++ b/qpid/cpp/src/qpid/broker/Message.cpp
@@ -344,6 +344,7 @@ bool Message::hasExpired()
boost::intrusive_ptr<Message>& Message::getReplacementMessage(const Queue* qfor) const
{
+ sys::Mutex::ScopedLock l(lock);
Replacement::iterator i = replacement.find(qfor);
if (i != replacement.end()){
return i->second;
@@ -353,6 +354,7 @@ boost::intrusive_ptr<Message>& Message::getReplacementMessage(const Queue* qfor)
void Message::setReplacementMessage(boost::intrusive_ptr<Message> msg, const Queue* qfor)
{
+ sys::Mutex::ScopedLock l(lock);
replacement[qfor] = msg;
}
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index c9ee7f394f..bcce83af1e 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -587,10 +587,19 @@ QueuedMessage Queue::getFront()
return msg;
}
-QueuedMessage& Queue::checkLvqReplace(QueuedMessage& msg) const
+QueuedMessage& Queue::checkLvqReplace(QueuedMessage& msg)
{
boost::intrusive_ptr<Message> replacement = msg.payload->getReplacementMessage(this);
- if (replacement.get()) msg.payload = replacement;
+ if (replacement.get()) {
+ const framing::FieldTable* ft = replacement->getApplicationHeaders();
+ if (ft) {
+ string key = ft->getAsString(qpidVQMatchProperty);
+ if (lvq.find(key) != lvq.end()){
+ lvq[key] = replacement;
+ }
+ }
+ msg.payload = replacement;
+ }
return msg;
}
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index 61fbd45de8..14849b3c8e 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -116,7 +116,7 @@ namespace qpid {
void dequeued(const QueuedMessage& msg);
void popAndDequeue();
QueuedMessage getFront();
- QueuedMessage& checkLvqReplace(QueuedMessage& msg) const;
+ QueuedMessage& checkLvqReplace(QueuedMessage& msg);
void clearLVQIndex(const QueuedMessage& msg);
inline void mgntEnqStats(const boost::intrusive_ptr<Message>& msg)