summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/CMakeLists.txt2
-rw-r--r--cpp/src/Makefile.am10
-rw-r--r--cpp/src/amqp.cmake8
-rw-r--r--cpp/src/qpid/amqp/MapReader.cpp289
-rw-r--r--cpp/src/qpid/amqp/MapReader.h104
-rw-r--r--cpp/src/qpid/broker/amqp/DataReader.cpp187
-rw-r--r--cpp/src/qpid/broker/amqp/DataReader.h53
-rw-r--r--cpp/src/qpid/broker/amqp/Filter.cpp145
-rw-r--r--cpp/src/qpid/broker/amqp/Filter.h62
-rw-r--r--cpp/src/qpid/broker/amqp/NodeProperties.cpp154
-rw-r--r--cpp/src/qpid/broker/amqp/NodeProperties.h64
-rw-r--r--cpp/src/qpid/broker/amqp/Session.cpp163
-rw-r--r--cpp/src/qpid/broker/amqp/Session.h10
-rw-r--r--cpp/src/qpid/messaging/amqp/AddressHelper.cpp182
-rw-r--r--cpp/src/qpid/messaging/amqp/AddressHelper.h57
-rw-r--r--cpp/src/qpid/messaging/amqp/ConnectionContext.cpp3
-rw-r--r--cpp/src/qpid/messaging/amqp/ReceiverContext.cpp18
-rw-r--r--cpp/src/qpid/messaging/amqp/SenderContext.cpp14
-rw-r--r--cpp/src/qpid/messaging/amqp/SenderContext.h3
19 files changed, 1407 insertions, 121 deletions
diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt
index ae969c942e..579e792b62 100644
--- a/cpp/src/CMakeLists.txt
+++ b/cpp/src/CMakeLists.txt
@@ -954,6 +954,8 @@ set (qpidcommon_SOURCES
qpid/amqp/Descriptor.cpp
qpid/amqp/Encoder.h
qpid/amqp/Encoder.cpp
+ qpid/amqp/MapReader.h
+ qpid/amqp/MapReader.cpp
qpid/amqp/MessageEncoder.h
qpid/amqp/MessageEncoder.cpp
qpid/amqp/MessageId.h
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 3782c17c78..05c0ec6302 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -528,6 +528,8 @@ libqpidcommon_la_SOURCES += \
qpid/amqp/Encoder.cpp \
qpid/amqp/ListReader.h \
qpid/amqp/LoggingReader.h \
+ qpid/amqp/MapReader.h \
+ qpid/amqp/MapReader.cpp \
qpid/amqp/MessageEncoder.h \
qpid/amqp/MessageEncoder.cpp \
qpid/amqp/MessageId.h \
@@ -764,6 +766,10 @@ amqp_la_LIBADD = libqpidcommon.la
amqp_la_SOURCES = \
qpid/broker/amqp/Connection.h \
qpid/broker/amqp/Connection.cpp \
+ qpid/broker/amqp/DataReader.h \
+ qpid/broker/amqp/DataReader.cpp \
+ qpid/broker/amqp/Filter.h \
+ qpid/broker/amqp/Filter.cpp \
qpid/broker/amqp/Header.h \
qpid/broker/amqp/Header.cpp \
qpid/broker/amqp/ManagedConnection.h \
@@ -774,6 +780,8 @@ amqp_la_SOURCES = \
qpid/broker/amqp/ManagedOutgoingLink.cpp \
qpid/broker/amqp/Message.h \
qpid/broker/amqp/Message.cpp \
+ qpid/broker/amqp/NodeProperties.h \
+ qpid/broker/amqp/NodeProperties.cpp \
qpid/broker/amqp/Outgoing.h \
qpid/broker/amqp/Outgoing.cpp \
qpid/broker/amqp/ProtocolPlugin.cpp \
@@ -790,6 +798,8 @@ amqp_la_LDFLAGS = $(PLUGINLDFLAGS) $(PROTON_LIBS)
cmoduleexec_LTLIBRARIES += amqpc.la
amqpc_la_LIBADD = libqpidcommon.la
amqpc_la_SOURCES = \
+ qpid/messaging/amqp/AddressHelper.h \
+ qpid/messaging/amqp/AddressHelper.cpp \
qpid/messaging/amqp/ConnectionContext.h \
qpid/messaging/amqp/ConnectionContext.cpp \
qpid/messaging/amqp/ConnectionHandle.h \
diff --git a/cpp/src/amqp.cmake b/cpp/src/amqp.cmake
index c8da3abf47..355e591cf6 100644
--- a/cpp/src/amqp.cmake
+++ b/cpp/src/amqp.cmake
@@ -50,6 +50,10 @@ if (BUILD_AMQP)
set (amqp_SOURCES
qpid/broker/amqp/Connection.h
qpid/broker/amqp/Connection.cpp
+ qpid/broker/amqp/DataReader.h
+ qpid/broker/amqp/DataReader.cpp
+ qpid/broker/amqp/Filter.h
+ qpid/broker/amqp/Filter.cpp
qpid/broker/amqp/Header.h
qpid/broker/amqp/Header.cpp
qpid/broker/amqp/ManagedConnection.h
@@ -60,6 +64,8 @@ if (BUILD_AMQP)
qpid/broker/amqp/ManagedOutgoingLink.cpp
qpid/broker/amqp/Message.h
qpid/broker/amqp/Message.cpp
+ qpid/broker/amqp/NodeProperties.h
+ qpid/broker/amqp/NodeProperties.cpp
qpid/broker/amqp/Outgoing.h
qpid/broker/amqp/Outgoing.cpp
qpid/broker/amqp/ProtocolPlugin.cpp
@@ -82,6 +88,8 @@ if (BUILD_AMQP)
set (amqpc_SOURCES
+ qpid/messaging/amqp/AddressHelper.h
+ qpid/messaging/amqp/AddressHelper.cpp
qpid/messaging/amqp/ConnectionContext.h
qpid/messaging/amqp/ConnectionContext.cpp
qpid/messaging/amqp/ConnectionHandle.h
diff --git a/cpp/src/qpid/amqp/MapReader.cpp b/cpp/src/qpid/amqp/MapReader.cpp
new file mode 100644
index 0000000000..2bace74d34
--- /dev/null
+++ b/cpp/src/qpid/amqp/MapReader.cpp
@@ -0,0 +1,289 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/amqp/MapReader.h"
+#include "qpid/Exception.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace amqp {
+
+void MapReader::onNull(const Descriptor* d)
+{
+ if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
+ if (key) {
+ onNullValue(key, d);
+ key.data = 0; key.size = 0;
+ } else {
+ throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
+ }
+}
+void MapReader::onBoolean(bool v, const Descriptor* d)
+{
+ if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
+ if (key) {
+ onBooleanValue(key, v, d);
+ key.data = 0; key.size = 0;
+ } else {
+ throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
+ }
+}
+
+void MapReader::onUByte(uint8_t v, const Descriptor* d)
+{
+ if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
+ if (key) {
+ onUByteValue(key, v, d);
+ key.data = 0; key.size = 0;
+ } else {
+ throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
+ }
+}
+
+void MapReader::onUShort(uint16_t v, const Descriptor* d)
+{
+ if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
+ if (key) {
+ onUShortValue(key, v, d);
+ key.data = 0; key.size = 0;
+ } else {
+ throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
+ }
+}
+
+void MapReader::onUInt(uint32_t v, const Descriptor* d)
+{
+ if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
+ if (key) {
+ onUIntValue(key, v, d);
+ key.data = 0; key.size = 0;
+ } else {
+ throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
+ }
+}
+
+void MapReader::onULong(uint64_t v, const Descriptor* d)
+{
+ if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
+ if (key) {
+ onULongValue(key, v, d);
+ key.data = 0; key.size = 0;
+ } else {
+ throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
+ }
+}
+
+void MapReader::onByte(int8_t v, const Descriptor* d)
+{
+ if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
+ if (key) {
+ onByteValue(key, v, d);
+ key.data = 0; key.size = 0;
+ } else {
+ throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
+ }
+}
+
+void MapReader::onShort(int16_t v, const Descriptor* d)
+{
+ if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
+ if (key) {
+ onShortValue(key, v, d);
+ key.data = 0; key.size = 0;
+ } else {
+ throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
+ }
+}
+
+void MapReader::onInt(int32_t v, const Descriptor* d)
+{
+ if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
+ if (key) {
+ onIntValue(key, v, d);
+ key.data = 0; key.size = 0;
+ } else {
+ throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
+ }
+}
+
+void MapReader::onLong(int64_t v, const Descriptor* d)
+{
+ if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
+ if (key) {
+ onLongValue(key, v, d);
+ key.data = 0; key.size = 0;
+ } else {
+ throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
+ }
+}
+
+void MapReader::onFloat(float v, const Descriptor* d)
+{
+ if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
+ if (key) {
+ onFloatValue(key, v, d);
+ key.data = 0; key.size = 0;
+ } else {
+ throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
+ }
+}
+
+void MapReader::onDouble(double v, const Descriptor* d)
+{
+ if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
+ if (key) {
+ onDoubleValue(key, v, d);
+ key.data = 0; key.size = 0;
+ } else {
+ throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
+ }
+}
+
+void MapReader::onUuid(const CharSequence& v, const Descriptor* d)
+{
+ if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
+ if (key) {
+ onUuidValue(key, v, d);
+ key.data = 0; key.size = 0;
+ } else {
+ throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
+ }
+}
+
+void MapReader::onTimestamp(int64_t v, const Descriptor* d)
+{
+ if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
+ if (key) {
+ onTimestampValue(key, v, d);
+ key.data = 0; key.size = 0;
+ } else {
+ throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
+ }
+}
+
+void MapReader::onBinary(const CharSequence& v, const Descriptor* d)
+{
+ if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
+ if (key) {
+ onBinaryValue(key, v, d);
+ key.data = 0; key.size = 0;
+ } else {
+ throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
+ }
+}
+
+void MapReader::onString(const CharSequence& v, const Descriptor* d)
+{
+ if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
+ if (key) {
+ onStringValue(key, v, d);
+ key.data = 0; key.size = 0;
+ } else {
+ throw qpid::Exception(QPID_MSG("Expecting symbol as key, got string " << v.str()));
+ }
+}
+
+void MapReader::onSymbol(const CharSequence& v, const Descriptor* d)
+{
+ if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
+ if (key) {
+ onSymbolValue(key, v, d);
+ key.data = 0; key.size = 0;
+ } else {
+ key = v;
+ }
+}
+
+bool MapReader::onStartList(uint32_t count, const CharSequence&, const Descriptor* d)
+{
+ if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
+ if (key) {
+ bool step = onStartListValue(key, count, d);
+ key.data = 0; key.size = 0;
+ return step;
+ } else {
+ throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
+ }
+ return true;
+}
+
+bool MapReader::onStartMap(uint32_t count, const CharSequence&, const Descriptor* d)
+{
+ if (level++) {
+ if (key) {
+ bool step = onStartMapValue(key, count, d);
+ key.data = 0; key.size = 0;
+ return step;
+ } else {
+ throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
+ }
+ }
+ return true;
+}
+
+bool MapReader::onStartArray(uint32_t count, const CharSequence&, const Constructor& c, const Descriptor* d)
+{
+ if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
+ if (key) {
+ bool step = onStartArrayValue(key, count, c, d);
+ key.data = 0; key.size = 0;
+ return step;
+ } else {
+ throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
+ }
+ return true;
+}
+
+void MapReader::onEndList(uint32_t count, const Descriptor* d)
+{
+ if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
+ if (key) {
+ onEndListValue(key, count, d);
+ key.data = 0; key.size = 0;
+ } else {
+ throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
+ }
+}
+
+void MapReader::onEndMap(uint32_t count, const Descriptor* d)
+{
+ if (--level) {
+ onEndMapValue(key, count, d);
+ key.data = 0; key.size = 0;
+ }
+}
+
+void MapReader::onEndArray(uint32_t count, const Descriptor* d)
+{
+ if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
+ if (key) {
+ onEndArrayValue(key, count, d);
+ key.data = 0; key.size = 0;
+ } else {
+ throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
+ }
+}
+
+MapReader::MapReader() : level(0)
+{
+ key.data = 0; key.size = 0;
+}
+
+}} // namespace qpid::amqp
diff --git a/cpp/src/qpid/amqp/MapReader.h b/cpp/src/qpid/amqp/MapReader.h
new file mode 100644
index 0000000000..fe8f65b30c
--- /dev/null
+++ b/cpp/src/qpid/amqp/MapReader.h
@@ -0,0 +1,104 @@
+#ifndef QPID_AMQP_MAPREADER_H
+#define QPID_AMQP_MAPREADER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Reader.h"
+#include "CharSequence.h"
+#include <string>
+
+namespace qpid {
+namespace amqp {
+
+/**
+ * Reading AMQP 1.0 encoded data which is constrained to be a symbol
+ * keyeed map. The keys are assumed never to be described, the values
+ * may be.
+ */
+class MapReader : public Reader
+{
+ public:
+ virtual void onNullValue(const CharSequence& /*key*/, const Descriptor*) {}
+ virtual void onBooleanValue(const CharSequence& /*key*/, bool, const Descriptor*) {}
+ virtual void onUByteValue(const CharSequence& /*key*/, uint8_t, const Descriptor*) {}
+ virtual void onUShortValue(const CharSequence& /*key*/, uint16_t, const Descriptor*) {}
+ virtual void onUIntValue(const CharSequence& /*key*/, uint32_t, const Descriptor*) {}
+ virtual void onULongValue(const CharSequence& /*key*/, uint64_t, const Descriptor*) {}
+ virtual void onByteValue(const CharSequence& /*key*/, int8_t, const Descriptor*) {}
+ virtual void onShortValue(const CharSequence& /*key*/, int16_t, const Descriptor*) {}
+ virtual void onIntValue(const CharSequence& /*key*/, int32_t, const Descriptor*) {}
+ virtual void onLongValue(const CharSequence& /*key*/, int64_t, const Descriptor*) {}
+ virtual void onFloatValue(const CharSequence& /*key*/, float, const Descriptor*) {}
+ virtual void onDoubleValue(const CharSequence& /*key*/, double, const Descriptor*) {}
+ virtual void onUuidValue(const CharSequence& /*key*/, const CharSequence&, const Descriptor*) {}
+ virtual void onTimestampValue(const CharSequence& /*key*/, int64_t, const Descriptor*) {}
+
+ virtual void onBinaryValue(const CharSequence& /*key*/, const CharSequence&, const Descriptor*) {}
+ virtual void onStringValue(const CharSequence& /*key*/, const CharSequence&, const Descriptor*) {}
+ virtual void onSymbolValue(const CharSequence& /*key*/, const CharSequence&, const Descriptor*) {}
+
+ /**
+ * @return true to step into elements of the compound value, false
+ * to skip over it
+ */
+ virtual bool onStartListValue(const CharSequence& /*key*/, uint32_t /*count*/, const Descriptor*) { return true; }
+ virtual bool onStartMapValue(const CharSequence& /*key*/, uint32_t /*count*/, const Descriptor*) { return true; }
+ virtual bool onStartArrayValue(const CharSequence& /*key*/, uint32_t /*count*/, const Constructor&, const Descriptor*) { return true; }
+ virtual void onEndListValue(const CharSequence& /*key*/, uint32_t /*count*/, const Descriptor*) {}
+ virtual void onEndMapValue(const CharSequence& /*key*/, uint32_t /*count*/, const Descriptor*) {}
+ virtual void onEndArrayValue(const CharSequence& /*key*/, uint32_t /*count*/, const Descriptor*) {}
+
+
+ //this class implements the Reader interface, thus acting as a transformer into a more map oriented scheme
+ void onNull(const Descriptor*);
+ void onBoolean(bool, const Descriptor*);
+ void onUByte(uint8_t, const Descriptor*);
+ void onUShort(uint16_t, const Descriptor*);
+ void onUInt(uint32_t, const Descriptor*);
+ void onULong(uint64_t, const Descriptor*);
+ void onByte(int8_t, const Descriptor*);
+ void onShort(int16_t, const Descriptor*);
+ void onInt(int32_t, const Descriptor*);
+ void onLong(int64_t, const Descriptor*);
+ void onFloat(float, const Descriptor*);
+ void onDouble(double, const Descriptor*);
+ void onUuid(const CharSequence&, const Descriptor*);
+ void onTimestamp(int64_t, const Descriptor*);
+
+ void onBinary(const CharSequence&, const Descriptor*);
+ void onString(const CharSequence&, const Descriptor*);
+ void onSymbol(const CharSequence&, const Descriptor*);
+
+ bool onStartList(uint32_t /*count*/, const CharSequence&, const Descriptor*);
+ bool onStartMap(uint32_t /*count*/, const CharSequence&, const Descriptor*);
+ bool onStartArray(uint32_t /*count*/, const CharSequence&, const Constructor&, const Descriptor*);
+ void onEndList(uint32_t /*count*/, const Descriptor*);
+ void onEndMap(uint32_t /*count*/, const Descriptor*);
+ void onEndArray(uint32_t /*count*/, const Descriptor*);
+
+ MapReader();
+ private:
+ CharSequence key;
+ size_t level;
+};
+}} // namespace qpid::amqp
+
+#endif /*!QPID_AMQP_MAPREADER_H*/
diff --git a/cpp/src/qpid/broker/amqp/DataReader.cpp b/cpp/src/qpid/broker/amqp/DataReader.cpp
new file mode 100644
index 0000000000..519dd71c9c
--- /dev/null
+++ b/cpp/src/qpid/broker/amqp/DataReader.cpp
@@ -0,0 +1,187 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "DataReader.h"
+#include "qpid/amqp/CharSequence.h"
+#include "qpid/amqp/Descriptor.h"
+#include "qpid/log/Statement.h"
+#include <string>
+extern "C" {
+#include <proton/engine.h>
+}
+
+namespace qpid {
+namespace broker {
+namespace amqp {
+namespace {
+qpid::amqp::CharSequence convert(pn_bytes_t in)
+{
+ qpid::amqp::CharSequence out;
+ out.data = in.start;
+ out.size = in.size;
+ return out;
+}
+
+qpid::amqp::CharSequence convert(pn_uuid_t in)
+{
+ qpid::amqp::CharSequence out;
+ out.data = in.bytes;
+ out.size = 16;
+ return out;
+}
+}
+
+DataReader::DataReader(qpid::amqp::Reader& r) : reader(r) {}
+
+void DataReader::read(pn_data_t* data)
+{
+ /*
+ while (pn_data_next(data)) {
+ readOne(data);
+ }
+ */
+ do {
+ readOne(data);
+ } while (pn_data_next(data));
+}
+void DataReader::readOne(pn_data_t* data)
+{
+ qpid::amqp::Descriptor descriptor(0);
+ bool described = pn_data_is_described(data);
+ if (described) {
+ pn_data_enter(data);
+ pn_data_next(data);
+ if (pn_data_type(data) == PN_ULONG) {
+ descriptor = qpid::amqp::Descriptor(pn_data_get_ulong(data));
+ } else if (pn_data_type(data) == PN_SYMBOL) {
+ descriptor = qpid::amqp::Descriptor(convert(pn_data_get_symbol(data)));
+ } else {
+ QPID_LOG(notice, "Ignoring descriptor of type " << pn_data_type(data));
+ }
+ pn_data_next(data);
+ }
+ switch (pn_data_type(data)) {
+ case PN_NULL:
+ reader.onNull(described ? &descriptor : 0);
+ break;
+ case PN_BOOL:
+ reader.onBoolean(pn_data_get_bool(data), described ? &descriptor : 0);
+ break;
+ case PN_UBYTE:
+ reader.onUByte(pn_data_get_ubyte(data), described ? &descriptor : 0);
+ break;
+ case PN_BYTE:
+ reader.onByte(pn_data_get_byte(data), described ? &descriptor : 0);
+ break;
+ case PN_USHORT:
+ reader.onUShort(pn_data_get_ushort(data), described ? &descriptor : 0);
+ break;
+ case PN_SHORT:
+ reader.onShort(pn_data_get_short(data), described ? &descriptor : 0);
+ break;
+ case PN_UINT:
+ reader.onUInt(pn_data_get_uint(data), described ? &descriptor : 0);
+ break;
+ case PN_INT:
+ reader.onInt(pn_data_get_int(data), described ? &descriptor : 0);
+ break;
+ case PN_CHAR:
+ pn_data_get_char(data);
+ break;
+ case PN_ULONG:
+ reader.onULong(pn_data_get_ulong(data), described ? &descriptor : 0);
+ break;
+ case PN_LONG:
+ reader.onLong(pn_data_get_long(data), described ? &descriptor : 0);
+ break;
+ case PN_TIMESTAMP:
+ reader.onTimestamp(pn_data_get_timestamp(data), described ? &descriptor : 0);
+ break;
+ case PN_FLOAT:
+ reader.onFloat(pn_data_get_float(data), described ? &descriptor : 0);
+ break;
+ case PN_DOUBLE:
+ reader.onDouble(pn_data_get_double(data), described ? &descriptor : 0);
+ break;
+ case PN_DECIMAL32:
+ pn_data_get_decimal32(data);
+ break;
+ case PN_DECIMAL64:
+ pn_data_get_decimal64(data);
+ break;
+ case PN_DECIMAL128:
+ pn_data_get_decimal128(data);
+ break;
+ case PN_UUID:
+ reader.onUuid(convert(pn_data_get_uuid(data)), described ? &descriptor : 0);
+ break;
+ case PN_BINARY:
+ reader.onBinary(convert(pn_data_get_binary(data)), described ? &descriptor : 0);
+ break;
+ case PN_STRING:
+ reader.onString(convert(pn_data_get_string(data)), described ? &descriptor : 0);
+ break;
+ case PN_SYMBOL:
+ reader.onSymbol(convert(pn_data_get_symbol(data)), described ? &descriptor : 0);
+ break;
+ case PN_DESCRIBED:
+ break;
+ case PN_ARRAY:
+ readArray(data, described ? &descriptor : 0);
+ break;
+ case PN_LIST:
+ readList(data, described ? &descriptor : 0);
+ break;
+ case PN_MAP:
+ readMap(data, described ? &descriptor : 0);
+ break;
+ }
+ if (described) pn_data_exit(data);
+}
+
+void DataReader::readArray(pn_data_t* /*data*/, const qpid::amqp::Descriptor* /*descriptor*/)
+{
+ //not yet implemented
+}
+
+void DataReader::readList(pn_data_t* data, const qpid::amqp::Descriptor* descriptor)
+{
+ size_t count = pn_data_get_list(data);
+ reader.onStartList(count, qpid::amqp::CharSequence(), descriptor);
+ pn_data_enter(data);
+ for (size_t i = 0; i < count && pn_data_next(data); ++i) {
+ read(data);
+ }
+ pn_data_exit(data);
+ reader.onEndList(count, descriptor);
+}
+
+void DataReader::readMap(pn_data_t* data, const qpid::amqp::Descriptor* descriptor)
+{
+ size_t count = pn_data_get_map(data);
+ reader.onStartMap(count, qpid::amqp::CharSequence(), descriptor);
+ pn_data_enter(data);
+ for (size_t i = 0; i < count && pn_data_next(data); ++i) {
+ read(data);
+ }
+ pn_data_exit(data);
+ reader.onEndMap(count, descriptor);
+}
+}}} // namespace qpid::broker::amqp
diff --git a/cpp/src/qpid/broker/amqp/DataReader.h b/cpp/src/qpid/broker/amqp/DataReader.h
new file mode 100644
index 0000000000..024507e7f2
--- /dev/null
+++ b/cpp/src/qpid/broker/amqp/DataReader.h
@@ -0,0 +1,53 @@
+#ifndef QPID_BROKER_AMQP_DATAREADER_H
+#define QPID_BROKER_AMQP_DATAREADER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/amqp/Reader.h"
+
+struct pn_data_t;
+
+namespace qpid {
+namespace amqp {
+struct Descriptor;
+}
+namespace broker {
+namespace amqp {
+
+/**
+ * Allows use of Reader interface to read pn_data_t* data.
+ */
+class DataReader
+{
+ public:
+ DataReader(qpid::amqp::Reader& reader);
+ void read(pn_data_t*);
+ private:
+ qpid::amqp::Reader& reader;
+
+ void readOne(pn_data_t*);
+ void readMap(pn_data_t*, const qpid::amqp::Descriptor*);
+ void readList(pn_data_t*, const qpid::amqp::Descriptor*);
+ void readArray(pn_data_t*, const qpid::amqp::Descriptor*);
+};
+}}} // namespace qpid::broker::amqp
+
+#endif /*!QPID_BROKER_AMQP_DATAREADER_H*/
diff --git a/cpp/src/qpid/broker/amqp/Filter.cpp b/cpp/src/qpid/broker/amqp/Filter.cpp
new file mode 100644
index 0000000000..61e377c72f
--- /dev/null
+++ b/cpp/src/qpid/broker/amqp/Filter.cpp
@@ -0,0 +1,145 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/amqp/Filter.h"
+#include "qpid/broker/amqp/DataReader.h"
+#include "qpid/broker/DirectExchange.h"
+#include "qpid/broker/TopicExchange.h"
+#include "qpid/amqp/descriptors.h"
+#include "qpid/log/Statement.h"
+extern "C" {
+#include <proton/engine.h>
+}
+
+namespace qpid {
+namespace broker {
+namespace amqp {
+
+void Filter::read(pn_data_t* data)
+{
+ try {
+ DataReader reader(*this);
+ reader.read(data);
+ } catch (const std::exception& e) {
+ QPID_LOG(warning, "Error parsing filter: " << e.what());
+ }
+}
+
+void Filter::write(pn_data_t* data)
+{
+ pn_data_put_map(data);
+ pn_data_enter(data);
+ subjectFilter.write(data);
+ pn_data_exit(data);
+}
+
+void Filter::onStringValue(const qpid::amqp::CharSequence& key, const qpid::amqp::CharSequence& value, const qpid::amqp::Descriptor* descriptor)
+{
+ StringFilter filter;
+ filter.key = std::string(key.data, key.size);
+ filter.value = std::string(value.data, value.size);
+ if (descriptor) {
+ filter.described = true;
+ filter.descriptor = *descriptor;
+ if (descriptor->match(qpid::amqp::filters::LEGACY_TOPIC_FILTER_SYMBOL, qpid::amqp::filters::LEGACY_TOPIC_FILTER_CODE)
+ || descriptor->match(qpid::amqp::filters::LEGACY_DIRECT_FILTER_SYMBOL, qpid::amqp::filters::LEGACY_DIRECT_FILTER_CODE)) {
+ setSubjectFilter(filter);
+ } else {
+ QPID_LOG(notice, "Skipping unrecognised string filter with key " << filter.key << " and descriptor " << filter.descriptor);
+ }
+ } else {
+ setSubjectFilter(filter);
+ }
+}
+
+bool Filter::hasSubjectFilter() const
+{
+ return !subjectFilter.value.empty();
+}
+
+
+void Filter::setSubjectFilter(const StringFilter& filter)
+{
+ if (hasSubjectFilter()) {
+ QPID_LOG(notice, "Skipping filter with key " << filter.key << "; subject filter already set");
+ } else {
+ subjectFilter = filter;
+ }
+}
+
+void Filter::bind(boost::shared_ptr<Exchange> exchange, boost::shared_ptr<Queue> queue)
+{
+ subjectFilter.bind(exchange, queue);
+}
+
+Filter::StringFilter::StringFilter() : described(false), descriptor(0) {}
+namespace {
+pn_bytes_t convert(const std::string& in)
+{
+ pn_bytes_t out;
+ out.start = const_cast<char*>(in.data());
+ out.size = in.size();
+ return out;
+}
+pn_bytes_t convert(const qpid::amqp::CharSequence& in)
+{
+ pn_bytes_t out;
+ out.start = const_cast<char*>(in.data);
+ out.size = in.size;
+ return out;
+}
+}
+void Filter::StringFilter::write(pn_data_t* data)
+{
+ pn_data_put_symbol(data, convert(key));
+ if (described) {
+ pn_data_put_described(data);
+ pn_data_enter(data);
+ switch (descriptor.type) {
+ case qpid::amqp::Descriptor::NUMERIC:
+ pn_data_put_ulong(data, descriptor.value.code);
+ break;
+ case qpid::amqp::Descriptor::SYMBOLIC:
+ pn_data_put_symbol(data, convert(descriptor.value.symbol));
+ break;
+ }
+ }
+ pn_data_put_string(data, convert(value));
+ if (described) pn_data_exit(data);
+}
+
+void Filter::StringFilter::bind(boost::shared_ptr<Exchange> exchange, boost::shared_ptr<Queue> queue)
+{
+ if (described && exchange->getType() == DirectExchange::typeName
+ && descriptor.match(qpid::amqp::filters::LEGACY_TOPIC_FILTER_SYMBOL, qpid::amqp::filters::LEGACY_TOPIC_FILTER_CODE)) {
+ QPID_LOG(info, "Using legacy topic filter as a direct matching filter for " << exchange->getName());
+ if (descriptor.type == qpid::amqp::Descriptor::NUMERIC) {
+ descriptor = qpid::amqp::Descriptor(qpid::amqp::filters::LEGACY_DIRECT_FILTER_CODE);
+ } else {
+ qpid::amqp::CharSequence symbol;
+ symbol.data = qpid::amqp::filters::LEGACY_DIRECT_FILTER_SYMBOL.data();
+ symbol.size = qpid::amqp::filters::LEGACY_DIRECT_FILTER_SYMBOL.size();
+ descriptor = qpid::amqp::Descriptor(symbol);
+ }
+ }
+ exchange->bind(queue, value, 0);
+}
+
+}}} // namespace qpid::broker::amqp
diff --git a/cpp/src/qpid/broker/amqp/Filter.h b/cpp/src/qpid/broker/amqp/Filter.h
new file mode 100644
index 0000000000..5e2dee4d6e
--- /dev/null
+++ b/cpp/src/qpid/broker/amqp/Filter.h
@@ -0,0 +1,62 @@
+#ifndef QPID_BROKER_AMQP_FILTER_H
+#define QPID_BROKER_AMQP_FILTER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/amqp/MapReader.h"
+#include "qpid/amqp/Descriptor.h"
+#include <boost/shared_ptr.hpp>
+
+struct pn_data_t;
+namespace qpid {
+namespace broker {
+class Exchange;
+class Queue;
+namespace amqp {
+
+
+class Filter : qpid::amqp::MapReader
+{
+ public:
+ void read(pn_data_t*);
+ void write(pn_data_t*);
+ bool hasSubjectFilter() const;
+ void bind(boost::shared_ptr<Exchange> exchange, boost::shared_ptr<Queue> queue);
+ private:
+ struct StringFilter
+ {
+ bool described;
+ qpid::amqp::Descriptor descriptor;
+ std::string key;
+ std::string value;
+ StringFilter();
+ void write(pn_data_t*);
+ void bind(boost::shared_ptr<Exchange> exchange, boost::shared_ptr<Queue> queue);
+ };
+
+ void onStringValue(const qpid::amqp::CharSequence& key, const qpid::amqp::CharSequence& value, const qpid::amqp::Descriptor* descriptor);
+ void setSubjectFilter(const StringFilter&);
+
+ StringFilter subjectFilter;
+};
+}}} // namespace qpid::broker::amqp
+
+#endif /*!QPID_BROKER_AMQP_FILTER_H*/
diff --git a/cpp/src/qpid/broker/amqp/NodeProperties.cpp b/cpp/src/qpid/broker/amqp/NodeProperties.cpp
new file mode 100644
index 0000000000..c12b161e8b
--- /dev/null
+++ b/cpp/src/qpid/broker/amqp/NodeProperties.cpp
@@ -0,0 +1,154 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/amqp/NodeProperties.h"
+#include "qpid/broker/amqp/DataReader.h"
+#include "qpid/amqp/CharSequence.h"
+#include "qpid/types/Variant.h"
+#include "qpid/broker/QueueSettings.h"
+#include "qpid/log/Statement.h"
+
+using qpid::amqp::CharSequence;
+using qpid::amqp::Descriptor;
+
+namespace qpid {
+namespace broker {
+namespace amqp {
+namespace {
+//distribution modes:
+const std::string MOVE("move");
+const std::string COPY("copy");
+
+const std::string SUPPORTED_DIST_MODES("supported-dist-modes");
+}
+
+NodeProperties::NodeProperties() : queue(true) {}
+
+void NodeProperties::read(pn_data_t* data)
+{
+ DataReader reader(*this);
+ reader.read(data);
+}
+
+void NodeProperties::process(const std::string& key, const qpid::types::Variant& value)
+{
+ QPID_LOG(notice, "Processing node property " << key << " = " << value);
+ if (key == SUPPORTED_DIST_MODES) {
+ if (value == MOVE) queue = true;
+ else if (value == COPY) queue = false;
+ } else {
+ properties[key] = value;
+ }
+}
+
+void NodeProperties::onNullValue(const CharSequence& key, const Descriptor*)
+{
+ process(key.str(), qpid::types::Variant());
+}
+
+void NodeProperties::onBooleanValue(const CharSequence& key, bool value, const Descriptor*)
+{
+ process(key.str(), value);
+}
+
+void NodeProperties::onUByteValue(const CharSequence& key, uint8_t value, const Descriptor*)
+{
+ process(key.str(), value);
+}
+
+void NodeProperties::onUShortValue(const CharSequence& key, uint16_t value, const Descriptor*)
+{
+ process(key.str(), value);
+}
+
+void NodeProperties::onUIntValue(const CharSequence& key, uint32_t value, const Descriptor*)
+{
+ process(key.str(), value);
+}
+
+void NodeProperties::onULongValue(const CharSequence& key, uint64_t value, const Descriptor*)
+{
+ process(key.str(), value);
+}
+
+void NodeProperties::onByteValue(const CharSequence& key, int8_t value, const Descriptor*)
+{
+ process(key.str(), value);
+}
+
+void NodeProperties::onShortValue(const CharSequence& key, int16_t value, const Descriptor*)
+{
+ process(key.str(), value);
+}
+
+void NodeProperties::onIntValue(const CharSequence& key, int32_t value, const Descriptor*)
+{
+ process(key.str(), value);
+}
+
+void NodeProperties::onLongValue(const CharSequence& key, int64_t value, const Descriptor*)
+{
+ process(key.str(), value);
+}
+
+void NodeProperties::onFloatValue(const CharSequence& key, float value, const Descriptor*)
+{
+ process(key.str(), value);
+}
+
+void NodeProperties::onDoubleValue(const CharSequence& key, double value, const Descriptor*)
+{
+ process(key.str(), value);
+}
+
+void NodeProperties::onUuidValue(const CharSequence& key, const CharSequence& value, const Descriptor*)
+{
+ process(key.str(), value.str());
+}
+
+void NodeProperties::onTimestampValue(const CharSequence& key, int64_t value, const Descriptor*)
+{
+ process(key.str(), value);
+}
+
+void NodeProperties::onStringValue(const CharSequence& key, const CharSequence& value, const Descriptor*)
+{
+ process(key.str(), value.str());
+}
+
+void NodeProperties::onSymbolValue(const CharSequence& key, const CharSequence& value, const Descriptor*)
+{
+ process(key.str(), value.str());
+}
+
+QueueSettings NodeProperties::getQueueSettings()
+{
+ QueueSettings settings(false/*durable*/, false/*auto-delete*/);
+ qpid::types::Variant::Map unused;
+ settings.populate(properties, unused);
+ return settings;
+}
+
+bool NodeProperties::isQueue() const
+{
+ return queue;
+}
+
+}}} // namespace qpid::broker::amqp
diff --git a/cpp/src/qpid/broker/amqp/NodeProperties.h b/cpp/src/qpid/broker/amqp/NodeProperties.h
new file mode 100644
index 0000000000..2b3a98b6bd
--- /dev/null
+++ b/cpp/src/qpid/broker/amqp/NodeProperties.h
@@ -0,0 +1,64 @@
+#ifndef QPID_BROKER_AMQP_NODEPROPERTIES_H
+#define QPID_BROKER_AMQP_NODEPROPERTIES_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/amqp/MapReader.h"
+#include "qpid/types/Variant.h"
+
+struct pn_data_t;
+namespace qpid {
+namespace broker {
+struct QueueSettings;
+namespace amqp {
+
+class NodeProperties : public qpid::amqp::MapReader
+{
+ public:
+ NodeProperties();
+ void read(pn_data_t*);
+ void onNullValue(const qpid::amqp::CharSequence&, const qpid::amqp::Descriptor*);
+ void onBooleanValue(const qpid::amqp::CharSequence&, bool, const qpid::amqp::Descriptor*);
+ void onUByteValue(const qpid::amqp::CharSequence&, uint8_t, const qpid::amqp::Descriptor*);
+ void onUShortValue(const qpid::amqp::CharSequence&, uint16_t, const qpid::amqp::Descriptor*);
+ void onUIntValue(const qpid::amqp::CharSequence&, uint32_t, const qpid::amqp::Descriptor*);
+ void onULongValue(const qpid::amqp::CharSequence&, uint64_t, const qpid::amqp::Descriptor*);
+ void onByteValue(const qpid::amqp::CharSequence&, int8_t, const qpid::amqp::Descriptor*);
+ void onShortValue(const qpid::amqp::CharSequence&, int16_t, const qpid::amqp::Descriptor*);
+ void onIntValue(const qpid::amqp::CharSequence&, int32_t, const qpid::amqp::Descriptor*);
+ void onLongValue(const qpid::amqp::CharSequence&, int64_t, const qpid::amqp::Descriptor*);
+ void onFloatValue(const qpid::amqp::CharSequence&, float, const qpid::amqp::Descriptor*);
+ void onDoubleValue(const qpid::amqp::CharSequence&, double, const qpid::amqp::Descriptor*);
+ void onUuidValue(const qpid::amqp::CharSequence&, const qpid::amqp::CharSequence&, const qpid::amqp::Descriptor*);
+ void onTimestampValue(const qpid::amqp::CharSequence&, int64_t, const qpid::amqp::Descriptor*);
+ void onStringValue(const qpid::amqp::CharSequence&, const qpid::amqp::CharSequence&, const qpid::amqp::Descriptor*);
+ void onSymbolValue(const qpid::amqp::CharSequence&, const qpid::amqp::CharSequence&, const qpid::amqp::Descriptor*);
+ bool isQueue() const;
+ QueueSettings getQueueSettings();
+ private:
+ bool queue;
+ qpid::types::Variant::Map properties;
+
+ void process(const std::string&, const qpid::types::Variant&);
+};
+}}} // namespace qpid::broker::amqp
+
+#endif /*!QPID_BROKER_AMQP_NODEPROPERTIES_H*/
diff --git a/cpp/src/qpid/broker/amqp/Session.cpp b/cpp/src/qpid/broker/amqp/Session.cpp
index 5a6812edad..f0756bc14d 100644
--- a/cpp/src/qpid/broker/amqp/Session.cpp
+++ b/cpp/src/qpid/broker/amqp/Session.cpp
@@ -35,7 +35,10 @@
#include "qpid/broker/Message.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/TopicExchange.h"
+#include "qpid/broker/amqp/Filter.h"
+#include "qpid/broker/amqp/NodeProperties.h"
#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/FieldTable.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/log/Statement.h"
#include <boost/intrusive_ptr.hpp>
@@ -84,6 +87,30 @@ class Exchange : public Target
Session::Session(pn_session_t* s, qpid::broker::Broker& b, ManagedConnection& c, qpid::sys::OutputControl& o)
: ManagedSession(b, c, (boost::format("%1%") % s).str()), session(s), broker(b), connection(c), out(o), deleted(false) {}
+
+Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* terminus)
+{
+ ResolvedNode node;
+ node.exchange = broker.getExchanges().find(name);
+ node.queue = broker.getQueues().find(name);
+ if (!node.queue && !node.exchange && pn_terminus_is_dynamic(terminus)) {
+ //TODO: handle dynamic creation
+ //is it a queue or an exchange?
+ NodeProperties properties;
+ properties.read(pn_terminus_properties(terminus));
+ if (properties.isQueue()) {
+ node.queue = broker.createQueue(name, properties.getQueueSettings(), this, "", connection.getUserid(), connection.getId()).first;
+ } else {
+ qpid::framing::FieldTable args;
+ node.exchange = broker.createExchange(name, "topic"/*type*/, false/*durable*/, ""/*alternateExchange*/, args, connection.getUserid(), connection.getId()).first;
+ }
+ } else if (node.queue && node.exchange) {
+ QPID_LOG_CAT(warning, protocol, "Ambiguous node name; " << name << " could be queue or exchange, assuming queue");
+ node.exchange.reset();
+ }
+ return node;
+}
+
void Session::attach(pn_link_t* link)
{
if (pn_link_is_sender(link)) {
@@ -96,120 +123,33 @@ void Session::attach(pn_link_t* link)
QPID_LOG(debug, "Received attach request for outgoing link from " << name);
pn_terminus_set_address(pn_link_source(link), name.c_str());
- boost::shared_ptr<qpid::broker::Exchange> exchange = broker.getExchanges().find(name);
- boost::shared_ptr<qpid::broker::Queue> queue = broker.getQueues().find(name);
- if (queue) {
- if (exchange) {
- QPID_LOG_CAT(warning, protocol, "Ambiguous node name; " << name << " could be queue or exchange, assuming queue");
- }
- boost::shared_ptr<Outgoing> q(new Outgoing(broker, queue, link, *this, out, false));
+ ResolvedNode node = resolve(name, source);
+
+ if (node.queue) {
+ boost::shared_ptr<Outgoing> q(new Outgoing(broker, node.queue, link, *this, out, false));
q->init();
senders[link] = q;
- } else if (exchange) {
+ } else if (node.exchange) {
QueueSettings settings(false, true);
//TODO: populate settings from source details when available from engine
- queue = broker.createQueue(name + qpid::types::Uuid(true).str(), settings, this, "", connection.getUserid(), connection.getId()).first;
- pn_data_t* filter = pn_terminus_filter(source);
- pn_data_next(filter);
- if (filter && !pn_data_is_null(filter)) {
- if (pn_data_type(filter) == PN_MAP) {
- pn_data_t* echo = pn_terminus_filter(pn_link_source(link));
- pn_data_put_map(echo);
- pn_data_enter(echo);
- size_t count = pn_data_get_map(filter)/2;
- QPID_LOG(debug, "Got filter map with " << count << " entries");
- pn_data_enter(filter);
- for (size_t i = 0; i < count; i++) {
- pn_bytes_t fname = pn_data_get_symbol(filter);
- pn_data_next(filter);
- bool isDescribed = pn_data_is_described(filter);
- qpid::amqp::Descriptor descriptor(0);
- if (isDescribed) {
- pn_data_enter(filter);
- pn_data_next(filter);
- //TODO: use or at least verify descriptor
- if (pn_data_type(filter) == PN_ULONG) {
- descriptor = qpid::amqp::Descriptor(pn_data_get_ulong(filter));
- } else if (pn_data_type(filter) == PN_SYMBOL) {
- pn_bytes_t d = pn_data_get_symbol(filter);
- qpid::amqp::CharSequence c;
- c.data = d.start;
- c.size = d.size;
- descriptor = qpid::amqp::Descriptor(c);
- } else {
- QPID_LOG(notice, "Ignoring filter " << std::string(fname.start, fname.size) << " with descriptor of type " << pn_data_type(filter));
- continue;
- }
- if (descriptor.match(qpid::amqp::filters::LEGACY_TOPIC_FILTER_SYMBOL, qpid::amqp::filters::LEGACY_TOPIC_FILTER_CODE)) {
- if (exchange->getType() == qpid::broker::DirectExchange::typeName) {
- QPID_LOG(info, "Interpreting legacy topic filter as direct binding key for " << exchange->getName());
- } else if (exchange->getType() == qpid::broker::FanOutExchange::typeName) {
- QPID_LOG(info, "Ignoring legacy topic filter on fanout exchange " << exchange->getName());
- for (int i = 0; i < 3; ++i) pn_data_next(filter);//move off descriptor, then skip key and value
- continue;
- }
- } else if (descriptor.match(qpid::amqp::filters::LEGACY_DIRECT_FILTER_SYMBOL, qpid::amqp::filters::LEGACY_DIRECT_FILTER_CODE)) {
- if (exchange->getType() == qpid::broker::TopicExchange::typeName) {
- QPID_LOG(info, "Interpreting legacy direct filter as topic binding key for " << exchange->getName());
- } else if (exchange->getType() == qpid::broker::FanOutExchange::typeName) {
- QPID_LOG(info, "Ignoring legacy direct filter on fanout exchange " << exchange->getName());
- for (int i = 0; i < 3; ++i) pn_data_next(filter);//move off descriptor, then skip key and value
- continue;
- }
- } else {
- QPID_LOG(notice, "Ignoring filter with unsupported descriptor " << descriptor);
- for (int i = 0; i < 3; ++i) pn_data_next(filter);//move off descriptor, then skip key and value
- continue;
- }
- pn_data_next(filter);
- } else {
- QPID_LOG(info, "Got undescribed filter of type " << pn_data_type(filter));
- }
- if (pn_data_type(filter) == PN_STRING) {
- pn_bytes_t value = pn_data_get_string(filter);
- pn_data_next(filter);
- exchange->bind(queue, std::string(value.start, value.size), 0);
- pn_data_put_symbol(echo, fname);
- if (isDescribed) {
- pn_data_put_described(echo);
- pn_data_enter(echo);
- pn_bytes_t symbol;
- switch (descriptor.type) {
- case qpid::amqp::Descriptor::NUMERIC:
- pn_data_put_ulong(echo, descriptor.value.code);
- break;
- case qpid::amqp::Descriptor::SYMBOLIC:
- symbol.start = const_cast<char*>(descriptor.value.symbol.data);
- symbol.size = descriptor.value.symbol.size;
- pn_data_put_symbol(echo, symbol);
- break;
- }
- }
- pn_data_put_string(echo, value);
- if (isDescribed) pn_data_exit(echo);
-
- QPID_LOG(debug, "Binding using filter " << std::string(fname.start, fname.size) << ":" << std::string(value.start, value.size));
- } else {
- //TODO: handle headers exchange filters
- QPID_LOG(warning, "Ignoring unsupported filter type with key " << std::string(fname.start, fname.size) << " and type " << pn_data_type(filter));
- }
- }
- pn_data_exit(echo);
- } else {
- QPID_LOG(warning, "Filter should be map, got type: " << pn_data_type(filter));
- }
- } else if (exchange->getType() == FanOutExchange::typeName) {
- exchange->bind(queue, std::string(), 0);
- } else if (exchange->getType() == TopicExchange::typeName) {
- exchange->bind(queue, "#", 0);
+ boost::shared_ptr<qpid::broker::Queue> queue
+ = broker.createQueue(name + qpid::types::Uuid(true).str(), settings, this, "", connection.getUserid(), connection.getId()).first;
+ Filter filter;
+ filter.read(pn_terminus_filter(source));
+ if (filter.hasSubjectFilter()) {
+ filter.bind(node.exchange, queue);
+ filter.write(pn_terminus_filter(pn_link_source(link)));
+ } else if (node.exchange->getType() == FanOutExchange::typeName) {
+ node.exchange->bind(queue, std::string(), 0);
+ } else if (node.exchange->getType() == TopicExchange::typeName) {
+ node.exchange->bind(queue, "#", 0);
} else {
- throw qpid::Exception("Exchange type requires a filter: " + exchange->getType());/*not-supported?*/
+ throw qpid::Exception("Exchange type requires a filter: " + node.exchange->getType());/*not-supported?*/
}
boost::shared_ptr<Outgoing> q(new Outgoing(broker, queue, link, *this, out, true));
senders[link] = q;
q->init();
} else {
- //TODO: handle dynamic creation
pn_terminus_set_type(pn_link_source(link), PN_UNSPECIFIED);
throw qpid::Exception("Node not found: " + name);/*not-found*/
}
@@ -223,22 +163,17 @@ void Session::attach(pn_link_t* link)
QPID_LOG(debug, "Received attach request for incoming link to " << name);
pn_terminus_set_address(pn_link_target(link), name.c_str());
+ ResolvedNode node = resolve(name, target);
- boost::shared_ptr<qpid::broker::Queue> queue = broker.getQueues().find(name);
- boost::shared_ptr<qpid::broker::Exchange> exchange = broker.getExchanges().find(name);
- if (queue) {
- if (exchange) {
- QPID_LOG_CAT(warning, protocol, "Ambiguous node name; " << name << " could be queue or exchange, assuming queue");
- }
- boost::shared_ptr<Target> q(new Queue(queue, link));
+ if (node.queue) {
+ boost::shared_ptr<Target> q(new Queue(node.queue, link));
targets[link] = q;
q->flow();
- } else if (exchange) {
- boost::shared_ptr<Target> e(new Exchange(exchange, link));
+ } else if (node.exchange) {
+ boost::shared_ptr<Target> e(new Exchange(node.exchange, link));
targets[link] = e;
e->flow();
} else {
- //TODO: handle dynamic creation
pn_terminus_set_type(pn_link_target(link), PN_UNSPECIFIED);
throw qpid::Exception("Node not found: " + name);/*not-found*/
}
diff --git a/cpp/src/qpid/broker/amqp/Session.h b/cpp/src/qpid/broker/amqp/Session.h
index 07302d062b..7dbdaf05fc 100644
--- a/cpp/src/qpid/broker/amqp/Session.h
+++ b/cpp/src/qpid/broker/amqp/Session.h
@@ -32,11 +32,14 @@
struct pn_delivery_t;
struct pn_link_t;
struct pn_session_t;
+struct pn_terminus_t;
namespace qpid {
namespace broker {
class Broker;
+class Exchange;
+class Queue;
namespace amqp {
@@ -71,6 +74,13 @@ class Session : public ManagedSession, public boost::enable_shared_from_this<Ses
std::deque<pn_delivery_t*> completed;
bool deleted;
qpid::sys::Mutex lock;
+ struct ResolvedNode
+ {
+ boost::shared_ptr<qpid::broker::Exchange> exchange;
+ boost::shared_ptr<qpid::broker::Queue> queue;
+ };
+
+ ResolvedNode resolve(const std::string name, pn_terminus_t* terminus);
};
}}} // namespace qpid::broker::amqp
diff --git a/cpp/src/qpid/messaging/amqp/AddressHelper.cpp b/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
new file mode 100644
index 0000000000..359660dce5
--- /dev/null
+++ b/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
@@ -0,0 +1,182 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/messaging/amqp/AddressHelper.h"
+#include "qpid/messaging/Address.h"
+#include <vector>
+#include <boost/assign.hpp>
+extern "C" {
+#include <proton/engine.h>
+}
+
+namespace qpid {
+namespace messaging {
+namespace amqp {
+
+using qpid::types::Variant;
+
+namespace {
+//policy types
+const std::string CREATE("create");
+const std::string ASSERT("assert");
+const std::string DELETE("delete");
+
+//policy values
+const std::string ALWAYS("always");
+const std::string NEVER("never");
+const std::string RECEIVER("receiver");
+const std::string SENDER("sender");
+
+const std::string NODE("node");
+const std::string LINK("link");
+
+const std::string TYPE("type");
+const std::string TOPIC("topic");
+const std::string QUEUE("queue");
+
+//distribution modes:
+const std::string MOVE("move");
+const std::string COPY("copy");
+
+const std::string SUPPORTED_DIST_MODES("supported-dist-modes");
+
+
+const std::vector<std::string> RECEIVER_MODES = boost::assign::list_of<std::string>(ALWAYS) (RECEIVER);
+const std::vector<std::string> SENDER_MODES = boost::assign::list_of<std::string>(ALWAYS) (SENDER);
+
+pn_bytes_t convert(const std::string& s)
+{
+ pn_bytes_t result;
+ result.start = const_cast<char*>(s.data());
+ result.size = s.size();
+ return result;
+}
+
+bool bind(const Variant::Map& options, const std::string& name, std::string& variable)
+{
+ Variant::Map::const_iterator j = options.find(name);
+ if (j == options.end()) {
+ return false;
+ } else {
+ variable = j->second.asString();
+ return true;
+ }
+}
+
+bool bind(const Variant::Map& options, const std::string& name, Variant::Map& variable)
+{
+ Variant::Map::const_iterator j = options.find(name);
+ if (j == options.end()) {
+ return false;
+ } else {
+ variable = j->second.asMap();
+ return true;
+ }
+}
+
+bool bind(const Address& address, const std::string& name, std::string& variable)
+{
+ return bind(address.getOptions(), name, variable);
+}
+
+bool bind(const Address& address, const std::string& name, Variant::Map& variable)
+{
+ return bind(address.getOptions(), name, variable);
+}
+
+bool in(const std::string& value, const std::vector<std::string>& choices)
+{
+ for (std::vector<std::string>::const_iterator i = choices.begin(); i != choices.end(); ++i) {
+ if (value == *i) return true;
+ }
+ return false;
+}
+}
+
+AddressHelper::AddressHelper(const Address& address)
+{
+ bind(address, CREATE, createPolicy);
+ bind(address, DELETE, deletePolicy);
+ bind(address, ASSERT, assertPolicy);
+
+ bind(address, NODE, node);
+ bind(address, LINK, link);
+}
+
+bool AddressHelper::createEnabled(CheckMode mode) const
+{
+ return enabled(createPolicy, mode);
+}
+bool AddressHelper::deleteEnabled(CheckMode mode) const
+{
+ return enabled(deletePolicy, mode);
+}
+bool AddressHelper::assertEnabled(CheckMode mode) const
+{
+ return enabled(assertPolicy, mode);
+}
+bool AddressHelper::enabled(const std::string& policy, CheckMode mode) const
+{
+ bool result = false;
+ switch (mode) {
+ case FOR_RECEIVER:
+ result = in(policy, RECEIVER_MODES);
+ break;
+ case FOR_SENDER:
+ result = in(policy, SENDER_MODES);
+ break;
+ }
+ return result;
+}
+
+const qpid::types::Variant::Map& AddressHelper::getNodeProperties() const
+{
+ return node;
+}
+const qpid::types::Variant::Map& AddressHelper::getLinkProperties() const
+{
+ return link;
+}
+
+void AddressHelper::setNodeProperties(pn_terminus_t* terminus)
+{
+ pn_terminus_set_dynamic(terminus, true);
+
+ //properties for dynamically created node:
+ pn_data_t* data = pn_terminus_properties(terminus);
+ if (node.size()) {
+ pn_data_put_map(data);
+ pn_data_enter(data);
+ }
+ for (qpid::types::Variant::Map::const_iterator i = node.begin(); i != node.end(); ++i) {
+ if (i->first == TYPE) {
+ pn_data_put_symbol(data, convert(SUPPORTED_DIST_MODES));
+ pn_data_put_string(data, convert(i->second == TOPIC ? COPY : MOVE));
+ } else {
+ pn_data_put_symbol(data, convert(i->first));
+ pn_data_put_string(data, convert(i->second.asString()));
+ }
+ }
+ if (node.size()) {
+ pn_data_exit(data);
+ }
+}
+
+}}} // namespace qpid::messaging::amqp
diff --git a/cpp/src/qpid/messaging/amqp/AddressHelper.h b/cpp/src/qpid/messaging/amqp/AddressHelper.h
new file mode 100644
index 0000000000..cd0aa1be9e
--- /dev/null
+++ b/cpp/src/qpid/messaging/amqp/AddressHelper.h
@@ -0,0 +1,57 @@
+#ifndef QPID_MESSAGING_AMQP_ADDRESSHELPER_H
+#define QPID_MESSAGING_AMQP_ADDRESSHELPER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/types/Variant.h"
+
+struct pn_terminus_t;
+
+namespace qpid {
+namespace messaging {
+class Address;
+namespace amqp {
+
+class AddressHelper
+{
+ public:
+ enum CheckMode {FOR_RECEIVER, FOR_SENDER};
+
+ AddressHelper(const Address& address);
+ bool createEnabled(CheckMode mode) const;
+ bool deleteEnabled(CheckMode mode) const;
+ bool assertEnabled(CheckMode mode) const;
+
+ void setNodeProperties(pn_terminus_t*);
+ const qpid::types::Variant::Map& getNodeProperties() const;
+ const qpid::types::Variant::Map& getLinkProperties() const;
+ private:
+ std::string createPolicy;
+ std::string assertPolicy;
+ std::string deletePolicy;
+ qpid::types::Variant::Map node;
+ qpid::types::Variant::Map link;
+
+ bool enabled(const std::string& policy, CheckMode mode) const;
+};
+}}} // namespace qpid::messaging::amqp
+
+#endif /*!QPID_MESSAGING_AMQP_ADDRESSHELPER_H*/
diff --git a/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
index 2f1f39cb68..67c7f29448 100644
--- a/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
+++ b/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
@@ -250,8 +250,7 @@ void ConnectionContext::acknowledge(boost::shared_ptr<SessionContext> ssn, qpid:
void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk)
{
- pn_terminus_t* target = pn_link_target((pn_link_t*) lnk->sender);
- pn_terminus_set_address(target, lnk->getTarget().c_str());
+ lnk->configure();
attach(ssn->session, (pn_link_t*) lnk->sender);
if (!pn_link_remote_target((pn_link_t*) lnk->sender)) {
std::string msg("No such target : ");
diff --git a/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp b/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
index 0a8f139839..8034df311a 100644
--- a/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
+++ b/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
@@ -18,7 +18,8 @@
* under the License.
*
*/
-#include "ReceiverContext.h"
+#include "qpid/messaging/amqp/ReceiverContext.h"
+#include "qpid/messaging/amqp/AddressHelper.h"
#include "qpid/messaging/Duration.h"
#include "qpid/messaging/Message.h"
#include "qpid/amqp/descriptors.h"
@@ -113,16 +114,23 @@ void ReceiverContext::configure() const
void ReceiverContext::configure(pn_terminus_t* source) const
{
pn_terminus_set_address(source, address.getName().c_str());
+ //dynamic create:
+ AddressHelper helper(address);
+ if (helper.createEnabled(AddressHelper::FOR_RECEIVER)) {
+ helper.setNodeProperties(source);
+ }
+ //filter:
pn_data_t* filter = pn_terminus_filter(source);
pn_data_put_map(filter);
pn_data_enter(filter);
pn_data_put_symbol(filter, convert("subject"));
- pn_data_put_described(filter);
- pn_data_enter(filter);
- pn_data_put_ulong(filter, getFilterDescriptor(address.getSubject()));
+ //TODO: At present inserting described values into the map doesn't seem to work; correct this once resolved
+ //pn_data_put_described(filter);
+ //pn_data_enter(filter);
+ //pn_data_put_ulong(filter, getFilterDescriptor(address.getSubject()));
pn_data_put_string(filter, convert(address.getSubject()));
- pn_data_exit(filter);
+ //pn_data_exit(filter);
pn_data_exit(filter);
}
diff --git a/cpp/src/qpid/messaging/amqp/SenderContext.cpp b/cpp/src/qpid/messaging/amqp/SenderContext.cpp
index 02f7bdec1c..95398fea6f 100644
--- a/cpp/src/qpid/messaging/amqp/SenderContext.cpp
+++ b/cpp/src/qpid/messaging/amqp/SenderContext.cpp
@@ -20,6 +20,7 @@
*/
#include "qpid/messaging/amqp/SenderContext.h"
#include "qpid/messaging/amqp/EncodedMessage.h"
+#include "qpid/messaging/amqp/AddressHelper.h"
#include "qpid/amqp/descriptors.h"
#include "qpid/amqp/MessageEncoder.h"
#include "qpid/messaging/exceptions.h"
@@ -336,4 +337,17 @@ bool SenderContext::Delivery::accepted()
return pn_delivery_remote_state(token) == PN_ACCEPTED;
}
+void SenderContext::configure() const
+{
+ configure(pn_link_target(sender));
+}
+void SenderContext::configure(pn_terminus_t* target) const
+{
+ pn_terminus_set_address(target, address.getName().c_str());
+ //dynamic create:
+ AddressHelper helper(address);
+ if (helper.createEnabled(AddressHelper::FOR_SENDER)) {
+ helper.setNodeProperties(target);
+ }
+}
}}} // namespace qpid::messaging::amqp
diff --git a/cpp/src/qpid/messaging/amqp/SenderContext.h b/cpp/src/qpid/messaging/amqp/SenderContext.h
index bc73d199e7..0202d6aa4b 100644
--- a/cpp/src/qpid/messaging/amqp/SenderContext.h
+++ b/cpp/src/qpid/messaging/amqp/SenderContext.h
@@ -31,6 +31,7 @@
struct pn_delivery_t;
struct pn_link_t;
struct pn_session_t;
+struct pn_terminus_t;
namespace qpid {
namespace messaging {
@@ -67,6 +68,7 @@ class SenderContext
const std::string& getName() const;
const std::string& getTarget() const;
Delivery* send(const qpid::messaging::Message& message);
+ void configure() const;
private:
friend class ConnectionContext;
typedef std::deque<Delivery> Deliveries;
@@ -79,6 +81,7 @@ class SenderContext
uint32_t capacity;
uint32_t processUnsettled();
+ void configure(pn_terminus_t*) const;
};
}}} // namespace qpid::messaging::amqp