summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2009-09-28 13:11:06 +0000
committerGordon Sim <gsim@apache.org>2009-09-28 13:11:06 +0000
commit490d068a2aeed02fb7521caccee7bff6c1b77bb7 (patch)
treefacef1e353227b9fa35c078c67eb2dcbfb8f031c /cpp
parent9376922f0fce58400c1e9b5b20f6c6f7b279a55b (diff)
downloadqpid-python-490d068a2aeed02fb7521caccee7bff6c1b77bb7.tar.gz
r817742 (the fix for QPID-2102), introduced a deadlock and a race condition into flow-to-disk behaviour, fixed by this checkin.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@819524 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/broker/Message.cpp53
-rw-r--r--cpp/src/qpid/broker/Message.h3
2 files changed, 30 insertions, 26 deletions
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index 4b85f3b898..2d42c650ce 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -49,7 +49,7 @@ TransferAdapter Message::TRANSFER;
Message::Message(const framing::SequenceNumber& id) :
frames(id), persistenceId(0), redelivered(false), loaded(false),
staged(false), forcePersistentPolicy(false), publisher(0), adapter(0),
- expiration(FAR_FUTURE), enqueueCallback(0), dequeueCallback(0) {}
+ expiration(FAR_FUTURE), enqueueCallback(0), dequeueCallback(0), requiredCredit(0) {}
Message::~Message()
{
@@ -108,12 +108,16 @@ bool Message::requiresAccept()
return getAdapter().requiresAccept(frames);
}
-uint32_t Message::getRequiredCredit() const
+uint32_t Message::getRequiredCredit()
{
- //add up payload for all header and content frames in the frameset
- SumBodySize sum;
- frames.map_if(sum, TypeFilter2<HEADER_BODY, CONTENT_BODY>());
- return sum.getSize();
+ sys::Mutex::ScopedLock l(lock);
+ if (!requiredCredit) {
+ //add up payload for all header and content frames in the frameset
+ SumBodySize sum;
+ frames.map_if(sum, TypeFilter2<HEADER_BODY, CONTENT_BODY>());
+ requiredCredit = sum.getSize();
+ }
+ return requiredCredit;
}
void Message::encode(framing::Buffer& buffer) const
@@ -204,6 +208,8 @@ void Message::releaseContent()
store->stage(pmsg);
staged = true;
}
+ //ensure required credit is cached before content frames are released
+ getRequiredCredit();
//remove any content frames from the frameset
frames.remove(TypeFilter<CONTENT_BODY>());
setContentReleased();
@@ -223,32 +229,29 @@ void Message::destroy()
bool Message::getContentFrame(const Queue& queue, AMQFrame& frame, uint16_t maxContentSize, uint64_t offset) const
{
- if (isContentReleased()) {
- intrusive_ptr<const PersistableMessage> pmsg(this);
-
- bool done = false;
- string& data = frame.castBody<AMQContentBody>()->getData();
- store->loadContent(queue, pmsg, data, offset, maxContentSize);
- done = data.size() < maxContentSize;
- frame.setBof(false);
- frame.setEof(true);
- QPID_LOG(debug, "loaded frame" << frame);
- if (offset > 0) {
- frame.setBos(false);
- }
- if (!done) {
- frame.setEos(false);
- } else return false;
- return true;
+ intrusive_ptr<const PersistableMessage> pmsg(this);
+
+ bool done = false;
+ string& data = frame.castBody<AMQContentBody>()->getData();
+ store->loadContent(queue, pmsg, data, offset, maxContentSize);
+ done = data.size() < maxContentSize;
+ frame.setBof(false);
+ frame.setEof(true);
+ QPID_LOG(debug, "loaded frame" << frame);
+ if (offset > 0) {
+ frame.setBos(false);
}
- else return false;
+ if (!done) {
+ frame.setEos(false);
+ } else return false;
+ return true;
}
void Message::sendContent(const Queue& queue, framing::FrameHandler& out, uint16_t maxFrameSize) const
{
sys::Mutex::ScopedLock l(lock);
if (isContentReleased() && !frames.isComplete()) {
-
+ sys::Mutex::ScopedUnlock u(lock);
uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
bool morecontent = true;
for (uint64_t offset = 0; morecontent; offset += maxContentSize)
diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h
index e4f3534020..81d1808168 100644
--- a/cpp/src/qpid/broker/Message.h
+++ b/cpp/src/qpid/broker/Message.h
@@ -108,7 +108,7 @@ public:
return frames.isA<T>();
}
- uint32_t getRequiredCredit() const;
+ uint32_t getRequiredCredit();
void encode(framing::Buffer& buffer) const;
void encodeContent(framing::Buffer& buffer) const;
@@ -186,6 +186,7 @@ public:
mutable boost::intrusive_ptr<Message> empty;
MessageCallback* enqueueCallback;
MessageCallback* dequeueCallback;
+ uint32_t requiredCredit;
static std::string updateDestination;
};