diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Message.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/Message.cpp | 452 |
1 files changed, 452 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp new file mode 100644 index 0000000000..763dc55e40 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/Message.cpp @@ -0,0 +1,452 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/broker/Message.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/ExchangeRegistry.h" +#include "qpid/broker/ExpiryPolicy.h" +#include "qpid/StringUtils.h" +#include "qpid/framing/frame_functors.h" +#include "qpid/framing/FieldTable.h" +#include "qpid/framing/MessageTransferBody.h" +#include "qpid/framing/SendContent.h" +#include "qpid/framing/SequenceNumber.h" +#include "qpid/framing/TypeFilter.h" +#include "qpid/log/Statement.h" + +#include <time.h> + +using boost::intrusive_ptr; +using qpid::sys::AbsTime; +using qpid::sys::Duration; +using qpid::sys::TIME_MSEC; +using qpid::sys::FAR_FUTURE; +using std::string; +using namespace qpid::framing; + +namespace qpid { +namespace broker { + +TransferAdapter Message::TRANSFER; + +Message::Message(const framing::SequenceNumber& id) : + frames(id), persistenceId(0), redelivered(false), loaded(false), + staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), + expiration(FAR_FUTURE), dequeueCallback(0), + inCallback(false), requiredCredit(0), isManagementMessage(false) +{} + +Message::Message(const Message& original) : + PersistableMessage(), frames(original.frames), persistenceId(0), redelivered(false), loaded(false), + staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), + expiration(original.expiration), dequeueCallback(0), + inCallback(false), requiredCredit(0) +{ + setExpiryPolicy(original.expiryPolicy); +} + +Message::~Message() +{ + if (expiryPolicy) + expiryPolicy->forget(*this); +} + +void Message::forcePersistent() +{ + // only set forced bit if we actually need to force. + if (! getAdapter().isPersistent(frames) ){ + forcePersistentPolicy = true; + } +} + +bool Message::isForcedPersistent() +{ + return forcePersistentPolicy; +} + +std::string Message::getRoutingKey() const +{ + return getAdapter().getRoutingKey(frames); +} + +std::string Message::getExchangeName() const +{ + return getAdapter().getExchange(frames); +} + +const boost::shared_ptr<Exchange> Message::getExchange(ExchangeRegistry& registry) const +{ + if (!exchange) { + exchange = registry.get(getExchangeName()); + } + return exchange; +} + +bool Message::isImmediate() const +{ + return getAdapter().isImmediate(frames); +} + +const FieldTable* Message::getApplicationHeaders() const +{ + return getAdapter().getApplicationHeaders(frames); +} + +std::string Message::getAppId() const +{ + return getAdapter().getAppId(frames); +} + +bool Message::isPersistent() const +{ + return (getAdapter().isPersistent(frames) || forcePersistentPolicy); +} + +bool Message::requiresAccept() +{ + return getAdapter().requiresAccept(frames); +} + +uint32_t Message::getRequiredCredit() +{ + sys::Mutex::ScopedLock l(lock); + if (!requiredCredit) { + //add up payload for all header and content frames in the frameset + SumBodySize sum; + frames.map_if(sum, TypeFilter2<HEADER_BODY, CONTENT_BODY>()); + requiredCredit = sum.getSize(); + } + return requiredCredit; +} + +void Message::encode(framing::Buffer& buffer) const +{ + //encode method and header frames + EncodeFrame f1(buffer); + frames.map_if(f1, TypeFilter2<METHOD_BODY, HEADER_BODY>()); + + //then encode the payload of each content frame + framing::EncodeBody f2(buffer); + frames.map_if(f2, TypeFilter<CONTENT_BODY>()); +} + +void Message::encodeContent(framing::Buffer& buffer) const +{ + //encode the payload of each content frame + EncodeBody f2(buffer); + frames.map_if(f2, TypeFilter<CONTENT_BODY>()); +} + +uint32_t Message::encodedSize() const +{ + return encodedHeaderSize() + encodedContentSize(); +} + +uint32_t Message::encodedContentSize() const +{ + return frames.getContentSize(); +} + +uint32_t Message::encodedHeaderSize() const +{ + //add up the size for all method and header frames in the frameset + SumFrameSize sum; + frames.map_if(sum, TypeFilter2<METHOD_BODY, HEADER_BODY>()); + return sum.getSize(); +} + +void Message::decodeHeader(framing::Buffer& buffer) +{ + AMQFrame method; + method.decode(buffer); + frames.append(method); + + AMQFrame header; + header.decode(buffer); + frames.append(header); +} + +void Message::decodeContent(framing::Buffer& buffer) +{ + if (buffer.available()) { + //get the data as a string and set that as the content + //body on a frame then add that frame to the frameset + AMQFrame frame((AMQContentBody())); + frame.castBody<AMQContentBody>()->decode(buffer, buffer.available()); + frame.setFirstSegment(false); + frames.append(frame); + } else { + //adjust header flags + MarkLastSegment f; + frames.map_if(f, TypeFilter<HEADER_BODY>()); + } + //mark content loaded + loaded = true; +} + +// Used for testing only +void Message::tryReleaseContent() +{ + if (checkContentReleasable()) { + releaseContent(); + } +} + +void Message::releaseContent(MessageStore* s) +{ + //deprecated, use setStore(store); releaseContent(); instead + if (!store) setStore(s); + releaseContent(); +} + +void Message::releaseContent() +{ + sys::Mutex::ScopedLock l(lock); + if (store) { + if (!getPersistenceId()) { + intrusive_ptr<PersistableMessage> pmsg(this); + store->stage(pmsg); + staged = true; + } + //ensure required credit is cached before content frames are released + getRequiredCredit(); + //remove any content frames from the frameset + frames.remove(TypeFilter<CONTENT_BODY>()); + setContentReleased(); + } +} + +void Message::destroy() +{ + if (staged) { + if (store) { + store->destroy(*this); + } else { + QPID_LOG(error, "Message content was staged but no store is set so it can't be destroyed"); + } + } +} + +bool Message::getContentFrame(const Queue& queue, AMQFrame& frame, uint16_t maxContentSize, uint64_t offset) const +{ + intrusive_ptr<const PersistableMessage> pmsg(this); + + bool done = false; + string& data = frame.castBody<AMQContentBody>()->getData(); + store->loadContent(queue, pmsg, data, offset, maxContentSize); + done = data.size() < maxContentSize; + frame.setBof(false); + frame.setEof(true); + QPID_LOG(debug, "loaded frame" << frame); + if (offset > 0) { + frame.setBos(false); + } + if (!done) { + frame.setEos(false); + } else return false; + return true; +} + +void Message::sendContent(const Queue& queue, framing::FrameHandler& out, uint16_t maxFrameSize) const +{ + sys::Mutex::ScopedLock l(lock); + if (isContentReleased() && !frames.isComplete()) { + sys::Mutex::ScopedUnlock u(lock); + uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead(); + bool morecontent = true; + for (uint64_t offset = 0; morecontent; offset += maxContentSize) + { + AMQFrame frame((AMQContentBody())); + morecontent = getContentFrame(queue, frame, maxContentSize, offset); + out.handle(frame); + } + } else { + Count c; + frames.map_if(c, TypeFilter<CONTENT_BODY>()); + + SendContent f(out, maxFrameSize, c.getCount()); + frames.map_if(f, TypeFilter<CONTENT_BODY>()); + } +} + +void Message::sendHeader(framing::FrameHandler& out, uint16_t /*maxFrameSize*/) const +{ + sys::Mutex::ScopedLock l(lock); + Relay f(out); + frames.map_if(f, TypeFilter<HEADER_BODY>()); +} + +// TODO aconway 2007-11-09: Obsolete, remove. Was used to cover over +// 0-8/0-9 message differences. +MessageAdapter& Message::getAdapter() const +{ + if (!adapter) { + if(frames.isA<MessageTransferBody>()) { + adapter = &TRANSFER; + } else { + const AMQMethodBody* method = frames.getMethod(); + if (!method) throw Exception("Can't adapt message with no method"); + else throw Exception(QPID_MSG("Can't adapt message based on " << *method)); + } + } + return *adapter; +} + +uint64_t Message::contentSize() const +{ + return frames.getContentSize(); +} + +bool Message::isContentLoaded() const +{ + return loaded; +} + + +namespace +{ +const std::string X_QPID_TRACE("x-qpid.trace"); +} + +bool Message::isExcluded(const std::vector<std::string>& excludes) const +{ + const FieldTable* headers = getApplicationHeaders(); + if (headers) { + std::string traceStr = headers->getAsString(X_QPID_TRACE); + if (traceStr.size()) { + std::vector<std::string> trace = split(traceStr, ", "); + + for (std::vector<std::string>::const_iterator i = excludes.begin(); i != excludes.end(); i++) { + for (std::vector<std::string>::const_iterator j = trace.begin(); j != trace.end(); j++) { + if (*i == *j) { + return true; + } + } + } + } + } + return false; +} + +void Message::addTraceId(const std::string& id) +{ + sys::Mutex::ScopedLock l(lock); + if (isA<MessageTransferBody>()) { + FieldTable& headers = getProperties<MessageProperties>()->getApplicationHeaders(); + std::string trace = headers.getAsString(X_QPID_TRACE); + if (trace.empty()) { + headers.setString(X_QPID_TRACE, id); + } else if (trace.find(id) == std::string::npos) { + trace += ","; + trace += id; + headers.setString(X_QPID_TRACE, trace); + } + } +} + +void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e) +{ + DeliveryProperties* props = getProperties<DeliveryProperties>(); + if (props->getTtl()) { + // AMQP requires setting the expiration property to be posix + // time_t in seconds. TTL is in milliseconds + if (!props->getExpiration()) { + //only set expiration in delivery properties if not already set + time_t now = ::time(0); + props->setExpiration(now + (props->getTtl()/1000)); + } + // Use higher resolution time for the internal expiry calculation. + Duration ttl(std::min(props->getTtl() * TIME_MSEC, (uint64_t) std::numeric_limits<int64_t>::max()));//Prevent overflow + expiration = AbsTime(AbsTime::now(), ttl); + setExpiryPolicy(e); + } +} + +void Message::adjustTtl() +{ + DeliveryProperties* props = getProperties<DeliveryProperties>(); + if (props->getTtl()) { + sys::Mutex::ScopedLock l(lock); + if (expiration < FAR_FUTURE) { + sys::Duration d(sys::AbsTime::now(), getExpiration()); + props->setTtl(int64_t(d) > 0 ? int64_t(d)/1000000 : 1); // convert from ns to ms; set to 1 if expired + } + } +} + +void Message::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) { + expiryPolicy = e; + if (expiryPolicy) + expiryPolicy->willExpire(*this); +} + +bool Message::hasExpired() +{ + return expiryPolicy && expiryPolicy->hasExpired(*this); +} + +namespace { +struct ScopedSet { + sys::Monitor& lock; + bool& flag; + ScopedSet(sys::Monitor& l, bool& f) : lock(l), flag(f) { + sys::Monitor::ScopedLock sl(lock); + flag = true; + } + ~ScopedSet(){ + sys::Monitor::ScopedLock sl(lock); + flag = false; + lock.notifyAll(); + } +}; +} + +void Message::allDequeuesComplete() { + ScopedSet ss(callbackLock, inCallback); + MessageCallback* cb = dequeueCallback; + if (cb && *cb) (*cb)(intrusive_ptr<Message>(this)); +} + +void Message::setDequeueCompleteCallback(MessageCallback& cb) { + sys::Mutex::ScopedLock l(callbackLock); + while (inCallback) callbackLock.wait(); + dequeueCallback = &cb; +} + +void Message::resetDequeueCompleteCallback() { + sys::Mutex::ScopedLock l(callbackLock); + while (inCallback) callbackLock.wait(); + dequeueCallback = 0; +} + +uint8_t Message::getPriority() const { + return getAdapter().getPriority(frames); +} + +framing::FieldTable& Message::getOrInsertHeaders() +{ + return getProperties<MessageProperties>()->getApplicationHeaders(); +} + +bool Message::getIsManagementMessage() const { return isManagementMessage; } +void Message::setIsManagementMessage(bool b) { isManagementMessage = b; } + +}} // namespace qpid::broker |