diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Message.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/Message.cpp | 542 |
1 files changed, 152 insertions, 390 deletions
diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp index 4dd8a349dd..c48e9bcfa4 100644 --- a/qpid/cpp/src/qpid/broker/Message.cpp +++ b/qpid/cpp/src/qpid/broker/Message.cpp @@ -20,19 +20,12 @@ */ #include "qpid/broker/Message.h" -#include "qpid/broker/Queue.h" -#include "qpid/broker/ExchangeRegistry.h" -#include "qpid/broker/ExpiryPolicy.h" +#include "qpid/broker/MapHandler.h" #include "qpid/StringUtils.h" -#include "qpid/framing/frame_functors.h" -#include "qpid/framing/FieldTable.h" -#include "qpid/framing/MessageTransferBody.h" -#include "qpid/framing/SendContent.h" -#include "qpid/framing/SequenceNumber.h" -#include "qpid/framing/TypeFilter.h" -#include "qpid/framing/reply_exceptions.h" #include "qpid/log/Statement.h" +#include <algorithm> +#include <string.h> #include <time.h> using boost::intrusive_ptr; @@ -41,492 +34,261 @@ using qpid::sys::Duration; using qpid::sys::TIME_MSEC; using qpid::sys::FAR_FUTURE; using std::string; -using namespace qpid::framing; namespace qpid { namespace broker { -TransferAdapter Message::TRANSFER; - -Message::Message(const framing::SequenceNumber& id) : - frames(id), persistenceId(0), redelivered(false), loaded(false), - staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), - expiration(FAR_FUTURE), dequeueCallback(0), - inCallback(false), requiredCredit(0), isManagementMessage(false), copyHeaderOnWrite(false) -{} - -Message::~Message() {} - -void Message::forcePersistent() +Message::Message() : deliveryCount(0), publisher(0), expiration(FAR_FUTURE), timestamp(0), isManagementMessage(false) {} +Message::Message(boost::intrusive_ptr<Encoding> e, boost::intrusive_ptr<PersistableMessage> p) + : encoding(e), persistentContext(p), deliveryCount(0), publisher(0), expiration(FAR_FUTURE), timestamp(0), isManagementMessage(false) { - sys::Mutex::ScopedLock l(lock); - // only set forced bit if we actually need to force. - if (! getAdapter().isPersistent(frames) ){ - forcePersistentPolicy = true; - } + if (persistentContext) persistentContext->setIngressCompletion(e); } +Message::~Message() {} -bool Message::isForcedPersistent() -{ - return forcePersistentPolicy; -} std::string Message::getRoutingKey() const { - return getAdapter().getRoutingKey(frames); -} - -std::string Message::getExchangeName() const -{ - return getAdapter().getExchange(frames); -} - -const boost::shared_ptr<Exchange> Message::getExchange(ExchangeRegistry& registry) const -{ - if (!exchange) { - exchange = registry.get(getExchangeName()); - } - return exchange; -} - -bool Message::isImmediate() const -{ - return getAdapter().isImmediate(frames); -} - -const FieldTable* Message::getApplicationHeaders() const -{ - sys::Mutex::ScopedLock l(lock); - return getAdapter().getApplicationHeaders(frames); -} - -std::string Message::getAppId() const -{ - sys::Mutex::ScopedLock l(lock); - return getAdapter().getAppId(frames); + return getEncoding().getRoutingKey(); } bool Message::isPersistent() const { - sys::Mutex::ScopedLock l(lock); - return (getAdapter().isPersistent(frames) || forcePersistentPolicy); + return getEncoding().isPersistent(); } -bool Message::requiresAccept() +uint64_t Message::getContentSize() const { - return getAdapter().requiresAccept(frames); + return getEncoding().getContentSize(); } -uint32_t Message::getRequiredCredit() +boost::intrusive_ptr<AsyncCompletion> Message::getIngressCompletion() const { - sys::Mutex::ScopedLock l(lock); - if (!requiredCredit) { - //add up payload for all header and content frames in the frameset - SumBodySize sum; - frames.map_if(sum, TypeFilter2<HEADER_BODY, CONTENT_BODY>()); - requiredCredit = sum.getSize(); - } - return requiredCredit; + return encoding; } -void Message::encode(framing::Buffer& buffer) const -{ - sys::Mutex::ScopedLock l(lock); - //encode method and header frames - EncodeFrame f1(buffer); - frames.map_if(f1, TypeFilter2<METHOD_BODY, HEADER_BODY>()); - - //then encode the payload of each content frame - framing::EncodeBody f2(buffer); - frames.map_if(f2, TypeFilter<CONTENT_BODY>()); -} - -void Message::encodeContent(framing::Buffer& buffer) const -{ - sys::Mutex::ScopedLock l(lock); - //encode the payload of each content frame - EncodeBody f2(buffer); - frames.map_if(f2, TypeFilter<CONTENT_BODY>()); -} - -uint32_t Message::encodedSize() const -{ - return encodedHeaderSize() + encodedContentSize(); -} - -uint32_t Message::encodedContentSize() const -{ - sys::Mutex::ScopedLock l(lock); - return frames.getContentSize(); -} - -uint32_t Message::encodedHeaderSize() const -{ - sys::Mutex::ScopedLock l(lock); // prevent modifications while computing size - //add up the size for all method and header frames in the frameset - SumFrameSize sum; - frames.map_if(sum, TypeFilter2<METHOD_BODY, HEADER_BODY>()); - return sum.getSize(); -} - -void Message::decodeHeader(framing::Buffer& buffer) +namespace { - AMQFrame method; - method.decode(buffer); - frames.append(method); - - AMQFrame header; - header.decode(buffer); - frames.append(header); +const std::string X_QPID_TRACE("x-qpid.trace"); } -void Message::decodeContent(framing::Buffer& buffer) +bool Message::isExcluded(const std::vector<std::string>& excludes) const { - if (buffer.available()) { - //get the data as a string and set that as the content - //body on a frame then add that frame to the frameset - AMQFrame frame((AMQContentBody())); - frame.castBody<AMQContentBody>()->decode(buffer, buffer.available()); - frame.setFirstSegment(false); - frames.append(frame); - } else { - //adjust header flags - MarkLastSegment f; - frames.map_if(f, TypeFilter<HEADER_BODY>()); + std::string traceStr = getEncoding().getAnnotationAsString(X_QPID_TRACE); + if (traceStr.size()) { + std::vector<std::string> trace = split(traceStr, ", "); + for (std::vector<std::string>::const_iterator i = excludes.begin(); i != excludes.end(); i++) { + for (std::vector<std::string>::const_iterator j = trace.begin(); j != trace.end(); j++) { + if (*i == *j) { + return true; + } + } + } } - //mark content loaded - loaded = true; + return false; } -// Used for testing only -void Message::tryReleaseContent() +void Message::addTraceId(const std::string& id) { - if (checkContentReleasable()) { - releaseContent(); + std::string trace = getEncoding().getAnnotationAsString(X_QPID_TRACE); + if (trace.empty()) { + annotations[X_QPID_TRACE] = id; + } else if (trace.find(id) == std::string::npos) { + trace += ","; + trace += id; + annotations[X_QPID_TRACE] = trace; } + annotationsChanged(); } -void Message::releaseContent(MessageStore* s) +void Message::clearTrace() { - //deprecated, use setStore(store); releaseContent(); instead - if (!store) setStore(s); - releaseContent(); + annotations[X_QPID_TRACE] = std::string(); + annotationsChanged(); } -void Message::releaseContent() +void Message::setTimestamp() { - sys::Mutex::ScopedLock l(lock); - if (store) { - if (!getPersistenceId()) { - intrusive_ptr<PersistableMessage> pmsg(this); - store->stage(pmsg); - staged = true; - } - //ensure required credit and size is cached before content frames are released - getRequiredCredit(); - contentSize(); - //remove any content frames from the frameset - frames.remove(TypeFilter<CONTENT_BODY>()); - setContentReleased(); - } + timestamp = ::time(0); // AMQP-0.10: posix time_t - secs since Epoch } -void Message::destroy() +uint64_t Message::getTimestamp() const { - if (staged) { - if (store) { - store->destroy(*this); - } else { - QPID_LOG(error, "Message content was staged but no store is set so it can't be destroyed"); - } - } + return timestamp; } -bool Message::getContentFrame(const Queue& queue, AMQFrame& frame, uint16_t maxContentSize, uint64_t offset) const +uint64_t Message::getTtl() const { - intrusive_ptr<const PersistableMessage> pmsg(this); - - bool done = false; - string& data = frame.castBody<AMQContentBody>()->getData(); - store->loadContent(queue, pmsg, data, offset, maxContentSize); - done = data.size() < maxContentSize; - frame.setBof(false); - frame.setEof(true); - QPID_LOG(debug, "loaded frame" << frame); - if (offset > 0) { - frame.setBos(false); - } - if (!done) { - frame.setEos(false); - } else return false; - return true; -} - -void Message::sendContent(const Queue& queue, framing::FrameHandler& out, uint16_t maxFrameSize) const -{ - sys::Mutex::ScopedLock l(lock); - if (isContentReleased() && !frames.isComplete()) { - sys::Mutex::ScopedUnlock u(lock); - uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead(); - bool morecontent = true; - for (uint64_t offset = 0; morecontent; offset += maxContentSize) - { - AMQFrame frame((AMQContentBody())); - morecontent = getContentFrame(queue, frame, maxContentSize, offset); - out.handle(frame); - } - queue.countLoadedFromDisk(contentSize()); + uint64_t ttl; + if (encoding->getTtl(ttl) && expiration < FAR_FUTURE) { + sys::AbsTime current( + expiryPolicy ? expiryPolicy->getCurrentTime() : sys::AbsTime::now()); + sys::Duration ttl(current, getExpiration()); + // convert from ns to ms; set to 1 if expired + return (int64_t(ttl) >= 1000000 ? int64_t(ttl)/1000000 : 1); } else { - Count c; - frames.map_if(c, TypeFilter<CONTENT_BODY>()); - - SendContent f(out, maxFrameSize, c.getCount()); - frames.map_if(f, TypeFilter<CONTENT_BODY>()); + return 0; } } -void Message::sendHeader(framing::FrameHandler& out, uint16_t /*maxFrameSize*/) const -{ - sys::Mutex::ScopedLock l(lock); - Relay f(out); - frames.map_if(f, TypeFilter<HEADER_BODY>()); - //as frame (and pointer to body) has now been passed to handler, - //subsequent modifications should use a copy - copyHeaderOnWrite = true; -} - -// TODO aconway 2007-11-09: Obsolete, remove. Was used to cover over -// 0-8/0-9 message differences. -MessageAdapter& Message::getAdapter() const +void Message::computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e) { - if (!adapter) { - if(frames.isA<MessageTransferBody>()) { - adapter = &TRANSFER; - } else { - const AMQMethodBody* method = frames.getMethod(); - if (!method) throw Exception("Can't adapt message with no method"); - else throw Exception(QPID_MSG("Can't adapt message based on " << *method)); + //TODO: this is still quite 0-10 specific... + uint64_t ttl; + if (getEncoding().getTtl(ttl)) { + if (e) { + // Use higher resolution time for the internal expiry calculation. + // Prevent overflow as a signed int64_t + Duration duration(std::min(ttl * TIME_MSEC, + (uint64_t) std::numeric_limits<int64_t>::max())); + expiration = AbsTime(e->getCurrentTime(), duration); + setExpiryPolicy(e); } } - return *adapter; } -uint64_t Message::contentSize() const +void Message::addAnnotation(const std::string& key, const qpid::types::Variant& value) { - return frames.getContentSize(); + annotations[key] = value; + annotationsChanged(); } -bool Message::isContentLoaded() const +void Message::annotationsChanged() { - return loaded; + if (persistentContext) { + persistentContext = persistentContext->merge(annotations); + persistentContext->setIngressCompletion(encoding); + } } - -namespace -{ -const std::string X_QPID_TRACE("x-qpid.trace"); +void Message::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) { + expiryPolicy = e; } -bool Message::isExcluded(const std::vector<std::string>& excludes) const +bool Message::hasExpired() const { - sys::Mutex::ScopedLock l(lock); - const FieldTable* headers = getApplicationHeaders(); - if (headers) { - std::string traceStr = headers->getAsString(X_QPID_TRACE); - if (traceStr.size()) { - std::vector<std::string> trace = split(traceStr, ", "); - - for (std::vector<std::string>::const_iterator i = excludes.begin(); i != excludes.end(); i++) { - for (std::vector<std::string>::const_iterator j = trace.begin(); j != trace.end(); j++) { - if (*i == *j) { - return true; - } - } - } - } - } - return false; + return expiryPolicy && expiryPolicy->hasExpired(*this); } -class CloneHeaderBody -{ -public: - void operator()(AMQFrame& f) - { - f.cloneBody(); - } -}; - -AMQHeaderBody* Message::getHeaderBody() +uint8_t Message::getPriority() const { - // expects lock to be held - if (copyHeaderOnWrite) { - CloneHeaderBody f; - frames.map_if(f, TypeFilter<HEADER_BODY>()); - copyHeaderOnWrite = false; - } - return frames.getHeaders(); + return getEncoding().getPriority(); } -void Message::addTraceId(const std::string& id) +bool Message::getIsManagementMessage() const { return isManagementMessage; } +void Message::setIsManagementMessage(bool b) { isManagementMessage = b; } +qpid::framing::SequenceNumber Message::getSequence() const { - sys::Mutex::ScopedLock l(lock); - if (isA<MessageTransferBody>()) { - FieldTable& headers = getModifiableProperties<MessageProperties>()->getApplicationHeaders(); - std::string trace = headers.getAsString(X_QPID_TRACE); - if (trace.empty()) { - headers.setString(X_QPID_TRACE, id); - } else if (trace.find(id) == std::string::npos) { - trace += ","; - trace += id; - headers.setString(X_QPID_TRACE, trace); - } - } + return sequence; } - -void Message::clearTrace() +void Message::setSequence(const qpid::framing::SequenceNumber& s) { - sys::Mutex::ScopedLock l(lock); - if (isA<MessageTransferBody>()) { - FieldTable& headers = getModifiableProperties<MessageProperties>()->getApplicationHeaders(); - std::string trace = headers.getAsString(X_QPID_TRACE); - if (!trace.empty()) { - headers.setString(X_QPID_TRACE, ""); - } - } + sequence = s; } -void Message::setTimestamp() +MessageState Message::getState() const { - sys::Mutex::ScopedLock l(lock); - DeliveryProperties* props = getModifiableProperties<DeliveryProperties>(); - time_t now = ::time(0); - props->setTimestamp(now); // AMQP-0.10: posix time_t - secs since Epoch + return state; } - -void Message::computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e) +void Message::setState(MessageState s) { - sys::Mutex::ScopedLock l(lock); - DeliveryProperties* props = getModifiableProperties<DeliveryProperties>(); - if (props->getTtl()) { - // AMQP requires setting the expiration property to be posix - // time_t in seconds. TTL is in milliseconds - if (!props->getExpiration()) { - //only set expiration in delivery properties if not already set - time_t now = ::time(0); - props->setExpiration(now + (props->getTtl()/1000)); - } - if (e) { - // Use higher resolution time for the internal expiry calculation. - // Prevent overflow as a signed int64_t - Duration ttl(std::min(props->getTtl() * TIME_MSEC, - (uint64_t) std::numeric_limits<int64_t>::max())); - expiration = AbsTime(e->getCurrentTime(), ttl); - setExpiryPolicy(e); - } - } + state = s; } -void Message::adjustTtl() +const qpid::types::Variant::Map& Message::getAnnotations() const { - sys::Mutex::ScopedLock l(lock); - DeliveryProperties* props = getModifiableProperties<DeliveryProperties>(); - if (props->getTtl()) { - if (expiration < FAR_FUTURE) { - sys::AbsTime current( - expiryPolicy ? expiryPolicy->getCurrentTime() : sys::AbsTime::now()); - sys::Duration ttl(current, getExpiration()); - // convert from ns to ms; set to 1 if expired - props->setTtl(int64_t(ttl) >= 1000000 ? int64_t(ttl)/1000000 : 1); - } - } + return annotations; } -void Message::setRedelivered() +qpid::types::Variant Message::getAnnotation(const std::string& key) const { - sys::Mutex::ScopedLock l(lock); - getModifiableProperties<framing::DeliveryProperties>()->setRedelivered(true); + qpid::types::Variant::Map::const_iterator i = annotations.find(key); + if (i != annotations.end()) return i->second; + //FIXME: modify Encoding interface to allow retrieval of + //annotations of different types from the message data as received + //off the wire + return qpid::types::Variant(getEncoding().getAnnotationAsString(key)); } -void Message::insertCustomProperty(const std::string& key, int64_t value) +std::string Message::getUserId() const { - sys::Mutex::ScopedLock l(lock); - getModifiableProperties<MessageProperties>()->getApplicationHeaders().setInt64(key,value); + return encoding->getUserId(); } -void Message::insertCustomProperty(const std::string& key, const std::string& value) +Message::Encoding& Message::getEncoding() { - sys::Mutex::ScopedLock l(lock); - getModifiableProperties<MessageProperties>()->getApplicationHeaders().setString(key,value); + return *encoding; } - -void Message::removeCustomProperty(const std::string& key) +const Message::Encoding& Message::getEncoding() const { - sys::Mutex::ScopedLock l(lock); - getModifiableProperties<MessageProperties>()->getApplicationHeaders().erase(key); + return *encoding; } - -void Message::setExchange(const std::string& exchange) +Message::operator bool() const { - sys::Mutex::ScopedLock l(lock); - getModifiableProperties<DeliveryProperties>()->setExchange(exchange); + return encoding; } -void Message::clearApplicationHeadersFlag() +std::string Message::getContent() const { - sys::Mutex::ScopedLock l(lock); - getModifiableProperties<MessageProperties>()->clearApplicationHeadersFlag(); -} - -void Message::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) { - expiryPolicy = e; + return encoding->getContent(); } -bool Message::hasExpired() +std::string Message::getPropertyAsString(const std::string& key) const { - return expiryPolicy && expiryPolicy->hasExpired(*this); + return encoding->getPropertyAsString(key); } - namespace { -struct ScopedSet { - sys::Monitor& lock; - bool& flag; - ScopedSet(sys::Monitor& l, bool& f) : lock(l), flag(f) { - sys::Monitor::ScopedLock sl(lock); - flag = true; +class PropertyRetriever : public MapHandler +{ + public: + PropertyRetriever(const std::string& key) : name(key) {} + void handleVoid(const CharSequence&) {} + void handleUint8(const CharSequence& key, uint8_t value) { handle(key, value); } + void handleUint16(const CharSequence& key, uint16_t value) { handle(key, value); } + void handleUint32(const CharSequence& key, uint32_t value) { handle(key, value); } + void handleUint64(const CharSequence& key, uint64_t value) { handle(key, value); } + void handleInt8(const CharSequence& key, int8_t value) { handle(key, value); } + void handleInt16(const CharSequence& key, int16_t value) { handle(key, value); } + void handleInt32(const CharSequence& key, int32_t value) { handle(key, value); } + void handleInt64(const CharSequence& key, int64_t value) { handle(key, value); } + void handleFloat(const CharSequence& key, float value) { handle(key, value); } + void handleDouble(const CharSequence& key, double value) { handle(key, value); } + void handleString(const CharSequence& key, const CharSequence& value, const CharSequence& /*encoding*/) + { + if (matches(key)) result = std::string(value.data, value.size); } - ~ScopedSet(){ - sys::Monitor::ScopedLock sl(lock); - flag = false; - lock.notifyAll(); + qpid::types::Variant getResult() { return result; } + + private: + std::string name; + qpid::types::Variant result; + + bool matches(const CharSequence& key) + { + return ::strncmp(key.data, name.data(), std::min(key.size, name.size())) == 0; } -}; -} -void Message::allDequeuesComplete() { - ScopedSet ss(callbackLock, inCallback); - MessageCallback* cb = dequeueCallback; - if (cb && *cb) (*cb)(intrusive_ptr<Message>(this)); + template <typename T> void handle(const CharSequence& key, T value) + { + if (matches(key)) result = value; + } +}; } - -void Message::setDequeueCompleteCallback(MessageCallback& cb) { - sys::Mutex::ScopedLock l(callbackLock); - while (inCallback) callbackLock.wait(); - dequeueCallback = &cb; +qpid::types::Variant Message::getProperty(const std::string& key) const +{ + PropertyRetriever r(key); + encoding->processProperties(r); + return r.getResult(); } -void Message::resetDequeueCompleteCallback() { - sys::Mutex::ScopedLock l(callbackLock); - while (inCallback) callbackLock.wait(); - dequeueCallback = 0; +boost::intrusive_ptr<PersistableMessage> Message::getPersistentContext() const +{ + return persistentContext; } -uint8_t Message::getPriority() const { - sys::Mutex::ScopedLock l(lock); - return getAdapter().getPriority(frames); +void Message::processProperties(MapHandler& handler) const +{ + encoding->processProperties(handler); } -bool Message::getIsManagementMessage() const { return isManagementMessage; } -void Message::setIsManagementMessage(bool b) { isManagementMessage = b; } - }} // namespace qpid::broker |