summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/amqp/MessageEncoder.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/amqp/MessageEncoder.cpp')
-rw-r--r--qpid/cpp/src/qpid/amqp/MessageEncoder.cpp312
1 files changed, 312 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/amqp/MessageEncoder.cpp b/qpid/cpp/src/qpid/amqp/MessageEncoder.cpp
new file mode 100644
index 0000000000..71e7e75111
--- /dev/null
+++ b/qpid/cpp/src/qpid/amqp/MessageEncoder.cpp
@@ -0,0 +1,312 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/amqp/MessageEncoder.h"
+#include "qpid/amqp/MapEncoder.h"
+#include "qpid/amqp/MapSizeCalculator.h"
+#include "qpid/amqp/descriptors.h"
+#include "qpid/log/Statement.h"
+#include <assert.h>
+
+namespace qpid {
+namespace amqp {
+
+namespace {
+size_t optimisable(const MessageEncoder::Header& msg)
+{
+ if (msg.getDeliveryCount()) return 5;
+ else if (msg.isFirstAcquirer()) return 4;
+ else if (msg.hasTtl()) return 3;
+ else if (msg.getPriority() != 4) return 2;
+ else if (msg.isDurable()) return 1;
+ else return 0;
+}
+
+size_t optimisable(const MessageEncoder::Properties& msg)
+{
+ if (msg.hasReplyToGroupId()) return 13;
+ else if (msg.hasGroupSequence()) return 12;
+ else if (msg.hasGroupId()) return 11;
+ else if (msg.hasCreationTime()) return 10;
+ else if (msg.hasAbsoluteExpiryTime()) return 9;
+ else if (msg.hasContentEncoding()) return 8;
+ else if (msg.hasContentType()) return 7;
+ else if (msg.hasCorrelationId()) return 6;
+ else if (msg.hasReplyTo()) return 5;
+ else if (msg.hasSubject()) return 4;
+ else if (msg.hasTo()) return 3;
+ else if (msg.hasUserId()) return 2;
+ else if (msg.hasMessageId()) return 1;
+ else return 0;
+}
+size_t encodedSize(const std::string& s)
+{
+ size_t total = s.size();
+ if (total > 255) total += 4;
+ else total += 1;
+ return total;
+}
+const std::string BINARY("binary");
+}
+
+void MessageEncoder::writeHeader(const Header& msg)
+{
+ size_t fields(optimise ? optimisable(msg) : 5);
+ if (fields) {
+ void* token = startList8(&qpid::amqp::message::HEADER);
+ writeBoolean(msg.isDurable());
+ if (fields > 1) writeUByte(msg.getPriority());
+
+ if (msg.getTtl()) writeUInt(msg.getTtl());
+ else if (fields > 2) writeNull();
+
+ if (msg.isFirstAcquirer()) writeBoolean(true);
+ else if (fields > 3) writeNull();
+
+ if (msg.getDeliveryCount()) writeUInt(msg.getDeliveryCount());
+ else if (fields > 4) writeNull();
+ endList8(fields, token);
+ }
+}
+
+
+void MessageEncoder::writeProperties(const Properties& msg)
+{
+ size_t fields(optimise ? optimisable(msg) : 13);
+ if (fields) {
+ void* token = startList32(&qpid::amqp::message::PROPERTIES);
+ if (msg.hasMessageId()) writeString(msg.getMessageId());
+ else writeNull();
+
+ if (msg.hasUserId()) writeBinary(msg.getUserId());
+ else if (fields > 1) writeNull();
+
+ if (msg.hasTo()) writeString(msg.getTo());
+ else if (fields > 2) writeNull();
+
+ if (msg.hasSubject()) writeString(msg.getSubject());
+ else if (fields > 3) writeNull();
+
+ if (msg.hasReplyTo()) writeString(msg.getReplyTo());
+ else if (fields > 4) writeNull();
+
+ if (msg.hasCorrelationId()) writeString(msg.getCorrelationId());
+ else if (fields > 5) writeNull();
+
+ if (msg.hasContentType()) writeSymbol(msg.getContentType());
+ else if (fields > 6) writeNull();
+
+ if (msg.hasContentEncoding()) writeSymbol(msg.getContentEncoding());
+ else if (fields > 7) writeNull();
+
+ if (msg.hasAbsoluteExpiryTime()) writeTimestamp(msg.getAbsoluteExpiryTime());
+ else if (fields > 8) writeNull();
+
+ if (msg.hasCreationTime()) writeTimestamp(msg.getCreationTime());
+ else if (fields > 9) writeNull();
+
+ if (msg.hasGroupId()) writeString(msg.getGroupId());
+ else if (fields > 10) writeNull();
+
+ if (msg.hasGroupSequence()) writeUInt(msg.getGroupSequence());
+ else if (fields > 11) writeNull();
+
+ if (msg.hasReplyToGroupId()) writeString(msg.getReplyToGroupId());
+ else if (fields > 12) writeNull();
+
+ endList32(fields, token);
+ }
+}
+
+void MessageEncoder::writeApplicationProperties(const ApplicationProperties& properties)
+{
+ MapSizeCalculator calc;
+ properties.handle(calc);
+ size_t required = calc.getTotalSizeRequired(&qpid::amqp::message::APPLICATION_PROPERTIES);
+ assert(required <= getSize() - getPosition());
+ MapEncoder encoder(skip(required), required);
+ encoder.writeMetaData(calc.getSize(), calc.getCount()*2, &qpid::amqp::message::APPLICATION_PROPERTIES);
+ properties.handle(encoder);
+}
+
+void MessageEncoder::writeApplicationProperties(const qpid::types::Variant::Map& properties)
+{
+ writeApplicationProperties(properties, !optimise || properties.size()*2 > 255 || getEncodedSizeForElements(properties) > 255);
+}
+
+void MessageEncoder::writeApplicationProperties(const qpid::types::Variant::Map& properties, bool large)
+{
+ writeMap(properties, &qpid::amqp::message::APPLICATION_PROPERTIES, large);
+}
+
+size_t MessageEncoder::getEncodedSize(const Header& h, const Properties& p, const qpid::types::Variant::Map& ap, const std::string& d)
+{
+ return getEncodedSize(h) + getEncodedSize(p, ap, d);
+}
+
+size_t MessageEncoder::getEncodedSize(const Header& h, const Properties& p, const ApplicationProperties& ap, const std::string& d)
+{
+ return getEncodedSize(h) + getEncodedSize(p) + getEncodedSize(ap) + getEncodedSizeForContent(d);
+}
+
+size_t MessageEncoder::getEncodedSize(const Properties& p, const qpid::types::Variant::Map& ap, const std::string& d)
+{
+ size_t total(getEncodedSize(p));
+ //application-properties:
+ total += 3/*descriptor*/ + getEncodedSize(ap, true);
+ //body:
+ if (d.size()) total += 3/*descriptor*/ + 1/*code*/ + encodedSize(d);
+
+ return total;
+}
+
+size_t MessageEncoder::getEncodedSizeForContent(const std::string& d)
+{
+ if (d.size()) return 3/*descriptor*/ + 1/*code*/ + encodedSize(d);
+ else return 0;
+}
+
+size_t MessageEncoder::getEncodedSize(const Header& h)
+{
+ //NOTE: this does not take optional optimisation into account,
+ //i.e. it is a 'worst case' estimate for required buffer space
+ size_t total(3/*descriptor*/ + 1/*code*/ + 1/*size*/ + 1/*count*/ + 5/*codes for each field*/);
+ if (h.getPriority() != 4) total += 1;
+ if (h.getDeliveryCount()) total += 4;
+ if (h.hasTtl()) total += 4;
+ return total;
+}
+
+size_t MessageEncoder::getEncodedSize(const Properties& p)
+{
+ //NOTE: this does not take optional optimisation into account,
+ //i.e. it is a 'worst case' estimate for required buffer space
+ size_t total(3/*descriptor*/ + 1/*code*/ + 4/*size*/ + 4/*count*/ + 13/*codes for each field*/);
+ if (p.hasMessageId()) total += encodedSize(p.getMessageId());
+ if (p.hasUserId()) total += encodedSize(p.getUserId());
+ if (p.hasTo()) total += encodedSize(p.getTo());
+ if (p.hasSubject()) total += encodedSize(p.getSubject());
+ if (p.hasReplyTo()) total += encodedSize(p.getReplyTo());
+ if (p.hasCorrelationId()) total += encodedSize(p.getCorrelationId());
+ if (p.hasContentType()) total += encodedSize(p.getContentType());
+ if (p.hasContentEncoding()) total += encodedSize(p.getContentEncoding());
+ if (p.hasAbsoluteExpiryTime()) total += 8;
+ if (p.hasCreationTime()) total += 8;
+ if (p.hasGroupId()) total += encodedSize(p.getGroupId());
+ if (p.hasGroupSequence()) total += 4;
+ if (p.hasReplyToGroupId()) total += encodedSize(p.getReplyToGroupId());
+ return total;
+}
+
+size_t MessageEncoder::getEncodedSize(const ApplicationProperties& p)
+{
+ MapSizeCalculator calc;
+ p.handle(calc);
+ return calc.getTotalSizeRequired(&qpid::amqp::message::APPLICATION_PROPERTIES);
+}
+
+size_t MessageEncoder::getEncodedSizeForElements(const qpid::types::Variant::Map& map)
+{
+ size_t total = 0;
+ for (qpid::types::Variant::Map::const_iterator i = map.begin(); i != map.end(); ++i) {
+ total += 1/*code*/ + encodedSize(i->first) + getEncodedSizeForValue(i->second);
+ }
+ return total;
+}
+
+size_t MessageEncoder::getEncodedSizeForValue(const qpid::types::Variant& value)
+{
+ size_t total = 0;
+ switch (value.getType()) {
+ case qpid::types::VAR_MAP:
+ total += getEncodedSize(value.asMap(), true);
+ break;
+ case qpid::types::VAR_LIST:
+ total += getEncodedSize(value.asList(), true);
+ break;
+
+ case qpid::types::VAR_VOID:
+ case qpid::types::VAR_BOOL:
+ total += 1;
+ break;
+
+ case qpid::types::VAR_UINT8:
+ case qpid::types::VAR_INT8:
+ total += 2;
+ break;
+
+ case qpid::types::VAR_UINT16:
+ case qpid::types::VAR_INT16:
+ total += 3;
+ break;
+
+ case qpid::types::VAR_UINT32:
+ case qpid::types::VAR_INT32:
+ case qpid::types::VAR_FLOAT:
+ total += 5;
+ break;
+
+ case qpid::types::VAR_UINT64:
+ case qpid::types::VAR_INT64:
+ case qpid::types::VAR_DOUBLE:
+ total += 9;
+ break;
+
+ case qpid::types::VAR_UUID:
+ total += 17;
+ break;
+
+ case qpid::types::VAR_STRING:
+ total += 1/*code*/ + encodedSize(value.getString());
+ break;
+ }
+ return total;
+}
+
+
+size_t MessageEncoder::getEncodedSize(const qpid::types::Variant::Map& map, bool alwaysUseLargeMap)
+{
+ size_t total = getEncodedSizeForElements(map);
+
+ //its not just the count that determines whether we can use a small map, but the aggregate size:
+ if (alwaysUseLargeMap || map.size()*2 > 255 || total > 255) total += 4/*size*/ + 4/*count*/;
+ else total += 1/*size*/ + 1/*count*/;
+
+ total += 1 /*code for map itself*/;
+
+ return total;
+}
+
+size_t MessageEncoder::getEncodedSize(const qpid::types::Variant::List& list, bool alwaysUseLargeList)
+{
+ size_t total(0);
+ for (qpid::types::Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) {
+ total += getEncodedSizeForValue(*i);
+ }
+
+ //its not just the count that determines whether we can use a small list, but the aggregate size:
+ if (alwaysUseLargeList || list.size()*2 > 255 || total > 255) total += 4/*size*/ + 4/*count*/;
+ else total += 1/*size*/ + 1/*count*/;
+
+ total += 1 /*code for list itself*/;
+
+ return total;
+}
+}} // namespace qpid::amqp