summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/Message.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Message.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/Message.cpp368
1 files changed, 368 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp
new file mode 100644
index 0000000000..250acf6b4e
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/Message.cpp
@@ -0,0 +1,368 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/broker/Message.h"
+
+#include "qpid/amqp/CharSequence.h"
+#include "qpid/amqp/MapHandler.h"
+#include "qpid/broker/Connection.h"
+#include "qpid/broker/OwnershipToken.h"
+#include "qpid/management/ManagementObject.h"
+#include "qpid/management/Manageable.h"
+#include "qpid/StringUtils.h"
+#include "qpid/log/Statement.h"
+#include "qpid/assert.h"
+
+#include <algorithm>
+#include <string.h>
+#include <time.h>
+
+using boost::intrusive_ptr;
+using qpid::sys::AbsTime;
+using qpid::sys::Duration;
+using qpid::sys::TIME_MSEC;
+using qpid::sys::FAR_FUTURE;
+using qpid::amqp::CharSequence;
+using qpid::amqp::MapHandler;
+using std::string;
+
+namespace qpid {
+namespace broker {
+
+Message::Message() : deliveryCount(-1), alreadyAcquired(false), replicationId(0), isReplicationIdSet(false)
+{}
+
+Message::Message(boost::intrusive_ptr<SharedState> e, boost::intrusive_ptr<PersistableMessage> p)
+ : sharedState(e), persistentContext(p), deliveryCount(-1), alreadyAcquired(false), replicationId(0), isReplicationIdSet(false)
+{
+ if (persistentContext) persistentContext->setIngressCompletion(e);
+}
+
+Message::~Message() {}
+
+
+std::string Message::getRoutingKey() const
+{
+ return getEncoding().getRoutingKey();
+}
+
+bool Message::isPersistent() const
+{
+ return getEncoding().isPersistent();
+}
+
+uint64_t Message::getMessageSize() const
+{
+ return getEncoding().getMessageSize();
+}
+
+boost::intrusive_ptr<AsyncCompletion> Message::getIngressCompletion() const
+{
+ return sharedState;
+}
+
+namespace
+{
+const std::string X_QPID_TRACE("x-qpid.trace");
+}
+
+bool Message::isExcluded(const std::vector<std::string>& excludes) const
+{
+ std::string traceStr = getEncoding().getAnnotationAsString(X_QPID_TRACE);
+ if (traceStr.size()) {
+ std::vector<std::string> trace = split(traceStr, ", ");
+ for (std::vector<std::string>::const_iterator i = excludes.begin(); i != excludes.end(); i++) {
+ for (std::vector<std::string>::const_iterator j = trace.begin(); j != trace.end(); j++) {
+ if (*i == *j) {
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+}
+
+void Message::addTraceId(const std::string& id)
+{
+ std::string trace = getEncoding().getAnnotationAsString(X_QPID_TRACE);
+ if (trace.empty()) {
+ addAnnotation(X_QPID_TRACE, id);
+ } else if (trace.find(id) == std::string::npos) {
+ trace += ",";
+ trace += id;
+ addAnnotation(X_QPID_TRACE, trace);
+ }
+}
+
+void Message::clearTrace()
+{
+ addAnnotation(X_QPID_TRACE, std::string());
+}
+
+uint64_t Message::getTimestamp() const
+{
+ return sharedState ? sharedState->getTimestamp() : 0;
+}
+
+uint64_t Message::getTtl() const
+{
+ uint64_t ttl;
+ if (getTtl(ttl, 1)/*set to 1 if expired*/) {
+ return ttl;
+ } else {
+ return 0;
+ }
+}
+
+bool Message::getTtl(uint64_t& ttl) const
+{
+ return getTtl(ttl, 0); //set to 0 if expired
+}
+
+bool Message::getTtl(uint64_t& ttl, uint64_t expiredValue) const
+{
+ if (sharedState->getTtl(ttl) && sharedState->getExpiration() < FAR_FUTURE) {
+ sys::Duration remaining = sharedState->getTimeToExpiration();
+ // convert from ns to ms
+ ttl = (int64_t(remaining) >= 1000000 ? int64_t(remaining)/1000000 : expiredValue);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+void Message::addAnnotation(const std::string& key, const qpid::types::Variant& value)
+{
+ annotations.get()[key] = value;
+ annotationsChanged();
+}
+
+void Message::annotationsChanged()
+{
+ if (persistentContext) {
+ uint64_t id = persistentContext->getPersistenceId();
+ persistentContext = persistentContext->merge(getAnnotations());
+ persistentContext->setIngressCompletion(sharedState);
+ persistentContext->setPersistenceId(id);
+ }
+}
+
+uint8_t Message::getPriority() const
+{
+ return getEncoding().getPriority();
+}
+
+bool Message::getIsManagementMessage() const { return sharedState->getIsManagementMessage(); }
+
+const Connection* Message::getPublisher() const { return sharedState->getPublisher(); }
+bool Message::isLocalTo(const OwnershipToken* token) const {
+ return token && sharedState->getPublisher() && token->isLocal(sharedState->getPublisher());
+}
+
+
+qpid::framing::SequenceNumber Message::getSequence() const
+{
+ return sequence;
+}
+void Message::setSequence(const qpid::framing::SequenceNumber& s)
+{
+ sequence = s;
+}
+
+MessageState Message::getState() const
+{
+ return state;
+}
+void Message::setState(MessageState s)
+{
+ state = s;
+}
+namespace {
+const qpid::types::Variant::Map EMPTY_MAP;
+}
+
+const qpid::types::Variant::Map& Message::getAnnotations() const
+{
+ return annotations ? *annotations : EMPTY_MAP;
+}
+
+qpid::types::Variant Message::getAnnotation(const std::string& key) const
+{
+ const qpid::types::Variant::Map& a = getAnnotations();
+ qpid::types::Variant::Map::const_iterator i = a.find(key);
+ if (i != a.end()) return i->second;
+ //FIXME: modify Encoding interface to allow retrieval of
+ //annotations of different types from the message data as received
+ //off the wire
+ return qpid::types::Variant(getEncoding().getAnnotationAsString(key));
+}
+
+std::string Message::getUserId() const
+{
+ return sharedState->getUserId();
+}
+
+Message::SharedState& Message::getSharedState()
+{
+ return *sharedState;
+}
+const Message::Encoding& Message::getEncoding() const
+{
+ return *sharedState;
+}
+Message::operator bool() const
+{
+ return !!sharedState;
+}
+
+std::string Message::getContent() const
+{
+ return sharedState->getContent();
+}
+
+std::string Message::getPropertyAsString(const std::string& key) const
+{
+ return sharedState->getPropertyAsString(key);
+}
+namespace {
+class PropertyRetriever : public MapHandler
+{
+ public:
+ PropertyRetriever(const std::string& key) : name(key) {}
+ void handleVoid(const CharSequence&) {}
+ void handleBool(const CharSequence& key, bool value) { handle(key, value); }
+ void handleUint8(const CharSequence& key, uint8_t value) { handle(key, value); }
+ void handleUint16(const CharSequence& key, uint16_t value) { handle(key, value); }
+ void handleUint32(const CharSequence& key, uint32_t value) { handle(key, value); }
+ void handleUint64(const CharSequence& key, uint64_t value) { handle(key, value); }
+ void handleInt8(const CharSequence& key, int8_t value) { handle(key, value); }
+ void handleInt16(const CharSequence& key, int16_t value) { handle(key, value); }
+ void handleInt32(const CharSequence& key, int32_t value) { handle(key, value); }
+ void handleInt64(const CharSequence& key, int64_t value) { handle(key, value); }
+ void handleFloat(const CharSequence& key, float value) { handle(key, value); }
+ void handleDouble(const CharSequence& key, double value) { handle(key, value); }
+ void handleString(const CharSequence& key, const CharSequence& value, const CharSequence& /*encoding*/)
+ {
+ if (matches(key)) result = std::string(value.data, value.size);
+ }
+ qpid::types::Variant getResult() { return result; }
+
+ private:
+ std::string name;
+ qpid::types::Variant result;
+
+ bool matches(const CharSequence& key)
+ {
+ return name.size()==key.size &&
+ ::strncmp(key.data, name.data(), key.size) == 0;
+ }
+
+ template <typename T> void handle(const CharSequence& key, T value)
+ {
+ if (matches(key)) result = value;
+ }
+};
+}
+qpid::types::Variant Message::getProperty(const std::string& key) const
+{
+ PropertyRetriever r(key);
+ sharedState->processProperties(r);
+ return r.getResult();
+}
+
+boost::intrusive_ptr<PersistableMessage> Message::getPersistentContext() const
+{
+ return persistentContext;
+}
+
+void Message::processProperties(MapHandler& handler) const
+{
+ sharedState->processProperties(handler);
+}
+
+bool Message::hasReplicationId() const {
+ return isReplicationIdSet;
+}
+
+uint64_t Message::getReplicationId() const {
+ return replicationId;
+}
+
+void Message::setReplicationId(framing::SequenceNumber id) {
+ replicationId = id;
+ isReplicationIdSet = true;
+}
+
+sys::AbsTime Message::getExpiration() const
+{
+ return sharedState->getExpiration();
+}
+
+Message::SharedStateImpl::SharedStateImpl() : publisher(0), expiration(qpid::sys::FAR_FUTURE), isManagementMessage(false) {}
+
+const Connection* Message::SharedStateImpl::getPublisher() const
+{
+ return publisher;
+}
+
+void Message::SharedStateImpl::setPublisher(const Connection* p)
+{
+ publisher = p;
+}
+
+sys::AbsTime Message::SharedStateImpl::getExpiration() const
+{
+ return expiration;
+}
+
+void Message::SharedStateImpl::setExpiration(sys::AbsTime e)
+{
+ expiration = e;
+}
+
+sys::Duration Message::SharedStateImpl::getTimeToExpiration() const
+{
+ return sys::Duration(sys::AbsTime::now(), expiration);
+}
+
+void Message::SharedStateImpl::computeExpiration()
+{
+ //TODO: this is still quite 0-10 specific...
+ uint64_t ttl;
+ if (getTtl(ttl)) {
+ // Use higher resolution time for the internal expiry calculation.
+ // Prevent overflow as a signed int64_t
+ Duration duration(std::min(ttl * TIME_MSEC,
+ (uint64_t) std::numeric_limits<int64_t>::max()));
+ expiration = AbsTime(sys::AbsTime::now(), duration);
+ }
+}
+
+bool Message::SharedStateImpl::getIsManagementMessage() const
+{
+ return isManagementMessage;
+}
+void Message::SharedStateImpl::setIsManagementMessage(bool b)
+{
+ isManagementMessage = b;
+}
+
+}} // namespace qpid::broker