diff options
author | Gordon Sim <gsim@apache.org> | 2009-09-28 13:11:06 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2009-09-28 13:11:06 +0000 |
commit | 490d068a2aeed02fb7521caccee7bff6c1b77bb7 (patch) | |
tree | facef1e353227b9fa35c078c67eb2dcbfb8f031c /cpp | |
parent | 9376922f0fce58400c1e9b5b20f6c6f7b279a55b (diff) | |
download | qpid-python-490d068a2aeed02fb7521caccee7bff6c1b77bb7.tar.gz |
r817742 (the fix for QPID-2102), introduced a deadlock and a race condition into flow-to-disk behaviour, fixed by this checkin.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@819524 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/qpid/broker/Message.cpp | 53 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Message.h | 3 |
2 files changed, 30 insertions, 26 deletions
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index 4b85f3b898..2d42c650ce 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -49,7 +49,7 @@ TransferAdapter Message::TRANSFER; Message::Message(const framing::SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), loaded(false), staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), - expiration(FAR_FUTURE), enqueueCallback(0), dequeueCallback(0) {} + expiration(FAR_FUTURE), enqueueCallback(0), dequeueCallback(0), requiredCredit(0) {} Message::~Message() { @@ -108,12 +108,16 @@ bool Message::requiresAccept() return getAdapter().requiresAccept(frames); } -uint32_t Message::getRequiredCredit() const +uint32_t Message::getRequiredCredit() { - //add up payload for all header and content frames in the frameset - SumBodySize sum; - frames.map_if(sum, TypeFilter2<HEADER_BODY, CONTENT_BODY>()); - return sum.getSize(); + sys::Mutex::ScopedLock l(lock); + if (!requiredCredit) { + //add up payload for all header and content frames in the frameset + SumBodySize sum; + frames.map_if(sum, TypeFilter2<HEADER_BODY, CONTENT_BODY>()); + requiredCredit = sum.getSize(); + } + return requiredCredit; } void Message::encode(framing::Buffer& buffer) const @@ -204,6 +208,8 @@ void Message::releaseContent() store->stage(pmsg); staged = true; } + //ensure required credit is cached before content frames are released + getRequiredCredit(); //remove any content frames from the frameset frames.remove(TypeFilter<CONTENT_BODY>()); setContentReleased(); @@ -223,32 +229,29 @@ void Message::destroy() bool Message::getContentFrame(const Queue& queue, AMQFrame& frame, uint16_t maxContentSize, uint64_t offset) const { - if (isContentReleased()) { - intrusive_ptr<const PersistableMessage> pmsg(this); - - bool done = false; - string& data = frame.castBody<AMQContentBody>()->getData(); - store->loadContent(queue, pmsg, data, offset, maxContentSize); - done = data.size() < maxContentSize; - frame.setBof(false); - frame.setEof(true); - QPID_LOG(debug, "loaded frame" << frame); - if (offset > 0) { - frame.setBos(false); - } - if (!done) { - frame.setEos(false); - } else return false; - return true; + intrusive_ptr<const PersistableMessage> pmsg(this); + + bool done = false; + string& data = frame.castBody<AMQContentBody>()->getData(); + store->loadContent(queue, pmsg, data, offset, maxContentSize); + done = data.size() < maxContentSize; + frame.setBof(false); + frame.setEof(true); + QPID_LOG(debug, "loaded frame" << frame); + if (offset > 0) { + frame.setBos(false); } - else return false; + if (!done) { + frame.setEos(false); + } else return false; + return true; } void Message::sendContent(const Queue& queue, framing::FrameHandler& out, uint16_t maxFrameSize) const { sys::Mutex::ScopedLock l(lock); if (isContentReleased() && !frames.isComplete()) { - + sys::Mutex::ScopedUnlock u(lock); uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead(); bool morecontent = true; for (uint64_t offset = 0; morecontent; offset += maxContentSize) diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h index e4f3534020..81d1808168 100644 --- a/cpp/src/qpid/broker/Message.h +++ b/cpp/src/qpid/broker/Message.h @@ -108,7 +108,7 @@ public: return frames.isA<T>(); } - uint32_t getRequiredCredit() const; + uint32_t getRequiredCredit(); void encode(framing::Buffer& buffer) const; void encodeContent(framing::Buffer& buffer) const; @@ -186,6 +186,7 @@ public: mutable boost::intrusive_ptr<Message> empty; MessageCallback* enqueueCallback; MessageCallback* dequeueCallback; + uint32_t requiredCredit; static std::string updateDestination; }; |