/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "qpid/broker/Message.h" #include "qpid/broker/Queue.h" #include "qpid/broker/ExchangeRegistry.h" #include "qpid/broker/ExpiryPolicy.h" #include "qpid/StringUtils.h" #include "qpid/framing/frame_functors.h" #include "qpid/framing/FieldTable.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/framing/SendContent.h" #include "qpid/framing/SequenceNumber.h" #include "qpid/framing/TypeFilter.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/log/Statement.h" #include using boost::intrusive_ptr; using qpid::sys::AbsTime; using qpid::sys::Duration; using qpid::sys::TIME_MSEC; using qpid::sys::FAR_FUTURE; using std::string; using namespace qpid::framing; namespace qpid { namespace broker { TransferAdapter Message::TRANSFER; Message::Message(const framing::SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), loaded(false), staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), expiration(FAR_FUTURE), dequeueCallback(0), inCallback(false), requiredCredit(0), isManagementMessage(false), copyHeaderOnWrite(false) {} Message::~Message() {} void Message::forcePersistent() { sys::Mutex::ScopedLock l(lock); // only set forced bit if we actually need to force. if (! getAdapter().isPersistent(frames) ){ forcePersistentPolicy = true; } } bool Message::isForcedPersistent() { return forcePersistentPolicy; } std::string Message::getRoutingKey() const { return getAdapter().getRoutingKey(frames); } std::string Message::getExchangeName() const { return getAdapter().getExchange(frames); } const boost::shared_ptr Message::getExchange(ExchangeRegistry& registry) const { if (!exchange) { exchange = registry.get(getExchangeName()); } return exchange; } bool Message::isImmediate() const { return getAdapter().isImmediate(frames); } const FieldTable* Message::getApplicationHeaders() const { sys::Mutex::ScopedLock l(lock); return getAdapter().getApplicationHeaders(frames); } std::string Message::getAppId() const { sys::Mutex::ScopedLock l(lock); return getAdapter().getAppId(frames); } bool Message::isPersistent() const { sys::Mutex::ScopedLock l(lock); return (getAdapter().isPersistent(frames) || forcePersistentPolicy); } bool Message::requiresAccept() { return getAdapter().requiresAccept(frames); } uint32_t Message::getRequiredCredit() { sys::Mutex::ScopedLock l(lock); if (!requiredCredit) { //add up payload for all header and content frames in the frameset SumBodySize sum; frames.map_if(sum, TypeFilter2()); requiredCredit = sum.getSize(); } return requiredCredit; } void Message::encode(framing::Buffer& buffer) const { //encode method and header frames EncodeFrame f1(buffer); frames.map_if(f1, TypeFilter2()); //then encode the payload of each content frame framing::EncodeBody f2(buffer); frames.map_if(f2, TypeFilter()); } void Message::encodeContent(framing::Buffer& buffer) const { //encode the payload of each content frame EncodeBody f2(buffer); frames.map_if(f2, TypeFilter()); } uint32_t Message::encodedSize() const { return encodedHeaderSize() + encodedContentSize(); } uint32_t Message::encodedContentSize() const { return frames.getContentSize(); } uint32_t Message::encodedHeaderSize() const { //add up the size for all method and header frames in the frameset SumFrameSize sum; frames.map_if(sum, TypeFilter2()); return sum.getSize(); } void Message::decodeHeader(framing::Buffer& buffer) { AMQFrame method; method.decode(buffer); frames.append(method); AMQFrame header; header.decode(buffer); frames.append(header); } void Message::decodeContent(framing::Buffer& buffer) { if (buffer.available()) { //get the data as a string and set that as the content //body on a frame then add that frame to the frameset AMQFrame frame((AMQContentBody())); frame.castBody()->decode(buffer, buffer.available()); frame.setFirstSegment(false); frames.append(frame); } else { //adjust header flags MarkLastSegment f; frames.map_if(f, TypeFilter()); } //mark content loaded loaded = true; } // Used for testing only void Message::tryReleaseContent() { if (checkContentReleasable()) { releaseContent(); } } void Message::releaseContent(MessageStore* s) { //deprecated, use setStore(store); releaseContent(); instead if (!store) setStore(s); releaseContent(); } void Message::releaseContent() { sys::Mutex::ScopedLock l(lock); if (store) { if (!getPersistenceId()) { intrusive_ptr pmsg(this); store->stage(pmsg); staged = true; } //ensure required credit is cached before content frames are released getRequiredCredit(); //remove any content frames from the frameset frames.remove(TypeFilter()); setContentReleased(); } } void Message::destroy() { if (staged) { if (store) { store->destroy(*this); } else { QPID_LOG(error, "Message content was staged but no store is set so it can't be destroyed"); } } } bool Message::getContentFrame(const Queue& queue, AMQFrame& frame, uint16_t maxContentSize, uint64_t offset) const { intrusive_ptr pmsg(this); bool done = false; string& data = frame.castBody()->getData(); store->loadContent(queue, pmsg, data, offset, maxContentSize); done = data.size() < maxContentSize; frame.setBof(false); frame.setEof(true); QPID_LOG(debug, "loaded frame" << frame); if (offset > 0) { frame.setBos(false); } if (!done) { frame.setEos(false); } else return false; return true; } void Message::sendContent(const Queue& queue, framing::FrameHandler& out, uint16_t maxFrameSize) const { sys::Mutex::ScopedLock l(lock); if (isContentReleased() && !frames.isComplete()) { sys::Mutex::ScopedUnlock u(lock); uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead(); bool morecontent = true; for (uint64_t offset = 0; morecontent; offset += maxContentSize) { AMQFrame frame((AMQContentBody())); morecontent = getContentFrame(queue, frame, maxContentSize, offset); out.handle(frame); } } else { Count c; frames.map_if(c, TypeFilter()); SendContent f(out, maxFrameSize, c.getCount()); frames.map_if(f, TypeFilter()); } } void Message::sendHeader(framing::FrameHandler& out, uint16_t /*maxFrameSize*/) const { sys::Mutex::ScopedLock l(lock); Relay f(out); frames.map_if(f, TypeFilter()); //as frame (and pointer to body) has now been passed to handler, //subsequent modifications should use a copy copyHeaderOnWrite = true; } // TODO aconway 2007-11-09: Obsolete, remove. Was used to cover over // 0-8/0-9 message differences. MessageAdapter& Message::getAdapter() const { if (!adapter) { if(frames.isA()) { adapter = &TRANSFER; } else { const AMQMethodBody* method = frames.getMethod(); if (!method) throw Exception("Can't adapt message with no method"); else throw Exception(QPID_MSG("Can't adapt message based on " << *method)); } } return *adapter; } uint64_t Message::contentSize() const { return frames.getContentSize(); } bool Message::isContentLoaded() const { return loaded; } namespace { const std::string X_QPID_TRACE("x-qpid.trace"); } bool Message::isExcluded(const std::vector& excludes) const { sys::Mutex::ScopedLock l(lock); const FieldTable* headers = getApplicationHeaders(); if (headers) { std::string traceStr = headers->getAsString(X_QPID_TRACE); if (traceStr.size()) { std::vector trace = split(traceStr, ", "); for (std::vector::const_iterator i = excludes.begin(); i != excludes.end(); i++) { for (std::vector::const_iterator j = trace.begin(); j != trace.end(); j++) { if (*i == *j) { return true; } } } } } return false; } class CloneHeaderBody { public: void operator()(AMQFrame& f) { f.cloneBody(); } }; AMQHeaderBody* Message::getHeaderBody() { if (copyHeaderOnWrite) { CloneHeaderBody f; frames.map_if(f, TypeFilter()); copyHeaderOnWrite = false; } return frames.getHeaders(); } void Message::addTraceId(const std::string& id) { sys::Mutex::ScopedLock l(lock); if (isA()) { FieldTable& headers = getModifiableProperties()->getApplicationHeaders(); std::string trace = headers.getAsString(X_QPID_TRACE); if (trace.empty()) { headers.setString(X_QPID_TRACE, id); } else if (trace.find(id) == std::string::npos) { trace += ","; trace += id; headers.setString(X_QPID_TRACE, trace); } } } void Message::setTimestamp() { sys::Mutex::ScopedLock l(lock); DeliveryProperties* props = getModifiableProperties(); time_t now = ::time(0); props->setTimestamp(now); // AMQP-0.10: posix time_t - secs since Epoch } void Message::computeExpiration(const boost::intrusive_ptr& e) { sys::Mutex::ScopedLock l(lock); DeliveryProperties* props = getModifiableProperties(); if (props->getTtl()) { // AMQP requires setting the expiration property to be posix // time_t in seconds. TTL is in milliseconds if (!props->getExpiration()) { //only set expiration in delivery properties if not already set time_t now = ::time(0); props->setExpiration(now + (props->getTtl()/1000)); } if (e) { // Use higher resolution time for the internal expiry calculation. // Prevent overflow as a signed int64_t Duration ttl(std::min(props->getTtl() * TIME_MSEC, (uint64_t) std::numeric_limits::max())); expiration = AbsTime(e->getCurrentTime(), ttl); setExpiryPolicy(e); } } } void Message::adjustTtl() { sys::Mutex::ScopedLock l(lock); DeliveryProperties* props = getModifiableProperties(); if (props->getTtl()) { if (expiration < FAR_FUTURE) { sys::AbsTime current( expiryPolicy ? expiryPolicy->getCurrentTime() : sys::AbsTime::now()); sys::Duration ttl(current, getExpiration()); // convert from ns to ms; set to 1 if expired props->setTtl(int64_t(ttl) >= 1000000 ? int64_t(ttl)/1000000 : 1); } } } void Message::setRedelivered() { sys::Mutex::ScopedLock l(lock); getModifiableProperties()->setRedelivered(true); } void Message::insertCustomProperty(const std::string& key, int64_t value) { sys::Mutex::ScopedLock l(lock); getModifiableProperties()->getApplicationHeaders().setInt64(key,value); } void Message::insertCustomProperty(const std::string& key, const std::string& value) { sys::Mutex::ScopedLock l(lock); getModifiableProperties()->getApplicationHeaders().setString(key,value); } void Message::removeCustomProperty(const std::string& key) { sys::Mutex::ScopedLock l(lock); getModifiableProperties()->getApplicationHeaders().erase(key); } void Message::setExchange(const std::string& exchange) { sys::Mutex::ScopedLock l(lock); getModifiableProperties()->setExchange(exchange); } void Message::clearApplicationHeadersFlag() { sys::Mutex::ScopedLock l(lock); getModifiableProperties()->clearApplicationHeadersFlag(); } void Message::setExpiryPolicy(const boost::intrusive_ptr& e) { expiryPolicy = e; } bool Message::hasExpired() { return expiryPolicy && expiryPolicy->hasExpired(*this); } namespace { struct ScopedSet { sys::Monitor& lock; bool& flag; ScopedSet(sys::Monitor& l, bool& f) : lock(l), flag(f) { sys::Monitor::ScopedLock sl(lock); flag = true; } ~ScopedSet(){ sys::Monitor::ScopedLock sl(lock); flag = false; lock.notifyAll(); } }; } void Message::allDequeuesComplete() { ScopedSet ss(callbackLock, inCallback); MessageCallback* cb = dequeueCallback; if (cb && *cb) (*cb)(intrusive_ptr(this)); } void Message::setDequeueCompleteCallback(MessageCallback& cb) { sys::Mutex::ScopedLock l(callbackLock); while (inCallback) callbackLock.wait(); dequeueCallback = &cb; } void Message::resetDequeueCompleteCallback() { sys::Mutex::ScopedLock l(callbackLock); while (inCallback) callbackLock.wait(); dequeueCallback = 0; } uint8_t Message::getPriority() const { sys::Mutex::ScopedLock l(lock); return getAdapter().getPriority(frames); } bool Message::getIsManagementMessage() const { return isManagementMessage; } void Message::setIsManagementMessage(bool b) { isManagementMessage = b; } }} // namespace qpid::broker