diff options
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/CMakeLists.txt | 2 | ||||
-rw-r--r-- | cpp/src/Makefile.am | 10 | ||||
-rw-r--r-- | cpp/src/amqp.cmake | 8 | ||||
-rw-r--r-- | cpp/src/qpid/amqp/MapReader.cpp | 289 | ||||
-rw-r--r-- | cpp/src/qpid/amqp/MapReader.h | 104 | ||||
-rw-r--r-- | cpp/src/qpid/broker/amqp/DataReader.cpp | 187 | ||||
-rw-r--r-- | cpp/src/qpid/broker/amqp/DataReader.h | 53 | ||||
-rw-r--r-- | cpp/src/qpid/broker/amqp/Filter.cpp | 145 | ||||
-rw-r--r-- | cpp/src/qpid/broker/amqp/Filter.h | 62 | ||||
-rw-r--r-- | cpp/src/qpid/broker/amqp/NodeProperties.cpp | 154 | ||||
-rw-r--r-- | cpp/src/qpid/broker/amqp/NodeProperties.h | 64 | ||||
-rw-r--r-- | cpp/src/qpid/broker/amqp/Session.cpp | 163 | ||||
-rw-r--r-- | cpp/src/qpid/broker/amqp/Session.h | 10 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/amqp/AddressHelper.cpp | 182 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/amqp/AddressHelper.h | 57 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/amqp/ConnectionContext.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/amqp/ReceiverContext.cpp | 18 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/amqp/SenderContext.cpp | 14 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/amqp/SenderContext.h | 3 |
19 files changed, 1407 insertions, 121 deletions
diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index ae969c942e..579e792b62 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -954,6 +954,8 @@ set (qpidcommon_SOURCES qpid/amqp/Descriptor.cpp qpid/amqp/Encoder.h qpid/amqp/Encoder.cpp + qpid/amqp/MapReader.h + qpid/amqp/MapReader.cpp qpid/amqp/MessageEncoder.h qpid/amqp/MessageEncoder.cpp qpid/amqp/MessageId.h diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 3782c17c78..05c0ec6302 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -528,6 +528,8 @@ libqpidcommon_la_SOURCES += \ qpid/amqp/Encoder.cpp \ qpid/amqp/ListReader.h \ qpid/amqp/LoggingReader.h \ + qpid/amqp/MapReader.h \ + qpid/amqp/MapReader.cpp \ qpid/amqp/MessageEncoder.h \ qpid/amqp/MessageEncoder.cpp \ qpid/amqp/MessageId.h \ @@ -764,6 +766,10 @@ amqp_la_LIBADD = libqpidcommon.la amqp_la_SOURCES = \ qpid/broker/amqp/Connection.h \ qpid/broker/amqp/Connection.cpp \ + qpid/broker/amqp/DataReader.h \ + qpid/broker/amqp/DataReader.cpp \ + qpid/broker/amqp/Filter.h \ + qpid/broker/amqp/Filter.cpp \ qpid/broker/amqp/Header.h \ qpid/broker/amqp/Header.cpp \ qpid/broker/amqp/ManagedConnection.h \ @@ -774,6 +780,8 @@ amqp_la_SOURCES = \ qpid/broker/amqp/ManagedOutgoingLink.cpp \ qpid/broker/amqp/Message.h \ qpid/broker/amqp/Message.cpp \ + qpid/broker/amqp/NodeProperties.h \ + qpid/broker/amqp/NodeProperties.cpp \ qpid/broker/amqp/Outgoing.h \ qpid/broker/amqp/Outgoing.cpp \ qpid/broker/amqp/ProtocolPlugin.cpp \ @@ -790,6 +798,8 @@ amqp_la_LDFLAGS = $(PLUGINLDFLAGS) $(PROTON_LIBS) cmoduleexec_LTLIBRARIES += amqpc.la amqpc_la_LIBADD = libqpidcommon.la amqpc_la_SOURCES = \ + qpid/messaging/amqp/AddressHelper.h \ + qpid/messaging/amqp/AddressHelper.cpp \ qpid/messaging/amqp/ConnectionContext.h \ qpid/messaging/amqp/ConnectionContext.cpp \ qpid/messaging/amqp/ConnectionHandle.h \ diff --git a/cpp/src/amqp.cmake b/cpp/src/amqp.cmake index c8da3abf47..355e591cf6 100644 --- a/cpp/src/amqp.cmake +++ b/cpp/src/amqp.cmake @@ -50,6 +50,10 @@ if (BUILD_AMQP) set (amqp_SOURCES qpid/broker/amqp/Connection.h qpid/broker/amqp/Connection.cpp + qpid/broker/amqp/DataReader.h + qpid/broker/amqp/DataReader.cpp + qpid/broker/amqp/Filter.h + qpid/broker/amqp/Filter.cpp qpid/broker/amqp/Header.h qpid/broker/amqp/Header.cpp qpid/broker/amqp/ManagedConnection.h @@ -60,6 +64,8 @@ if (BUILD_AMQP) qpid/broker/amqp/ManagedOutgoingLink.cpp qpid/broker/amqp/Message.h qpid/broker/amqp/Message.cpp + qpid/broker/amqp/NodeProperties.h + qpid/broker/amqp/NodeProperties.cpp qpid/broker/amqp/Outgoing.h qpid/broker/amqp/Outgoing.cpp qpid/broker/amqp/ProtocolPlugin.cpp @@ -82,6 +88,8 @@ if (BUILD_AMQP) set (amqpc_SOURCES + qpid/messaging/amqp/AddressHelper.h + qpid/messaging/amqp/AddressHelper.cpp qpid/messaging/amqp/ConnectionContext.h qpid/messaging/amqp/ConnectionContext.cpp qpid/messaging/amqp/ConnectionHandle.h diff --git a/cpp/src/qpid/amqp/MapReader.cpp b/cpp/src/qpid/amqp/MapReader.cpp new file mode 100644 index 0000000000..2bace74d34 --- /dev/null +++ b/cpp/src/qpid/amqp/MapReader.cpp @@ -0,0 +1,289 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/amqp/MapReader.h" +#include "qpid/Exception.h" +#include "qpid/log/Statement.h" + +namespace qpid { +namespace amqp { + +void MapReader::onNull(const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + onNullValue(key, d); + key.data = 0; key.size = 0; + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } +} +void MapReader::onBoolean(bool v, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + onBooleanValue(key, v, d); + key.data = 0; key.size = 0; + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } +} + +void MapReader::onUByte(uint8_t v, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + onUByteValue(key, v, d); + key.data = 0; key.size = 0; + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } +} + +void MapReader::onUShort(uint16_t v, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + onUShortValue(key, v, d); + key.data = 0; key.size = 0; + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } +} + +void MapReader::onUInt(uint32_t v, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + onUIntValue(key, v, d); + key.data = 0; key.size = 0; + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } +} + +void MapReader::onULong(uint64_t v, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + onULongValue(key, v, d); + key.data = 0; key.size = 0; + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } +} + +void MapReader::onByte(int8_t v, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + onByteValue(key, v, d); + key.data = 0; key.size = 0; + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } +} + +void MapReader::onShort(int16_t v, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + onShortValue(key, v, d); + key.data = 0; key.size = 0; + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } +} + +void MapReader::onInt(int32_t v, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + onIntValue(key, v, d); + key.data = 0; key.size = 0; + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } +} + +void MapReader::onLong(int64_t v, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + onLongValue(key, v, d); + key.data = 0; key.size = 0; + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } +} + +void MapReader::onFloat(float v, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + onFloatValue(key, v, d); + key.data = 0; key.size = 0; + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } +} + +void MapReader::onDouble(double v, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + onDoubleValue(key, v, d); + key.data = 0; key.size = 0; + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } +} + +void MapReader::onUuid(const CharSequence& v, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + onUuidValue(key, v, d); + key.data = 0; key.size = 0; + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } +} + +void MapReader::onTimestamp(int64_t v, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + onTimestampValue(key, v, d); + key.data = 0; key.size = 0; + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } +} + +void MapReader::onBinary(const CharSequence& v, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + onBinaryValue(key, v, d); + key.data = 0; key.size = 0; + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } +} + +void MapReader::onString(const CharSequence& v, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + onStringValue(key, v, d); + key.data = 0; key.size = 0; + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key, got string " << v.str())); + } +} + +void MapReader::onSymbol(const CharSequence& v, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + onSymbolValue(key, v, d); + key.data = 0; key.size = 0; + } else { + key = v; + } +} + +bool MapReader::onStartList(uint32_t count, const CharSequence&, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + bool step = onStartListValue(key, count, d); + key.data = 0; key.size = 0; + return step; + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } + return true; +} + +bool MapReader::onStartMap(uint32_t count, const CharSequence&, const Descriptor* d) +{ + if (level++) { + if (key) { + bool step = onStartMapValue(key, count, d); + key.data = 0; key.size = 0; + return step; + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } + } + return true; +} + +bool MapReader::onStartArray(uint32_t count, const CharSequence&, const Constructor& c, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + bool step = onStartArrayValue(key, count, c, d); + key.data = 0; key.size = 0; + return step; + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } + return true; +} + +void MapReader::onEndList(uint32_t count, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + onEndListValue(key, count, d); + key.data = 0; key.size = 0; + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } +} + +void MapReader::onEndMap(uint32_t count, const Descriptor* d) +{ + if (--level) { + onEndMapValue(key, count, d); + key.data = 0; key.size = 0; + } +} + +void MapReader::onEndArray(uint32_t count, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + onEndArrayValue(key, count, d); + key.data = 0; key.size = 0; + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } +} + +MapReader::MapReader() : level(0) +{ + key.data = 0; key.size = 0; +} + +}} // namespace qpid::amqp diff --git a/cpp/src/qpid/amqp/MapReader.h b/cpp/src/qpid/amqp/MapReader.h new file mode 100644 index 0000000000..fe8f65b30c --- /dev/null +++ b/cpp/src/qpid/amqp/MapReader.h @@ -0,0 +1,104 @@ +#ifndef QPID_AMQP_MAPREADER_H +#define QPID_AMQP_MAPREADER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Reader.h" +#include "CharSequence.h" +#include <string> + +namespace qpid { +namespace amqp { + +/** + * Reading AMQP 1.0 encoded data which is constrained to be a symbol + * keyeed map. The keys are assumed never to be described, the values + * may be. + */ +class MapReader : public Reader +{ + public: + virtual void onNullValue(const CharSequence& /*key*/, const Descriptor*) {} + virtual void onBooleanValue(const CharSequence& /*key*/, bool, const Descriptor*) {} + virtual void onUByteValue(const CharSequence& /*key*/, uint8_t, const Descriptor*) {} + virtual void onUShortValue(const CharSequence& /*key*/, uint16_t, const Descriptor*) {} + virtual void onUIntValue(const CharSequence& /*key*/, uint32_t, const Descriptor*) {} + virtual void onULongValue(const CharSequence& /*key*/, uint64_t, const Descriptor*) {} + virtual void onByteValue(const CharSequence& /*key*/, int8_t, const Descriptor*) {} + virtual void onShortValue(const CharSequence& /*key*/, int16_t, const Descriptor*) {} + virtual void onIntValue(const CharSequence& /*key*/, int32_t, const Descriptor*) {} + virtual void onLongValue(const CharSequence& /*key*/, int64_t, const Descriptor*) {} + virtual void onFloatValue(const CharSequence& /*key*/, float, const Descriptor*) {} + virtual void onDoubleValue(const CharSequence& /*key*/, double, const Descriptor*) {} + virtual void onUuidValue(const CharSequence& /*key*/, const CharSequence&, const Descriptor*) {} + virtual void onTimestampValue(const CharSequence& /*key*/, int64_t, const Descriptor*) {} + + virtual void onBinaryValue(const CharSequence& /*key*/, const CharSequence&, const Descriptor*) {} + virtual void onStringValue(const CharSequence& /*key*/, const CharSequence&, const Descriptor*) {} + virtual void onSymbolValue(const CharSequence& /*key*/, const CharSequence&, const Descriptor*) {} + + /** + * @return true to step into elements of the compound value, false + * to skip over it + */ + virtual bool onStartListValue(const CharSequence& /*key*/, uint32_t /*count*/, const Descriptor*) { return true; } + virtual bool onStartMapValue(const CharSequence& /*key*/, uint32_t /*count*/, const Descriptor*) { return true; } + virtual bool onStartArrayValue(const CharSequence& /*key*/, uint32_t /*count*/, const Constructor&, const Descriptor*) { return true; } + virtual void onEndListValue(const CharSequence& /*key*/, uint32_t /*count*/, const Descriptor*) {} + virtual void onEndMapValue(const CharSequence& /*key*/, uint32_t /*count*/, const Descriptor*) {} + virtual void onEndArrayValue(const CharSequence& /*key*/, uint32_t /*count*/, const Descriptor*) {} + + + //this class implements the Reader interface, thus acting as a transformer into a more map oriented scheme + void onNull(const Descriptor*); + void onBoolean(bool, const Descriptor*); + void onUByte(uint8_t, const Descriptor*); + void onUShort(uint16_t, const Descriptor*); + void onUInt(uint32_t, const Descriptor*); + void onULong(uint64_t, const Descriptor*); + void onByte(int8_t, const Descriptor*); + void onShort(int16_t, const Descriptor*); + void onInt(int32_t, const Descriptor*); + void onLong(int64_t, const Descriptor*); + void onFloat(float, const Descriptor*); + void onDouble(double, const Descriptor*); + void onUuid(const CharSequence&, const Descriptor*); + void onTimestamp(int64_t, const Descriptor*); + + void onBinary(const CharSequence&, const Descriptor*); + void onString(const CharSequence&, const Descriptor*); + void onSymbol(const CharSequence&, const Descriptor*); + + bool onStartList(uint32_t /*count*/, const CharSequence&, const Descriptor*); + bool onStartMap(uint32_t /*count*/, const CharSequence&, const Descriptor*); + bool onStartArray(uint32_t /*count*/, const CharSequence&, const Constructor&, const Descriptor*); + void onEndList(uint32_t /*count*/, const Descriptor*); + void onEndMap(uint32_t /*count*/, const Descriptor*); + void onEndArray(uint32_t /*count*/, const Descriptor*); + + MapReader(); + private: + CharSequence key; + size_t level; +}; +}} // namespace qpid::amqp + +#endif /*!QPID_AMQP_MAPREADER_H*/ diff --git a/cpp/src/qpid/broker/amqp/DataReader.cpp b/cpp/src/qpid/broker/amqp/DataReader.cpp new file mode 100644 index 0000000000..519dd71c9c --- /dev/null +++ b/cpp/src/qpid/broker/amqp/DataReader.cpp @@ -0,0 +1,187 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "DataReader.h" +#include "qpid/amqp/CharSequence.h" +#include "qpid/amqp/Descriptor.h" +#include "qpid/log/Statement.h" +#include <string> +extern "C" { +#include <proton/engine.h> +} + +namespace qpid { +namespace broker { +namespace amqp { +namespace { +qpid::amqp::CharSequence convert(pn_bytes_t in) +{ + qpid::amqp::CharSequence out; + out.data = in.start; + out.size = in.size; + return out; +} + +qpid::amqp::CharSequence convert(pn_uuid_t in) +{ + qpid::amqp::CharSequence out; + out.data = in.bytes; + out.size = 16; + return out; +} +} + +DataReader::DataReader(qpid::amqp::Reader& r) : reader(r) {} + +void DataReader::read(pn_data_t* data) +{ + /* + while (pn_data_next(data)) { + readOne(data); + } + */ + do { + readOne(data); + } while (pn_data_next(data)); +} +void DataReader::readOne(pn_data_t* data) +{ + qpid::amqp::Descriptor descriptor(0); + bool described = pn_data_is_described(data); + if (described) { + pn_data_enter(data); + pn_data_next(data); + if (pn_data_type(data) == PN_ULONG) { + descriptor = qpid::amqp::Descriptor(pn_data_get_ulong(data)); + } else if (pn_data_type(data) == PN_SYMBOL) { + descriptor = qpid::amqp::Descriptor(convert(pn_data_get_symbol(data))); + } else { + QPID_LOG(notice, "Ignoring descriptor of type " << pn_data_type(data)); + } + pn_data_next(data); + } + switch (pn_data_type(data)) { + case PN_NULL: + reader.onNull(described ? &descriptor : 0); + break; + case PN_BOOL: + reader.onBoolean(pn_data_get_bool(data), described ? &descriptor : 0); + break; + case PN_UBYTE: + reader.onUByte(pn_data_get_ubyte(data), described ? &descriptor : 0); + break; + case PN_BYTE: + reader.onByte(pn_data_get_byte(data), described ? &descriptor : 0); + break; + case PN_USHORT: + reader.onUShort(pn_data_get_ushort(data), described ? &descriptor : 0); + break; + case PN_SHORT: + reader.onShort(pn_data_get_short(data), described ? &descriptor : 0); + break; + case PN_UINT: + reader.onUInt(pn_data_get_uint(data), described ? &descriptor : 0); + break; + case PN_INT: + reader.onInt(pn_data_get_int(data), described ? &descriptor : 0); + break; + case PN_CHAR: + pn_data_get_char(data); + break; + case PN_ULONG: + reader.onULong(pn_data_get_ulong(data), described ? &descriptor : 0); + break; + case PN_LONG: + reader.onLong(pn_data_get_long(data), described ? &descriptor : 0); + break; + case PN_TIMESTAMP: + reader.onTimestamp(pn_data_get_timestamp(data), described ? &descriptor : 0); + break; + case PN_FLOAT: + reader.onFloat(pn_data_get_float(data), described ? &descriptor : 0); + break; + case PN_DOUBLE: + reader.onDouble(pn_data_get_double(data), described ? &descriptor : 0); + break; + case PN_DECIMAL32: + pn_data_get_decimal32(data); + break; + case PN_DECIMAL64: + pn_data_get_decimal64(data); + break; + case PN_DECIMAL128: + pn_data_get_decimal128(data); + break; + case PN_UUID: + reader.onUuid(convert(pn_data_get_uuid(data)), described ? &descriptor : 0); + break; + case PN_BINARY: + reader.onBinary(convert(pn_data_get_binary(data)), described ? &descriptor : 0); + break; + case PN_STRING: + reader.onString(convert(pn_data_get_string(data)), described ? &descriptor : 0); + break; + case PN_SYMBOL: + reader.onSymbol(convert(pn_data_get_symbol(data)), described ? &descriptor : 0); + break; + case PN_DESCRIBED: + break; + case PN_ARRAY: + readArray(data, described ? &descriptor : 0); + break; + case PN_LIST: + readList(data, described ? &descriptor : 0); + break; + case PN_MAP: + readMap(data, described ? &descriptor : 0); + break; + } + if (described) pn_data_exit(data); +} + +void DataReader::readArray(pn_data_t* /*data*/, const qpid::amqp::Descriptor* /*descriptor*/) +{ + //not yet implemented +} + +void DataReader::readList(pn_data_t* data, const qpid::amqp::Descriptor* descriptor) +{ + size_t count = pn_data_get_list(data); + reader.onStartList(count, qpid::amqp::CharSequence(), descriptor); + pn_data_enter(data); + for (size_t i = 0; i < count && pn_data_next(data); ++i) { + read(data); + } + pn_data_exit(data); + reader.onEndList(count, descriptor); +} + +void DataReader::readMap(pn_data_t* data, const qpid::amqp::Descriptor* descriptor) +{ + size_t count = pn_data_get_map(data); + reader.onStartMap(count, qpid::amqp::CharSequence(), descriptor); + pn_data_enter(data); + for (size_t i = 0; i < count && pn_data_next(data); ++i) { + read(data); + } + pn_data_exit(data); + reader.onEndMap(count, descriptor); +} +}}} // namespace qpid::broker::amqp diff --git a/cpp/src/qpid/broker/amqp/DataReader.h b/cpp/src/qpid/broker/amqp/DataReader.h new file mode 100644 index 0000000000..024507e7f2 --- /dev/null +++ b/cpp/src/qpid/broker/amqp/DataReader.h @@ -0,0 +1,53 @@ +#ifndef QPID_BROKER_AMQP_DATAREADER_H +#define QPID_BROKER_AMQP_DATAREADER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/amqp/Reader.h" + +struct pn_data_t; + +namespace qpid { +namespace amqp { +struct Descriptor; +} +namespace broker { +namespace amqp { + +/** + * Allows use of Reader interface to read pn_data_t* data. + */ +class DataReader +{ + public: + DataReader(qpid::amqp::Reader& reader); + void read(pn_data_t*); + private: + qpid::amqp::Reader& reader; + + void readOne(pn_data_t*); + void readMap(pn_data_t*, const qpid::amqp::Descriptor*); + void readList(pn_data_t*, const qpid::amqp::Descriptor*); + void readArray(pn_data_t*, const qpid::amqp::Descriptor*); +}; +}}} // namespace qpid::broker::amqp + +#endif /*!QPID_BROKER_AMQP_DATAREADER_H*/ diff --git a/cpp/src/qpid/broker/amqp/Filter.cpp b/cpp/src/qpid/broker/amqp/Filter.cpp new file mode 100644 index 0000000000..61e377c72f --- /dev/null +++ b/cpp/src/qpid/broker/amqp/Filter.cpp @@ -0,0 +1,145 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/amqp/Filter.h" +#include "qpid/broker/amqp/DataReader.h" +#include "qpid/broker/DirectExchange.h" +#include "qpid/broker/TopicExchange.h" +#include "qpid/amqp/descriptors.h" +#include "qpid/log/Statement.h" +extern "C" { +#include <proton/engine.h> +} + +namespace qpid { +namespace broker { +namespace amqp { + +void Filter::read(pn_data_t* data) +{ + try { + DataReader reader(*this); + reader.read(data); + } catch (const std::exception& e) { + QPID_LOG(warning, "Error parsing filter: " << e.what()); + } +} + +void Filter::write(pn_data_t* data) +{ + pn_data_put_map(data); + pn_data_enter(data); + subjectFilter.write(data); + pn_data_exit(data); +} + +void Filter::onStringValue(const qpid::amqp::CharSequence& key, const qpid::amqp::CharSequence& value, const qpid::amqp::Descriptor* descriptor) +{ + StringFilter filter; + filter.key = std::string(key.data, key.size); + filter.value = std::string(value.data, value.size); + if (descriptor) { + filter.described = true; + filter.descriptor = *descriptor; + if (descriptor->match(qpid::amqp::filters::LEGACY_TOPIC_FILTER_SYMBOL, qpid::amqp::filters::LEGACY_TOPIC_FILTER_CODE) + || descriptor->match(qpid::amqp::filters::LEGACY_DIRECT_FILTER_SYMBOL, qpid::amqp::filters::LEGACY_DIRECT_FILTER_CODE)) { + setSubjectFilter(filter); + } else { + QPID_LOG(notice, "Skipping unrecognised string filter with key " << filter.key << " and descriptor " << filter.descriptor); + } + } else { + setSubjectFilter(filter); + } +} + +bool Filter::hasSubjectFilter() const +{ + return !subjectFilter.value.empty(); +} + + +void Filter::setSubjectFilter(const StringFilter& filter) +{ + if (hasSubjectFilter()) { + QPID_LOG(notice, "Skipping filter with key " << filter.key << "; subject filter already set"); + } else { + subjectFilter = filter; + } +} + +void Filter::bind(boost::shared_ptr<Exchange> exchange, boost::shared_ptr<Queue> queue) +{ + subjectFilter.bind(exchange, queue); +} + +Filter::StringFilter::StringFilter() : described(false), descriptor(0) {} +namespace { +pn_bytes_t convert(const std::string& in) +{ + pn_bytes_t out; + out.start = const_cast<char*>(in.data()); + out.size = in.size(); + return out; +} +pn_bytes_t convert(const qpid::amqp::CharSequence& in) +{ + pn_bytes_t out; + out.start = const_cast<char*>(in.data); + out.size = in.size; + return out; +} +} +void Filter::StringFilter::write(pn_data_t* data) +{ + pn_data_put_symbol(data, convert(key)); + if (described) { + pn_data_put_described(data); + pn_data_enter(data); + switch (descriptor.type) { + case qpid::amqp::Descriptor::NUMERIC: + pn_data_put_ulong(data, descriptor.value.code); + break; + case qpid::amqp::Descriptor::SYMBOLIC: + pn_data_put_symbol(data, convert(descriptor.value.symbol)); + break; + } + } + pn_data_put_string(data, convert(value)); + if (described) pn_data_exit(data); +} + +void Filter::StringFilter::bind(boost::shared_ptr<Exchange> exchange, boost::shared_ptr<Queue> queue) +{ + if (described && exchange->getType() == DirectExchange::typeName + && descriptor.match(qpid::amqp::filters::LEGACY_TOPIC_FILTER_SYMBOL, qpid::amqp::filters::LEGACY_TOPIC_FILTER_CODE)) { + QPID_LOG(info, "Using legacy topic filter as a direct matching filter for " << exchange->getName()); + if (descriptor.type == qpid::amqp::Descriptor::NUMERIC) { + descriptor = qpid::amqp::Descriptor(qpid::amqp::filters::LEGACY_DIRECT_FILTER_CODE); + } else { + qpid::amqp::CharSequence symbol; + symbol.data = qpid::amqp::filters::LEGACY_DIRECT_FILTER_SYMBOL.data(); + symbol.size = qpid::amqp::filters::LEGACY_DIRECT_FILTER_SYMBOL.size(); + descriptor = qpid::amqp::Descriptor(symbol); + } + } + exchange->bind(queue, value, 0); +} + +}}} // namespace qpid::broker::amqp diff --git a/cpp/src/qpid/broker/amqp/Filter.h b/cpp/src/qpid/broker/amqp/Filter.h new file mode 100644 index 0000000000..5e2dee4d6e --- /dev/null +++ b/cpp/src/qpid/broker/amqp/Filter.h @@ -0,0 +1,62 @@ +#ifndef QPID_BROKER_AMQP_FILTER_H +#define QPID_BROKER_AMQP_FILTER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/amqp/MapReader.h" +#include "qpid/amqp/Descriptor.h" +#include <boost/shared_ptr.hpp> + +struct pn_data_t; +namespace qpid { +namespace broker { +class Exchange; +class Queue; +namespace amqp { + + +class Filter : qpid::amqp::MapReader +{ + public: + void read(pn_data_t*); + void write(pn_data_t*); + bool hasSubjectFilter() const; + void bind(boost::shared_ptr<Exchange> exchange, boost::shared_ptr<Queue> queue); + private: + struct StringFilter + { + bool described; + qpid::amqp::Descriptor descriptor; + std::string key; + std::string value; + StringFilter(); + void write(pn_data_t*); + void bind(boost::shared_ptr<Exchange> exchange, boost::shared_ptr<Queue> queue); + }; + + void onStringValue(const qpid::amqp::CharSequence& key, const qpid::amqp::CharSequence& value, const qpid::amqp::Descriptor* descriptor); + void setSubjectFilter(const StringFilter&); + + StringFilter subjectFilter; +}; +}}} // namespace qpid::broker::amqp + +#endif /*!QPID_BROKER_AMQP_FILTER_H*/ diff --git a/cpp/src/qpid/broker/amqp/NodeProperties.cpp b/cpp/src/qpid/broker/amqp/NodeProperties.cpp new file mode 100644 index 0000000000..c12b161e8b --- /dev/null +++ b/cpp/src/qpid/broker/amqp/NodeProperties.cpp @@ -0,0 +1,154 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/amqp/NodeProperties.h" +#include "qpid/broker/amqp/DataReader.h" +#include "qpid/amqp/CharSequence.h" +#include "qpid/types/Variant.h" +#include "qpid/broker/QueueSettings.h" +#include "qpid/log/Statement.h" + +using qpid::amqp::CharSequence; +using qpid::amqp::Descriptor; + +namespace qpid { +namespace broker { +namespace amqp { +namespace { +//distribution modes: +const std::string MOVE("move"); +const std::string COPY("copy"); + +const std::string SUPPORTED_DIST_MODES("supported-dist-modes"); +} + +NodeProperties::NodeProperties() : queue(true) {} + +void NodeProperties::read(pn_data_t* data) +{ + DataReader reader(*this); + reader.read(data); +} + +void NodeProperties::process(const std::string& key, const qpid::types::Variant& value) +{ + QPID_LOG(notice, "Processing node property " << key << " = " << value); + if (key == SUPPORTED_DIST_MODES) { + if (value == MOVE) queue = true; + else if (value == COPY) queue = false; + } else { + properties[key] = value; + } +} + +void NodeProperties::onNullValue(const CharSequence& key, const Descriptor*) +{ + process(key.str(), qpid::types::Variant()); +} + +void NodeProperties::onBooleanValue(const CharSequence& key, bool value, const Descriptor*) +{ + process(key.str(), value); +} + +void NodeProperties::onUByteValue(const CharSequence& key, uint8_t value, const Descriptor*) +{ + process(key.str(), value); +} + +void NodeProperties::onUShortValue(const CharSequence& key, uint16_t value, const Descriptor*) +{ + process(key.str(), value); +} + +void NodeProperties::onUIntValue(const CharSequence& key, uint32_t value, const Descriptor*) +{ + process(key.str(), value); +} + +void NodeProperties::onULongValue(const CharSequence& key, uint64_t value, const Descriptor*) +{ + process(key.str(), value); +} + +void NodeProperties::onByteValue(const CharSequence& key, int8_t value, const Descriptor*) +{ + process(key.str(), value); +} + +void NodeProperties::onShortValue(const CharSequence& key, int16_t value, const Descriptor*) +{ + process(key.str(), value); +} + +void NodeProperties::onIntValue(const CharSequence& key, int32_t value, const Descriptor*) +{ + process(key.str(), value); +} + +void NodeProperties::onLongValue(const CharSequence& key, int64_t value, const Descriptor*) +{ + process(key.str(), value); +} + +void NodeProperties::onFloatValue(const CharSequence& key, float value, const Descriptor*) +{ + process(key.str(), value); +} + +void NodeProperties::onDoubleValue(const CharSequence& key, double value, const Descriptor*) +{ + process(key.str(), value); +} + +void NodeProperties::onUuidValue(const CharSequence& key, const CharSequence& value, const Descriptor*) +{ + process(key.str(), value.str()); +} + +void NodeProperties::onTimestampValue(const CharSequence& key, int64_t value, const Descriptor*) +{ + process(key.str(), value); +} + +void NodeProperties::onStringValue(const CharSequence& key, const CharSequence& value, const Descriptor*) +{ + process(key.str(), value.str()); +} + +void NodeProperties::onSymbolValue(const CharSequence& key, const CharSequence& value, const Descriptor*) +{ + process(key.str(), value.str()); +} + +QueueSettings NodeProperties::getQueueSettings() +{ + QueueSettings settings(false/*durable*/, false/*auto-delete*/); + qpid::types::Variant::Map unused; + settings.populate(properties, unused); + return settings; +} + +bool NodeProperties::isQueue() const +{ + return queue; +} + +}}} // namespace qpid::broker::amqp diff --git a/cpp/src/qpid/broker/amqp/NodeProperties.h b/cpp/src/qpid/broker/amqp/NodeProperties.h new file mode 100644 index 0000000000..2b3a98b6bd --- /dev/null +++ b/cpp/src/qpid/broker/amqp/NodeProperties.h @@ -0,0 +1,64 @@ +#ifndef QPID_BROKER_AMQP_NODEPROPERTIES_H +#define QPID_BROKER_AMQP_NODEPROPERTIES_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/amqp/MapReader.h" +#include "qpid/types/Variant.h" + +struct pn_data_t; +namespace qpid { +namespace broker { +struct QueueSettings; +namespace amqp { + +class NodeProperties : public qpid::amqp::MapReader +{ + public: + NodeProperties(); + void read(pn_data_t*); + void onNullValue(const qpid::amqp::CharSequence&, const qpid::amqp::Descriptor*); + void onBooleanValue(const qpid::amqp::CharSequence&, bool, const qpid::amqp::Descriptor*); + void onUByteValue(const qpid::amqp::CharSequence&, uint8_t, const qpid::amqp::Descriptor*); + void onUShortValue(const qpid::amqp::CharSequence&, uint16_t, const qpid::amqp::Descriptor*); + void onUIntValue(const qpid::amqp::CharSequence&, uint32_t, const qpid::amqp::Descriptor*); + void onULongValue(const qpid::amqp::CharSequence&, uint64_t, const qpid::amqp::Descriptor*); + void onByteValue(const qpid::amqp::CharSequence&, int8_t, const qpid::amqp::Descriptor*); + void onShortValue(const qpid::amqp::CharSequence&, int16_t, const qpid::amqp::Descriptor*); + void onIntValue(const qpid::amqp::CharSequence&, int32_t, const qpid::amqp::Descriptor*); + void onLongValue(const qpid::amqp::CharSequence&, int64_t, const qpid::amqp::Descriptor*); + void onFloatValue(const qpid::amqp::CharSequence&, float, const qpid::amqp::Descriptor*); + void onDoubleValue(const qpid::amqp::CharSequence&, double, const qpid::amqp::Descriptor*); + void onUuidValue(const qpid::amqp::CharSequence&, const qpid::amqp::CharSequence&, const qpid::amqp::Descriptor*); + void onTimestampValue(const qpid::amqp::CharSequence&, int64_t, const qpid::amqp::Descriptor*); + void onStringValue(const qpid::amqp::CharSequence&, const qpid::amqp::CharSequence&, const qpid::amqp::Descriptor*); + void onSymbolValue(const qpid::amqp::CharSequence&, const qpid::amqp::CharSequence&, const qpid::amqp::Descriptor*); + bool isQueue() const; + QueueSettings getQueueSettings(); + private: + bool queue; + qpid::types::Variant::Map properties; + + void process(const std::string&, const qpid::types::Variant&); +}; +}}} // namespace qpid::broker::amqp + +#endif /*!QPID_BROKER_AMQP_NODEPROPERTIES_H*/ diff --git a/cpp/src/qpid/broker/amqp/Session.cpp b/cpp/src/qpid/broker/amqp/Session.cpp index 5a6812edad..f0756bc14d 100644 --- a/cpp/src/qpid/broker/amqp/Session.cpp +++ b/cpp/src/qpid/broker/amqp/Session.cpp @@ -35,7 +35,10 @@ #include "qpid/broker/Message.h" #include "qpid/broker/Queue.h" #include "qpid/broker/TopicExchange.h" +#include "qpid/broker/amqp/Filter.h" +#include "qpid/broker/amqp/NodeProperties.h" #include "qpid/framing/AMQFrame.h" +#include "qpid/framing/FieldTable.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/log/Statement.h" #include <boost/intrusive_ptr.hpp> @@ -84,6 +87,30 @@ class Exchange : public Target Session::Session(pn_session_t* s, qpid::broker::Broker& b, ManagedConnection& c, qpid::sys::OutputControl& o) : ManagedSession(b, c, (boost::format("%1%") % s).str()), session(s), broker(b), connection(c), out(o), deleted(false) {} + +Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* terminus) +{ + ResolvedNode node; + node.exchange = broker.getExchanges().find(name); + node.queue = broker.getQueues().find(name); + if (!node.queue && !node.exchange && pn_terminus_is_dynamic(terminus)) { + //TODO: handle dynamic creation + //is it a queue or an exchange? + NodeProperties properties; + properties.read(pn_terminus_properties(terminus)); + if (properties.isQueue()) { + node.queue = broker.createQueue(name, properties.getQueueSettings(), this, "", connection.getUserid(), connection.getId()).first; + } else { + qpid::framing::FieldTable args; + node.exchange = broker.createExchange(name, "topic"/*type*/, false/*durable*/, ""/*alternateExchange*/, args, connection.getUserid(), connection.getId()).first; + } + } else if (node.queue && node.exchange) { + QPID_LOG_CAT(warning, protocol, "Ambiguous node name; " << name << " could be queue or exchange, assuming queue"); + node.exchange.reset(); + } + return node; +} + void Session::attach(pn_link_t* link) { if (pn_link_is_sender(link)) { @@ -96,120 +123,33 @@ void Session::attach(pn_link_t* link) QPID_LOG(debug, "Received attach request for outgoing link from " << name); pn_terminus_set_address(pn_link_source(link), name.c_str()); - boost::shared_ptr<qpid::broker::Exchange> exchange = broker.getExchanges().find(name); - boost::shared_ptr<qpid::broker::Queue> queue = broker.getQueues().find(name); - if (queue) { - if (exchange) { - QPID_LOG_CAT(warning, protocol, "Ambiguous node name; " << name << " could be queue or exchange, assuming queue"); - } - boost::shared_ptr<Outgoing> q(new Outgoing(broker, queue, link, *this, out, false)); + ResolvedNode node = resolve(name, source); + + if (node.queue) { + boost::shared_ptr<Outgoing> q(new Outgoing(broker, node.queue, link, *this, out, false)); q->init(); senders[link] = q; - } else if (exchange) { + } else if (node.exchange) { QueueSettings settings(false, true); //TODO: populate settings from source details when available from engine - queue = broker.createQueue(name + qpid::types::Uuid(true).str(), settings, this, "", connection.getUserid(), connection.getId()).first; - pn_data_t* filter = pn_terminus_filter(source); - pn_data_next(filter); - if (filter && !pn_data_is_null(filter)) { - if (pn_data_type(filter) == PN_MAP) { - pn_data_t* echo = pn_terminus_filter(pn_link_source(link)); - pn_data_put_map(echo); - pn_data_enter(echo); - size_t count = pn_data_get_map(filter)/2; - QPID_LOG(debug, "Got filter map with " << count << " entries"); - pn_data_enter(filter); - for (size_t i = 0; i < count; i++) { - pn_bytes_t fname = pn_data_get_symbol(filter); - pn_data_next(filter); - bool isDescribed = pn_data_is_described(filter); - qpid::amqp::Descriptor descriptor(0); - if (isDescribed) { - pn_data_enter(filter); - pn_data_next(filter); - //TODO: use or at least verify descriptor - if (pn_data_type(filter) == PN_ULONG) { - descriptor = qpid::amqp::Descriptor(pn_data_get_ulong(filter)); - } else if (pn_data_type(filter) == PN_SYMBOL) { - pn_bytes_t d = pn_data_get_symbol(filter); - qpid::amqp::CharSequence c; - c.data = d.start; - c.size = d.size; - descriptor = qpid::amqp::Descriptor(c); - } else { - QPID_LOG(notice, "Ignoring filter " << std::string(fname.start, fname.size) << " with descriptor of type " << pn_data_type(filter)); - continue; - } - if (descriptor.match(qpid::amqp::filters::LEGACY_TOPIC_FILTER_SYMBOL, qpid::amqp::filters::LEGACY_TOPIC_FILTER_CODE)) { - if (exchange->getType() == qpid::broker::DirectExchange::typeName) { - QPID_LOG(info, "Interpreting legacy topic filter as direct binding key for " << exchange->getName()); - } else if (exchange->getType() == qpid::broker::FanOutExchange::typeName) { - QPID_LOG(info, "Ignoring legacy topic filter on fanout exchange " << exchange->getName()); - for (int i = 0; i < 3; ++i) pn_data_next(filter);//move off descriptor, then skip key and value - continue; - } - } else if (descriptor.match(qpid::amqp::filters::LEGACY_DIRECT_FILTER_SYMBOL, qpid::amqp::filters::LEGACY_DIRECT_FILTER_CODE)) { - if (exchange->getType() == qpid::broker::TopicExchange::typeName) { - QPID_LOG(info, "Interpreting legacy direct filter as topic binding key for " << exchange->getName()); - } else if (exchange->getType() == qpid::broker::FanOutExchange::typeName) { - QPID_LOG(info, "Ignoring legacy direct filter on fanout exchange " << exchange->getName()); - for (int i = 0; i < 3; ++i) pn_data_next(filter);//move off descriptor, then skip key and value - continue; - } - } else { - QPID_LOG(notice, "Ignoring filter with unsupported descriptor " << descriptor); - for (int i = 0; i < 3; ++i) pn_data_next(filter);//move off descriptor, then skip key and value - continue; - } - pn_data_next(filter); - } else { - QPID_LOG(info, "Got undescribed filter of type " << pn_data_type(filter)); - } - if (pn_data_type(filter) == PN_STRING) { - pn_bytes_t value = pn_data_get_string(filter); - pn_data_next(filter); - exchange->bind(queue, std::string(value.start, value.size), 0); - pn_data_put_symbol(echo, fname); - if (isDescribed) { - pn_data_put_described(echo); - pn_data_enter(echo); - pn_bytes_t symbol; - switch (descriptor.type) { - case qpid::amqp::Descriptor::NUMERIC: - pn_data_put_ulong(echo, descriptor.value.code); - break; - case qpid::amqp::Descriptor::SYMBOLIC: - symbol.start = const_cast<char*>(descriptor.value.symbol.data); - symbol.size = descriptor.value.symbol.size; - pn_data_put_symbol(echo, symbol); - break; - } - } - pn_data_put_string(echo, value); - if (isDescribed) pn_data_exit(echo); - - QPID_LOG(debug, "Binding using filter " << std::string(fname.start, fname.size) << ":" << std::string(value.start, value.size)); - } else { - //TODO: handle headers exchange filters - QPID_LOG(warning, "Ignoring unsupported filter type with key " << std::string(fname.start, fname.size) << " and type " << pn_data_type(filter)); - } - } - pn_data_exit(echo); - } else { - QPID_LOG(warning, "Filter should be map, got type: " << pn_data_type(filter)); - } - } else if (exchange->getType() == FanOutExchange::typeName) { - exchange->bind(queue, std::string(), 0); - } else if (exchange->getType() == TopicExchange::typeName) { - exchange->bind(queue, "#", 0); + boost::shared_ptr<qpid::broker::Queue> queue + = broker.createQueue(name + qpid::types::Uuid(true).str(), settings, this, "", connection.getUserid(), connection.getId()).first; + Filter filter; + filter.read(pn_terminus_filter(source)); + if (filter.hasSubjectFilter()) { + filter.bind(node.exchange, queue); + filter.write(pn_terminus_filter(pn_link_source(link))); + } else if (node.exchange->getType() == FanOutExchange::typeName) { + node.exchange->bind(queue, std::string(), 0); + } else if (node.exchange->getType() == TopicExchange::typeName) { + node.exchange->bind(queue, "#", 0); } else { - throw qpid::Exception("Exchange type requires a filter: " + exchange->getType());/*not-supported?*/ + throw qpid::Exception("Exchange type requires a filter: " + node.exchange->getType());/*not-supported?*/ } boost::shared_ptr<Outgoing> q(new Outgoing(broker, queue, link, *this, out, true)); senders[link] = q; q->init(); } else { - //TODO: handle dynamic creation pn_terminus_set_type(pn_link_source(link), PN_UNSPECIFIED); throw qpid::Exception("Node not found: " + name);/*not-found*/ } @@ -223,22 +163,17 @@ void Session::attach(pn_link_t* link) QPID_LOG(debug, "Received attach request for incoming link to " << name); pn_terminus_set_address(pn_link_target(link), name.c_str()); + ResolvedNode node = resolve(name, target); - boost::shared_ptr<qpid::broker::Queue> queue = broker.getQueues().find(name); - boost::shared_ptr<qpid::broker::Exchange> exchange = broker.getExchanges().find(name); - if (queue) { - if (exchange) { - QPID_LOG_CAT(warning, protocol, "Ambiguous node name; " << name << " could be queue or exchange, assuming queue"); - } - boost::shared_ptr<Target> q(new Queue(queue, link)); + if (node.queue) { + boost::shared_ptr<Target> q(new Queue(node.queue, link)); targets[link] = q; q->flow(); - } else if (exchange) { - boost::shared_ptr<Target> e(new Exchange(exchange, link)); + } else if (node.exchange) { + boost::shared_ptr<Target> e(new Exchange(node.exchange, link)); targets[link] = e; e->flow(); } else { - //TODO: handle dynamic creation pn_terminus_set_type(pn_link_target(link), PN_UNSPECIFIED); throw qpid::Exception("Node not found: " + name);/*not-found*/ } diff --git a/cpp/src/qpid/broker/amqp/Session.h b/cpp/src/qpid/broker/amqp/Session.h index 07302d062b..7dbdaf05fc 100644 --- a/cpp/src/qpid/broker/amqp/Session.h +++ b/cpp/src/qpid/broker/amqp/Session.h @@ -32,11 +32,14 @@ struct pn_delivery_t; struct pn_link_t; struct pn_session_t; +struct pn_terminus_t; namespace qpid { namespace broker { class Broker; +class Exchange; +class Queue; namespace amqp { @@ -71,6 +74,13 @@ class Session : public ManagedSession, public boost::enable_shared_from_this<Ses std::deque<pn_delivery_t*> completed; bool deleted; qpid::sys::Mutex lock; + struct ResolvedNode + { + boost::shared_ptr<qpid::broker::Exchange> exchange; + boost::shared_ptr<qpid::broker::Queue> queue; + }; + + ResolvedNode resolve(const std::string name, pn_terminus_t* terminus); }; }}} // namespace qpid::broker::amqp diff --git a/cpp/src/qpid/messaging/amqp/AddressHelper.cpp b/cpp/src/qpid/messaging/amqp/AddressHelper.cpp new file mode 100644 index 0000000000..359660dce5 --- /dev/null +++ b/cpp/src/qpid/messaging/amqp/AddressHelper.cpp @@ -0,0 +1,182 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/amqp/AddressHelper.h" +#include "qpid/messaging/Address.h" +#include <vector> +#include <boost/assign.hpp> +extern "C" { +#include <proton/engine.h> +} + +namespace qpid { +namespace messaging { +namespace amqp { + +using qpid::types::Variant; + +namespace { +//policy types +const std::string CREATE("create"); +const std::string ASSERT("assert"); +const std::string DELETE("delete"); + +//policy values +const std::string ALWAYS("always"); +const std::string NEVER("never"); +const std::string RECEIVER("receiver"); +const std::string SENDER("sender"); + +const std::string NODE("node"); +const std::string LINK("link"); + +const std::string TYPE("type"); +const std::string TOPIC("topic"); +const std::string QUEUE("queue"); + +//distribution modes: +const std::string MOVE("move"); +const std::string COPY("copy"); + +const std::string SUPPORTED_DIST_MODES("supported-dist-modes"); + + +const std::vector<std::string> RECEIVER_MODES = boost::assign::list_of<std::string>(ALWAYS) (RECEIVER); +const std::vector<std::string> SENDER_MODES = boost::assign::list_of<std::string>(ALWAYS) (SENDER); + +pn_bytes_t convert(const std::string& s) +{ + pn_bytes_t result; + result.start = const_cast<char*>(s.data()); + result.size = s.size(); + return result; +} + +bool bind(const Variant::Map& options, const std::string& name, std::string& variable) +{ + Variant::Map::const_iterator j = options.find(name); + if (j == options.end()) { + return false; + } else { + variable = j->second.asString(); + return true; + } +} + +bool bind(const Variant::Map& options, const std::string& name, Variant::Map& variable) +{ + Variant::Map::const_iterator j = options.find(name); + if (j == options.end()) { + return false; + } else { + variable = j->second.asMap(); + return true; + } +} + +bool bind(const Address& address, const std::string& name, std::string& variable) +{ + return bind(address.getOptions(), name, variable); +} + +bool bind(const Address& address, const std::string& name, Variant::Map& variable) +{ + return bind(address.getOptions(), name, variable); +} + +bool in(const std::string& value, const std::vector<std::string>& choices) +{ + for (std::vector<std::string>::const_iterator i = choices.begin(); i != choices.end(); ++i) { + if (value == *i) return true; + } + return false; +} +} + +AddressHelper::AddressHelper(const Address& address) +{ + bind(address, CREATE, createPolicy); + bind(address, DELETE, deletePolicy); + bind(address, ASSERT, assertPolicy); + + bind(address, NODE, node); + bind(address, LINK, link); +} + +bool AddressHelper::createEnabled(CheckMode mode) const +{ + return enabled(createPolicy, mode); +} +bool AddressHelper::deleteEnabled(CheckMode mode) const +{ + return enabled(deletePolicy, mode); +} +bool AddressHelper::assertEnabled(CheckMode mode) const +{ + return enabled(assertPolicy, mode); +} +bool AddressHelper::enabled(const std::string& policy, CheckMode mode) const +{ + bool result = false; + switch (mode) { + case FOR_RECEIVER: + result = in(policy, RECEIVER_MODES); + break; + case FOR_SENDER: + result = in(policy, SENDER_MODES); + break; + } + return result; +} + +const qpid::types::Variant::Map& AddressHelper::getNodeProperties() const +{ + return node; +} +const qpid::types::Variant::Map& AddressHelper::getLinkProperties() const +{ + return link; +} + +void AddressHelper::setNodeProperties(pn_terminus_t* terminus) +{ + pn_terminus_set_dynamic(terminus, true); + + //properties for dynamically created node: + pn_data_t* data = pn_terminus_properties(terminus); + if (node.size()) { + pn_data_put_map(data); + pn_data_enter(data); + } + for (qpid::types::Variant::Map::const_iterator i = node.begin(); i != node.end(); ++i) { + if (i->first == TYPE) { + pn_data_put_symbol(data, convert(SUPPORTED_DIST_MODES)); + pn_data_put_string(data, convert(i->second == TOPIC ? COPY : MOVE)); + } else { + pn_data_put_symbol(data, convert(i->first)); + pn_data_put_string(data, convert(i->second.asString())); + } + } + if (node.size()) { + pn_data_exit(data); + } +} + +}}} // namespace qpid::messaging::amqp diff --git a/cpp/src/qpid/messaging/amqp/AddressHelper.h b/cpp/src/qpid/messaging/amqp/AddressHelper.h new file mode 100644 index 0000000000..cd0aa1be9e --- /dev/null +++ b/cpp/src/qpid/messaging/amqp/AddressHelper.h @@ -0,0 +1,57 @@ +#ifndef QPID_MESSAGING_AMQP_ADDRESSHELPER_H +#define QPID_MESSAGING_AMQP_ADDRESSHELPER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/types/Variant.h" + +struct pn_terminus_t; + +namespace qpid { +namespace messaging { +class Address; +namespace amqp { + +class AddressHelper +{ + public: + enum CheckMode {FOR_RECEIVER, FOR_SENDER}; + + AddressHelper(const Address& address); + bool createEnabled(CheckMode mode) const; + bool deleteEnabled(CheckMode mode) const; + bool assertEnabled(CheckMode mode) const; + + void setNodeProperties(pn_terminus_t*); + const qpid::types::Variant::Map& getNodeProperties() const; + const qpid::types::Variant::Map& getLinkProperties() const; + private: + std::string createPolicy; + std::string assertPolicy; + std::string deletePolicy; + qpid::types::Variant::Map node; + qpid::types::Variant::Map link; + + bool enabled(const std::string& policy, CheckMode mode) const; +}; +}}} // namespace qpid::messaging::amqp + +#endif /*!QPID_MESSAGING_AMQP_ADDRESSHELPER_H*/ diff --git a/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp index 2f1f39cb68..67c7f29448 100644 --- a/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp +++ b/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp @@ -250,8 +250,7 @@ void ConnectionContext::acknowledge(boost::shared_ptr<SessionContext> ssn, qpid: void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk) { - pn_terminus_t* target = pn_link_target((pn_link_t*) lnk->sender); - pn_terminus_set_address(target, lnk->getTarget().c_str()); + lnk->configure(); attach(ssn->session, (pn_link_t*) lnk->sender); if (!pn_link_remote_target((pn_link_t*) lnk->sender)) { std::string msg("No such target : "); diff --git a/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp b/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp index 0a8f139839..8034df311a 100644 --- a/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp +++ b/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp @@ -18,7 +18,8 @@ * under the License. * */ -#include "ReceiverContext.h" +#include "qpid/messaging/amqp/ReceiverContext.h" +#include "qpid/messaging/amqp/AddressHelper.h" #include "qpid/messaging/Duration.h" #include "qpid/messaging/Message.h" #include "qpid/amqp/descriptors.h" @@ -113,16 +114,23 @@ void ReceiverContext::configure() const void ReceiverContext::configure(pn_terminus_t* source) const { pn_terminus_set_address(source, address.getName().c_str()); + //dynamic create: + AddressHelper helper(address); + if (helper.createEnabled(AddressHelper::FOR_RECEIVER)) { + helper.setNodeProperties(source); + } + //filter: pn_data_t* filter = pn_terminus_filter(source); pn_data_put_map(filter); pn_data_enter(filter); pn_data_put_symbol(filter, convert("subject")); - pn_data_put_described(filter); - pn_data_enter(filter); - pn_data_put_ulong(filter, getFilterDescriptor(address.getSubject())); + //TODO: At present inserting described values into the map doesn't seem to work; correct this once resolved + //pn_data_put_described(filter); + //pn_data_enter(filter); + //pn_data_put_ulong(filter, getFilterDescriptor(address.getSubject())); pn_data_put_string(filter, convert(address.getSubject())); - pn_data_exit(filter); + //pn_data_exit(filter); pn_data_exit(filter); } diff --git a/cpp/src/qpid/messaging/amqp/SenderContext.cpp b/cpp/src/qpid/messaging/amqp/SenderContext.cpp index 02f7bdec1c..95398fea6f 100644 --- a/cpp/src/qpid/messaging/amqp/SenderContext.cpp +++ b/cpp/src/qpid/messaging/amqp/SenderContext.cpp @@ -20,6 +20,7 @@ */ #include "qpid/messaging/amqp/SenderContext.h" #include "qpid/messaging/amqp/EncodedMessage.h" +#include "qpid/messaging/amqp/AddressHelper.h" #include "qpid/amqp/descriptors.h" #include "qpid/amqp/MessageEncoder.h" #include "qpid/messaging/exceptions.h" @@ -336,4 +337,17 @@ bool SenderContext::Delivery::accepted() return pn_delivery_remote_state(token) == PN_ACCEPTED; } +void SenderContext::configure() const +{ + configure(pn_link_target(sender)); +} +void SenderContext::configure(pn_terminus_t* target) const +{ + pn_terminus_set_address(target, address.getName().c_str()); + //dynamic create: + AddressHelper helper(address); + if (helper.createEnabled(AddressHelper::FOR_SENDER)) { + helper.setNodeProperties(target); + } +} }}} // namespace qpid::messaging::amqp diff --git a/cpp/src/qpid/messaging/amqp/SenderContext.h b/cpp/src/qpid/messaging/amqp/SenderContext.h index bc73d199e7..0202d6aa4b 100644 --- a/cpp/src/qpid/messaging/amqp/SenderContext.h +++ b/cpp/src/qpid/messaging/amqp/SenderContext.h @@ -31,6 +31,7 @@ struct pn_delivery_t; struct pn_link_t; struct pn_session_t; +struct pn_terminus_t; namespace qpid { namespace messaging { @@ -67,6 +68,7 @@ class SenderContext const std::string& getName() const; const std::string& getTarget() const; Delivery* send(const qpid::messaging::Message& message); + void configure() const; private: friend class ConnectionContext; typedef std::deque<Delivery> Deliveries; @@ -79,6 +81,7 @@ class SenderContext uint32_t capacity; uint32_t processUnsettled(); + void configure(pn_terminus_t*) const; }; }}} // namespace qpid::messaging::amqp |