summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Message.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Message.cpp')
-rw-r--r--cpp/src/qpid/broker/Message.cpp175
1 files changed, 111 insertions, 64 deletions
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index 049b583335..11c8c13e12 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -46,14 +46,11 @@ using std::string;
namespace qpid {
namespace broker {
-Message::Message() : deliveryCount(-1), alreadyAcquired(false), publisher(0), expiration(FAR_FUTURE), timestamp(0),
- isManagementMessage(false), replicationId(0)
+Message::Message() : deliveryCount(-1), alreadyAcquired(false), replicationId(0)
{}
-Message::Message(boost::intrusive_ptr<Encoding> e, boost::intrusive_ptr<PersistableMessage> p)
- : encoding(e), persistentContext(p), deliveryCount(-1), alreadyAcquired(false), publisher(0),
- expiration(FAR_FUTURE), timestamp(0), isManagementMessage(false),
- replicationId(0)
+Message::Message(boost::intrusive_ptr<SharedState> e, boost::intrusive_ptr<PersistableMessage> p)
+ : sharedState(e), persistentContext(p), deliveryCount(-1), alreadyAcquired(false), replicationId(0)
{
if (persistentContext) persistentContext->setIngressCompletion(e);
}
@@ -78,7 +75,7 @@ uint64_t Message::getMessageSize() const
boost::intrusive_ptr<AsyncCompletion> Message::getIngressCompletion() const
{
- return encoding;
+ return sharedState;
}
namespace
@@ -106,35 +103,29 @@ void Message::addTraceId(const std::string& id)
{
std::string trace = getEncoding().getAnnotationAsString(X_QPID_TRACE);
if (trace.empty()) {
- annotations[X_QPID_TRACE] = id;
+ addAnnotation(X_QPID_TRACE, id);
} else if (trace.find(id) == std::string::npos) {
trace += ",";
trace += id;
- annotations[X_QPID_TRACE] = trace;
+ addAnnotation(X_QPID_TRACE, trace);
}
- annotationsChanged();
}
void Message::clearTrace()
{
- annotations[X_QPID_TRACE] = std::string();
- annotationsChanged();
+ addAnnotation(X_QPID_TRACE, std::string());
}
uint64_t Message::getTimestamp() const
{
- return encoding ? encoding->getTimestamp() : 0;
+ return sharedState ? sharedState->getTimestamp() : 0;
}
uint64_t Message::getTtl() const
{
uint64_t ttl;
- if (encoding->getTtl(ttl) && expiration < FAR_FUTURE) {
- sys::AbsTime current(
- expiryPolicy ? expiryPolicy->getCurrentTime() : sys::AbsTime::now());
- sys::Duration ttl(current, getExpiration());
- // convert from ns to ms; set to 1 if expired
- return (int64_t(ttl) >= 1000000 ? int64_t(ttl)/1000000 : 1);
+ if (getTtl(ttl, 1)/*set to 1 if expired*/) {
+ return ttl;
} else {
return 0;
}
@@ -142,35 +133,24 @@ uint64_t Message::getTtl() const
bool Message::getTtl(uint64_t& ttl) const
{
- if (encoding->getTtl(ttl) && expiration < FAR_FUTURE) {
- sys::Duration remaining(sys::AbsTime::now(), getExpiration());
- // convert from ns to ms; set to 0 if expired
- ttl = (int64_t(remaining) >= 1000000 ? int64_t(remaining)/1000000 : 0);
- return true;
- } else {
- return false;
- }
+ return getTtl(ttl, 0); //set to 0 if expired
}
-void Message::computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e)
+bool Message::getTtl(uint64_t& ttl, uint64_t expiredValue) const
{
- //TODO: this is still quite 0-10 specific...
- uint64_t ttl;
- if (getEncoding().getTtl(ttl)) {
- if (e) {
- // Use higher resolution time for the internal expiry calculation.
- // Prevent overflow as a signed int64_t
- Duration duration(std::min(ttl * TIME_MSEC,
- (uint64_t) std::numeric_limits<int64_t>::max()));
- expiration = AbsTime(e->getCurrentTime(), duration);
- setExpiryPolicy(e);
- }
+ if (sharedState->getTtl(ttl) && sharedState->getExpiration() < FAR_FUTURE) {
+ sys::Duration remaining = sharedState->getTimeToExpiration();
+ // convert from ns to ms
+ ttl = (int64_t(remaining) >= 1000000 ? int64_t(remaining)/1000000 : expiredValue);
+ return true;
+ } else {
+ return false;
}
}
void Message::addAnnotation(const std::string& key, const qpid::types::Variant& value)
{
- annotations[key] = value;
+ annotations.get()[key] = value;
annotationsChanged();
}
@@ -178,19 +158,15 @@ void Message::annotationsChanged()
{
if (persistentContext) {
uint64_t id = persistentContext->getPersistenceId();
- persistentContext = persistentContext->merge(annotations);
- persistentContext->setIngressCompletion(encoding);
+ persistentContext = persistentContext->merge(getAnnotations());
+ persistentContext->setIngressCompletion(sharedState);
persistentContext->setPersistenceId(id);
}
}
-void Message::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) {
- expiryPolicy = e;
-}
-
bool Message::hasExpired() const
{
- return expiryPolicy && expiryPolicy->hasExpired(*this);
+ return sharedState->hasExpired(*this);
}
uint8_t Message::getPriority() const
@@ -198,12 +174,12 @@ uint8_t Message::getPriority() const
return getEncoding().getPriority();
}
-bool Message::getIsManagementMessage() const { return isManagementMessage; }
-void Message::setIsManagementMessage(bool b) { isManagementMessage = b; }
+bool Message::getIsManagementMessage() const { return sharedState->getIsManagementMessage(); }
-const Connection* Message::getPublisher() const { return publisher; }
-void Message::setPublisher(const Connection& p) { publisher = &p; }
-bool Message::isLocalTo(const OwnershipToken* token) const { return token && publisher && token->isLocal(publisher); }
+const Connection* Message::getPublisher() const { return sharedState->getPublisher(); }
+bool Message::isLocalTo(const OwnershipToken* token) const {
+ return token && sharedState->getPublisher() && token->isLocal(sharedState->getPublisher());
+}
qpid::framing::SequenceNumber Message::getSequence() const
@@ -223,16 +199,20 @@ void Message::setState(MessageState s)
{
state = s;
}
+namespace {
+const qpid::types::Variant::Map EMPTY_MAP;
+}
const qpid::types::Variant::Map& Message::getAnnotations() const
{
- return annotations;
+ return annotations ? *annotations : EMPTY_MAP;
}
qpid::types::Variant Message::getAnnotation(const std::string& key) const
{
- qpid::types::Variant::Map::const_iterator i = annotations.find(key);
- if (i != annotations.end()) return i->second;
+ const qpid::types::Variant::Map& a = getAnnotations();
+ qpid::types::Variant::Map::const_iterator i = a.find(key);
+ if (i != a.end()) return i->second;
//FIXME: modify Encoding interface to allow retrieval of
//annotations of different types from the message data as received
//off the wire
@@ -241,30 +221,30 @@ qpid::types::Variant Message::getAnnotation(const std::string& key) const
std::string Message::getUserId() const
{
- return encoding->getUserId();
+ return sharedState->getUserId();
}
-Message::Encoding& Message::getEncoding()
+Message::SharedState& Message::getSharedState()
{
- return *encoding;
+ return *sharedState;
}
const Message::Encoding& Message::getEncoding() const
{
- return *encoding;
+ return *sharedState;
}
Message::operator bool() const
{
- return !!encoding;
+ return !!sharedState;
}
std::string Message::getContent() const
{
- return encoding->getContent();
+ return sharedState->getContent();
}
std::string Message::getPropertyAsString(const std::string& key) const
{
- return encoding->getPropertyAsString(key);
+ return sharedState->getPropertyAsString(key);
}
namespace {
class PropertyRetriever : public MapHandler
@@ -308,7 +288,7 @@ class PropertyRetriever : public MapHandler
qpid::types::Variant Message::getProperty(const std::string& key) const
{
PropertyRetriever r(key);
- encoding->processProperties(r);
+ sharedState->processProperties(r);
return r.getResult();
}
@@ -319,12 +299,79 @@ boost::intrusive_ptr<PersistableMessage> Message::getPersistentContext() const
void Message::processProperties(MapHandler& handler) const
{
- encoding->processProperties(handler);
+ sharedState->processProperties(handler);
}
uint64_t Message::getReplicationId() const { return replicationId; }
void Message::setReplicationId(framing::SequenceNumber id) { replicationId = id; }
+sys::AbsTime Message::getExpiration() const
+{
+ return sharedState->getExpiration();
+}
+
+Message::SharedStateImpl::SharedStateImpl() : publisher(0), expiration(qpid::sys::FAR_FUTURE), isManagementMessage(false) {}
+
+const Connection* Message::SharedStateImpl::getPublisher() const
+{
+ return publisher;
+}
+
+void Message::SharedStateImpl::setPublisher(const Connection* p)
+{
+ publisher = p;
+}
+
+sys::AbsTime Message::SharedStateImpl::getExpiration() const
+{
+ return expiration;
+}
+
+void Message::SharedStateImpl::setExpiration(sys::AbsTime e)
+{
+ expiration = e;
+}
+
+sys::Duration Message::SharedStateImpl::getTimeToExpiration() const
+{
+ sys::AbsTime current(expiryPolicy ? expiryPolicy->getCurrentTime() : sys::AbsTime::now());
+ return sys::Duration(current, expiration);
+}
+
+void Message::SharedStateImpl::computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e)
+{
+ //TODO: this is still quite 0-10 specific...
+ uint64_t ttl;
+ if (getTtl(ttl)) {
+ if (e) {
+ // Use higher resolution time for the internal expiry calculation.
+ // Prevent overflow as a signed int64_t
+ Duration duration(std::min(ttl * TIME_MSEC,
+ (uint64_t) std::numeric_limits<int64_t>::max()));
+ expiration = AbsTime(e->getCurrentTime(), duration);
+ expiryPolicy = e;
+ }
+ }
+}
+
+bool Message::SharedStateImpl::hasExpired(const Message& m) const
+{
+ return expiryPolicy && expiryPolicy->hasExpired(m);
+}
+
+void Message::SharedStateImpl::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e)
+{
+ expiryPolicy = e;
+}
+
+bool Message::SharedStateImpl::getIsManagementMessage() const
+{
+ return isManagementMessage;
+}
+void Message::SharedStateImpl::setIsManagementMessage(bool b)
+{
+ isManagementMessage = b;
+}
}} // namespace qpid::broker