summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/amqp/MessageEncoder.cpp
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2013-02-28 16:14:30 +0000
committerKim van der Riet <kpvdr@apache.org>2013-02-28 16:14:30 +0000
commit9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919 (patch)
tree2a890e1df09e5b896a9b4168a7b22648f559a1f2 /cpp/src/qpid/amqp/MessageEncoder.cpp
parent172d9b2a16cfb817bbe632d050acba7e31401cd2 (diff)
downloadqpid-python-9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919.tar.gz
Update from trunk r1375509 through r1450773asyncstore
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1451244 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/amqp/MessageEncoder.cpp')
-rw-r--r--cpp/src/qpid/amqp/MessageEncoder.cpp313
1 files changed, 313 insertions, 0 deletions
diff --git a/cpp/src/qpid/amqp/MessageEncoder.cpp b/cpp/src/qpid/amqp/MessageEncoder.cpp
new file mode 100644
index 0000000000..852ad29635
--- /dev/null
+++ b/cpp/src/qpid/amqp/MessageEncoder.cpp
@@ -0,0 +1,313 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/amqp/MessageEncoder.h"
+#include "qpid/amqp/descriptors.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace amqp {
+
+namespace {
+size_t optimisable(const MessageEncoder::Header& msg)
+{
+ if (msg.getDeliveryCount()) return 5;
+ else if (msg.isFirstAcquirer()) return 4;
+ else if (msg.hasTtl()) return 3;
+ else if (msg.getPriority() != 4) return 2;
+ else if (msg.isDurable()) return 1;
+ else return 0;
+}
+
+size_t optimisable(const MessageEncoder::Properties& msg)
+{
+ if (msg.hasReplyToGroupId()) return 13;
+ else if (msg.hasGroupSequence()) return 12;
+ else if (msg.hasGroupId()) return 11;
+ else if (msg.hasCreationTime()) return 10;
+ else if (msg.hasAbsoluteExpiryTime()) return 9;
+ else if (msg.hasContentEncoding()) return 8;
+ else if (msg.hasContentType()) return 7;
+ else if (msg.hasCorrelationId()) return 6;
+ else if (msg.hasReplyTo()) return 5;
+ else if (msg.hasSubject()) return 4;
+ else if (msg.hasTo()) return 3;
+ else if (msg.hasUserId()) return 2;
+ else if (msg.hasMessageId()) return 1;
+ else return 0;
+}
+size_t encodedSize(const std::string& s)
+{
+ size_t total = s.size();
+ if (total > 255) total += 4;
+ else total += 1;
+ return total;
+}
+const std::string BINARY("binary");
+}
+
+void MessageEncoder::writeHeader(const Header& msg)
+{
+ size_t fields(optimise ? optimisable(msg) : 5);
+ if (fields) {
+ void* token = startList8(&qpid::amqp::message::HEADER);
+ writeBoolean(msg.isDurable());
+ if (fields > 1) writeUByte(msg.getPriority());
+
+ if (msg.getTtl()) writeUInt(msg.getTtl());
+ else if (fields > 2) writeNull();
+
+ if (msg.isFirstAcquirer()) writeBoolean(true);
+ else if (fields > 3) writeNull();
+
+ if (msg.getDeliveryCount()) writeUInt(msg.getDeliveryCount());
+ else if (fields > 4) writeNull();
+ endList8(fields, token);
+ }
+}
+
+
+void MessageEncoder::writeProperties(const Properties& msg)
+{
+ size_t fields(optimise ? optimisable(msg) : 13);
+ if (fields) {
+ void* token = startList32(&qpid::amqp::message::PROPERTIES);
+ if (msg.hasMessageId()) writeString(msg.getMessageId());
+ else writeNull();
+
+ if (msg.hasUserId()) writeBinary(msg.getUserId());
+ else if (fields > 1) writeNull();
+
+ if (msg.hasTo()) writeString(msg.getTo());
+ else if (fields > 2) writeNull();
+
+ if (msg.hasSubject()) writeString(msg.getSubject());
+ else if (fields > 3) writeNull();
+
+ if (msg.hasReplyTo()) writeString(msg.getReplyTo());
+ else if (fields > 4) writeNull();
+
+ if (msg.hasCorrelationId()) writeString(msg.getCorrelationId());
+ else if (fields > 5) writeNull();
+
+ if (msg.hasContentType()) writeSymbol(msg.getContentType());
+ else if (fields > 6) writeNull();
+
+ if (msg.hasContentEncoding()) writeSymbol(msg.getContentEncoding());
+ else if (fields > 7) writeNull();
+
+ if (msg.hasAbsoluteExpiryTime()) writeLong(msg.getAbsoluteExpiryTime());
+ else if (fields > 8) writeNull();
+
+ if (msg.hasCreationTime()) writeLong(msg.getCreationTime());
+ else if (fields > 9) writeNull();
+
+ if (msg.hasGroupId()) writeString(msg.getGroupId());
+ else if (fields > 10) writeNull();
+
+ if (msg.hasGroupSequence()) writeUInt(msg.getGroupSequence());
+ else if (fields > 11) writeNull();
+
+ if (msg.hasReplyToGroupId()) writeString(msg.getReplyToGroupId());
+ else if (fields > 12) writeNull();
+
+ endList32(fields, token);
+ }
+}
+
+void MessageEncoder::writeApplicationProperties(const qpid::types::Variant::Map& properties)
+{
+ writeApplicationProperties(properties, !optimise || properties.size()*2 > 255 || getEncodedSizeForElements(properties) > 255);
+}
+
+void MessageEncoder::writeApplicationProperties(const qpid::types::Variant::Map& properties, bool large)
+{
+ writeMap(properties, &qpid::amqp::message::APPLICATION_PROPERTIES, large);
+}
+
+void MessageEncoder::writeMap(const qpid::types::Variant::Map& properties, const Descriptor* d, bool large)
+{
+ void* token = large ? startMap32(d) : startMap8(d);
+ for (qpid::types::Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) {
+ writeString(i->first);
+ switch (i->second.getType()) {
+ case qpid::types::VAR_MAP:
+ case qpid::types::VAR_LIST:
+ //not allowed (TODO: revise, only strictly true for application-properties) whereas this is now a more general method)
+ QPID_LOG(warning, "Ignoring nested map/list; not allowed in application-properties for AMQP 1.0");
+ case qpid::types::VAR_VOID:
+ writeNull();
+ break;
+ case qpid::types::VAR_BOOL:
+ writeBoolean(i->second);
+ break;
+ case qpid::types::VAR_UINT8:
+ writeUByte(i->second);
+ break;
+ case qpid::types::VAR_UINT16:
+ writeUShort(i->second);
+ break;
+ case qpid::types::VAR_UINT32:
+ writeUInt(i->second);
+ break;
+ case qpid::types::VAR_UINT64:
+ writeULong(i->second);
+ break;
+ case qpid::types::VAR_INT8:
+ writeByte(i->second);
+ break;
+ case qpid::types::VAR_INT16:
+ writeShort(i->second);
+ break;
+ case qpid::types::VAR_INT32:
+ writeInt(i->second);
+ break;
+ case qpid::types::VAR_INT64:
+ writeULong(i->second);
+ break;
+ case qpid::types::VAR_FLOAT:
+ writeFloat(i->second);
+ break;
+ case qpid::types::VAR_DOUBLE:
+ writeDouble(i->second);
+ break;
+ case qpid::types::VAR_STRING:
+ if (i->second.getEncoding() == BINARY) {
+ writeBinary(i->second);
+ } else {
+ writeString(i->second);
+ }
+ break;
+ case qpid::types::VAR_UUID:
+ writeUuid(i->second);
+ break;
+ }
+ }
+ if (large) endMap32(properties.size()*2, token);
+ else endMap8(properties.size()*2, token);
+}
+
+size_t MessageEncoder::getEncodedSize(const Header& h, const Properties& p, const qpid::types::Variant::Map& ap, const std::string& d)
+{
+ //NOTE: this does not take optional optimisation into account,
+ //i.e. it is a 'worst case' estimate for required buffer space
+ size_t total(0);
+
+ //header:
+ total += 3/*descriptor*/ + 1/*code*/ + 1/*size*/ + 1/*count*/ + 5/*codes for each field*/;
+ if (h.getPriority() != 4) total += 1;
+ if (h.getDeliveryCount()) total += 4;
+ if (h.hasTtl()) total += 4;
+ return total + getEncodedSize(p, ap, d);
+}
+
+size_t MessageEncoder::getEncodedSize(const Properties& p, const qpid::types::Variant::Map& ap, const std::string& d)
+{
+ //NOTE: this does not take optional optimisation into account,
+ //i.e. it is a 'worst case' estimate for required buffer space
+ size_t total(0);
+
+ //properties:
+ total += 3/*descriptor*/ + 1/*code*/ + 4/*size*/ + 4/*count*/ + 13/*codes for each field*/;
+ if (p.hasMessageId()) total += encodedSize(p.getMessageId());
+ if (p.hasUserId()) total += encodedSize(p.getUserId());
+ if (p.hasTo()) total += encodedSize(p.getTo());
+ if (p.hasSubject()) total += encodedSize(p.getSubject());
+ if (p.hasReplyTo()) total += encodedSize(p.getReplyTo());
+ if (p.hasCorrelationId()) total += encodedSize(p.getCorrelationId());
+ if (p.hasContentType()) total += encodedSize(p.getContentType());
+ if (p.hasContentEncoding()) total += encodedSize(p.getContentEncoding());
+ if (p.hasAbsoluteExpiryTime()) total += 8;
+ if (p.hasCreationTime()) total += 8;
+ if (p.hasGroupId()) total += encodedSize(p.getGroupId());
+ if (p.hasGroupSequence()) total += 4;
+ if (p.hasReplyToGroupId()) total += encodedSize(p.getReplyToGroupId());
+
+
+ //application-properties:
+ total += 3/*descriptor*/ + getEncodedSize(ap, true);
+ //body:
+ if (d.size()) total += 3/*descriptor*/ + 1/*code*/ + encodedSize(d);
+
+ return total;
+}
+
+size_t MessageEncoder::getEncodedSizeForElements(const qpid::types::Variant::Map& map)
+{
+ size_t total = 0;
+ for (qpid::types::Variant::Map::const_iterator i = map.begin(); i != map.end(); ++i) {
+ total += 1/*code*/ + encodedSize(i->first);
+
+ switch (i->second.getType()) {
+ case qpid::types::VAR_MAP:
+ case qpid::types::VAR_LIST:
+ case qpid::types::VAR_VOID:
+ case qpid::types::VAR_BOOL:
+ total += 1;
+ break;
+
+ case qpid::types::VAR_UINT8:
+ case qpid::types::VAR_INT8:
+ total += 2;
+ break;
+
+ case qpid::types::VAR_UINT16:
+ case qpid::types::VAR_INT16:
+ total += 3;
+ break;
+
+ case qpid::types::VAR_UINT32:
+ case qpid::types::VAR_INT32:
+ case qpid::types::VAR_FLOAT:
+ total += 5;
+ break;
+
+ case qpid::types::VAR_UINT64:
+ case qpid::types::VAR_INT64:
+ case qpid::types::VAR_DOUBLE:
+ total += 9;
+ break;
+
+ case qpid::types::VAR_UUID:
+ total += 17;
+ break;
+
+ case qpid::types::VAR_STRING:
+ total += 1/*code*/ + encodedSize(i->second);
+ break;
+ }
+ }
+ return total;
+}
+
+
+size_t MessageEncoder::getEncodedSize(const qpid::types::Variant::Map& map, bool alwaysUseLargeMap)
+{
+ size_t total = getEncodedSizeForElements(map);
+
+ //its not just the count that determines whether we can use a small map, but the aggregate size:
+ if (alwaysUseLargeMap || map.size()*2 > 255 || total > 255) total += 4/*size*/ + 4/*count*/;
+ else total += 1/*size*/ + 1/*count*/;
+
+ total += 1 /*code for map itself*/;
+
+ return total;
+}
+}} // namespace qpid::amqp