diff options
Diffstat (limited to 'cpp/src/qpid/broker/Message.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Message.cpp | 85 |
1 files changed, 66 insertions, 19 deletions
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index d694d1eafd..28886826ca 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -30,7 +30,9 @@ #include "qpid/framing/SendContent.h" #include "qpid/framing/SequenceNumber.h" #include "qpid/framing/TypeFilter.h" +#include "qpid/framing/reply_exceptions.h" #include "qpid/log/Statement.h" +#include <boost/pointer_cast.hpp> #include <time.h> @@ -51,18 +53,9 @@ Message::Message(const framing::SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), loaded(false), staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), expiration(FAR_FUTURE), dequeueCallback(0), - inCallback(false), requiredCredit(0), isManagementMessage(false) + inCallback(false), requiredCredit(0), isManagementMessage(false), copyHeaderOnWrite(false) {} -Message::Message(const Message& original) : - PersistableMessage(), frames(original.frames), persistenceId(0), redelivered(false), loaded(false), - staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), - expiration(original.expiration), dequeueCallback(0), - inCallback(false), requiredCredit(0) -{ - setExpiryPolicy(original.expiryPolicy); -} - Message::~Message() {} void Message::forcePersistent() @@ -288,6 +281,9 @@ void Message::sendHeader(framing::FrameHandler& out, uint16_t /*maxFrameSize*/) sys::Mutex::ScopedLock l(lock); Relay f(out); frames.map_if(f, TypeFilter<HEADER_BODY>()); + //as frame (and pointer to body) has now been passed to handler, + //subsequent modifications should use a copy + copyHeaderOnWrite = true; } // TODO aconway 2007-11-09: Obsolete, remove. Was used to cover over @@ -342,11 +338,30 @@ bool Message::isExcluded(const std::vector<std::string>& excludes) const return false; } +class CloneHeaderBody +{ +public: + void operator()(AMQFrame& f) + { + f.cloneBody(); + } +}; + +AMQHeaderBody* Message::getHeaderBody() +{ + if (copyHeaderOnWrite) { + CloneHeaderBody f; + frames.map_if(f, TypeFilter<HEADER_BODY>()); + copyHeaderOnWrite = false; + } + return frames.getHeaders(); +} + void Message::addTraceId(const std::string& id) { sys::Mutex::ScopedLock l(lock); if (isA<MessageTransferBody>()) { - FieldTable& headers = getProperties<MessageProperties>()->getApplicationHeaders(); + FieldTable& headers = getModifiableProperties<MessageProperties>()->getApplicationHeaders(); std::string trace = headers.getAsString(X_QPID_TRACE); if (trace.empty()) { headers.setString(X_QPID_TRACE, id); @@ -360,7 +375,8 @@ void Message::addTraceId(const std::string& id) void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e) { - DeliveryProperties* props = getProperties<DeliveryProperties>(); + sys::Mutex::ScopedLock l(lock); + DeliveryProperties* props = getModifiableProperties<DeliveryProperties>(); if (props->getTtl()) { // AMQP requires setting the expiration property to be posix // time_t in seconds. TTL is in milliseconds @@ -382,9 +398,9 @@ void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e) void Message::adjustTtl() { - DeliveryProperties* props = getProperties<DeliveryProperties>(); + sys::Mutex::ScopedLock l(lock); + DeliveryProperties* props = getModifiableProperties<DeliveryProperties>(); if (props->getTtl()) { - sys::Mutex::ScopedLock l(lock); if (expiration < FAR_FUTURE) { sys::AbsTime current( expiryPolicy ? expiryPolicy->getCurrentTime() : sys::AbsTime::now()); @@ -395,6 +411,42 @@ void Message::adjustTtl() } } +void Message::setRedelivered() +{ + sys::Mutex::ScopedLock l(lock); + getModifiableProperties<framing::DeliveryProperties>()->setRedelivered(true); +} + +void Message::insertCustomProperty(const std::string& key, int64_t value) +{ + sys::Mutex::ScopedLock l(lock); + getModifiableProperties<MessageProperties>()->getApplicationHeaders().setInt64(key,value); +} + +void Message::insertCustomProperty(const std::string& key, const std::string& value) +{ + sys::Mutex::ScopedLock l(lock); + getModifiableProperties<MessageProperties>()->getApplicationHeaders().setString(key,value); +} + +void Message::removeCustomProperty(const std::string& key) +{ + sys::Mutex::ScopedLock l(lock); + getModifiableProperties<MessageProperties>()->getApplicationHeaders().erase(key); +} + +void Message::setExchange(const std::string& exchange) +{ + sys::Mutex::ScopedLock l(lock); + getModifiableProperties<DeliveryProperties>()->setExchange(exchange); +} + +void Message::clearApplicationHeadersFlag() +{ + sys::Mutex::ScopedLock l(lock); + getModifiableProperties<MessageProperties>()->clearApplicationHeadersFlag(); +} + void Message::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) { expiryPolicy = e; } @@ -442,11 +494,6 @@ uint8_t Message::getPriority() const { return getAdapter().getPriority(frames); } -framing::FieldTable& Message::getOrInsertHeaders() -{ - return getProperties<MessageProperties>()->getApplicationHeaders(); -} - bool Message::getIsManagementMessage() const { return isManagementMessage; } void Message::setIsManagementMessage(bool b) { isManagementMessage = b; } |