summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Message.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Message.cpp')
-rw-r--r--cpp/src/qpid/broker/Message.cpp52
1 files changed, 26 insertions, 26 deletions
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index 639f04faa2..8731a29d24 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,6 +22,7 @@
#include "qpid/broker/Message.h"
#include "qpid/broker/ExchangeRegistry.h"
#include "qpid/broker/ExpiryPolicy.h"
+#include "qpid/broker/NullMessageStore.h"
#include "qpid/StringUtils.h"
#include "qpid/framing/frame_functors.h"
#include "qpid/framing/FieldTable.h"
@@ -48,7 +49,7 @@ TransferAdapter Message::TRANSFER;
Message::Message(const framing::SequenceNumber& id) :
frames(id), persistenceId(0), redelivered(false), loaded(false),
- staged(false), forcePersistentPolicy(false), publisher(0), adapter(0),
+ staged(false), forcePersistentPolicy(false), publisher(0), adapter(0),
expiration(FAR_FUTURE), enqueueCallback(0), dequeueCallback(0) {}
Message::~Message()
@@ -75,7 +76,7 @@ std::string Message::getRoutingKey() const
return getAdapter().getRoutingKey(frames);
}
-std::string Message::getExchangeName() const
+std::string Message::getExchangeName() const
{
return getAdapter().getExchange(frames);
}
@@ -84,7 +85,7 @@ const boost::shared_ptr<Exchange> Message::getExchange(ExchangeRegistry& registr
{
if (!exchange) {
exchange = registry.get(getExchangeName());
- }
+ }
return exchange;
}
@@ -98,7 +99,7 @@ const FieldTable* Message::getApplicationHeaders() const
return getAdapter().getApplicationHeaders(frames);
}
-bool Message::isPersistent()
+bool Message::isPersistent() const
{
return (getAdapter().isPersistent(frames) || forcePersistentPolicy);
}
@@ -175,26 +176,25 @@ void Message::decodeContent(framing::Buffer& buffer)
} else {
//adjust header flags
MarkLastSegment f;
- frames.map_if(f, TypeFilter<HEADER_BODY>());
+ frames.map_if(f, TypeFilter<HEADER_BODY>());
}
//mark content loaded
loaded = true;
}
-void Message::releaseContent(MessageStore* _store)
+void Message::releaseContent(bool immediate)
{
- if (!store) {
- store = _store;
- }
- if (store) {
+ if (store && !NullMessageStore::isNullStore(store) && (immediate || releaseMgr.canRelease())) {
if (!getPersistenceId()) {
intrusive_ptr<PersistableMessage> pmsg(this);
store->stage(pmsg);
staged = true;
- }
- //remove any content frames from the frameset
- frames.remove(TypeFilter<CONTENT_BODY>());
- setContentReleased();
+ frames.remove(TypeFilter<CONTENT_BODY>());
+ setContentReleased();
+ } else if (immediate || releaseMgr.canRelease()) {
+ frames.remove(TypeFilter<CONTENT_BODY>());
+ setContentReleased();
+ }
}
}
@@ -213,7 +213,7 @@ bool Message::getContentFrame(const Queue& queue, AMQFrame& frame, uint16_t maxC
{
if (isContentReleased()) {
intrusive_ptr<const PersistableMessage> pmsg(this);
-
+
bool done = false;
string& data = frame.castBody<AMQContentBody>()->getData();
store->loadContent(queue, pmsg, data, offset, maxContentSize);
@@ -239,7 +239,7 @@ void Message::sendContent(const Queue& queue, framing::FrameHandler& out, uint16
uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
bool morecontent = true;
for (uint64_t offset = 0; morecontent; offset += maxContentSize)
- {
+ {
AMQFrame frame((AMQContentBody()));
morecontent = getContentFrame(queue, frame, maxContentSize, offset);
out.handle(frame);
@@ -257,7 +257,7 @@ void Message::sendHeader(framing::FrameHandler& out, uint16_t /*maxFrameSize*/)
{
sys::Mutex::ScopedLock l(lock);
Relay f(out);
- frames.map_if(f, TypeFilter<HEADER_BODY>());
+ frames.map_if(f, TypeFilter<HEADER_BODY>());
}
// TODO aconway 2007-11-09: Obsolete, remove. Was used to cover over
@@ -287,7 +287,7 @@ bool Message::isContentLoaded() const
}
-namespace
+namespace
{
const std::string X_QPID_TRACE("x-qpid.trace");
}
@@ -324,13 +324,13 @@ void Message::addTraceId(const std::string& id)
trace += ",";
trace += id;
headers.setString(X_QPID_TRACE, trace);
- }
+ }
}
}
-void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e)
+void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e)
{
- DeliveryProperties* props = getProperties<DeliveryProperties>();
+ DeliveryProperties* props = getProperties<DeliveryProperties>();
if (props->getTtl()) {
// AMQP requires setting the expiration property to be posix
// time_t in seconds. TTL is in milliseconds
@@ -347,7 +347,7 @@ void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e)
void Message::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) {
expiryPolicy = e;
- if (expiryPolicy)
+ if (expiryPolicy)
expiryPolicy->willExpire(*this);
}
@@ -362,7 +362,7 @@ boost::intrusive_ptr<Message>& Message::getReplacementMessage(const Queue* qfor)
Replacement::iterator i = replacement.find(qfor);
if (i != replacement.end()){
return i->second;
- }
+ }
return empty;
}
@@ -410,7 +410,7 @@ void Message::setUpdateDestination(const std::string& d)
bool Message::isUpdateMessage()
{
- return updateDestination.size() && isA<MessageTransferBody>()
+ return updateDestination.size() && isA<MessageTransferBody>()
&& getMethod<MessageTransferBody>()->getDestination() == updateDestination;
}