diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Message.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/Message.cpp | 110 |
1 files changed, 67 insertions, 43 deletions
diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp index 7360010192..e2799b0bff 100644 --- a/qpid/cpp/src/qpid/broker/Message.cpp +++ b/qpid/cpp/src/qpid/broker/Message.cpp @@ -49,7 +49,7 @@ TransferAdapter Message::TRANSFER; Message::Message(const framing::SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), loaded(false), staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), - expiration(FAR_FUTURE), enqueueCallback(0), dequeueCallback(0) {} + expiration(FAR_FUTURE), enqueueCallback(0), dequeueCallback(0), requiredCredit(0) {} Message::~Message() { @@ -98,7 +98,7 @@ const FieldTable* Message::getApplicationHeaders() const return getAdapter().getApplicationHeaders(frames); } -bool Message::isPersistent() +bool Message::isPersistent() const { return (getAdapter().isPersistent(frames) || forcePersistentPolicy); } @@ -108,12 +108,16 @@ bool Message::requiresAccept() return getAdapter().requiresAccept(frames); } -uint32_t Message::getRequiredCredit() const +uint32_t Message::getRequiredCredit() { - //add up payload for all header and content frames in the frameset - SumBodySize sum; - frames.map_if(sum, TypeFilter2<HEADER_BODY, CONTENT_BODY>()); - return sum.getSize(); + sys::Mutex::ScopedLock l(lock); + if (!requiredCredit) { + //add up payload for all header and content frames in the frameset + SumBodySize sum; + frames.map_if(sum, TypeFilter2<HEADER_BODY, CONTENT_BODY>()); + requiredCredit = sum.getSize(); + } + return requiredCredit; } void Message::encode(framing::Buffer& buffer) const @@ -181,17 +185,31 @@ void Message::decodeContent(framing::Buffer& buffer) loaded = true; } -void Message::releaseContent(MessageStore* _store) +void Message::tryReleaseContent() { - if (!store) { - store = _store; + if (checkContentReleasable()) { + releaseContent(); } +} + +void Message::releaseContent(MessageStore* s) +{ + //deprecated, use setStore(store); releaseContent(); instead + if (!store) setStore(s); + releaseContent(); +} + +void Message::releaseContent() +{ + sys::Mutex::ScopedLock l(lock); if (store) { if (!getPersistenceId()) { intrusive_ptr<PersistableMessage> pmsg(this); store->stage(pmsg); staged = true; } + //ensure required credit is cached before content frames are released + getRequiredCredit(); //remove any content frames from the frameset frames.remove(TypeFilter<CONTENT_BODY>()); setContentReleased(); @@ -211,31 +229,29 @@ void Message::destroy() bool Message::getContentFrame(const Queue& queue, AMQFrame& frame, uint16_t maxContentSize, uint64_t offset) const { - if (isContentReleased()) { - intrusive_ptr<const PersistableMessage> pmsg(this); - - bool done = false; - string& data = frame.castBody<AMQContentBody>()->getData(); - store->loadContent(queue, pmsg, data, offset, maxContentSize); - done = data.size() < maxContentSize; - frame.setBof(false); - frame.setEof(true); - QPID_LOG(debug, "loaded frame" << frame); - if (offset > 0) { - frame.setBos(false); - } - if (!done) { - frame.setEos(false); - } else return false; - return true; + intrusive_ptr<const PersistableMessage> pmsg(this); + + bool done = false; + string& data = frame.castBody<AMQContentBody>()->getData(); + store->loadContent(queue, pmsg, data, offset, maxContentSize); + done = data.size() < maxContentSize; + frame.setBof(false); + frame.setEof(true); + QPID_LOG(debug, "loaded frame" << frame); + if (offset > 0) { + frame.setBos(false); } - else return false; + if (!done) { + frame.setEos(false); + } else return false; + return true; } void Message::sendContent(const Queue& queue, framing::FrameHandler& out, uint16_t maxFrameSize) const { + sys::Mutex::ScopedLock l(lock); if (isContentReleased() && !frames.isComplete()) { - + sys::Mutex::ScopedUnlock u(lock); uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead(); bool morecontent = true; for (uint64_t offset = 0; morecontent; offset += maxContentSize) @@ -373,28 +389,36 @@ void Message::setReplacementMessage(boost::intrusive_ptr<Message> msg, const Que } void Message::allEnqueuesComplete() { - MessageCallback* cb = 0; - { - sys::Mutex::ScopedLock l(lock); - std::swap(cb, enqueueCallback); - } + sys::Mutex::ScopedLock l(callbackLock); + MessageCallback* cb = enqueueCallback; if (cb && *cb) (*cb)(intrusive_ptr<Message>(this)); } void Message::allDequeuesComplete() { - MessageCallback* cb = 0; - { - sys::Mutex::ScopedLock l(lock); - std::swap(cb, dequeueCallback); - } + sys::Mutex::ScopedLock l(callbackLock); + MessageCallback* cb = dequeueCallback; if (cb && *cb) (*cb)(intrusive_ptr<Message>(this)); } -void Message::setEnqueueCompleteCallback(MessageCallback& cb) { enqueueCallback = &cb; } -void Message::resetEnqueueCompleteCallback() { enqueueCallback = 0; } +void Message::setEnqueueCompleteCallback(MessageCallback& cb) { + sys::Mutex::ScopedLock l(callbackLock); + enqueueCallback = &cb; +} + +void Message::resetEnqueueCompleteCallback() { + sys::Mutex::ScopedLock l(callbackLock); + enqueueCallback = 0; +} -void Message::setDequeueCompleteCallback(MessageCallback& cb) { dequeueCallback = &cb; } -void Message::resetDequeueCompleteCallback() { dequeueCallback = 0; } +void Message::setDequeueCompleteCallback(MessageCallback& cb) { + sys::Mutex::ScopedLock l(callbackLock); + dequeueCallback = &cb; +} + +void Message::resetDequeueCompleteCallback() { + sys::Mutex::ScopedLock l(callbackLock); + dequeueCallback = 0; +} framing::FieldTable& Message::getOrInsertHeaders() { |