diff options
Diffstat (limited to 'cpp/src/qpid/broker/Message.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Message.cpp | 219 |
1 files changed, 177 insertions, 42 deletions
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index 331bb5e716..47ca7a7ae8 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -19,8 +19,9 @@ * */ -#include "Message.h" -#include "ExchangeRegistry.h" +#include "qpid/broker/Message.h" +#include "qpid/broker/ExchangeRegistry.h" +#include "qpid/broker/ExpiryPolicy.h" #include "qpid/StringUtils.h" #include "qpid/framing/frame_functors.h" #include "qpid/framing/FieldTable.h" @@ -30,17 +31,43 @@ #include "qpid/framing/TypeFilter.h" #include "qpid/log/Statement.h" +#include <time.h> + using boost::intrusive_ptr; -using namespace qpid::broker; -using namespace qpid::framing; +using qpid::sys::AbsTime; +using qpid::sys::Duration; +using qpid::sys::TIME_MSEC; +using qpid::sys::FAR_FUTURE; using std::string; +using namespace qpid::framing; + +namespace qpid { +namespace broker { TransferAdapter Message::TRANSFER; -Message::Message(const SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), loaded(false), staged(false), publisher(0), adapter(0) {} +Message::Message(const framing::SequenceNumber& id) : + frames(id), persistenceId(0), redelivered(false), loaded(false), + staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), + expiration(FAR_FUTURE), enqueueCallback(0), dequeueCallback(0), requiredCredit(0) {} Message::~Message() { + if (expiryPolicy) + expiryPolicy->forget(*this); +} + +void Message::forcePersistent() +{ + // only set forced bit if we actually need to force. + if (! getAdapter().isPersistent(frames) ){ + forcePersistentPolicy = true; + } +} + +bool Message::isForcedPersistent() +{ + return forcePersistentPolicy; } std::string Message::getRoutingKey() const @@ -71,9 +98,9 @@ const FieldTable* Message::getApplicationHeaders() const return getAdapter().getApplicationHeaders(frames); } -bool Message::isPersistent() +bool Message::isPersistent() const { - return getAdapter().isPersistent(frames); + return (getAdapter().isPersistent(frames) || forcePersistentPolicy); } bool Message::requiresAccept() @@ -81,12 +108,16 @@ bool Message::requiresAccept() return getAdapter().requiresAccept(frames); } -uint32_t Message::getRequiredCredit() const +uint32_t Message::getRequiredCredit() { - //add up payload for all header and content frames in the frameset - SumBodySize sum; - frames.map_if(sum, TypeFilter2<HEADER_BODY, CONTENT_BODY>()); - return sum.getSize(); + sys::Mutex::ScopedLock l(lock); + if (!requiredCredit) { + //add up payload for all header and content frames in the frameset + SumBodySize sum; + frames.map_if(sum, TypeFilter2<HEADER_BODY, CONTENT_BODY>()); + requiredCredit = sum.getSize(); + } + return requiredCredit; } void Message::encode(framing::Buffer& buffer) const @@ -96,7 +127,7 @@ void Message::encode(framing::Buffer& buffer) const frames.map_if(f1, TypeFilter2<METHOD_BODY, HEADER_BODY>()); //then encode the payload of each content frame - EncodeBody f2(buffer); + framing::EncodeBody f2(buffer); frames.map_if(f2, TypeFilter<CONTENT_BODY>()); } @@ -141,9 +172,9 @@ void Message::decodeContent(framing::Buffer& buffer) if (buffer.available()) { //get the data as a string and set that as the content //body on a frame then add that frame to the frameset - AMQFrame frame; - frame.setBody(AMQContentBody()); + AMQFrame frame((AMQContentBody())); frame.castBody<AMQContentBody>()->decode(buffer, buffer.available()); + frame.setFirstSegment(false); frames.append(frame); } else { //adjust header flags @@ -154,17 +185,31 @@ void Message::decodeContent(framing::Buffer& buffer) loaded = true; } -void Message::releaseContent(MessageStore* _store) +void Message::tryReleaseContent() { - if (!store) { - store = _store; + if (checkContentReleasable()) { + releaseContent(); } +} + +void Message::releaseContent(MessageStore* s) +{ + //deprecated, use setStore(store); releaseContent(); instead + if (!store) setStore(s); + releaseContent(); +} + +void Message::releaseContent() +{ + sys::Mutex::ScopedLock l(lock); if (store) { if (!getPersistenceId()) { intrusive_ptr<PersistableMessage> pmsg(this); store->stage(pmsg); staged = true; } + //ensure required credit is cached before content frames are released + getRequiredCredit(); //remove any content frames from the frameset frames.remove(TypeFilter<CONTENT_BODY>()); setContentReleased(); @@ -182,30 +227,37 @@ void Message::destroy() } } -void Message::sendContent(Queue& queue, framing::FrameHandler& out, uint16_t maxFrameSize) const +bool Message::getContentFrame(const Queue& queue, AMQFrame& frame, uint16_t maxContentSize, uint64_t offset) const +{ + intrusive_ptr<const PersistableMessage> pmsg(this); + + bool done = false; + string& data = frame.castBody<AMQContentBody>()->getData(); + store->loadContent(queue, pmsg, data, offset, maxContentSize); + done = data.size() < maxContentSize; + frame.setBof(false); + frame.setEof(true); + QPID_LOG(debug, "loaded frame" << frame); + if (offset > 0) { + frame.setBos(false); + } + if (!done) { + frame.setEos(false); + } else return false; + return true; +} + +void Message::sendContent(const Queue& queue, framing::FrameHandler& out, uint16_t maxFrameSize) const { - if (isContentReleased()) { - //load content from store in chunks of maxContentSize + sys::Mutex::ScopedLock l(lock); + if (isContentReleased() && !frames.isComplete()) { + sys::Mutex::ScopedUnlock u(lock); uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead(); - intrusive_ptr<const PersistableMessage> pmsg(this); - - bool done = false; - for (uint64_t offset = 0; !done; offset += maxContentSize) + bool morecontent = true; + for (uint64_t offset = 0; morecontent; offset += maxContentSize) { - AMQFrame frame(in_place<AMQContentBody>()); - string& data = frame.castBody<AMQContentBody>()->getData(); - - store->loadContent(queue, pmsg, data, offset, maxContentSize); - done = data.size() < maxContentSize; - frame.setBof(false); - frame.setEof(true); - if (offset > 0) { - frame.setBos(false); - } - if (!done) { - frame.setEos(false); - } - QPID_LOG(debug, "loaded frame for delivery: " << frame); + AMQFrame frame((AMQContentBody())); + morecontent = getContentFrame(queue, frame, maxContentSize, offset); out.handle(frame); } } else { @@ -253,14 +305,14 @@ bool Message::isContentLoaded() const namespace { - const std::string X_QPID_TRACE("x-qpid.trace"); +const std::string X_QPID_TRACE("x-qpid.trace"); } bool Message::isExcluded(const std::vector<std::string>& excludes) const { const FieldTable* headers = getApplicationHeaders(); if (headers) { - std::string traceStr = headers->getString(X_QPID_TRACE); + std::string traceStr = headers->getAsString(X_QPID_TRACE); if (traceStr.size()) { std::vector<std::string> trace = split(traceStr, ", "); @@ -281,7 +333,7 @@ void Message::addTraceId(const std::string& id) sys::Mutex::ScopedLock l(lock); if (isA<MessageTransferBody>()) { FieldTable& headers = getProperties<MessageProperties>()->getApplicationHeaders(); - std::string trace = headers.getString(X_QPID_TRACE); + std::string trace = headers.getAsString(X_QPID_TRACE); if (trace.empty()) { headers.setString(X_QPID_TRACE, id); } else if (trace.find(id) == std::string::npos) { @@ -291,3 +343,86 @@ void Message::addTraceId(const std::string& id) } } } + +void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e) +{ + DeliveryProperties* props = getProperties<DeliveryProperties>(); + if (props->getTtl()) { + // AMQP requires setting the expiration property to be posix + // time_t in seconds. TTL is in milliseconds + if (!props->getExpiration()) { + //only set expiration in delivery properties if not already set + time_t now = ::time(0); + props->setExpiration(now + (props->getTtl()/1000)); + } + // Use higher resolution time for the internal expiry calculation. + expiration = AbsTime(AbsTime::now(), Duration(props->getTtl() * TIME_MSEC)); + setExpiryPolicy(e); + } +} + +void Message::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) { + expiryPolicy = e; + if (expiryPolicy) + expiryPolicy->willExpire(*this); +} + +bool Message::hasExpired() +{ + return expiryPolicy && expiryPolicy->hasExpired(*this); +} + +boost::intrusive_ptr<Message>& Message::getReplacementMessage(const Queue* qfor) const +{ + sys::Mutex::ScopedLock l(lock); + Replacement::iterator i = replacement.find(qfor); + if (i != replacement.end()){ + return i->second; + } + return empty; +} + +void Message::setReplacementMessage(boost::intrusive_ptr<Message> msg, const Queue* qfor) +{ + sys::Mutex::ScopedLock l(lock); + replacement[qfor] = msg; +} + +void Message::allEnqueuesComplete() { + sys::Mutex::ScopedLock l(callbackLock); + MessageCallback* cb = enqueueCallback; + if (cb && *cb) (*cb)(intrusive_ptr<Message>(this)); +} + +void Message::allDequeuesComplete() { + sys::Mutex::ScopedLock l(callbackLock); + MessageCallback* cb = dequeueCallback; + if (cb && *cb) (*cb)(intrusive_ptr<Message>(this)); +} + +void Message::setEnqueueCompleteCallback(MessageCallback& cb) { + sys::Mutex::ScopedLock l(callbackLock); + enqueueCallback = &cb; +} + +void Message::resetEnqueueCompleteCallback() { + sys::Mutex::ScopedLock l(callbackLock); + enqueueCallback = 0; +} + +void Message::setDequeueCompleteCallback(MessageCallback& cb) { + sys::Mutex::ScopedLock l(callbackLock); + dequeueCallback = &cb; +} + +void Message::resetDequeueCompleteCallback() { + sys::Mutex::ScopedLock l(callbackLock); + dequeueCallback = 0; +} + +framing::FieldTable& Message::getOrInsertHeaders() +{ + return getProperties<MessageProperties>()->getApplicationHeaders(); +} + +}} // namespace qpid::broker |