diff options
Diffstat (limited to 'cpp/src/qpid/broker/Message.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Message.cpp | 175 |
1 files changed, 111 insertions, 64 deletions
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index 049b583335..11c8c13e12 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -46,14 +46,11 @@ using std::string; namespace qpid { namespace broker { -Message::Message() : deliveryCount(-1), alreadyAcquired(false), publisher(0), expiration(FAR_FUTURE), timestamp(0), - isManagementMessage(false), replicationId(0) +Message::Message() : deliveryCount(-1), alreadyAcquired(false), replicationId(0) {} -Message::Message(boost::intrusive_ptr<Encoding> e, boost::intrusive_ptr<PersistableMessage> p) - : encoding(e), persistentContext(p), deliveryCount(-1), alreadyAcquired(false), publisher(0), - expiration(FAR_FUTURE), timestamp(0), isManagementMessage(false), - replicationId(0) +Message::Message(boost::intrusive_ptr<SharedState> e, boost::intrusive_ptr<PersistableMessage> p) + : sharedState(e), persistentContext(p), deliveryCount(-1), alreadyAcquired(false), replicationId(0) { if (persistentContext) persistentContext->setIngressCompletion(e); } @@ -78,7 +75,7 @@ uint64_t Message::getMessageSize() const boost::intrusive_ptr<AsyncCompletion> Message::getIngressCompletion() const { - return encoding; + return sharedState; } namespace @@ -106,35 +103,29 @@ void Message::addTraceId(const std::string& id) { std::string trace = getEncoding().getAnnotationAsString(X_QPID_TRACE); if (trace.empty()) { - annotations[X_QPID_TRACE] = id; + addAnnotation(X_QPID_TRACE, id); } else if (trace.find(id) == std::string::npos) { trace += ","; trace += id; - annotations[X_QPID_TRACE] = trace; + addAnnotation(X_QPID_TRACE, trace); } - annotationsChanged(); } void Message::clearTrace() { - annotations[X_QPID_TRACE] = std::string(); - annotationsChanged(); + addAnnotation(X_QPID_TRACE, std::string()); } uint64_t Message::getTimestamp() const { - return encoding ? encoding->getTimestamp() : 0; + return sharedState ? sharedState->getTimestamp() : 0; } uint64_t Message::getTtl() const { uint64_t ttl; - if (encoding->getTtl(ttl) && expiration < FAR_FUTURE) { - sys::AbsTime current( - expiryPolicy ? expiryPolicy->getCurrentTime() : sys::AbsTime::now()); - sys::Duration ttl(current, getExpiration()); - // convert from ns to ms; set to 1 if expired - return (int64_t(ttl) >= 1000000 ? int64_t(ttl)/1000000 : 1); + if (getTtl(ttl, 1)/*set to 1 if expired*/) { + return ttl; } else { return 0; } @@ -142,35 +133,24 @@ uint64_t Message::getTtl() const bool Message::getTtl(uint64_t& ttl) const { - if (encoding->getTtl(ttl) && expiration < FAR_FUTURE) { - sys::Duration remaining(sys::AbsTime::now(), getExpiration()); - // convert from ns to ms; set to 0 if expired - ttl = (int64_t(remaining) >= 1000000 ? int64_t(remaining)/1000000 : 0); - return true; - } else { - return false; - } + return getTtl(ttl, 0); //set to 0 if expired } -void Message::computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e) +bool Message::getTtl(uint64_t& ttl, uint64_t expiredValue) const { - //TODO: this is still quite 0-10 specific... - uint64_t ttl; - if (getEncoding().getTtl(ttl)) { - if (e) { - // Use higher resolution time for the internal expiry calculation. - // Prevent overflow as a signed int64_t - Duration duration(std::min(ttl * TIME_MSEC, - (uint64_t) std::numeric_limits<int64_t>::max())); - expiration = AbsTime(e->getCurrentTime(), duration); - setExpiryPolicy(e); - } + if (sharedState->getTtl(ttl) && sharedState->getExpiration() < FAR_FUTURE) { + sys::Duration remaining = sharedState->getTimeToExpiration(); + // convert from ns to ms + ttl = (int64_t(remaining) >= 1000000 ? int64_t(remaining)/1000000 : expiredValue); + return true; + } else { + return false; } } void Message::addAnnotation(const std::string& key, const qpid::types::Variant& value) { - annotations[key] = value; + annotations.get()[key] = value; annotationsChanged(); } @@ -178,19 +158,15 @@ void Message::annotationsChanged() { if (persistentContext) { uint64_t id = persistentContext->getPersistenceId(); - persistentContext = persistentContext->merge(annotations); - persistentContext->setIngressCompletion(encoding); + persistentContext = persistentContext->merge(getAnnotations()); + persistentContext->setIngressCompletion(sharedState); persistentContext->setPersistenceId(id); } } -void Message::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) { - expiryPolicy = e; -} - bool Message::hasExpired() const { - return expiryPolicy && expiryPolicy->hasExpired(*this); + return sharedState->hasExpired(*this); } uint8_t Message::getPriority() const @@ -198,12 +174,12 @@ uint8_t Message::getPriority() const return getEncoding().getPriority(); } -bool Message::getIsManagementMessage() const { return isManagementMessage; } -void Message::setIsManagementMessage(bool b) { isManagementMessage = b; } +bool Message::getIsManagementMessage() const { return sharedState->getIsManagementMessage(); } -const Connection* Message::getPublisher() const { return publisher; } -void Message::setPublisher(const Connection& p) { publisher = &p; } -bool Message::isLocalTo(const OwnershipToken* token) const { return token && publisher && token->isLocal(publisher); } +const Connection* Message::getPublisher() const { return sharedState->getPublisher(); } +bool Message::isLocalTo(const OwnershipToken* token) const { + return token && sharedState->getPublisher() && token->isLocal(sharedState->getPublisher()); +} qpid::framing::SequenceNumber Message::getSequence() const @@ -223,16 +199,20 @@ void Message::setState(MessageState s) { state = s; } +namespace { +const qpid::types::Variant::Map EMPTY_MAP; +} const qpid::types::Variant::Map& Message::getAnnotations() const { - return annotations; + return annotations ? *annotations : EMPTY_MAP; } qpid::types::Variant Message::getAnnotation(const std::string& key) const { - qpid::types::Variant::Map::const_iterator i = annotations.find(key); - if (i != annotations.end()) return i->second; + const qpid::types::Variant::Map& a = getAnnotations(); + qpid::types::Variant::Map::const_iterator i = a.find(key); + if (i != a.end()) return i->second; //FIXME: modify Encoding interface to allow retrieval of //annotations of different types from the message data as received //off the wire @@ -241,30 +221,30 @@ qpid::types::Variant Message::getAnnotation(const std::string& key) const std::string Message::getUserId() const { - return encoding->getUserId(); + return sharedState->getUserId(); } -Message::Encoding& Message::getEncoding() +Message::SharedState& Message::getSharedState() { - return *encoding; + return *sharedState; } const Message::Encoding& Message::getEncoding() const { - return *encoding; + return *sharedState; } Message::operator bool() const { - return !!encoding; + return !!sharedState; } std::string Message::getContent() const { - return encoding->getContent(); + return sharedState->getContent(); } std::string Message::getPropertyAsString(const std::string& key) const { - return encoding->getPropertyAsString(key); + return sharedState->getPropertyAsString(key); } namespace { class PropertyRetriever : public MapHandler @@ -308,7 +288,7 @@ class PropertyRetriever : public MapHandler qpid::types::Variant Message::getProperty(const std::string& key) const { PropertyRetriever r(key); - encoding->processProperties(r); + sharedState->processProperties(r); return r.getResult(); } @@ -319,12 +299,79 @@ boost::intrusive_ptr<PersistableMessage> Message::getPersistentContext() const void Message::processProperties(MapHandler& handler) const { - encoding->processProperties(handler); + sharedState->processProperties(handler); } uint64_t Message::getReplicationId() const { return replicationId; } void Message::setReplicationId(framing::SequenceNumber id) { replicationId = id; } +sys::AbsTime Message::getExpiration() const +{ + return sharedState->getExpiration(); +} + +Message::SharedStateImpl::SharedStateImpl() : publisher(0), expiration(qpid::sys::FAR_FUTURE), isManagementMessage(false) {} + +const Connection* Message::SharedStateImpl::getPublisher() const +{ + return publisher; +} + +void Message::SharedStateImpl::setPublisher(const Connection* p) +{ + publisher = p; +} + +sys::AbsTime Message::SharedStateImpl::getExpiration() const +{ + return expiration; +} + +void Message::SharedStateImpl::setExpiration(sys::AbsTime e) +{ + expiration = e; +} + +sys::Duration Message::SharedStateImpl::getTimeToExpiration() const +{ + sys::AbsTime current(expiryPolicy ? expiryPolicy->getCurrentTime() : sys::AbsTime::now()); + return sys::Duration(current, expiration); +} + +void Message::SharedStateImpl::computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e) +{ + //TODO: this is still quite 0-10 specific... + uint64_t ttl; + if (getTtl(ttl)) { + if (e) { + // Use higher resolution time for the internal expiry calculation. + // Prevent overflow as a signed int64_t + Duration duration(std::min(ttl * TIME_MSEC, + (uint64_t) std::numeric_limits<int64_t>::max())); + expiration = AbsTime(e->getCurrentTime(), duration); + expiryPolicy = e; + } + } +} + +bool Message::SharedStateImpl::hasExpired(const Message& m) const +{ + return expiryPolicy && expiryPolicy->hasExpired(m); +} + +void Message::SharedStateImpl::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) +{ + expiryPolicy = e; +} + +bool Message::SharedStateImpl::getIsManagementMessage() const +{ + return isManagementMessage; +} +void Message::SharedStateImpl::setIsManagementMessage(bool b) +{ + isManagementMessage = b; +} }} // namespace qpid::broker |