summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/Message.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Message.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/Message.cpp110
1 files changed, 67 insertions, 43 deletions
diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp
index 7360010192..e2799b0bff 100644
--- a/qpid/cpp/src/qpid/broker/Message.cpp
+++ b/qpid/cpp/src/qpid/broker/Message.cpp
@@ -49,7 +49,7 @@ TransferAdapter Message::TRANSFER;
Message::Message(const framing::SequenceNumber& id) :
frames(id), persistenceId(0), redelivered(false), loaded(false),
staged(false), forcePersistentPolicy(false), publisher(0), adapter(0),
- expiration(FAR_FUTURE), enqueueCallback(0), dequeueCallback(0) {}
+ expiration(FAR_FUTURE), enqueueCallback(0), dequeueCallback(0), requiredCredit(0) {}
Message::~Message()
{
@@ -98,7 +98,7 @@ const FieldTable* Message::getApplicationHeaders() const
return getAdapter().getApplicationHeaders(frames);
}
-bool Message::isPersistent()
+bool Message::isPersistent() const
{
return (getAdapter().isPersistent(frames) || forcePersistentPolicy);
}
@@ -108,12 +108,16 @@ bool Message::requiresAccept()
return getAdapter().requiresAccept(frames);
}
-uint32_t Message::getRequiredCredit() const
+uint32_t Message::getRequiredCredit()
{
- //add up payload for all header and content frames in the frameset
- SumBodySize sum;
- frames.map_if(sum, TypeFilter2<HEADER_BODY, CONTENT_BODY>());
- return sum.getSize();
+ sys::Mutex::ScopedLock l(lock);
+ if (!requiredCredit) {
+ //add up payload for all header and content frames in the frameset
+ SumBodySize sum;
+ frames.map_if(sum, TypeFilter2<HEADER_BODY, CONTENT_BODY>());
+ requiredCredit = sum.getSize();
+ }
+ return requiredCredit;
}
void Message::encode(framing::Buffer& buffer) const
@@ -181,17 +185,31 @@ void Message::decodeContent(framing::Buffer& buffer)
loaded = true;
}
-void Message::releaseContent(MessageStore* _store)
+void Message::tryReleaseContent()
{
- if (!store) {
- store = _store;
+ if (checkContentReleasable()) {
+ releaseContent();
}
+}
+
+void Message::releaseContent(MessageStore* s)
+{
+ //deprecated, use setStore(store); releaseContent(); instead
+ if (!store) setStore(s);
+ releaseContent();
+}
+
+void Message::releaseContent()
+{
+ sys::Mutex::ScopedLock l(lock);
if (store) {
if (!getPersistenceId()) {
intrusive_ptr<PersistableMessage> pmsg(this);
store->stage(pmsg);
staged = true;
}
+ //ensure required credit is cached before content frames are released
+ getRequiredCredit();
//remove any content frames from the frameset
frames.remove(TypeFilter<CONTENT_BODY>());
setContentReleased();
@@ -211,31 +229,29 @@ void Message::destroy()
bool Message::getContentFrame(const Queue& queue, AMQFrame& frame, uint16_t maxContentSize, uint64_t offset) const
{
- if (isContentReleased()) {
- intrusive_ptr<const PersistableMessage> pmsg(this);
-
- bool done = false;
- string& data = frame.castBody<AMQContentBody>()->getData();
- store->loadContent(queue, pmsg, data, offset, maxContentSize);
- done = data.size() < maxContentSize;
- frame.setBof(false);
- frame.setEof(true);
- QPID_LOG(debug, "loaded frame" << frame);
- if (offset > 0) {
- frame.setBos(false);
- }
- if (!done) {
- frame.setEos(false);
- } else return false;
- return true;
+ intrusive_ptr<const PersistableMessage> pmsg(this);
+
+ bool done = false;
+ string& data = frame.castBody<AMQContentBody>()->getData();
+ store->loadContent(queue, pmsg, data, offset, maxContentSize);
+ done = data.size() < maxContentSize;
+ frame.setBof(false);
+ frame.setEof(true);
+ QPID_LOG(debug, "loaded frame" << frame);
+ if (offset > 0) {
+ frame.setBos(false);
}
- else return false;
+ if (!done) {
+ frame.setEos(false);
+ } else return false;
+ return true;
}
void Message::sendContent(const Queue& queue, framing::FrameHandler& out, uint16_t maxFrameSize) const
{
+ sys::Mutex::ScopedLock l(lock);
if (isContentReleased() && !frames.isComplete()) {
-
+ sys::Mutex::ScopedUnlock u(lock);
uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
bool morecontent = true;
for (uint64_t offset = 0; morecontent; offset += maxContentSize)
@@ -373,28 +389,36 @@ void Message::setReplacementMessage(boost::intrusive_ptr<Message> msg, const Que
}
void Message::allEnqueuesComplete() {
- MessageCallback* cb = 0;
- {
- sys::Mutex::ScopedLock l(lock);
- std::swap(cb, enqueueCallback);
- }
+ sys::Mutex::ScopedLock l(callbackLock);
+ MessageCallback* cb = enqueueCallback;
if (cb && *cb) (*cb)(intrusive_ptr<Message>(this));
}
void Message::allDequeuesComplete() {
- MessageCallback* cb = 0;
- {
- sys::Mutex::ScopedLock l(lock);
- std::swap(cb, dequeueCallback);
- }
+ sys::Mutex::ScopedLock l(callbackLock);
+ MessageCallback* cb = dequeueCallback;
if (cb && *cb) (*cb)(intrusive_ptr<Message>(this));
}
-void Message::setEnqueueCompleteCallback(MessageCallback& cb) { enqueueCallback = &cb; }
-void Message::resetEnqueueCompleteCallback() { enqueueCallback = 0; }
+void Message::setEnqueueCompleteCallback(MessageCallback& cb) {
+ sys::Mutex::ScopedLock l(callbackLock);
+ enqueueCallback = &cb;
+}
+
+void Message::resetEnqueueCompleteCallback() {
+ sys::Mutex::ScopedLock l(callbackLock);
+ enqueueCallback = 0;
+}
-void Message::setDequeueCompleteCallback(MessageCallback& cb) { dequeueCallback = &cb; }
-void Message::resetDequeueCompleteCallback() { dequeueCallback = 0; }
+void Message::setDequeueCompleteCallback(MessageCallback& cb) {
+ sys::Mutex::ScopedLock l(callbackLock);
+ dequeueCallback = &cb;
+}
+
+void Message::resetDequeueCompleteCallback() {
+ sys::Mutex::ScopedLock l(callbackLock);
+ dequeueCallback = 0;
+}
framing::FieldTable& Message::getOrInsertHeaders()
{