summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Message.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Message.cpp')
-rw-r--r--cpp/src/qpid/broker/Message.cpp8
1 files changed, 7 insertions, 1 deletions
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index ae4503328a..40dfba39f4 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -131,6 +131,7 @@ uint32_t Message::getRequiredCredit()
void Message::encode(framing::Buffer& buffer) const
{
+ sys::Mutex::ScopedLock l(lock);
//encode method and header frames
EncodeFrame f1(buffer);
frames.map_if(f1, TypeFilter2<METHOD_BODY, HEADER_BODY>());
@@ -142,6 +143,7 @@ void Message::encode(framing::Buffer& buffer) const
void Message::encodeContent(framing::Buffer& buffer) const
{
+ sys::Mutex::ScopedLock l(lock);
//encode the payload of each content frame
EncodeBody f2(buffer);
frames.map_if(f2, TypeFilter<CONTENT_BODY>());
@@ -154,11 +156,13 @@ uint32_t Message::encodedSize() const
uint32_t Message::encodedContentSize() const
{
+ sys::Mutex::ScopedLock l(lock);
return frames.getContentSize();
}
uint32_t Message::encodedHeaderSize() const
{
+ sys::Mutex::ScopedLock l(lock); // prevent modifications while computing size
//add up the size for all method and header frames in the frameset
SumFrameSize sum;
frames.map_if(sum, TypeFilter2<METHOD_BODY, HEADER_BODY>());
@@ -218,8 +222,9 @@ void Message::releaseContent()
store->stage(pmsg);
staged = true;
}
- //ensure required credit is cached before content frames are released
+ //ensure required credit and size is cached before content frames are released
getRequiredCredit();
+ contentSize();
//remove any content frames from the frameset
frames.remove(TypeFilter<CONTENT_BODY>());
setContentReleased();
@@ -354,6 +359,7 @@ public:
AMQHeaderBody* Message::getHeaderBody()
{
+ // expects lock to be held
if (copyHeaderOnWrite) {
CloneHeaderBody f;
frames.map_if(f, TypeFilter<HEADER_BODY>());