diff options
Diffstat (limited to 'cpp/src/qpid/broker/Message.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Message.cpp | 52 |
1 files changed, 26 insertions, 26 deletions
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index 639f04faa2..8731a29d24 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -22,6 +22,7 @@ #include "qpid/broker/Message.h" #include "qpid/broker/ExchangeRegistry.h" #include "qpid/broker/ExpiryPolicy.h" +#include "qpid/broker/NullMessageStore.h" #include "qpid/StringUtils.h" #include "qpid/framing/frame_functors.h" #include "qpid/framing/FieldTable.h" @@ -48,7 +49,7 @@ TransferAdapter Message::TRANSFER; Message::Message(const framing::SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), loaded(false), - staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), + staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), expiration(FAR_FUTURE), enqueueCallback(0), dequeueCallback(0) {} Message::~Message() @@ -75,7 +76,7 @@ std::string Message::getRoutingKey() const return getAdapter().getRoutingKey(frames); } -std::string Message::getExchangeName() const +std::string Message::getExchangeName() const { return getAdapter().getExchange(frames); } @@ -84,7 +85,7 @@ const boost::shared_ptr<Exchange> Message::getExchange(ExchangeRegistry& registr { if (!exchange) { exchange = registry.get(getExchangeName()); - } + } return exchange; } @@ -98,7 +99,7 @@ const FieldTable* Message::getApplicationHeaders() const return getAdapter().getApplicationHeaders(frames); } -bool Message::isPersistent() +bool Message::isPersistent() const { return (getAdapter().isPersistent(frames) || forcePersistentPolicy); } @@ -175,26 +176,25 @@ void Message::decodeContent(framing::Buffer& buffer) } else { //adjust header flags MarkLastSegment f; - frames.map_if(f, TypeFilter<HEADER_BODY>()); + frames.map_if(f, TypeFilter<HEADER_BODY>()); } //mark content loaded loaded = true; } -void Message::releaseContent(MessageStore* _store) +void Message::releaseContent(bool immediate) { - if (!store) { - store = _store; - } - if (store) { + if (store && !NullMessageStore::isNullStore(store) && (immediate || releaseMgr.canRelease())) { if (!getPersistenceId()) { intrusive_ptr<PersistableMessage> pmsg(this); store->stage(pmsg); staged = true; - } - //remove any content frames from the frameset - frames.remove(TypeFilter<CONTENT_BODY>()); - setContentReleased(); + frames.remove(TypeFilter<CONTENT_BODY>()); + setContentReleased(); + } else if (immediate || releaseMgr.canRelease()) { + frames.remove(TypeFilter<CONTENT_BODY>()); + setContentReleased(); + } } } @@ -213,7 +213,7 @@ bool Message::getContentFrame(const Queue& queue, AMQFrame& frame, uint16_t maxC { if (isContentReleased()) { intrusive_ptr<const PersistableMessage> pmsg(this); - + bool done = false; string& data = frame.castBody<AMQContentBody>()->getData(); store->loadContent(queue, pmsg, data, offset, maxContentSize); @@ -239,7 +239,7 @@ void Message::sendContent(const Queue& queue, framing::FrameHandler& out, uint16 uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead(); bool morecontent = true; for (uint64_t offset = 0; morecontent; offset += maxContentSize) - { + { AMQFrame frame((AMQContentBody())); morecontent = getContentFrame(queue, frame, maxContentSize, offset); out.handle(frame); @@ -257,7 +257,7 @@ void Message::sendHeader(framing::FrameHandler& out, uint16_t /*maxFrameSize*/) { sys::Mutex::ScopedLock l(lock); Relay f(out); - frames.map_if(f, TypeFilter<HEADER_BODY>()); + frames.map_if(f, TypeFilter<HEADER_BODY>()); } // TODO aconway 2007-11-09: Obsolete, remove. Was used to cover over @@ -287,7 +287,7 @@ bool Message::isContentLoaded() const } -namespace +namespace { const std::string X_QPID_TRACE("x-qpid.trace"); } @@ -324,13 +324,13 @@ void Message::addTraceId(const std::string& id) trace += ","; trace += id; headers.setString(X_QPID_TRACE, trace); - } + } } } -void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e) +void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e) { - DeliveryProperties* props = getProperties<DeliveryProperties>(); + DeliveryProperties* props = getProperties<DeliveryProperties>(); if (props->getTtl()) { // AMQP requires setting the expiration property to be posix // time_t in seconds. TTL is in milliseconds @@ -347,7 +347,7 @@ void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e) void Message::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) { expiryPolicy = e; - if (expiryPolicy) + if (expiryPolicy) expiryPolicy->willExpire(*this); } @@ -362,7 +362,7 @@ boost::intrusive_ptr<Message>& Message::getReplacementMessage(const Queue* qfor) Replacement::iterator i = replacement.find(qfor); if (i != replacement.end()){ return i->second; - } + } return empty; } @@ -410,7 +410,7 @@ void Message::setUpdateDestination(const std::string& d) bool Message::isUpdateMessage() { - return updateDestination.size() && isA<MessageTransferBody>() + return updateDestination.size() && isA<MessageTransferBody>() && getMethod<MessageTransferBody>()->getDestination() == updateDestination; } |