summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2013-06-04 17:05:38 +0000
committerGordon Sim <gsim@apache.org>2013-06-04 17:05:38 +0000
commit4402f8238b82cf854f8bf6031b18afbdc56b56df (patch)
tree212d3c21ba7bc9b14dfcab9bc422e5da0dfb9494
parent5566d2a9185ade49b5a9e12fc222b8abd73b5fb9 (diff)
downloadqpid-python-4402f8238b82cf854f8bf6031b18afbdc56b56df.tar.gz
QPID-4707: Set AMQP 1.0 fields on outgoing messages based on special property keys
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1489519 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/CMakeLists.txt5
-rw-r--r--cpp/src/Makefile.am6
-rw-r--r--cpp/src/qpid/amqp/Descriptor.cpp15
-rw-r--r--cpp/src/qpid/amqp/Descriptor.h1
-rw-r--r--cpp/src/qpid/amqp/Encoder.cpp3
-rw-r--r--cpp/src/qpid/amqp/Encoder.h3
-rw-r--r--cpp/src/qpid/amqp/MapEncoder.cpp142
-rw-r--r--cpp/src/qpid/amqp/MapEncoder.h58
-rw-r--r--cpp/src/qpid/amqp/MapHandler.h (renamed from cpp/src/qpid/broker/MapHandler.h)15
-rw-r--r--cpp/src/qpid/amqp/MapSizeCalculator.cpp151
-rw-r--r--cpp/src/qpid/amqp/MapSizeCalculator.h73
-rw-r--r--cpp/src/qpid/amqp/MessageEncoder.cpp69
-rw-r--r--cpp/src/qpid/amqp/MessageEncoder.h21
-rw-r--r--cpp/src/qpid/broker/HeadersExchange.cpp29
-rw-r--r--cpp/src/qpid/broker/Message.cpp4
-rw-r--r--cpp/src/qpid/broker/Message.h8
-rw-r--r--cpp/src/qpid/broker/Selector.cpp4
-rw-r--r--cpp/src/qpid/broker/amqp/Message.cpp3
-rw-r--r--cpp/src/qpid/broker/amqp/Message.h2
-rw-r--r--cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp10
-rw-r--r--cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h2
-rw-r--r--cpp/src/qpid/messaging/amqp/SenderContext.cpp153
-rw-r--r--cpp/src/qpid/xml/XmlExchange.cpp30
23 files changed, 711 insertions, 96 deletions
diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt
index 2cc2a6fece..e00d7a0b7c 100644
--- a/cpp/src/CMakeLists.txt
+++ b/cpp/src/CMakeLists.txt
@@ -965,6 +965,11 @@ set (qpidcommon_SOURCES
qpid/amqp/Descriptor.cpp
qpid/amqp/Encoder.h
qpid/amqp/Encoder.cpp
+ qpid/amqp/MapHandler.h
+ qpid/amqp/MapEncoder.h
+ qpid/amqp/MapEncoder.cpp
+ qpid/amqp/MapSizeCalculator.h
+ qpid/amqp/MapSizeCalculator.cpp
qpid/amqp/MapReader.h
qpid/amqp/MapReader.cpp
qpid/amqp/MessageEncoder.h
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index ac5f2581fa..2a52403c55 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -537,6 +537,11 @@ libqpidcommon_la_SOURCES += \
qpid/amqp/Encoder.cpp \
qpid/amqp/ListReader.h \
qpid/amqp/LoggingReader.h \
+ qpid/amqp/MapHandler.h \
+ qpid/amqp/MapEncoder.h \
+ qpid/amqp/MapEncoder.cpp \
+ qpid/amqp/MapSizeCalculator.h \
+ qpid/amqp/MapSizeCalculator.cpp \
qpid/amqp/MapReader.h \
qpid/amqp/MapReader.cpp \
qpid/amqp/MessageEncoder.h \
@@ -644,7 +649,6 @@ libqpidbroker_la_SOURCES = \
qpid/broker/LinkRegistry.h \
qpid/broker/Lvq.h \
qpid/broker/Lvq.cpp \
- qpid/broker/MapHandler.h \
qpid/broker/Message.cpp \
qpid/broker/Message.h \
qpid/broker/MessageAdapter.cpp \
diff --git a/cpp/src/qpid/amqp/Descriptor.cpp b/cpp/src/qpid/amqp/Descriptor.cpp
index 087e87c5e6..1d2df01e6e 100644
--- a/cpp/src/qpid/amqp/Descriptor.cpp
+++ b/cpp/src/qpid/amqp/Descriptor.cpp
@@ -35,6 +35,21 @@ bool Descriptor::match(const std::string& symbol, uint64_t code) const
return false;
}
+size_t Descriptor::getSize() const
+{
+ size_t size = 1/*descriptor indicator*/ + 1/*type code*/;
+ switch (type) {
+ case Descriptor::NUMERIC:
+ if (value.code > 0) size += value.code < 256 ? 1/*encode as byte*/ : 8/*encode as long*/;
+ //else value will be indicated through ULONG_ZERO typecode
+ break;
+ case Descriptor::SYMBOLIC:
+ size += value.symbol.size < 256? 1/*size field is a byte*/ : 4/*size field is an int*/;
+ size += value.symbol.size;
+ break;
+ }
+ return size;
+}
std::ostream& operator<<(std::ostream& os, const Descriptor& d)
{
diff --git a/cpp/src/qpid/amqp/Descriptor.h b/cpp/src/qpid/amqp/Descriptor.h
index c36aa38ee3..789f4dcc63 100644
--- a/cpp/src/qpid/amqp/Descriptor.h
+++ b/cpp/src/qpid/amqp/Descriptor.h
@@ -45,6 +45,7 @@ struct Descriptor
Descriptor(uint64_t code);
Descriptor(const CharSequence& symbol);
bool match(const std::string&, uint64_t) const;
+ size_t getSize() const;
};
std::ostream& operator<<(std::ostream& os, const Descriptor& d);
diff --git a/cpp/src/qpid/amqp/Encoder.cpp b/cpp/src/qpid/amqp/Encoder.cpp
index 6599f70811..549b6d1e4e 100644
--- a/cpp/src/qpid/amqp/Encoder.cpp
+++ b/cpp/src/qpid/amqp/Encoder.cpp
@@ -353,7 +353,6 @@ void Encoder::endMap32(uint32_t count, void* token)
end<uint32_t>(count, token, data+position);
}
-
void* Encoder::startArray8(const Constructor& c, const Descriptor* d)
{
return startArray<uint8_t>(typecodes::ARRAY8, d, c);
@@ -397,6 +396,8 @@ void Encoder::check(size_t s)
}
Encoder::Encoder(char* d, size_t s) : data(d), size(s), position(0) {}
size_t Encoder::getPosition() { return position; }
+size_t Encoder::getSize() const { return size; }
+char* Encoder::getData() { return data + position; }
void Encoder::resetPosition(size_t p) { assert(p <= size); position = p; }
}} // namespace qpid::amqp
diff --git a/cpp/src/qpid/amqp/Encoder.h b/cpp/src/qpid/amqp/Encoder.h
index e2938a002a..c661e4ac5d 100644
--- a/cpp/src/qpid/amqp/Encoder.h
+++ b/cpp/src/qpid/amqp/Encoder.h
@@ -98,6 +98,9 @@ class Encoder
char* skip(size_t);
void writeBytes(const char* bytes, size_t count);
virtual ~Encoder() {}
+ size_t getSize() const;
+ protected:
+ char* getData();
private:
char* data;
size_t size;
diff --git a/cpp/src/qpid/amqp/MapEncoder.cpp b/cpp/src/qpid/amqp/MapEncoder.cpp
new file mode 100644
index 0000000000..cf8ef4ecb5
--- /dev/null
+++ b/cpp/src/qpid/amqp/MapEncoder.cpp
@@ -0,0 +1,142 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "MapEncoder.h"
+#include "CharSequence.h"
+#include "qpid/amqp/typecodes.h"
+#include <string.h>
+
+namespace qpid {
+namespace amqp {
+
+MapEncoder::MapEncoder(char* data, size_t size) : Encoder(data, size) {}
+
+void MapEncoder::handleVoid(const CharSequence& key)
+{
+ writeString(key);
+ writeNull();
+}
+
+void MapEncoder::handleBool(const CharSequence& key, bool value)
+{
+ writeString(key);
+ writeBoolean(value);
+}
+
+void MapEncoder::handleUint8(const CharSequence& key, uint8_t value)
+{
+ writeString(key);
+ writeUByte(value);
+}
+
+void MapEncoder::handleUint16(const CharSequence& key, uint16_t value)
+{
+ writeString(key);
+ writeUShort(value);
+}
+
+void MapEncoder::handleUint32(const CharSequence& key, uint32_t value)
+{
+ writeString(key);
+ writeUInt(value);
+}
+
+void MapEncoder::handleUint64(const CharSequence& key, uint64_t value)
+{
+ writeString(key);
+ writeULong(value);
+}
+
+void MapEncoder::handleInt8(const CharSequence& key, int8_t value)
+{
+ writeString(key);
+ writeByte(value);
+}
+
+void MapEncoder::handleInt16(const CharSequence& key, int16_t value)
+{
+ writeString(key);
+ writeShort(value);
+}
+
+void MapEncoder::handleInt32(const CharSequence& key, int32_t value)
+{
+ writeString(key);
+ writeInt(value);
+}
+
+void MapEncoder::handleInt64(const CharSequence& key, int64_t value)
+{
+ writeString(key);
+ writeLong(value);
+}
+
+void MapEncoder::handleFloat(const CharSequence& key, float value)
+{
+ writeString(key);
+ writeFloat(value);
+}
+
+void MapEncoder::handleDouble(const CharSequence& key, double value)
+{
+ writeString(key);
+ writeDouble(value);
+}
+
+namespace {
+const std::string BINARY("binary");
+}
+
+void MapEncoder::handleString(const CharSequence& key, const CharSequence& value, const CharSequence& encoding)
+{
+ writeString(key);
+ if (encoding.size == BINARY.size() && ::strncmp(encoding.data, BINARY.data(), encoding.size)) {
+ writeBinary(value);
+ } else {
+ writeString(value);
+ }
+}
+
+void MapEncoder::writeMetaData(size_t size, size_t count, const Descriptor* d)
+{
+ if (count > 255 || size > 255) {
+ writeMap32MetaData((uint32_t) size, (uint32_t) count, d);
+ } else {
+ writeMap8MetaData((uint8_t) size, (uint8_t) count, d);
+ }
+}
+
+void MapEncoder::writeMap8MetaData(uint8_t size, uint8_t count, const Descriptor* d)
+{
+ if (d) writeDescriptor(*d);
+ writeCode(typecodes::MAP8);
+ write((uint8_t) (size+1)/*size includes count field*/);
+ write(count);
+}
+
+void MapEncoder::writeMap32MetaData(uint32_t size, uint32_t count, const Descriptor* d)
+{
+ if (d) writeDescriptor(*d);
+ writeCode(typecodes::MAP32);
+ write((uint32_t) (size+4)/*size includes count field*/);
+ write(count);
+}
+
+}} // namespace qpid::amqp
diff --git a/cpp/src/qpid/amqp/MapEncoder.h b/cpp/src/qpid/amqp/MapEncoder.h
new file mode 100644
index 0000000000..42fb819932
--- /dev/null
+++ b/cpp/src/qpid/amqp/MapEncoder.h
@@ -0,0 +1,58 @@
+#ifndef QPID_AMQP_MAPENCODER_H
+#define QPID_AMQP_MAPENCODER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "MapHandler.h"
+#include "Encoder.h"
+
+namespace qpid {
+namespace amqp {
+struct Descriptor;
+
+/**
+ * Encode map like data
+ */
+class MapEncoder : public MapHandler, Encoder
+{
+ public:
+ MapEncoder(char* data, size_t size);
+ void handleVoid(const CharSequence& key);
+ void handleBool(const CharSequence& key, bool value);
+ void handleUint8(const CharSequence& key, uint8_t value);
+ void handleUint16(const CharSequence& key, uint16_t value);
+ void handleUint32(const CharSequence& key, uint32_t value);
+ void handleUint64(const CharSequence& key, uint64_t value);
+ void handleInt8(const CharSequence& key, int8_t value);
+ void handleInt16(const CharSequence& key, int16_t value);
+ void handleInt32(const CharSequence& key, int32_t value);
+ void handleInt64(const CharSequence& key, int64_t value);
+ void handleFloat(const CharSequence& key, float value);
+ void handleDouble(const CharSequence& key, double value);
+ void handleString(const CharSequence& key, const CharSequence& value, const CharSequence& encoding);
+ void writeMetaData(size_t size, size_t count, const Descriptor* descriptor=0);
+ void writeMap8MetaData(uint8_t size, uint8_t count, const Descriptor* d=0);
+ void writeMap32MetaData(uint32_t size, uint32_t count, const Descriptor* d=0);
+ private:
+};
+}} // namespace qpid::amqp
+
+#endif /*!QPID_AMQP_MAPENCODER_H*/
diff --git a/cpp/src/qpid/broker/MapHandler.h b/cpp/src/qpid/amqp/MapHandler.h
index 15e3a95252..14994ccac7 100644
--- a/cpp/src/qpid/broker/MapHandler.h
+++ b/cpp/src/qpid/amqp/MapHandler.h
@@ -1,5 +1,5 @@
-#ifndef QPID_BROKER_MAPHANDLER_H
-#define QPID_BROKER_MAPHANDLER_H
+#ifndef QPID_AMQP_MAPHANDLER_H
+#define QPID_AMQP_MAPHANDLER_H
/*
*
@@ -24,21 +24,14 @@
#include "qpid/sys/IntegerTypes.h"
namespace qpid {
-
namespace amqp {
struct CharSequence;
-}
-
-namespace broker {
-
/**
* Interface for processing entries in some map-like object
*/
class MapHandler
{
public:
- typedef qpid::amqp::CharSequence CharSequence;
-
virtual ~MapHandler() {}
virtual void handleVoid(const CharSequence& key) = 0;
virtual void handleBool(const CharSequence& key, bool value) = 0;
@@ -55,6 +48,6 @@ class MapHandler
virtual void handleString(const CharSequence& key, const CharSequence& value, const CharSequence& encoding) = 0;
private:
};
-}} // namespace qpid::broker
+}} // namespace qpid::amqp
-#endif /*!QPID_BROKER_MAPHANDLER_H*/
+#endif /*!QPID_AMQP_MAPHANDLER_H*/
diff --git a/cpp/src/qpid/amqp/MapSizeCalculator.cpp b/cpp/src/qpid/amqp/MapSizeCalculator.cpp
new file mode 100644
index 0000000000..2da152108f
--- /dev/null
+++ b/cpp/src/qpid/amqp/MapSizeCalculator.cpp
@@ -0,0 +1,151 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "MapSizeCalculator.h"
+#include "CharSequence.h"
+#include "Descriptor.h"
+
+namespace qpid {
+namespace amqp {
+
+MapSizeCalculator::MapSizeCalculator() : size(0), count(0) {}
+
+void MapSizeCalculator::handleKey(const CharSequence& key)
+{
+ ++count;
+ size += getEncodedSize(key);
+}
+
+size_t MapSizeCalculator::getEncodedSize(const CharSequence& s)
+{
+ return 1/*typecode*/ + (s.size < 256 ? 1 : 4)/*size field*/ + s.size;
+}
+
+void MapSizeCalculator::handleVoid(const CharSequence& key)
+{
+ handleKey(key);
+ size += 1;/*typecode*/;
+}
+
+void MapSizeCalculator::handleBool(const CharSequence& key, bool /*value*/)
+{
+ handleKey(key);
+ size += 1;/*typecode*/;
+}
+
+void MapSizeCalculator::handleUint8(const CharSequence& key, uint8_t /*value*/)
+{
+ handleKey(key);
+ size += 1/*typecode*/ + 1/*value*/;
+}
+
+void MapSizeCalculator::handleUint16(const CharSequence& key, uint16_t /*value*/)
+{
+ handleKey(key);
+ size += 1/*typecode*/ + 2/*value*/;
+}
+
+void MapSizeCalculator::handleUint32(const CharSequence& key, uint32_t value)
+{
+ handleKey(key);
+ size += 1;/*typecode*/;
+ if (value > 0) {
+ if (value < 256) size += 1/*UINT_SMALL*/;
+ else size += 4/*UINT*/;
+ }//else UINT_ZERO
+}
+
+void MapSizeCalculator::handleUint64(const CharSequence& key, uint64_t value)
+{
+ handleKey(key);
+ size += 1;/*typecode*/;
+ if (value > 0) {
+ if (value < 256) size += 1/*ULONG_SMALL*/;
+ else size += 8/*ULONG*/;
+ }//else ULONG_ZERO
+}
+
+void MapSizeCalculator::handleInt8(const CharSequence& key, int8_t /*value*/)
+{
+ handleKey(key);
+ size += 1/*typecode*/ + 1/*value*/;
+}
+
+void MapSizeCalculator::handleInt16(const CharSequence& key, int16_t /*value*/)
+{
+ handleKey(key);
+ size += 1/*typecode*/ + 2/*value*/;
+}
+
+void MapSizeCalculator::handleInt32(const CharSequence& key, int32_t /*value*/)
+{
+ handleKey(key);
+ size += 1/*typecode*/ + 4/*value*/;
+}
+
+void MapSizeCalculator::handleInt64(const CharSequence& key, int64_t /*value*/)
+{
+ handleKey(key);
+ size += 1/*typecode*/ + 8/*value*/;
+}
+
+void MapSizeCalculator::handleFloat(const CharSequence& key, float /*value*/)
+{
+ handleKey(key);
+ size += 1/*typecode*/ + 4/*value*/;
+}
+
+void MapSizeCalculator::handleDouble(const CharSequence& key, double /*value*/)
+{
+ handleKey(key);
+ size += 1/*typecode*/ + 8/*value*/;
+}
+
+void MapSizeCalculator::handleString(const CharSequence& key, const CharSequence& value, const CharSequence& /*encoding*/)
+{
+ handleKey(key);
+ size += getEncodedSize(value);
+}
+
+size_t MapSizeCalculator::getSize() const
+{
+ return size;
+}
+
+size_t MapSizeCalculator::getCount() const
+{
+ return count;
+}
+
+size_t MapSizeCalculator::getTotalSizeRequired(const Descriptor* d) const
+{
+ size_t result(size);
+ if (d) result += d->getSize();
+ result += 1/*typecode*/;
+ if (count * 2 > 255 || size > 255) {
+ result += 4/*size*/ + 4/*count*/;
+ } else {
+ result += 1/*size*/ + 1/*count*/;
+ }
+ return result;
+}
+
+
+}} // namespace qpid::amqp
diff --git a/cpp/src/qpid/amqp/MapSizeCalculator.h b/cpp/src/qpid/amqp/MapSizeCalculator.h
new file mode 100644
index 0000000000..35c9ad732d
--- /dev/null
+++ b/cpp/src/qpid/amqp/MapSizeCalculator.h
@@ -0,0 +1,73 @@
+#ifndef QPID_AMQP_MAPSIZECALCULATOR_H
+#define QPID_AMQP_MAPSIZECALCULATOR_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "MapHandler.h"
+#include <cstring>
+
+namespace qpid {
+namespace amqp {
+struct Descriptor;
+
+/**
+ * Utility to calculate the encoded size for map data
+ */
+class MapSizeCalculator : public MapHandler
+{
+ public:
+ MapSizeCalculator();
+ void handleVoid(const CharSequence& key);
+ void handleBool(const CharSequence& key, bool value);
+ void handleUint8(const CharSequence& key, uint8_t value);
+ void handleUint16(const CharSequence& key, uint16_t value);
+ void handleUint32(const CharSequence& key, uint32_t value);
+ void handleUint64(const CharSequence& key, uint64_t value);
+ void handleInt8(const CharSequence& key, int8_t value);
+ void handleInt16(const CharSequence& key, int16_t value);
+ void handleInt32(const CharSequence& key, int32_t value);
+ void handleInt64(const CharSequence& key, int64_t value);
+ void handleFloat(const CharSequence& key, float value);
+ void handleDouble(const CharSequence& key, double value);
+ void handleString(const CharSequence& key, const CharSequence& value, const CharSequence& encoding);
+ /**
+ * @returns the encoded size of the map entries (i.e. does not
+ * include the count field, typecode or any other metadata for the
+ * map as a whole)
+ */
+ size_t getSize() const;
+ size_t getCount() const;
+ /**
+ * @returns the total encoded size for a map containing the
+ * handled values (i.e. including the metadata for the map as a
+ * whole)
+ */
+ size_t getTotalSizeRequired(const Descriptor* d=0) const;
+ private:
+ size_t size;
+ size_t count;
+
+ void handleKey(const CharSequence& key);
+ static size_t getEncodedSize(const CharSequence&);
+};
+}} // namespace qpid::amqp
+
+#endif /*!QPID_AMQP_MAPSIZECALCULATOR_H*/
diff --git a/cpp/src/qpid/amqp/MessageEncoder.cpp b/cpp/src/qpid/amqp/MessageEncoder.cpp
index 852ad29635..3b493d1de7 100644
--- a/cpp/src/qpid/amqp/MessageEncoder.cpp
+++ b/cpp/src/qpid/amqp/MessageEncoder.cpp
@@ -19,8 +19,11 @@
*
*/
#include "qpid/amqp/MessageEncoder.h"
+#include "qpid/amqp/MapEncoder.h"
+#include "qpid/amqp/MapSizeCalculator.h"
#include "qpid/amqp/descriptors.h"
#include "qpid/log/Statement.h"
+#include <assert.h>
namespace qpid {
namespace amqp {
@@ -132,6 +135,17 @@ void MessageEncoder::writeProperties(const Properties& msg)
}
}
+void MessageEncoder::writeApplicationProperties(const ApplicationProperties& properties)
+{
+ MapSizeCalculator calc;
+ properties.handle(calc);
+ size_t required = calc.getTotalSizeRequired(&qpid::amqp::message::APPLICATION_PROPERTIES);
+ assert(required <= getSize() - getPosition());
+ MapEncoder encoder(skip(required), required);
+ encoder.writeMetaData(calc.getSize(), calc.getCount()*2, &qpid::amqp::message::APPLICATION_PROPERTIES);
+ properties.handle(encoder);
+}
+
void MessageEncoder::writeApplicationProperties(const qpid::types::Variant::Map& properties)
{
writeApplicationProperties(properties, !optimise || properties.size()*2 > 255 || getEncodedSizeForElements(properties) > 255);
@@ -206,26 +220,47 @@ void MessageEncoder::writeMap(const qpid::types::Variant::Map& properties, const
size_t MessageEncoder::getEncodedSize(const Header& h, const Properties& p, const qpid::types::Variant::Map& ap, const std::string& d)
{
+ return getEncodedSize(h) + getEncodedSize(p, ap, d);
+}
+
+size_t MessageEncoder::getEncodedSize(const Header& h, const Properties& p, const ApplicationProperties& ap, const std::string& d)
+{
+ return getEncodedSize(h) + getEncodedSize(p) + getEncodedSize(ap) + getEncodedSizeForContent(d);
+}
+
+size_t MessageEncoder::getEncodedSize(const Properties& p, const qpid::types::Variant::Map& ap, const std::string& d)
+{
+ size_t total(getEncodedSize(p));
+ //application-properties:
+ total += 3/*descriptor*/ + getEncodedSize(ap, true);
+ //body:
+ if (d.size()) total += 3/*descriptor*/ + 1/*code*/ + encodedSize(d);
+
+ return total;
+}
+
+size_t MessageEncoder::getEncodedSizeForContent(const std::string& d)
+{
+ if (d.size()) return 3/*descriptor*/ + 1/*code*/ + encodedSize(d);
+ else return 0;
+}
+
+size_t MessageEncoder::getEncodedSize(const Header& h)
+{
//NOTE: this does not take optional optimisation into account,
//i.e. it is a 'worst case' estimate for required buffer space
- size_t total(0);
-
- //header:
- total += 3/*descriptor*/ + 1/*code*/ + 1/*size*/ + 1/*count*/ + 5/*codes for each field*/;
+ size_t total(3/*descriptor*/ + 1/*code*/ + 1/*size*/ + 1/*count*/ + 5/*codes for each field*/);
if (h.getPriority() != 4) total += 1;
if (h.getDeliveryCount()) total += 4;
if (h.hasTtl()) total += 4;
- return total + getEncodedSize(p, ap, d);
+ return total;
}
-size_t MessageEncoder::getEncodedSize(const Properties& p, const qpid::types::Variant::Map& ap, const std::string& d)
+size_t MessageEncoder::getEncodedSize(const Properties& p)
{
//NOTE: this does not take optional optimisation into account,
//i.e. it is a 'worst case' estimate for required buffer space
- size_t total(0);
-
- //properties:
- total += 3/*descriptor*/ + 1/*code*/ + 4/*size*/ + 4/*count*/ + 13/*codes for each field*/;
+ size_t total(3/*descriptor*/ + 1/*code*/ + 4/*size*/ + 4/*count*/ + 13/*codes for each field*/);
if (p.hasMessageId()) total += encodedSize(p.getMessageId());
if (p.hasUserId()) total += encodedSize(p.getUserId());
if (p.hasTo()) total += encodedSize(p.getTo());
@@ -239,16 +274,16 @@ size_t MessageEncoder::getEncodedSize(const Properties& p, const qpid::types::Va
if (p.hasGroupId()) total += encodedSize(p.getGroupId());
if (p.hasGroupSequence()) total += 4;
if (p.hasReplyToGroupId()) total += encodedSize(p.getReplyToGroupId());
-
-
- //application-properties:
- total += 3/*descriptor*/ + getEncodedSize(ap, true);
- //body:
- if (d.size()) total += 3/*descriptor*/ + 1/*code*/ + encodedSize(d);
-
return total;
}
+size_t MessageEncoder::getEncodedSize(const ApplicationProperties& p)
+{
+ MapSizeCalculator calc;
+ p.handle(calc);
+ return calc.getTotalSizeRequired(&qpid::amqp::message::APPLICATION_PROPERTIES);
+}
+
size_t MessageEncoder::getEncodedSizeForElements(const qpid::types::Variant::Map& map)
{
size_t total = 0;
diff --git a/cpp/src/qpid/amqp/MessageEncoder.h b/cpp/src/qpid/amqp/MessageEncoder.h
index 3db0763e8a..1de3d47bb2 100644
--- a/cpp/src/qpid/amqp/MessageEncoder.h
+++ b/cpp/src/qpid/amqp/MessageEncoder.h
@@ -26,7 +26,7 @@
namespace qpid {
namespace amqp {
-
+class MapHandler;
/**
*
*/
@@ -77,23 +77,36 @@ class MessageEncoder : public Encoder
virtual std::string getReplyToGroupId() const = 0;
};
+ class ApplicationProperties
+ {
+ public:
+ virtual ~ApplicationProperties() {}
+ virtual void handle(MapHandler&) const = 0;
+ };
+
MessageEncoder(char* d, size_t s, bool o=false) : Encoder(d, s), optimise(o) {}
void writeHeader(const Header&);
void writeProperties(const Properties&);
+ void writeApplicationProperties(const ApplicationProperties&);
void writeApplicationProperties(const qpid::types::Variant::Map& properties);
void writeApplicationProperties(const qpid::types::Variant::Map& properties, bool useLargeMap);
void writeMap(const qpid::types::Variant::Map& map, const Descriptor*, bool useLargeMap);
- static size_t getEncodedSize(const Header&, const Properties&, const qpid::types::Variant::Map&, const std::string&);
- static size_t getEncodedSize(const Properties&, const qpid::types::Variant::Map&, const std::string&);
+ static size_t getEncodedSize(const Header&);
+ static size_t getEncodedSize(const Properties&);
+ static size_t getEncodedSize(const ApplicationProperties&);
+ static size_t getEncodedSize(const Header&, const Properties&, const ApplicationProperties&, const std::string&);
+
static size_t getEncodedSize(const qpid::types::Variant::Map&, bool useLargeMap);
static size_t getEncodedSize(const qpid::types::Variant::Map&);
-
+ static size_t getEncodedSize(const Header&, const Properties&, const qpid::types::Variant::Map&, const std::string&);
+ static size_t getEncodedSize(const Properties&, const qpid::types::Variant::Map&, const std::string&);
private:
bool optimise;
static size_t getEncodedSizeForElements(const qpid::types::Variant::Map&);
+ static size_t getEncodedSizeForContent(const std::string&);
};
}} // namespace qpid::amqp
diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp
index 7acdffce4d..611978beef 100644
--- a/cpp/src/qpid/broker/HeadersExchange.cpp
+++ b/cpp/src/qpid/broker/HeadersExchange.cpp
@@ -21,7 +21,7 @@
#include "qpid/broker/HeadersExchange.h"
#include "qpid/amqp/CharSequence.h"
-#include "qpid/broker/MapHandler.h"
+#include "qpid/amqp/MapHandler.h"
#include "qpid/framing/FieldValue.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/log/Statement.h"
@@ -31,6 +31,7 @@
using namespace qpid::broker;
using std::string;
+using qpid::amqp::MapHandler;
using namespace qpid::framing;
using namespace qpid::sys;
@@ -78,22 +79,22 @@ class Matcher : public MapHandler
{
public:
Matcher(const FieldTable& b) : binding(b), matched(0) {}
- void handleBool(const MapHandler::CharSequence& key, bool value) { processUint(std::string(key.data, key.size), value); }
- void handleUint8(const MapHandler::CharSequence& key, uint8_t value) { processUint(std::string(key.data, key.size), value); }
- void handleUint16(const MapHandler::CharSequence& key, uint16_t value) { processUint(std::string(key.data, key.size), value); }
- void handleUint32(const MapHandler::CharSequence& key, uint32_t value) { processUint(std::string(key.data, key.size), value); }
- void handleUint64(const MapHandler::CharSequence& key, uint64_t value) { processUint(std::string(key.data, key.size), value); }
- void handleInt8(const MapHandler::CharSequence& key, int8_t value) { processInt(std::string(key.data, key.size), value); }
- void handleInt16(const MapHandler::CharSequence& key, int16_t value) { processInt(std::string(key.data, key.size), value); }
- void handleInt32(const MapHandler::CharSequence& key, int32_t value) { processInt(std::string(key.data, key.size), value); }
- void handleInt64(const MapHandler::CharSequence& key, int64_t value) { processInt(std::string(key.data, key.size), value); }
- void handleFloat(const MapHandler::CharSequence& key, float value) { processFloat(std::string(key.data, key.size), value); }
- void handleDouble(const MapHandler::CharSequence& key, double value) { processFloat(std::string(key.data, key.size), value); }
- void handleString(const MapHandler::CharSequence& key, const MapHandler::CharSequence& value, const MapHandler::CharSequence& /*encoding*/)
+ void handleBool(const qpid::amqp::CharSequence& key, bool value) { processUint(std::string(key.data, key.size), value); }
+ void handleUint8(const qpid::amqp::CharSequence& key, uint8_t value) { processUint(std::string(key.data, key.size), value); }
+ void handleUint16(const qpid::amqp::CharSequence& key, uint16_t value) { processUint(std::string(key.data, key.size), value); }
+ void handleUint32(const qpid::amqp::CharSequence& key, uint32_t value) { processUint(std::string(key.data, key.size), value); }
+ void handleUint64(const qpid::amqp::CharSequence& key, uint64_t value) { processUint(std::string(key.data, key.size), value); }
+ void handleInt8(const qpid::amqp::CharSequence& key, int8_t value) { processInt(std::string(key.data, key.size), value); }
+ void handleInt16(const qpid::amqp::CharSequence& key, int16_t value) { processInt(std::string(key.data, key.size), value); }
+ void handleInt32(const qpid::amqp::CharSequence& key, int32_t value) { processInt(std::string(key.data, key.size), value); }
+ void handleInt64(const qpid::amqp::CharSequence& key, int64_t value) { processInt(std::string(key.data, key.size), value); }
+ void handleFloat(const qpid::amqp::CharSequence& key, float value) { processFloat(std::string(key.data, key.size), value); }
+ void handleDouble(const qpid::amqp::CharSequence& key, double value) { processFloat(std::string(key.data, key.size), value); }
+ void handleString(const qpid::amqp::CharSequence& key, const qpid::amqp::CharSequence& value, const qpid::amqp::CharSequence& /*encoding*/)
{
processString(std::string(key.data, key.size), std::string(value.data, value.size));
}
- void handleVoid(const MapHandler::CharSequence& key)
+ void handleVoid(const qpid::amqp::CharSequence& key)
{
valueCheckRequired(std::string(key.data, key.size));
}
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index 72dbf5ff8c..1b3f3bd827 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -22,7 +22,7 @@
#include "qpid/broker/Message.h"
#include "qpid/amqp/CharSequence.h"
-#include "qpid/broker/MapHandler.h"
+#include "qpid/amqp/MapHandler.h"
#include "qpid/StringUtils.h"
#include "qpid/log/Statement.h"
@@ -35,6 +35,8 @@ using qpid::sys::AbsTime;
using qpid::sys::Duration;
using qpid::sys::TIME_MSEC;
using qpid::sys::FAR_FUTURE;
+using qpid::amqp::CharSequence;
+using qpid::amqp::MapHandler;
using std::string;
namespace qpid {
diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h
index 1da0bf7648..64103ecac9 100644
--- a/cpp/src/qpid/broker/Message.h
+++ b/cpp/src/qpid/broker/Message.h
@@ -36,9 +36,11 @@
#include "qpid/broker/PersistableMessage.h"
namespace qpid {
+namespace amqp {
+class MapHandler;
+}
namespace broker {
class ConnectionToken;
-class MapHandler;
enum MessageState
{
@@ -62,7 +64,7 @@ public:
virtual std::string getAnnotationAsString(const std::string& key) const = 0;
virtual bool getTtl(uint64_t&) const = 0;
virtual std::string getContent() const = 0;
- virtual void processProperties(MapHandler&) const = 0;
+ virtual void processProperties(qpid::amqp::MapHandler&) const = 0;
virtual std::string getUserId() const = 0;
};
@@ -104,7 +106,7 @@ public:
QPID_BROKER_EXTERN uint8_t getPriority() const;
QPID_BROKER_EXTERN std::string getPropertyAsString(const std::string& key) const;
QPID_BROKER_EXTERN qpid::types::Variant getProperty(const std::string& key) const;
- void processProperties(MapHandler&) const;
+ void processProperties(qpid::amqp::MapHandler&) const;
QPID_BROKER_EXTERN uint64_t getContentSize() const;
diff --git a/cpp/src/qpid/broker/Selector.cpp b/cpp/src/qpid/broker/Selector.cpp
index 14f98850e7..129787171d 100644
--- a/cpp/src/qpid/broker/Selector.cpp
+++ b/cpp/src/qpid/broker/Selector.cpp
@@ -23,7 +23,7 @@
#include "qpid/amqp/CharSequence.h"
#include "qpid/broker/Message.h"
-#include "qpid/broker/MapHandler.h"
+#include "qpid/amqp/MapHandler.h"
#include "qpid/broker/SelectorExpression.h"
#include "qpid/broker/SelectorValue.h"
#include "qpid/log/Statement.h"
@@ -41,6 +41,8 @@ namespace broker {
using std::string;
using qpid::sys::unordered_map;
+using qpid::amqp::CharSequence;
+using qpid::amqp::MapHandler;
/**
* Identifier (amqp.) | JMS... | amqp 1.0 equivalent
diff --git a/cpp/src/qpid/broker/amqp/Message.cpp b/cpp/src/qpid/broker/amqp/Message.cpp
index 2c39c517e1..3dcf1c14e6 100644
--- a/cpp/src/qpid/broker/amqp/Message.cpp
+++ b/cpp/src/qpid/broker/amqp/Message.cpp
@@ -23,7 +23,7 @@
#include "qpid/amqp/descriptors.h"
#include "qpid/amqp/Reader.h"
#include "qpid/amqp/MessageEncoder.h"
-#include "qpid/broker/MapHandler.h"
+#include "qpid/amqp/MapHandler.h"
#include "qpid/log/Statement.h"
#include "qpid/framing/Buffer.h"
#include <string.h>
@@ -35,6 +35,7 @@ namespace amqp {
using qpid::amqp::CharSequence;
using qpid::amqp::Constructor;
using qpid::amqp::Descriptor;
+using qpid::amqp::MapHandler;
using qpid::amqp::Reader;
namespace {
diff --git a/cpp/src/qpid/broker/amqp/Message.h b/cpp/src/qpid/broker/amqp/Message.h
index cc3406f72a..aaa6c1b9f4 100644
--- a/cpp/src/qpid/broker/amqp/Message.h
+++ b/cpp/src/qpid/broker/amqp/Message.h
@@ -49,7 +49,7 @@ class Message : public qpid::broker::Message::Encoding, private qpid::amqp::Mess
std::string getAnnotationAsString(const std::string& key) const;
bool getTtl(uint64_t&) const;
std::string getContent() const;
- void processProperties(MapHandler&) const;
+ void processProperties(qpid::amqp::MapHandler&) const;
std::string getUserId() const;
qpid::amqp::MessageId getMessageId() const;
diff --git a/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp b/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp
index a389127b9e..9fb263e3d3 100644
--- a/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp
+++ b/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp
@@ -22,7 +22,7 @@
#include "qpid/amqp/CharSequence.h"
#include "qpid/amqp_0_10/Codecs.h"
-#include "qpid/broker/MapHandler.h"
+#include "qpid/amqp/MapHandler.h"
#include "qpid/broker/Message.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/MessageProperties.h"
@@ -309,7 +309,7 @@ bool MessageTransfer::isLastQMFResponse(const qpid::broker::Message& message, co
}
-void MessageTransfer::processProperties(qpid::broker::MapHandler& handler) const
+void MessageTransfer::processProperties(qpid::amqp::MapHandler& handler) const
{
const qpid::framing::MessageProperties* mp = getProperties<qpid::framing::MessageProperties>();
if (mp && mp->hasApplicationHeaders()) {
@@ -317,7 +317,7 @@ void MessageTransfer::processProperties(qpid::broker::MapHandler& handler) const
for (FieldTable::const_iterator i = ft.begin(); i != ft.end(); ++i) {
qpid::types::Variant v;
qpid::amqp_0_10::translate(i->second, v);
- qpid::broker::MapHandler::CharSequence key = {i->first.data(), i->first.size()};
+ qpid::amqp::CharSequence key = {i->first.data(), i->first.size()};
switch (v.getType()) {
case qpid::types::VAR_VOID:
handler.handleVoid(key); break;
@@ -345,8 +345,8 @@ void MessageTransfer::processProperties(qpid::broker::MapHandler& handler) const
handler.handleDouble(key, v); break;
case qpid::types::VAR_STRING: {
std::string s(v);
- qpid::broker::MapHandler::CharSequence value = {s.data(), s.size()};
- qpid::broker::MapHandler::CharSequence encoding = {0, 0};
+ qpid::amqp::CharSequence value = {s.data(), s.size()};
+ qpid::amqp::CharSequence encoding = {0, 0};
handler.handleString(key, value, encoding);
break;
}
diff --git a/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h b/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h
index 8ac9862445..3d6e3a2906 100644
--- a/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h
+++ b/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h
@@ -50,7 +50,7 @@ class MessageTransfer : public qpid::broker::Message::Encoding, public qpid::bro
bool getTtl(uint64_t&) const;
bool hasExpiration() const;
std::string getExchangeName() const;
- void processProperties(MapHandler&) const;
+ void processProperties(qpid::amqp::MapHandler&) const;
std::string getUserId() const;
bool requiresAccept() const;
diff --git a/cpp/src/qpid/messaging/amqp/SenderContext.cpp b/cpp/src/qpid/messaging/amqp/SenderContext.cpp
index aeb89d145c..1d9889c447 100644
--- a/cpp/src/qpid/messaging/amqp/SenderContext.cpp
+++ b/cpp/src/qpid/messaging/amqp/SenderContext.cpp
@@ -22,6 +22,7 @@
#include "qpid/messaging/amqp/EncodedMessage.h"
#include "qpid/messaging/AddressImpl.h"
#include "qpid/amqp/descriptors.h"
+#include "qpid/amqp/MapHandler.h"
#include "qpid/amqp/MessageEncoder.h"
#include "qpid/messaging/exceptions.h"
#include "qpid/messaging/Message.h"
@@ -102,10 +103,14 @@ uint32_t SenderContext::processUnsettled()
return deliveries.size();
}
namespace {
+const std::string X_AMQP("x-amqp-");
+const std::string X_AMQP_FIRST_ACQUIRER("x-amqp-first-acquirer");
+const std::string X_AMQP_DELIVERY_COUNT("x-amqp-delivery-count");
+
class HeaderAdapter : public qpid::amqp::MessageEncoder::Header
{
public:
- HeaderAdapter(const qpid::messaging::MessageImpl& impl) : msg(impl) {}
+ HeaderAdapter(const qpid::messaging::MessageImpl& impl) : msg(impl), headers(msg.getHeaders()) {}
virtual bool isDurable() const
{
return msg.isDurable();
@@ -124,22 +129,42 @@ class HeaderAdapter : public qpid::amqp::MessageEncoder::Header
}
virtual bool isFirstAcquirer() const
{
- return false;
+ qpid::types::Variant::Map::const_iterator i = headers.find(X_AMQP_FIRST_ACQUIRER);
+ if (i != headers.end()) {
+ return i->second;
+ } else {
+ return false;
+ }
}
virtual uint32_t getDeliveryCount() const
{
- return msg.isRedelivered() ? 1 : 0;
+ qpid::types::Variant::Map::const_iterator i = headers.find(X_AMQP_DELIVERY_COUNT);
+ if (i != headers.end()) {
+ return i->second;
+ } else {
+ return msg.isRedelivered() ? 1 : 0;
+ }
}
private:
const qpid::messaging::MessageImpl& msg;
+ const qpid::types::Variant::Map& headers;
};
const std::string EMPTY;
const std::string FORWARD_SLASH("/");
+const std::string X_AMQP_TO("x-amqp-to");
+const std::string X_AMQP_CONTENT_ENCODING("x-amqp-content-encoding");
+const std::string X_AMQP_CREATION_TIME("x-amqp-creation-time");
+const std::string X_AMQP_ABSOLUTE_EXPIRY_TIME("x-amqp-absolute-expiry-time");
+const std::string X_AMQP_GROUP_ID("x-amqp-group-id");
+const std::string X_AMQP_GROUP_SEQUENCE("x-amqp-group-sequence");
+const std::string X_AMQP_REPLY_TO_GROUP_ID("x-amqp-reply-to-group-id");
+const std::string X_AMQP_MESSAGE_ANNOTATIONS("x-amqp-message-annotations");
+const std::string X_AMQP_DELIVERY_ANNOTATIONS("x-amqp-delivery-annotations");
class PropertiesAdapter : public qpid::amqp::MessageEncoder::Properties
{
public:
- PropertiesAdapter(const qpid::messaging::MessageImpl& impl, const std::string& s) : msg(impl), subject(s) {}
+ PropertiesAdapter(const qpid::messaging::MessageImpl& impl, const std::string& s) : msg(impl), headers(msg.getHeaders()), subject(s) {}
bool hasMessageId() const
{
return getMessageId().size();
@@ -161,12 +186,12 @@ class PropertiesAdapter : public qpid::amqp::MessageEncoder::Properties
bool hasTo() const
{
- return false;//not yet supported
+ return hasHeader(X_AMQP_TO);
}
std::string getTo() const
{
- return EMPTY;//not yet supported
+ return headers.find(X_AMQP_TO)->second;
}
bool hasSubject() const
@@ -216,66 +241,153 @@ class PropertiesAdapter : public qpid::amqp::MessageEncoder::Properties
bool hasContentEncoding() const
{
- return false;//not yet supported
+ return hasHeader(X_AMQP_CONTENT_ENCODING);
}
std::string getContentEncoding() const
{
- return EMPTY;//not yet supported
+ return headers.find(X_AMQP_CONTENT_ENCODING)->second;
}
bool hasAbsoluteExpiryTime() const
{
- return false;//not yet supported
+ return hasHeader(X_AMQP_ABSOLUTE_EXPIRY_TIME);
}
int64_t getAbsoluteExpiryTime() const
{
- return 0;//not yet supported
+ return headers.find(X_AMQP_ABSOLUTE_EXPIRY_TIME)->second;
}
bool hasCreationTime() const
{
- return false;//not yet supported
+ return hasHeader(X_AMQP_CREATION_TIME);
}
int64_t getCreationTime() const
{
- return 0;//not yet supported
+ return headers.find(X_AMQP_CREATION_TIME)->second;
}
bool hasGroupId() const
{
- return false;//not yet supported
+ return hasHeader(X_AMQP_GROUP_ID);
}
std::string getGroupId() const
{
- return EMPTY;//not yet supported
+ return headers.find(X_AMQP_GROUP_ID)->second;
}
bool hasGroupSequence() const
{
- return false;//not yet supported
+ return hasHeader(X_AMQP_GROUP_SEQUENCE);
}
uint32_t getGroupSequence() const
{
- return 0;//not yet supported
+ return headers.find(X_AMQP_GROUP_SEQUENCE)->second;
}
bool hasReplyToGroupId() const
{
- return false;//not yet supported
+ return hasHeader(X_AMQP_REPLY_TO_GROUP_ID);
}
std::string getReplyToGroupId() const
{
- return EMPTY;//not yet supported
+ return headers.find(X_AMQP_REPLY_TO_GROUP_ID)->second;
}
private:
const qpid::messaging::MessageImpl& msg;
+ const qpid::types::Variant::Map& headers;
const std::string subject;
+
+ bool hasHeader(const std::string& key) const
+ {
+ return headers.find(key) != headers.end();
+ }
+};
+
+bool startsWith(const std::string& input, const std::string& pattern)
+{
+ if (input.size() < pattern.size()) return false;
+ for (std::string::const_iterator b = pattern.begin(), a = input.begin(); b != pattern.end(); ++b, ++a) {
+ if (*a != *b) return false;
+ }
+ return true;
+}
+class ApplicationPropertiesAdapter : public qpid::amqp::MessageEncoder::ApplicationProperties
+{
+ public:
+ ApplicationPropertiesAdapter(const qpid::types::Variant::Map& h) : headers(h) {}
+ void handle(qpid::amqp::MapHandler& h) const
+ {
+ for (qpid::types::Variant::Map::const_iterator i = headers.begin(); i != headers.end(); ++i) {
+ //strip out values with special keys as they are sent in standard fields
+ if (!startsWith(i->first, X_AMQP)) {
+ qpid::amqp::CharSequence key(convert(i->first));
+ switch (i->second.getType()) {
+ case qpid::types::VAR_VOID:
+ h.handleVoid(key);
+ break;
+ case qpid::types::VAR_BOOL:
+ h.handleBool(key, i->second);
+ break;
+ case qpid::types::VAR_UINT8:
+ h.handleUint8(key, i->second);
+ break;
+ case qpid::types::VAR_UINT16:
+ h.handleUint16(key, i->second);
+ break;
+ case qpid::types::VAR_UINT32:
+ h.handleUint32(key, i->second);
+ break;
+ case qpid::types::VAR_UINT64:
+ h.handleUint64(key, i->second);
+ break;
+ case qpid::types::VAR_INT8:
+ h.handleInt8(key, i->second);
+ break;
+ case qpid::types::VAR_INT16:
+ h.handleInt16(key, i->second);
+ break;
+ case qpid::types::VAR_INT32:
+ h.handleInt32(key, i->second);
+ break;
+ case qpid::types::VAR_INT64:
+ h.handleInt64(key, i->second);
+ break;
+ case qpid::types::VAR_FLOAT:
+ h.handleFloat(key, i->second);
+ break;
+ case qpid::types::VAR_DOUBLE:
+ h.handleDouble(key, i->second);
+ break;
+ case qpid::types::VAR_STRING:
+ h.handleString(key, convert(i->second), convert(i->second.getEncoding()));
+ break;
+ case qpid::types::VAR_UUID:
+ QPID_LOG(warning, "Skipping UUID in application properties; not yet handled correctly.");
+ break;
+ case qpid::types::VAR_MAP:
+ case qpid::types::VAR_LIST:
+ QPID_LOG(warning, "Skipping nested list and map; not allowed in application properties.");
+ break;
+ }
+ }
+ }
+ }
+ private:
+ const qpid::types::Variant::Map& headers;
+
+ static qpid::amqp::CharSequence convert(const std::string& in)
+ {
+ qpid::amqp::CharSequence out;
+ out.data = in.data();
+ out.size = in.size();
+ return out;
+ }
};
bool changedSubject(const qpid::messaging::MessageImpl& msg, const qpid::messaging::Address& address)
@@ -310,8 +422,9 @@ void SenderContext::Delivery::encode(const qpid::messaging::MessageImpl& msg, co
} else {
HeaderAdapter header(msg);
PropertiesAdapter properties(msg, address.getSubject());
+ ApplicationPropertiesAdapter applicationProperties(msg.getHeaders());
//compute size:
- encoded.resize(qpid::amqp::MessageEncoder::getEncodedSize(header, properties, msg.getHeaders(), msg.getBytes()));
+ encoded.resize(qpid::amqp::MessageEncoder::getEncodedSize(header, properties, applicationProperties, msg.getBytes()));
QPID_LOG(debug, "Sending message, buffer is " << encoded.getSize() << " bytes")
qpid::amqp::MessageEncoder encoder(encoded.getData(), encoded.getSize());
//write header:
@@ -320,7 +433,7 @@ void SenderContext::Delivery::encode(const qpid::messaging::MessageImpl& msg, co
//write properties
encoder.writeProperties(properties);
//write application-properties
- encoder.writeApplicationProperties(msg.getHeaders());
+ encoder.writeApplicationProperties(applicationProperties);
//write body
if (msg.getBytes().size()) encoder.writeBinary(msg.getBytes(), &qpid::amqp::message::DATA);//structured content not yet directly supported
if (encoder.getPosition() < encoded.getSize()) {
diff --git a/cpp/src/qpid/xml/XmlExchange.cpp b/cpp/src/qpid/xml/XmlExchange.cpp
index 29c00d9c12..29ab859591 100644
--- a/cpp/src/qpid/xml/XmlExchange.cpp
+++ b/cpp/src/qpid/xml/XmlExchange.cpp
@@ -28,7 +28,7 @@
#include "qpid/log/Statement.h"
#include "qpid/broker/FedOps.h"
-#include "qpid/broker/MapHandler.h"
+#include "qpid/amqp/MapHandler.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/FieldValue.h"
#include "qpid/framing/reply_exceptions.h"
@@ -208,26 +208,26 @@ bool XmlExchange::unbindLH(Queue::shared_ptr queue, const std::string& bindingKe
}
namespace {
-class DefineExternals : public MapHandler
+class DefineExternals : public qpid::amqp::MapHandler
{
public:
DefineExternals(DynamicContext* c) : context(c) { assert(context); }
- void handleBool(const MapHandler::CharSequence& key, bool value) { process(std::string(key.data, key.size), (int) value); }
- void handleUint8(const MapHandler::CharSequence& key, uint8_t value) { process(std::string(key.data, key.size), (int) value); }
- void handleUint16(const MapHandler::CharSequence& key, uint16_t value) { process(std::string(key.data, key.size), (int) value); }
- void handleUint32(const MapHandler::CharSequence& key, uint32_t value) { process(std::string(key.data, key.size), (int) value); }
- void handleUint64(const MapHandler::CharSequence& key, uint64_t value) { process(std::string(key.data, key.size), (int) value); }
- void handleInt8(const MapHandler::CharSequence& key, int8_t value) { process(std::string(key.data, key.size), (int) value); }
- void handleInt16(const MapHandler::CharSequence& key, int16_t value) { process(std::string(key.data, key.size), (int) value); }
- void handleInt32(const MapHandler::CharSequence& key, int32_t value) { process(std::string(key.data, key.size), (int) value); }
- void handleInt64(const MapHandler::CharSequence& key, int64_t value) { process(std::string(key.data, key.size), (int) value); }
- void handleFloat(const MapHandler::CharSequence& key, float value) { process(std::string(key.data, key.size), value); }
- void handleDouble(const MapHandler::CharSequence& key, double value) { process(std::string(key.data, key.size), value); }
- void handleString(const MapHandler::CharSequence& key, const MapHandler::CharSequence& value, const MapHandler::CharSequence& /*encoding*/)
+ void handleBool(const qpid::amqp::CharSequence& key, bool value) { process(std::string(key.data, key.size), (int) value); }
+ void handleUint8(const qpid::amqp::CharSequence& key, uint8_t value) { process(std::string(key.data, key.size), (int) value); }
+ void handleUint16(const qpid::amqp::CharSequence& key, uint16_t value) { process(std::string(key.data, key.size), (int) value); }
+ void handleUint32(const qpid::amqp::CharSequence& key, uint32_t value) { process(std::string(key.data, key.size), (int) value); }
+ void handleUint64(const qpid::amqp::CharSequence& key, uint64_t value) { process(std::string(key.data, key.size), (int) value); }
+ void handleInt8(const qpid::amqp::CharSequence& key, int8_t value) { process(std::string(key.data, key.size), (int) value); }
+ void handleInt16(const qpid::amqp::CharSequence& key, int16_t value) { process(std::string(key.data, key.size), (int) value); }
+ void handleInt32(const qpid::amqp::CharSequence& key, int32_t value) { process(std::string(key.data, key.size), (int) value); }
+ void handleInt64(const qpid::amqp::CharSequence& key, int64_t value) { process(std::string(key.data, key.size), (int) value); }
+ void handleFloat(const qpid::amqp::CharSequence& key, float value) { process(std::string(key.data, key.size), value); }
+ void handleDouble(const qpid::amqp::CharSequence& key, double value) { process(std::string(key.data, key.size), value); }
+ void handleString(const qpid::amqp::CharSequence& key, const qpid::amqp::CharSequence& value, const qpid::amqp::CharSequence& /*encoding*/)
{
process(std::string(key.data, key.size), std::string(value.data, value.size));
}
- void handleVoid(const MapHandler::CharSequence&) {}
+ void handleVoid(const qpid::amqp::CharSequence&) {}
private:
void process(const std::string& key, double value)
{