summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/messaging/amqp/EncodedMessage.cpp')
-rw-r--r--cpp/src/qpid/messaging/amqp/EncodedMessage.cpp263
1 files changed, 263 insertions, 0 deletions
diff --git a/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp b/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp
new file mode 100644
index 0000000000..54de3eae45
--- /dev/null
+++ b/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp
@@ -0,0 +1,263 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/messaging/amqp/EncodedMessage.h"
+#include "qpid/messaging/Address.h"
+#include "qpid/messaging/MessageImpl.h"
+#include "qpid/amqp/Decoder.h"
+#include <boost/lexical_cast.hpp>
+#include <string.h>
+
+namespace qpid {
+namespace messaging {
+namespace amqp {
+
+using namespace qpid::amqp;
+
+EncodedMessage::EncodedMessage(size_t s) : size(s), data(size ? new char[size] : 0)
+{
+ init();
+}
+
+EncodedMessage::EncodedMessage() : size(0), data(0)
+{
+ init();
+}
+
+EncodedMessage::EncodedMessage(const EncodedMessage& other) : size(other.size), data(size ? new char[size] : 0)
+{
+ init();
+}
+
+void EncodedMessage::init()
+{
+ //init all CharSequence members
+ deliveryAnnotations.init();
+ messageAnnotations.init();
+ userId.init();
+ to.init();
+ subject.init();
+ replyTo.init();
+ contentType.init();
+ contentEncoding.init();
+ groupId.init();
+ replyToGroupId.init();
+ applicationProperties.init();
+ body.init();
+ footer.init();
+}
+
+EncodedMessage::~EncodedMessage()
+{
+ delete[] data;
+}
+
+size_t EncodedMessage::getSize() const
+{
+ return size;
+}
+void EncodedMessage::trim(size_t t)
+{
+ size = t;
+}
+void EncodedMessage::resize(size_t s)
+{
+ delete[] data;
+ size = s;
+ data = new char[size];
+}
+
+char* EncodedMessage::getData()
+{
+ return data;
+}
+const char* EncodedMessage::getData() const
+{
+ return data;
+}
+
+void EncodedMessage::init(qpid::messaging::MessageImpl& impl)
+{
+ //initial scan of raw data
+ qpid::amqp::Decoder decoder(data, size);
+ InitialScan reader(*this, impl);
+ decoder.read(reader);
+ bareMessage = reader.getBareMessage();
+ if (bareMessage.data && !bareMessage.size) {
+ bareMessage.size = (data + size) - bareMessage.data;
+ }
+
+}
+void EncodedMessage::populate(qpid::types::Variant::Map& map) const
+{
+ //decode application properties
+ if (applicationProperties) {
+ qpid::amqp::Decoder decoder(applicationProperties.data, applicationProperties.size);
+ decoder.readMap(map);
+ }
+ //add in 'x-amqp-' prefixed values
+ if (!!firstAcquirer) {
+ map["x-amqp-first-acquirer"] = firstAcquirer.get();
+ }
+ if (!!deliveryCount) {
+ map["x-amqp-delivery-count"] = deliveryCount.get();
+ }
+ if (to) {
+ map["x-amqp-delivery-count"] = to.str();
+ }
+ if (!!absoluteExpiryTime) {
+ map["x-amqp-absolute-expiry-time"] = absoluteExpiryTime.get();
+ }
+ if (!!creationTime) {
+ map["x-amqp-creation-time"] = creationTime.get();
+ }
+ if (groupId) {
+ map["x-amqp-group-id"] = groupId.str();
+ }
+ if (!!groupSequence) {
+ map["x-amqp-qroup-sequence"] = groupSequence.get();
+ }
+ if (replyToGroupId) {
+ map["x-amqp-reply-to-group-id"] = replyToGroupId.str();
+ }
+ //add in any annotations
+ if (deliveryAnnotations) {
+ qpid::types::Variant::Map& annotations = map["x-amqp-delivery-annotations"].asMap();
+ qpid::amqp::Decoder decoder(deliveryAnnotations.data, deliveryAnnotations.size);
+ decoder.readMap(annotations);
+ }
+ if (messageAnnotations) {
+ qpid::types::Variant::Map& annotations = map["x-amqp-message-annotations"].asMap();
+ qpid::amqp::Decoder decoder(messageAnnotations.data, messageAnnotations.size);
+ decoder.readMap(annotations);
+ }
+}
+qpid::amqp::CharSequence EncodedMessage::getBareMessage() const
+{
+ return bareMessage;
+}
+
+void EncodedMessage::getReplyTo(qpid::messaging::Address& a) const
+{
+ a = qpid::messaging::Address(replyTo.str());
+}
+void EncodedMessage::getSubject(std::string& s) const
+{
+ s.assign(subject.data, subject.size);
+}
+void EncodedMessage::getContentType(std::string& s) const
+{
+ s.assign(contentType.data, contentType.size);
+}
+void EncodedMessage::getUserId(std::string& s) const
+{
+ s.assign(userId.data, userId.size);
+}
+void EncodedMessage::getMessageId(std::string& s) const
+{
+ messageId.assign(s);
+}
+void EncodedMessage::getCorrelationId(std::string& s) const
+{
+ correlationId.assign(s);
+}
+void EncodedMessage::getBody(std::string& s) const
+{
+ s.assign(body.data, body.size);
+}
+
+qpid::amqp::CharSequence EncodedMessage::getBody() const
+{
+ return body;
+}
+
+bool EncodedMessage::hasHeaderChanged(const qpid::messaging::MessageImpl& msg) const
+{
+ if (!durable) {
+ if (msg.isDurable()) return true;
+ } else {
+ if (durable.get() != msg.isDurable()) return true;
+ }
+
+ if (!priority) {
+ if (msg.getPriority() != 4) return true;
+ } else {
+ if (priority.get() != msg.getPriority()) return true;
+ }
+
+ if (msg.getTtl() && (!ttl || msg.getTtl() != ttl.get())) {
+ return true;
+ }
+
+ //first-acquirer can't be changed via Message interface as yet
+
+ if (msg.isRedelivered() && (!deliveryCount || deliveryCount.get() == 0)) {
+ return true;
+ }
+
+ return false;
+}
+
+
+EncodedMessage::InitialScan::InitialScan(EncodedMessage& e, qpid::messaging::MessageImpl& m) : em(e), mi(m)
+{
+ //set up defaults as needed:
+ mi.setPriority(4);
+}
+//header:
+void EncodedMessage::InitialScan::onDurable(bool b) { mi.setDurable(b); em.durable = b; }
+void EncodedMessage::InitialScan::onPriority(uint8_t i) { mi.setPriority(i); em.priority = i; }
+void EncodedMessage::InitialScan::onTtl(uint32_t i) { mi.setTtl(i); em.ttl = i; }
+void EncodedMessage::InitialScan::onFirstAcquirer(bool b) { em.firstAcquirer = b; }
+void EncodedMessage::InitialScan::onDeliveryCount(uint32_t i)
+{
+ mi.setRedelivered(i);
+ em.deliveryCount = i;
+}
+
+//properties:
+void EncodedMessage::InitialScan::onMessageId(uint64_t v) { em.messageId.set(v); }
+void EncodedMessage::InitialScan::onMessageId(const qpid::amqp::CharSequence& v, qpid::types::VariantType t) { em.messageId.set(v, t); }
+void EncodedMessage::InitialScan::onUserId(const qpid::amqp::CharSequence& v) { em.userId = v; }
+void EncodedMessage::InitialScan::onTo(const qpid::amqp::CharSequence& v) { em.to = v; }
+void EncodedMessage::InitialScan::onSubject(const qpid::amqp::CharSequence& v) { em.subject = v; }
+void EncodedMessage::InitialScan::onReplyTo(const qpid::amqp::CharSequence& v) { em.replyTo = v;}
+void EncodedMessage::InitialScan::onCorrelationId(uint64_t v) { em.correlationId.set(v); }
+void EncodedMessage::InitialScan::onCorrelationId(const qpid::amqp::CharSequence& v, qpid::types::VariantType t) { em.correlationId.set(v, t); }
+void EncodedMessage::InitialScan::onContentType(const qpid::amqp::CharSequence& v) { em.contentType = v; }
+void EncodedMessage::InitialScan::onContentEncoding(const qpid::amqp::CharSequence& v) { em.contentEncoding = v; }
+void EncodedMessage::InitialScan::onAbsoluteExpiryTime(int64_t i) { em.absoluteExpiryTime = i; }
+void EncodedMessage::InitialScan::onCreationTime(int64_t i) { em.creationTime = i; }
+void EncodedMessage::InitialScan::onGroupId(const qpid::amqp::CharSequence& v) { em.groupId = v; }
+void EncodedMessage::InitialScan::onGroupSequence(uint32_t i) { em.groupSequence = i; }
+void EncodedMessage::InitialScan::onReplyToGroupId(const qpid::amqp::CharSequence& v) { em.replyToGroupId = v; }
+
+void EncodedMessage::InitialScan::onApplicationProperties(const qpid::amqp::CharSequence& v) { em.applicationProperties = v; }
+void EncodedMessage::InitialScan::onDeliveryAnnotations(const qpid::amqp::CharSequence& v) { em.deliveryAnnotations = v; }
+void EncodedMessage::InitialScan::onMessageAnnotations(const qpid::amqp::CharSequence& v) { em.messageAnnotations = v; }
+void EncodedMessage::InitialScan::onBody(const qpid::amqp::CharSequence& v, const qpid::amqp::Descriptor&)
+{
+ //TODO: how to communicate the type, i.e. descriptor?
+ em.body = v;
+}
+void EncodedMessage::InitialScan::onBody(const qpid::types::Variant&, const qpid::amqp::Descriptor&) {}
+void EncodedMessage::InitialScan::onFooter(const qpid::amqp::CharSequence& v) { em.footer = v; }
+
+}}} // namespace qpid::messaging::amqp