diff options
Diffstat (limited to 'qpid/cpp/src/qpid/amqp')
40 files changed, 5572 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/amqp/CharSequence.cpp b/qpid/cpp/src/qpid/amqp/CharSequence.cpp new file mode 100644 index 0000000000..ad5b0ec84c --- /dev/null +++ b/qpid/cpp/src/qpid/amqp/CharSequence.cpp @@ -0,0 +1,65 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "CharSequence.h" + +namespace qpid { +namespace amqp { + +void CharSequence::init() +{ + data = 0; + size = 0; +} + +CharSequence::operator bool() const +{ + return data && size; +} +std::string CharSequence::str() const +{ + return (data && size) ? std::string(data, size) : std::string(); +} + +CharSequence CharSequence::create() +{ + CharSequence c = {0, 0}; + return c; +} + +CharSequence CharSequence::create(const std::string& str) +{ + CharSequence c = {str.data(), str.size()}; + return c; +} + +CharSequence CharSequence::create(const char* data, size_t size) +{ + CharSequence c = {data, size}; + return c; +} + +CharSequence CharSequence::create(const unsigned char* data, size_t size) +{ + CharSequence c = {reinterpret_cast<const char*>(data), size}; + return c; +} + +}} // namespace qpid::amqp diff --git a/qpid/cpp/src/qpid/amqp/CharSequence.h b/qpid/cpp/src/qpid/amqp/CharSequence.h new file mode 100644 index 0000000000..752097c913 --- /dev/null +++ b/qpid/cpp/src/qpid/amqp/CharSequence.h @@ -0,0 +1,52 @@ +#ifndef QPID_AMQP_CHARSEQUENCE_H +#define QPID_AMQP_CHARSEQUENCE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <stddef.h> +#include <string> +#include "qpid/CommonImportExport.h" + +namespace qpid { +namespace amqp { + +/** + * Simple record of a particular sequence of chars/bytes. The memroy + * referenced is assumed to be owned by some other entity, this is + * merely a pointer into a (segment of) it. + */ +struct CharSequence +{ + const char* data; + size_t size; + + QPID_COMMON_EXTERN operator bool() const; + QPID_COMMON_EXTERN std::string str() const; + QPID_COMMON_EXTERN void init(); + + QPID_COMMON_EXTERN static CharSequence create(); + QPID_COMMON_EXTERN static CharSequence create(const std::string& str); + QPID_COMMON_EXTERN static CharSequence create(const char* data, size_t size); + QPID_COMMON_EXTERN static CharSequence create(const unsigned char* data, size_t size); +}; +}} // namespace qpid::amqp + +#endif /*!QPID_AMQP_CHARSEQUENCE_H*/ diff --git a/qpid/cpp/src/qpid/amqp/Codec.h b/qpid/cpp/src/qpid/amqp/Codec.h new file mode 100644 index 0000000000..c91cd0a96b --- /dev/null +++ b/qpid/cpp/src/qpid/amqp/Codec.h @@ -0,0 +1,83 @@ +#ifndef QPID_AMQP_CODEC_H +#define QPID_AMQP_CODEC_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace qpid { +namespace amqp { + +/** + * + */ +class Codec +{ + public: + + + + private: + + struct Constructor + { + uint8_t code; + Descriptor descriptor; + bool isDescribed; + }; + + Constructor readConstructor(Decoder decoder, Reader reader) + { + Constructor result; + result.code = decoder.readCode(); + if (code == DESCRIPTOR) { + result.isDescribed = true; + result.descriptor = decoder.readDescriptor(); + result.code = decoder.readCode(); + } else { + result.isDescribed = false; + } + return result; + } +}; + +Codec::Descriptor Codec::Decoder::readDescriptor() +{ + uint8_t code = decoder.readCode(); + switch(code) { + case SYMBOL8: + return Descriptor(readSequence8()); + case SYMBOL32: + return Descriptor(readSequence32()); + case ULONG: + return Descriptor(readULong()); + case ULONG_SMALL: + return Descriptor((uint64_t) readUByte()); + case ULONG_ZERO: + return Descriptor((uint64_t) 0); + default: + throw qpid::Exception("Expected descriptor of type ulong or symbol; found " << code); + } +} + +Codec::Descriptor::Descriptor(uint64_t id) : value.id(id), type(NUMERIC) {} +Codec::Descriptor::Descriptor(const CharSequence& symbol) : value.symbol(symbol), type(SYMBOLIC) {} +}} // namespace qpid::amqp + +#endif /*!QPID_AMQP_CODEC_H*/ diff --git a/qpid/cpp/src/qpid/amqp/Constructor.h b/qpid/cpp/src/qpid/amqp/Constructor.h new file mode 100644 index 0000000000..444e455670 --- /dev/null +++ b/qpid/cpp/src/qpid/amqp/Constructor.h @@ -0,0 +1,42 @@ +#ifndef QPID_AMQP_CONSTRUCTOR_H +#define QPID_AMQP_CONSTRUCTOR_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/amqp/Descriptor.h" +namespace qpid { +namespace amqp { + +/** + * Representation of an AMQP 1.0 type 'constructor' (i.e. a type code + * with an optional descriptor) + */ +struct Constructor +{ + uint8_t code; + Descriptor descriptor; + bool isDescribed; + + Constructor(uint8_t c) : code(c), descriptor(0), isDescribed(false) {} +}; +}} // namespace qpid::amqp + +#endif /*!QPID_AMQP_CONSTRUCTOR_H*/ diff --git a/qpid/cpp/src/qpid/amqp/DataBuilder.cpp b/qpid/cpp/src/qpid/amqp/DataBuilder.cpp new file mode 100644 index 0000000000..aeb9b9c612 --- /dev/null +++ b/qpid/cpp/src/qpid/amqp/DataBuilder.cpp @@ -0,0 +1,196 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "DataBuilder.h" +#include "CharSequence.h" +#include "qpid/log/Statement.h" +#include "qpid/types/encodings.h" + +namespace qpid { +namespace amqp { + +void DataBuilder::onNull(const Descriptor*) +{ + handle(qpid::types::Variant()); +} +void DataBuilder::onBoolean(bool v, const Descriptor*) +{ + handle(v); +} +void DataBuilder::onUByte(uint8_t v, const Descriptor*) +{ + handle(v); +} +void DataBuilder::onUShort(uint16_t v, const Descriptor*) +{ + handle(v); +} +void DataBuilder::onUInt(uint32_t v, const Descriptor*) +{ + handle(v); +} +void DataBuilder::onULong(uint64_t v, const Descriptor*) +{ + handle(v); +} +void DataBuilder::onByte(int8_t v, const Descriptor*) +{ + handle(v); +} +void DataBuilder::onShort(int16_t v, const Descriptor*) +{ + handle(v); +} +void DataBuilder::onInt(int32_t v, const Descriptor*) +{ + handle(v); +} +void DataBuilder::onLong(int64_t v, const Descriptor*) +{ + handle(v); +} +void DataBuilder::onFloat(float v, const Descriptor*) +{ + handle(v); +} +void DataBuilder::onDouble(double v, const Descriptor*) +{ + handle(v); +} +void DataBuilder::onUuid(const CharSequence& v, const Descriptor*) +{ + if (v.size == qpid::types::Uuid::SIZE) { + handle(qpid::types::Uuid(v.data)); + } +} +void DataBuilder::onTimestamp(int64_t v, const Descriptor*) +{ + handle(v); +} + +void DataBuilder::handle(const qpid::types::Variant& v) +{ + switch (nested.top()->getType()) { + case qpid::types::VAR_MAP: + nested.push(&nested.top()->asMap()[v.asString()]); + break; + case qpid::types::VAR_LIST: + nested.top()->asList().push_back(v); + break; + default: + *(nested.top()) = v; + nested.pop(); + break; + } +} + +void DataBuilder::onBinary(const CharSequence& v, const Descriptor*) +{ + onString(std::string(v.data, v.size), qpid::types::encodings::BINARY); +} +void DataBuilder::onString(const CharSequence& v, const Descriptor*) +{ + onString(std::string(v.data, v.size), qpid::types::encodings::UTF8); +} +void DataBuilder::onSymbol(const CharSequence& v, const Descriptor*) +{ + onString(std::string(v.data, v.size), qpid::types::encodings::ASCII); +} + +void DataBuilder::onString(const std::string& value, const std::string& encoding) +{ + switch (nested.top()->getType()) { + case qpid::types::VAR_MAP: + nested.push(&nested.top()->asMap()[value]); + break; + case qpid::types::VAR_LIST: + nested.top()->asList().push_back(qpid::types::Variant(value)); + nested.top()->asList().back().setEncoding(encoding); + break; + default: + qpid::types::Variant& v = *(nested.top()); + v = value; + v.setEncoding(encoding); + nested.pop(); + break; + } +} + +bool DataBuilder::proceed() +{ + return !nested.empty(); +} + +bool DataBuilder::nest(const qpid::types::Variant& n) +{ + switch (nested.top()->getType()) { + case qpid::types::VAR_MAP: + if (nested.size() > 1 || nested.top()->asMap().size() > 0) { + QPID_LOG(error, QPID_MSG("Expecting map key; got " << n << " " << *(nested.top()))); + } + break; + case qpid::types::VAR_LIST: + nested.top()->asList().push_back(n); + nested.push(&nested.top()->asList().back()); + break; + default: + qpid::types::Variant& value = *(nested.top()); + value = n; + nested.pop(); + nested.push(&value); + break; + } + return true; +} + +bool DataBuilder::onStartList(uint32_t, const CharSequence&, const CharSequence&, const Descriptor*) +{ + return nest(qpid::types::Variant::List()); +} +void DataBuilder::onEndList(uint32_t /*count*/, const Descriptor*) +{ + nested.pop(); +} +bool DataBuilder::onStartMap(uint32_t /*count*/, const CharSequence&, const CharSequence&, const Descriptor*) +{ + return nest(qpid::types::Variant::Map()); +} +void DataBuilder::onEndMap(uint32_t /*count*/, const Descriptor*) +{ + nested.pop(); +} +bool DataBuilder::onStartArray(uint32_t count, const CharSequence&, const Constructor&, const Descriptor*) +{ + return onStartList(count, CharSequence::create(), CharSequence::create(), 0); +} +void DataBuilder::onEndArray(uint32_t count, const Descriptor*) +{ + onEndList(count, 0); +} +qpid::types::Variant& DataBuilder::getValue() +{ + return base; +} +DataBuilder::DataBuilder(qpid::types::Variant v) : base(v) +{ + nested.push(&base); +} +DataBuilder::~DataBuilder() {} +}} // namespace qpid::amqp diff --git a/qpid/cpp/src/qpid/amqp/DataBuilder.h b/qpid/cpp/src/qpid/amqp/DataBuilder.h new file mode 100644 index 0000000000..51ee3da5f8 --- /dev/null +++ b/qpid/cpp/src/qpid/amqp/DataBuilder.h @@ -0,0 +1,79 @@ +#ifndef QPID_AMQP_DATABUILDER_H +#define QPID_AMQP_DATABUILDER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Reader.h" +#include "qpid/types/Variant.h" +#include "qpid/CommonImportExport.h" +#include <stack> + +namespace qpid { +namespace amqp { + +/** + * Utility to build a Variant based structure (or value) from a data stream + */ +class DataBuilder : public Reader +{ + public: + QPID_COMMON_EXTERN DataBuilder(qpid::types::Variant); + QPID_COMMON_EXTERN virtual ~DataBuilder(); + QPID_COMMON_EXTERN void onNull(const Descriptor*); + QPID_COMMON_EXTERN void onBoolean(bool, const Descriptor*); + QPID_COMMON_EXTERN void onUByte(uint8_t, const Descriptor*); + QPID_COMMON_EXTERN void onUShort(uint16_t, const Descriptor*); + QPID_COMMON_EXTERN void onUInt(uint32_t, const Descriptor*); + QPID_COMMON_EXTERN void onULong(uint64_t, const Descriptor*); + QPID_COMMON_EXTERN void onByte(int8_t, const Descriptor*); + QPID_COMMON_EXTERN void onShort(int16_t, const Descriptor*); + QPID_COMMON_EXTERN void onInt(int32_t, const Descriptor*); + QPID_COMMON_EXTERN void onLong(int64_t, const Descriptor*); + QPID_COMMON_EXTERN void onFloat(float, const Descriptor*); + QPID_COMMON_EXTERN void onDouble(double, const Descriptor*); + QPID_COMMON_EXTERN void onUuid(const CharSequence&, const Descriptor*); + QPID_COMMON_EXTERN void onTimestamp(int64_t, const Descriptor*); + + QPID_COMMON_EXTERN void onBinary(const CharSequence&, const Descriptor*); + QPID_COMMON_EXTERN void onString(const CharSequence&, const Descriptor*); + QPID_COMMON_EXTERN void onSymbol(const CharSequence&, const Descriptor*); + + QPID_COMMON_EXTERN bool onStartList(uint32_t /*count*/, const CharSequence&, const CharSequence&, const Descriptor*); + QPID_COMMON_EXTERN bool onStartMap(uint32_t /*count*/, const CharSequence&, const CharSequence&, const Descriptor*); + QPID_COMMON_EXTERN bool onStartArray(uint32_t /*count*/, const CharSequence&, const Constructor&, const Descriptor*); + QPID_COMMON_EXTERN void onEndList(uint32_t /*count*/, const Descriptor*); + QPID_COMMON_EXTERN void onEndMap(uint32_t /*count*/, const Descriptor*); + QPID_COMMON_EXTERN void onEndArray(uint32_t /*count*/, const Descriptor*); + + QPID_COMMON_EXTERN bool proceed(); + QPID_COMMON_EXTERN qpid::types::Variant& getValue(); + private: + qpid::types::Variant base; + std::stack<qpid::types::Variant*> nested; + std::string key; + + void handle(const qpid::types::Variant& v); + bool nest(const qpid::types::Variant& v); + void onString(const std::string&, const std::string&); +}; +}} // namespace qpid::amqp + +#endif /*!QPID_AMQP_DATABUILDER_H*/ diff --git a/qpid/cpp/src/qpid/amqp/Decoder.cpp b/qpid/cpp/src/qpid/amqp/Decoder.cpp new file mode 100644 index 0000000000..53cd367c25 --- /dev/null +++ b/qpid/cpp/src/qpid/amqp/Decoder.cpp @@ -0,0 +1,448 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/amqp/Decoder.h" +#include "qpid/amqp/CharSequence.h" +#include "qpid/amqp/Constructor.h" +#include "qpid/amqp/Descriptor.h" +#include "qpid/amqp/MapBuilder.h" +#include "qpid/amqp/Reader.h" +#include "qpid/amqp/typecodes.h" +#include "qpid/types/Uuid.h" +#include "qpid/types/Variant.h" +#include "qpid/log/Statement.h" +#include "qpid/Exception.h" + +namespace qpid { +namespace amqp { + +using namespace qpid::amqp::typecodes; + +Decoder::Decoder(const char* d, size_t s) : start(d), size(s), position(0), current(0) {} + +void Decoder::readMap(qpid::types::Variant::Map& map) +{ + MapBuilder builder; + read(builder); + map = builder.getMap(); +} + +qpid::types::Variant::Map Decoder::readMap() +{ + qpid::types::Variant::Map map; + readMap(map); + return map; +} + +void Decoder::read(Reader& reader) +{ + while (available() && reader.proceed()) { + readOne(reader); + } +} + +void Decoder::readOne(Reader& reader) +{ + const char* temp = start + position; + current = position; + Constructor c = readConstructor(); + if (c.isDescribed) reader.onDescriptor(c.descriptor, temp); + readValue(reader, c.code, c.isDescribed ? &c.descriptor : 0); +} + +void Decoder::readValue(Reader& reader, uint8_t code, const Descriptor* descriptor) +{ + switch(code) { + case NULL_VALUE: + reader.onNull(descriptor); + break; + case BOOLEAN: + reader.onBoolean(readBoolean(), descriptor); + break; + case BOOLEAN_TRUE: + reader.onBoolean(true, descriptor); + break; + case BOOLEAN_FALSE: + reader.onBoolean(false, descriptor); + break; + case UBYTE: + reader.onUByte(readUByte(), descriptor); + break; + case USHORT: + reader.onUShort(readUShort(), descriptor); + break; + case UINT: + reader.onUInt(readUInt(), descriptor); + break; + case UINT_SMALL: + reader.onUInt(readUByte(), descriptor); + break; + case UINT_ZERO: + reader.onUInt(0, descriptor); + break; + case ULONG: + reader.onULong(readULong(), descriptor); + break; + case ULONG_SMALL: + reader.onULong(readUByte(), descriptor); + break; + case ULONG_ZERO: + reader.onULong(0, descriptor); + break; + case BYTE: + reader.onByte(readByte(), descriptor); + break; + case SHORT: + reader.onShort(readShort(), descriptor); + break; + case INT: + reader.onInt(readInt(), descriptor); + break; + case INT_SMALL: + reader.onInt(readByte(), descriptor); + break; + case LONG: + reader.onLong(readLong(), descriptor); + break; + case LONG_SMALL: + reader.onLong(readByte(), descriptor); + break; + case FLOAT: + reader.onFloat(readFloat(), descriptor); + break; + case DOUBLE: + reader.onDouble(readDouble(), descriptor); + break; + case UUID: + reader.onUuid(readRawUuid(), descriptor); + break; + case TIMESTAMP: + reader.onTimestamp(readLong(), descriptor); + break; + + case BINARY8: + reader.onBinary(readSequence8(), descriptor); + break; + case BINARY32: + reader.onBinary(readSequence32(), descriptor); + break; + case STRING8: + reader.onString(readSequence8(), descriptor); + break; + case STRING32: + reader.onString(readSequence32(), descriptor); + break; + case SYMBOL8: + reader.onSymbol(readSequence8(), descriptor); + break; + case SYMBOL32: + reader.onSymbol(readSequence32(), descriptor); + break; + + case LIST0: + reader.onStartList(0, CharSequence::create(), getCurrent(0), descriptor); + reader.onEndList(0, descriptor); + break; + case LIST8: + readList8(reader, descriptor); + break; + case LIST32: + readList32(reader, descriptor); + break; + case MAP8: + readMap8(reader, descriptor); + break; + case MAP32: + readMap32(reader, descriptor); + break; + case ARRAY8: + readArray8(reader, descriptor); + break; + case ARRAY32: + readArray32(reader, descriptor); + break; + default: + break; + } +} + +void Decoder::readList8(Reader& reader, const Descriptor* descriptor) +{ + uint8_t size = readUByte(); + uint8_t count = readUByte(); + readList(reader, size-sizeof(size), count, descriptor); +} + +void Decoder::readList32(Reader& reader, const Descriptor* descriptor) +{ + uint32_t size = readUInt(); + uint32_t count = readUInt(); + readList(reader, size-sizeof(size), count, descriptor); +} + +void Decoder::readMap8(Reader& reader, const Descriptor* descriptor) +{ + uint8_t size = readUByte(); + uint8_t count = readUByte(); + readMap(reader, size-sizeof(size), count, descriptor); +} + +void Decoder::readMap32(Reader& reader, const Descriptor* descriptor) +{ + uint32_t size = readUInt(); + uint32_t count = readUInt(); + readMap(reader, size-sizeof(size), count, descriptor); +} + +void Decoder::readArray8(Reader& reader, const Descriptor* descriptor) +{ + uint8_t size = readUByte(); + uint8_t count = readUByte(); + readArray(reader, size-sizeof(size), count, descriptor); +} + +void Decoder::readArray32(Reader& reader, const Descriptor* descriptor) +{ + uint32_t size = readUInt(); + uint32_t count = readUInt(); + readArray(reader, size-sizeof(size), count, descriptor); +} + +void Decoder::readList(Reader& reader, uint32_t size, uint32_t count, const Descriptor* descriptor) +{ + if (reader.onStartList(count, CharSequence::create(data(), size), getCurrent(size), descriptor)) { + for (uint32_t i = 0; i < count; ++i) { + readOne(reader); + } + reader.onEndList(count, descriptor); + } else { + //skip + advance(size); + } +} +void Decoder::readMap(Reader& reader, uint32_t size, uint32_t count, const Descriptor* descriptor) +{ + if (reader.onStartMap(count, CharSequence::create(data(), size), getCurrent(size), descriptor)) { + for (uint32_t i = 0; i < count; ++i) { + readOne(reader); + } + reader.onEndMap(count, descriptor); + } else { + //skip + advance(size); + } +} + +void Decoder::readArray(Reader& reader, uint32_t size, uint32_t count, const Descriptor* descriptor) +{ + size_t temp = position; + Constructor constructor = readConstructor(); + CharSequence raw = CharSequence::create(data(), size-(position-temp)); + if (reader.onStartArray(count, raw, constructor, descriptor)) { + for (uint32_t i = 0; i < count; ++i) { + readValue(reader, constructor.code, constructor.isDescribed ? &constructor.descriptor : 0); + } + reader.onEndArray(count, descriptor); + } else { + //skip + advance(raw.size); + } +} + + +Constructor Decoder::readConstructor() +{ + Constructor result(readCode()); + if (result.code == DESCRIPTOR) { + result.isDescribed = true; + result.descriptor = readDescriptor(); + result.code = readCode(); + for (Descriptor* d = &result.descriptor; result.code == DESCRIPTOR; result.code = readCode()) { + d = d->nest(readDescriptor()); + } + } else { + result.isDescribed = false; + } + return result; +} + +Descriptor Decoder::readDescriptor() +{ + uint8_t code = readCode(); + switch(code) { + case SYMBOL8: + return Descriptor(readSequence8()); + case SYMBOL32: + return Descriptor(readSequence32()); + case ULONG: + return Descriptor(readULong()); + case ULONG_SMALL: + return Descriptor((uint64_t) readUByte()); + case ULONG_ZERO: + return Descriptor((uint64_t) 0); + default: + throw qpid::Exception(QPID_MSG("Expected descriptor of type ulong or symbol; found " << (int)code)); + } +} + +void Decoder::advance(size_t n) +{ + if (n > available()) throw qpid::Exception(QPID_MSG("Out of Bounds: requested advance of " << n << " at " << position << " but only " << available() << " available")); + position += n; +} + +const char* Decoder::data() +{ + return start + position; +} + +size_t Decoder::available() +{ + return size - position; +} + +uint8_t Decoder::readCode() +{ + return readUByte(); +} + +bool Decoder::readBoolean() +{ + return readUByte(); +} + +uint8_t Decoder::readUByte() +{ + return static_cast<uint8_t>(start[position++]); +} + +uint16_t Decoder::readUShort() +{ + uint16_t hi = (unsigned char) start[position++]; + hi = hi << 8; + hi |= (unsigned char) start[position++]; + return hi; +} + +uint32_t Decoder::readUInt() +{ + uint32_t a = (unsigned char) start[position++]; + uint32_t b = (unsigned char) start[position++]; + uint32_t c = (unsigned char) start[position++]; + uint32_t d = (unsigned char) start[position++]; + a = a << 24; + a |= b << 16; + a |= c << 8; + a |= d; + return a; +} + +uint64_t Decoder::readULong() +{ + uint64_t hi =readUInt(); + uint64_t lo = readUInt(); + hi = hi << 32; + return hi | lo; +} + +int8_t Decoder::readByte() +{ + return (int8_t) readUByte(); +} + +int16_t Decoder::readShort() +{ + return (int16_t) readUShort(); +} + +int32_t Decoder::readInt() +{ + return (int32_t) readUInt(); +} + +int64_t Decoder::readLong() +{ + return (int64_t) readULong(); +} + +float Decoder::readFloat() +{ + union { + uint32_t i; + float f; + } val; + val.i = readUInt(); + return val.f; +} + +double Decoder::readDouble() +{ + union { + uint64_t i; + double f; + } val; + val.i = readULong(); + return val.f; +} + +CharSequence Decoder::readSequence8() +{ + CharSequence s; + s.size = readUByte(); + s.data = start + position; + advance(s.size); + return s; +} + +CharSequence Decoder::readSequence32() +{ + CharSequence s; + s.size = readUInt(); + s.data = start + position; + advance(s.size); + return s; +} + +qpid::types::Uuid Decoder::readUuid() +{ + qpid::types::Uuid uuid(start + position); + advance(16); + return uuid; +} + +CharSequence Decoder::readRawUuid() +{ + CharSequence s; + s.data = start + position; + s.size = 16; + advance(s.size); + return s; +} + +size_t Decoder::getPosition() const { return position; } +size_t Decoder::getSize() const { return size; } +void Decoder::resetSize(size_t s) { size = s; } + +CharSequence Decoder::getCurrent(size_t remaining) const +{ + return CharSequence::create(start + current, (position-current)+remaining); +} + +}} // namespace qpid::amqp diff --git a/qpid/cpp/src/qpid/amqp/Decoder.h b/qpid/cpp/src/qpid/amqp/Decoder.h new file mode 100644 index 0000000000..a78518be2b --- /dev/null +++ b/qpid/cpp/src/qpid/amqp/Decoder.h @@ -0,0 +1,100 @@ +#ifndef QPID_AMQP_DECODER_H +#define QPID_AMQP_DECODER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/sys/IntegerTypes.h" +#include "qpid/CommonImportExport.h" +#include <map> +#include <string> +#include <stddef.h> + +namespace qpid { +namespace types { +class Uuid; +class Variant; +} +namespace amqp { +struct CharSequence; +struct Constructor; +struct Descriptor; +class Reader; + +/** + * Class to assist in decoding an AMQP encoded data-stream. + */ +class Decoder +{ + public: + QPID_COMMON_EXTERN Decoder(const char*, size_t); + + QPID_COMMON_EXTERN size_t available(); + QPID_COMMON_EXTERN uint8_t readCode(); + + QPID_COMMON_EXTERN bool readBoolean(); + QPID_COMMON_EXTERN uint8_t readUByte(); + QPID_COMMON_EXTERN uint16_t readUShort(); + QPID_COMMON_EXTERN uint32_t readUInt(); + QPID_COMMON_EXTERN uint64_t readULong(); + QPID_COMMON_EXTERN int8_t readByte(); + QPID_COMMON_EXTERN int16_t readShort(); + QPID_COMMON_EXTERN int32_t readInt(); + QPID_COMMON_EXTERN int64_t readLong(); + QPID_COMMON_EXTERN float readFloat(); + QPID_COMMON_EXTERN double readDouble(); + QPID_COMMON_EXTERN qpid::types::Uuid readUuid(); + QPID_COMMON_EXTERN CharSequence readSequence8(); + QPID_COMMON_EXTERN CharSequence readSequence32(); + QPID_COMMON_EXTERN Descriptor readDescriptor(); + QPID_COMMON_EXTERN void read(Reader& reader); + + QPID_COMMON_EXTERN void readMap(std::map<std::string, qpid::types::Variant>&); + QPID_COMMON_EXTERN std::map<std::string, qpid::types::Variant> readMap(); + QPID_COMMON_EXTERN void advance(size_t); + QPID_COMMON_EXTERN size_t getPosition() const; + QPID_COMMON_EXTERN void resetSize(size_t size); + QPID_COMMON_EXTERN size_t getSize() const; + + private: + const char* const start; + size_t size; + size_t position; + size_t current; + + void readOne(Reader& reader); + void readValue(Reader& reader, uint8_t code, const Descriptor* descriptor); + void readList(Reader& reader, uint32_t size, uint32_t count, const Descriptor* descriptor); + void readMap(Reader& reader, uint32_t size, uint32_t count, const Descriptor* descriptor); + void readArray(Reader& reader, uint32_t size, uint32_t count, const Descriptor* descriptor); + void readList8(Reader& reader, const Descriptor* descriptor); + void readList32(Reader& reader, const Descriptor* descriptor); + void readMap8(Reader& reader, const Descriptor* descriptor); + void readMap32(Reader& reader, const Descriptor* descriptor); + void readArray8(Reader& reader, const Descriptor* descriptor); + void readArray32(Reader& reader, const Descriptor* descriptor); + CharSequence readRawUuid(); + Constructor readConstructor(); + const char* data(); + CharSequence getCurrent(size_t remaining) const; +}; +}} // namespace qpid::amqp + +#endif /*!QPID_AMQP_DECODER_H*/ diff --git a/qpid/cpp/src/qpid/amqp/Descriptor.cpp b/qpid/cpp/src/qpid/amqp/Descriptor.cpp new file mode 100644 index 0000000000..43d388ee76 --- /dev/null +++ b/qpid/cpp/src/qpid/amqp/Descriptor.cpp @@ -0,0 +1,148 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Descriptor.h" +#include "descriptors.h" +#include <qpid/framing/reply_exceptions.h> +#include <map> + +namespace qpid { +namespace amqp { + +Descriptor::Descriptor(uint64_t code) : type(NUMERIC) { value.code = code; } + +Descriptor::Descriptor(const CharSequence& symbol) : type(SYMBOLIC) { value.symbol = symbol; } + +bool Descriptor::match(const std::string& symbol, uint64_t code) const +{ + switch (type) { + case SYMBOLIC: + return symbol.compare(0, symbol.size(), value.symbol.data, value.symbol.size) == 0; + case NUMERIC: + return code == value.code; + } + return false; +} + +size_t Descriptor::getSize() const +{ + size_t size = 1/*descriptor indicator*/ + 1/*type code*/; + switch (type) { + case Descriptor::NUMERIC: + if (value.code > 0) size += value.code < 256 ? 1/*encode as byte*/ : 8/*encode as long*/; + //else value will be indicated through ULONG_ZERO typecode + break; + case Descriptor::SYMBOLIC: + size += value.symbol.size < 256? 1/*size field is a byte*/ : 4/*size field is an int*/; + size += value.symbol.size; + break; + } + return size; +} + +Descriptor* Descriptor::nest(const Descriptor& d) +{ + nested = boost::shared_ptr<Descriptor>(new Descriptor(0)); + *nested = d; + return nested.get(); +} + +namespace { + +class DescriptorMap { + typedef std::map<uint64_t, std::string> SymbolMap; + typedef std::map<std::string, uint64_t> CodeMap; + + SymbolMap symbols; + CodeMap codes; + + public: + DescriptorMap() { + symbols[message::HEADER_CODE] = message::HEADER_SYMBOL; + symbols[message::DELIVERY_ANNOTATIONS_CODE] = message::DELIVERY_ANNOTATIONS_SYMBOL; + symbols[message::MESSAGE_ANNOTATIONS_CODE] = message::MESSAGE_ANNOTATIONS_SYMBOL; + symbols[message::PROPERTIES_CODE] = message::PROPERTIES_SYMBOL; + symbols[message::APPLICATION_PROPERTIES_CODE] = message::APPLICATION_PROPERTIES_SYMBOL; + symbols[message::DATA_CODE] = message::DATA_SYMBOL; + symbols[message::AMQP_SEQUENCE_CODE] = message::AMQP_SEQUENCE_SYMBOL; + symbols[message::AMQP_VALUE_CODE] = message::AMQP_VALUE_SYMBOL; + symbols[message::FOOTER_CODE] = message::FOOTER_SYMBOL; + symbols[message::ACCEPTED_CODE] = message::ACCEPTED_SYMBOL; + symbols[sasl::SASL_MECHANISMS_CODE] = sasl::SASL_MECHANISMS_SYMBOL; + symbols[sasl::SASL_INIT_CODE] = sasl::SASL_INIT_SYMBOL; + symbols[sasl::SASL_CHALLENGE_CODE] = sasl::SASL_CHALLENGE_SYMBOL; + symbols[sasl::SASL_RESPONSE_CODE] = sasl::SASL_RESPONSE_SYMBOL; + symbols[sasl::SASL_OUTCOME_CODE] = sasl::SASL_OUTCOME_SYMBOL; + symbols[filters::LEGACY_DIRECT_FILTER_CODE] = filters::LEGACY_DIRECT_FILTER_SYMBOL; + symbols[filters::LEGACY_TOPIC_FILTER_CODE] = filters::LEGACY_TOPIC_FILTER_SYMBOL; + symbols[filters::LEGACY_HEADERS_FILTER_CODE] = filters::LEGACY_HEADERS_FILTER_SYMBOL; + symbols[filters::SELECTOR_FILTER_CODE] = filters::SELECTOR_FILTER_SYMBOL; + symbols[filters::XQUERY_FILTER_CODE] = filters::XQUERY_FILTER_SYMBOL; + symbols[lifetime_policy::DELETE_ON_CLOSE_CODE] = lifetime_policy::DELETE_ON_CLOSE_SYMBOL; + symbols[lifetime_policy::DELETE_ON_NO_LINKS_CODE] = lifetime_policy::DELETE_ON_NO_LINKS_SYMBOL; + symbols[lifetime_policy::DELETE_ON_NO_MESSAGES_CODE] = lifetime_policy::DELETE_ON_NO_MESSAGES_SYMBOL; + symbols[lifetime_policy::DELETE_ON_NO_LINKS_OR_MESSAGES_CODE] = lifetime_policy::DELETE_ON_NO_LINKS_OR_MESSAGES_SYMBOL; + symbols[transaction::DECLARE_CODE] = transaction::DECLARE_SYMBOL; + symbols[transaction::DISCHARGE_CODE] = transaction::DISCHARGE_SYMBOL; + symbols[transaction::DECLARED_CODE] = transaction::DECLARED_SYMBOL; + symbols[transaction::TRANSACTIONAL_STATE_CODE] = transaction::TRANSACTIONAL_STATE_SYMBOL; + symbols[0] = "unknown-descriptor"; + + for (SymbolMap::const_iterator i = symbols.begin(); i != symbols.end(); ++i) + codes[i->second] = i->first; + } + + std::string operator[](uint64_t code) const { + SymbolMap::const_iterator i = symbols.find(code); + return (i == symbols.end()) ? "unknown-descriptor" : i->second; + } + + uint64_t operator[](const std::string& symbol) const { + CodeMap::const_iterator i = codes.find(symbol); + return (i == codes.end()) ? 0 : i->second; + } +}; + +DescriptorMap DESCRIPTOR_MAP; +} + +std::string Descriptor::symbol() const { + switch (type) { + case Descriptor::NUMERIC: return DESCRIPTOR_MAP[value.code]; + case Descriptor::SYMBOLIC: return value.symbol.str(); + } + assert(0); + return std::string(); +} + +uint64_t Descriptor::code() const { + switch (type) { + case Descriptor::NUMERIC: return value.code; + case Descriptor::SYMBOLIC: return DESCRIPTOR_MAP[value.symbol.str()]; + } + assert(0); + return 0; +} + +std::ostream& operator<<(std::ostream& os, const Descriptor& d) { + return os << d.symbol() << "(" << "0x" << std::hex << d.code() << ")"; +} + +}} // namespace qpid::amqp diff --git a/qpid/cpp/src/qpid/amqp/Descriptor.h b/qpid/cpp/src/qpid/amqp/Descriptor.h new file mode 100644 index 0000000000..3726114769 --- /dev/null +++ b/qpid/cpp/src/qpid/amqp/Descriptor.h @@ -0,0 +1,60 @@ +#ifndef QPID_AMQP_DESCRIPTOR_H +#define QPID_AMQP_DESCRIPTOR_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/amqp/CharSequence.h" +#include "qpid/sys/IntegerTypes.h" +#include <ostream> +#include <boost/shared_ptr.hpp> + +namespace qpid { +namespace amqp { + +/** + * Representation of an AMQP 1.0 type descriptor. + */ +struct Descriptor +{ + union { + CharSequence symbol; + uint64_t code; + } value; + enum { + NUMERIC, + SYMBOLIC + } type; + boost::shared_ptr<Descriptor> nested; + + QPID_COMMON_EXTERN Descriptor(uint64_t code); + QPID_COMMON_EXTERN Descriptor(const CharSequence& symbol); + QPID_COMMON_EXTERN bool match(const std::string&, uint64_t) const; + QPID_COMMON_EXTERN size_t getSize() const; + QPID_COMMON_EXTERN Descriptor* nest(const Descriptor& d); + QPID_COMMON_EXTERN std::string symbol() const; + QPID_COMMON_EXTERN uint64_t code() const; +}; + +QPID_COMMON_EXTERN std::ostream& operator<<(std::ostream& os, const Descriptor& d); + +}} // namespace qpid::amqp + +#endif /*!QPID_AMQP_DESCRIPTOR_H*/ diff --git a/qpid/cpp/src/qpid/amqp/Encoder.cpp b/qpid/cpp/src/qpid/amqp/Encoder.cpp new file mode 100644 index 0000000000..86b59fb1a2 --- /dev/null +++ b/qpid/cpp/src/qpid/amqp/Encoder.cpp @@ -0,0 +1,523 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/amqp/Encoder.h" +#include "qpid/amqp/CharSequence.h" +#include "qpid/amqp/Descriptor.h" +#include "qpid/amqp/typecodes.h" +#include "qpid/types/Uuid.h" +#include "qpid/types/Variant.h" +#include "qpid/types/encodings.h" +#include "qpid/log/Statement.h" +#include "qpid/Exception.h" +#include <assert.h> +#include <string.h> + +using namespace qpid::types::encodings; +using qpid::types::Variant; + +namespace qpid { +namespace amqp { + +Encoder::Overflow::Overflow() : Exception("Buffer overflow in encoder!") {} + +Encoder::Encoder(char* d, size_t s) : data(d), size(s), position(0), grow(false) {} + +Encoder::Encoder() : data(0), size(0), position(0), grow(true) {} + +namespace { +template <typename T> size_t encode(char* data, T i); +template <> size_t encode<uint8_t>(char* data, uint8_t i) +{ + *data = i; + return 1; +} +template <> size_t encode<uint16_t>(char* data, uint16_t i) +{ + uint16_t b = i; + size_t position(0); + data[position++] = (uint8_t) (0xFF & (b >> 8)); + data[position++] = (uint8_t) (0xFF & b); + return position; +} +template <> size_t encode<uint32_t>(char* data, uint32_t i) +{ + uint32_t b = i; + size_t position(0); + data[position++] = (uint8_t) (0xFF & (b >> 24)); + data[position++] = (uint8_t) (0xFF & (b >> 16)); + data[position++] = (uint8_t) (0xFF & (b >> 8)); + data[position++] = (uint8_t) (0xFF & b); + return position; +} +template <> size_t encode<uint64_t>(char* data, uint64_t i) +{ + uint32_t hi = i >> 32; + uint32_t lo = i; + size_t r(0); + r += encode(data, hi); + r += encode(data + r, lo); + return r; +} +template<typename T> struct Backfill +{ + T size; + T count; + char* location; +}; + +template<typename T> void end(T count, void* token, char* current) +{ + Backfill<T> b; + b.location = (char*) token; + b.size = (T) (current - b.location) - sizeof(b.size); + b.count = count; + b.location += encode<T>(b.location, b.size); + encode<T>(b.location, b.count); +} +} +char* Encoder::skip(size_t n) +{ + char* current = data + position; + check(n); + position += n; + return current; +} + +void Encoder::write(bool b) +{ + check(sizeof(b)); + position += encode<uint8_t>(data+position, b ? 1u : 0u); +} +void Encoder::write(uint8_t i) +{ + check(sizeof(i)); + position += encode<uint8_t>(data+position, i); +} +void Encoder::write(uint16_t i) +{ + check(sizeof(i)); + position += encode<uint16_t>(data+position, i); +} +void Encoder::write(uint32_t i) +{ + check(sizeof(i)); + position += encode<uint32_t>(data+position, i); +} +void Encoder::write(uint64_t i) +{ + check(sizeof(i)); + position += encode<uint64_t>(data+position, i); +} +void Encoder::write(int8_t i) +{ + check(sizeof(i)); + position += encode(data+position, (uint8_t) i); +} +void Encoder::write(int16_t i) +{ + check(sizeof(i)); + position += encode(data+position, (uint16_t) i); +} +void Encoder::write(int32_t i) +{ + check(sizeof(i)); + position += encode(data+position, (uint32_t) i); +} +void Encoder::write(int64_t i) +{ + check(sizeof(i)); + position += encode(data+position, (uint64_t) i); +} +void Encoder::write(float f) +{ + check(sizeof(f)); + union { + uint32_t i; + float f; + } val; + + val.f = f; + write(val.i); +} +void Encoder::write(double d) +{ + check(sizeof(d)); + union { + uint64_t i; + double d; + } val; + + val.d = d; + write(val.i); +} +void Encoder::write(const qpid::types::Uuid& uuid) +{ + writeBytes((const char*) uuid.data(), uuid.size()); +} + +void Encoder::writeBytes(const char* bytes, size_t count) +{ + check(count); + ::memcpy(data + position, bytes, count); + position += count; +} + +void Encoder::writeCode(uint8_t code) +{ + write(code); +} + +void Encoder::writeNull(const Descriptor* d) +{ + if (d) writeDescriptor(*d); + writeCode(typecodes::NULL_VALUE); +} +void Encoder::writeBoolean(bool b, const Descriptor* d) +{ + if (d) writeDescriptor(*d); + writeCode(b ? typecodes::BOOLEAN_TRUE : typecodes::BOOLEAN_FALSE); +} +void Encoder::writeUByte(uint8_t i, const Descriptor* d) +{ + write(i, typecodes::UBYTE, d); +} + +void Encoder::writeUShort(uint16_t i, const Descriptor* d) +{ + write(i, typecodes::USHORT, d); +} + +void Encoder::writeUInt(uint32_t i, const Descriptor* d) +{ + if (i == 0) { + if (d) writeDescriptor(*d); + writeCode(typecodes::UINT_ZERO); + } else { + if (i < 256) { + write((uint8_t) i, typecodes::UINT_SMALL, d); + } else { + write(i, typecodes::UINT, d); + } + } +} + +void Encoder::writeULong(uint64_t i, const Descriptor* d) +{ + if (i == 0) { + if (d) writeDescriptor(*d); + writeCode(typecodes::ULONG_ZERO); + } else { + if (i < 256) { + write((uint8_t) i, typecodes::ULONG_SMALL, d); + } else { + write(i, typecodes::ULONG, d); + } + } +} + +void Encoder::writeByte(int8_t i, const Descriptor* d) +{ + write((uint8_t) i, typecodes::LONG, d); +} + +void Encoder::writeShort(int16_t i, const Descriptor* d) +{ + write((uint16_t) i, typecodes::SHORT, d); +} + +void Encoder::writeInt(int32_t i, const Descriptor* d) +{ + write((uint32_t) i, typecodes::INT, d); +} + +void Encoder::writeLong(int64_t i, const Descriptor* d) +{ + write((uint64_t) i, typecodes::LONG, d); +} + +void Encoder::writeTimestamp(int64_t t, const Descriptor* d) +{ + write((uint64_t) t, typecodes::TIMESTAMP, d); +} + + +void Encoder::writeFloat(float f, const Descriptor* d) +{ + write(f, typecodes::FLOAT, d); +} + +void Encoder::writeDouble(double f, const Descriptor* d) +{ + write(f, typecodes::DOUBLE, d); +} + +void Encoder::writeUuid(const qpid::types::Uuid& uuid, const Descriptor* d) +{ + write(uuid, typecodes::UUID, d); +} + +void Encoder::write(const CharSequence& v, std::pair<uint8_t, uint8_t> codes, const Descriptor* d) +{ + if (d) writeDescriptor(*d); + if (v.size < 256) { + writeCode(codes.first); + write((uint8_t) v.size); + } else { + writeCode(codes.second); + write((uint32_t) v.size); + } + writeBytes(v.data, v.size); +} + +void Encoder::write(const std::string& v, std::pair<uint8_t, uint8_t> codes, const Descriptor* d) +{ + if (d) writeDescriptor(*d); + if (v.size() < 256) { + writeCode(codes.first); + write((uint8_t) v.size()); + } else { + writeCode(codes.second); + write((uint32_t) v.size()); + } + writeBytes(v.data(), v.size()); +} + +void Encoder::writeSymbol(const CharSequence& v, const Descriptor* d) +{ + write(v, typecodes::SYMBOL, d); +} + +void Encoder::writeSymbol(const std::string& v, const Descriptor* d) +{ + write(v, typecodes::SYMBOL, d); +} + +void Encoder::writeString(const CharSequence& v, const Descriptor* d) +{ + write(v, typecodes::STRING, d); +} + +void Encoder::writeString(const std::string& v, const Descriptor* d) +{ + write(v, typecodes::STRING, d); +} + +void Encoder::writeBinary(const CharSequence& v, const Descriptor* d) +{ + write(v, typecodes::BINARY, d); +} + +void Encoder::writeBinary(const std::string& v, const Descriptor* d) +{ + write(v, typecodes::BINARY, d); +} + +void* Encoder::startList8(const Descriptor* d) +{ + return start<uint8_t>(typecodes::LIST8, d); +} + +void* Encoder::startList32(const Descriptor* d) +{ + return start<uint32_t>(typecodes::LIST32, d); +} + +void Encoder::endList8(uint8_t count, void* token) +{ + end<uint8_t>(count, token, data+position); +} + +void Encoder::endList32(uint32_t count, void* token) +{ + end<uint32_t>(count, token, data+position); +} + +void* Encoder::startMap8(const Descriptor* d) +{ + return start<uint8_t>(typecodes::MAP8, d); +} + +void* Encoder::startMap32(const Descriptor* d) +{ + return start<uint32_t>(typecodes::MAP32, d); +} + +void Encoder::endMap8(uint8_t count, void* token) +{ + end<uint8_t>(count, token, data+position); +} + +void Encoder::endMap32(uint32_t count, void* token) +{ + end<uint32_t>(count, token, data+position); +} + +void* Encoder::startArray8(const Constructor& c, const Descriptor* d) +{ + return startArray<uint8_t>(typecodes::ARRAY8, d, c); +} + +void* Encoder::startArray32(const Constructor& c, const Descriptor* d) +{ + return startArray<uint8_t>(typecodes::ARRAY32, d, c); +} + +void Encoder::endArray8(size_t count, void* token) +{ + end<uint8_t>(count, token, data+position); +} + +void Encoder::endArray32(size_t count, void* token) +{ + end<uint32_t>(count, token, data+position); +} + +void Encoder::writeMap(const std::map<std::string, qpid::types::Variant>& value, const Descriptor* d, bool large) +{ + void* token = large ? startMap32(d) : startMap8(d); + for (qpid::types::Variant::Map::const_iterator i = value.begin(); i != value.end(); ++i) { + writeString(i->first); + writeValue(i->second); + } + if (large) endMap32(value.size()*2, token); + else endMap8(value.size()*2, token); +} + +void Encoder::writeList(const std::list<qpid::types::Variant>& value, const Descriptor* d, bool large) +{ + void* token = large ? startList32(d) : startList8(d); + for (qpid::types::Variant::List::const_iterator i = value.begin(); i != value.end(); ++i) { + writeValue(*i); + } + if (large) endList32(value.size(), token); + else endList8(value.size(), token); +} + +void Encoder::writeValue(const qpid::types::Variant& value, const Descriptor* d) +{ + if (d) { + writeDescriptor(*d); // Write this descriptor before any in the value. + d = 0; + } + // Write any descriptors attached to the value. + const Variant::List& descriptors = value.getDescriptors(); + for (Variant::List::const_iterator i = descriptors.begin(); i != descriptors.end(); ++i) { + if (i->getType() == types::VAR_STRING) + writeDescriptor(Descriptor(CharSequence::create(i->asString()))); + else + writeDescriptor(Descriptor(i->asUint64())); + } + switch (value.getType()) { + case qpid::types::VAR_VOID: + writeNull(d); + break; + case qpid::types::VAR_BOOL: + writeBoolean(value.asBool(), d); + break; + case qpid::types::VAR_UINT8: + writeUByte(value.asUint8(), d); + break; + case qpid::types::VAR_UINT16: + writeUShort(value.asUint16(), d); + break; + case qpid::types::VAR_UINT32: + writeUInt(value.asUint32(), d); + break; + case qpid::types::VAR_UINT64: + writeULong(value.asUint64(), d); + break; + case qpid::types::VAR_INT8: + writeByte(value.asInt8(), d); + break; + case qpid::types::VAR_INT16: + writeShort(value.asInt16(), d); + break; + case qpid::types::VAR_INT32: + writeInt(value.asInt32(), d); + break; + case qpid::types::VAR_INT64: + writeLong(value.asInt64(), d); + break; + case qpid::types::VAR_FLOAT: + writeFloat(value.asFloat(), d); + break; + case qpid::types::VAR_DOUBLE: + writeDouble(value.asDouble(), d); + break; + case qpid::types::VAR_STRING: + if (value.getEncoding() == UTF8) { + writeString(value.getString(), d); + } else if (value.getEncoding() == ASCII) { + writeSymbol(value.getString(), d); + } else { + writeBinary(value.getString(), d); + } + break; + case qpid::types::VAR_MAP: + writeMap(value.asMap(), d); + break; + case qpid::types::VAR_LIST: + writeList(value.asList(), d); + break; + case qpid::types::VAR_UUID: + writeUuid(value.asUuid(), d); + break; + } + +} + +void Encoder::writeDescriptor(const Descriptor& d) +{ + writeCode(typecodes::DESCRIPTOR); + switch (d.type) { + case Descriptor::NUMERIC: + writeULong(d.value.code, 0); + break; + case Descriptor::SYMBOLIC: + writeSymbol(d.value.symbol, 0); + break; + } +} + +void Encoder::check(size_t s) +{ + if (position + s > size) { + if (grow) { + buffer.resize(buffer.size() + s); + data = const_cast<char*>(buffer.data()); + size = buffer.size(); + } + else { + QPID_LOG(notice, "Buffer overflow for write of size " << s + << " to buffer of size " << size << " at position " << position); + assert(false); + throw Overflow(); + } + } +} + +size_t Encoder::getPosition() { return position; } +size_t Encoder::getSize() const { return size; } +char* Encoder::getData() { return data + position; } +std::string Encoder::getBuffer() { return buffer; } +void Encoder::resetPosition(size_t p) { assert(p <= size); position = p; } + +}} // namespace qpid::amqp diff --git a/qpid/cpp/src/qpid/amqp/Encoder.h b/qpid/cpp/src/qpid/amqp/Encoder.h new file mode 100644 index 0000000000..8729f29b94 --- /dev/null +++ b/qpid/cpp/src/qpid/amqp/Encoder.h @@ -0,0 +1,181 @@ +#ifndef QPID_AMQP_ENCODER_H +#define QPID_AMQP_ENCODER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/sys/IntegerTypes.h" +#include "qpid/amqp/Constructor.h" +#include "qpid/Exception.h" +#include <list> +#include <map> +#include <stddef.h> +#include <string> + +namespace qpid { +namespace types { +class Uuid; +class Variant; +} +namespace amqp { +struct CharSequence; +struct Descriptor; + +/** + * Class to help create AMQP encoded data. + */ +class Encoder +{ + public: + struct Overflow : public Exception { Overflow(); }; + + /** Create an encoder that writes into the buffer at data up to size bytes. + * Write operations throw Overflow if encoding exceeds size bytes. + */ + QPID_COMMON_EXTERN Encoder(char* data, size_t size); + + /** Create an encoder that manages its own buffer. Buffer grows to accomodate + * all encoded data. Call getBuffer() to get the buffer. + */ + QPID_COMMON_EXTERN Encoder(); + + void writeCode(uint8_t); + + void write(bool); + void write(uint8_t); + void write(uint16_t); + void write(uint32_t); + void write(uint64_t); + void write(int8_t); + void write(int16_t); + void write(int32_t); + void write(int64_t); + void write(float); + void write(double); + void write(const qpid::types::Uuid&); + + void writeNull(const Descriptor* d=0); + void writeBoolean(bool, const Descriptor* d=0); + void writeUByte(uint8_t, const Descriptor* d=0); + void writeUShort(uint16_t, const Descriptor* d=0); + void writeUInt(uint32_t, const Descriptor* d=0); + void writeULong(uint64_t, const Descriptor* d=0); + void writeByte(int8_t, const Descriptor* d=0); + void writeShort(int16_t, const Descriptor* d=0); + void writeInt(int32_t, const Descriptor* d=0); + void writeLong(int64_t, const Descriptor* d=0); + void writeFloat(float, const Descriptor* d=0); + void writeDouble(double, const Descriptor* d=0); + void writeUuid(const qpid::types::Uuid&, const Descriptor* d=0); + void writeTimestamp(int64_t, const Descriptor* d=0); + + void writeSymbol(const CharSequence&, const Descriptor* d=0); + void writeSymbol(const std::string&, const Descriptor* d=0); + void writeString(const CharSequence&, const Descriptor* d=0); + void writeString(const std::string&, const Descriptor* d=0); + void writeBinary(const CharSequence&, const Descriptor* d=0); + QPID_COMMON_EXTERN void writeBinary(const std::string&, const Descriptor* d=0); + + void* startList8(const Descriptor* d=0); + void* startList32(const Descriptor* d=0); + void endList8(uint8_t count, void*); + void endList32(uint32_t count, void*); + + void* startMap8(const Descriptor* d=0); + void* startMap32(const Descriptor* d=0); + void endMap8(uint8_t count, void*); + void endMap32(uint32_t count, void*); + + void* startArray8(const Constructor&, const Descriptor* d=0); + void* startArray32(const Constructor&, const Descriptor* d=0); + void endArray8(size_t count, void*); + void endArray32(size_t count, void*); + + QPID_COMMON_EXTERN void writeValue(const qpid::types::Variant&, const Descriptor* d=0); + QPID_COMMON_EXTERN void writeMap(const std::map<std::string, qpid::types::Variant>& value, const Descriptor* d=0, bool large=true); + QPID_COMMON_EXTERN void writeList(const std::list<qpid::types::Variant>& value, const Descriptor* d=0, bool large=true); + + void writeDescriptor(const Descriptor&); + QPID_COMMON_EXTERN size_t getPosition(); + void resetPosition(size_t p); + char* skip(size_t); + void writeBytes(const char* bytes, size_t count); + virtual ~Encoder() {} + + /** Return the total size of the buffer. */ + size_t getSize() const; + + /** Return the growable buffer. */ + std::string getBuffer(); + + /** Return the unused portion of the buffer. */ + char* getData(); + + private: + char* data; + size_t size; + size_t position; + bool grow; + std::string buffer; + + void write(const CharSequence& v, std::pair<uint8_t, uint8_t> codes, const Descriptor* d); + void write(const std::string& v, std::pair<uint8_t, uint8_t> codes, const Descriptor* d); + void check(size_t); + + template<typename T> void write(T value, uint8_t code, const Descriptor* d) + { + if (d) writeDescriptor(*d); + writeCode(code); + write(value); + } + + template<typename T> void write(T value, std::pair<uint8_t, uint8_t> codes, const Descriptor* d) + { + if (value < 256) { + write((uint8_t) value, codes.first, d); + } else { + write(value, codes.second, d); + } + } + + template<typename T> void* start(uint8_t code, const Descriptor* d) + { + if (d) writeDescriptor(*d); + writeCode(code); + //skip size and count, will backfill on end + return skip(sizeof(T)/*size*/ + sizeof(T)/*count*/); + } + + template<typename T> void* startArray(uint8_t code, const Descriptor* d, const Constructor& c) + { + void* token = start<T>(code, d); + if (c.isDescribed) { + writeDescriptor(c.descriptor); + } + check(1); + writeCode(c.code); + return token; + } + +}; + +}} // namespace qpid::amqp + +#endif /*!QPID_AMQP_ENCODER_H*/ diff --git a/qpid/cpp/src/qpid/amqp/ListBuilder.cpp b/qpid/cpp/src/qpid/amqp/ListBuilder.cpp new file mode 100644 index 0000000000..f2ca8e8805 --- /dev/null +++ b/qpid/cpp/src/qpid/amqp/ListBuilder.cpp @@ -0,0 +1,33 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ListBuilder.h" + +namespace qpid { +namespace amqp { + +ListBuilder::ListBuilder() : DataBuilder(qpid::types::Variant::List()) {} + +qpid::types::Variant::List& ListBuilder::getList() +{ + return getValue().asList(); +} + +}} // namespace qpid::amqp diff --git a/qpid/cpp/src/qpid/amqp/ListBuilder.h b/qpid/cpp/src/qpid/amqp/ListBuilder.h new file mode 100644 index 0000000000..825f384f56 --- /dev/null +++ b/qpid/cpp/src/qpid/amqp/ListBuilder.h @@ -0,0 +1,41 @@ +#ifndef QPID_AMQP_LISTBUILDER_H +#define QPID_AMQP_LISTBUILDER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "DataBuilder.h" +#include "qpid/CommonImportExport.h" + +namespace qpid { +namespace amqp { + +/** + * Utility to build a Variant::List from a data stream + */ +class ListBuilder : public DataBuilder +{ + public: + QPID_COMMON_EXTERN ListBuilder(); + QPID_COMMON_EXTERN qpid::types::Variant::List& getList(); +}; +}} // namespace qpid::amqp + +#endif /*!QPID_AMQP_LISTBUILDER_H*/ diff --git a/qpid/cpp/src/qpid/amqp/ListReader.h b/qpid/cpp/src/qpid/amqp/ListReader.h new file mode 100644 index 0000000000..fafe2a1f9c --- /dev/null +++ b/qpid/cpp/src/qpid/amqp/ListReader.h @@ -0,0 +1,103 @@ +#ifndef QPID_AMQP_LISTREADER_H +#define QPID_AMQP_LISTREADER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Reader.h" + +namespace qpid { +namespace amqp { + +/** + * Utility to assist in reading AMQP encoded lists + */ +class ListReader : public Reader +{ + public: + ListReader() : index(0), level(0) {} + virtual ~ListReader() {} + virtual void onNull(const Descriptor* descriptor) { getReader().onNull(descriptor); } + virtual void onBoolean(bool v, const Descriptor* descriptor) { getReader().onBoolean(v, descriptor); } + virtual void onUByte(uint8_t v, const Descriptor* descriptor) { getReader().onUByte(v, descriptor); } + virtual void onUShort(uint16_t v, const Descriptor* descriptor) { getReader().onUShort(v, descriptor); } + virtual void onUInt(uint32_t v, const Descriptor* descriptor) { getReader().onUInt(v, descriptor); } + virtual void onULong(uint64_t v, const Descriptor* descriptor) { getReader().onULong(v, descriptor); } + virtual void onByte(int8_t v, const Descriptor* descriptor) { getReader().onByte(v, descriptor); } + virtual void onShort(int16_t v, const Descriptor* descriptor) { getReader().onShort(v, descriptor); } + virtual void onInt(int32_t v, const Descriptor* descriptor) { getReader().onInt(v, descriptor); } + virtual void onLong(int64_t v, const Descriptor* descriptor) { getReader().onLong(v, descriptor); } + virtual void onFloat(float v, const Descriptor* descriptor) { getReader().onFloat(v, descriptor); } + virtual void onDouble(double v, const Descriptor* descriptor) { getReader().onDouble(v, descriptor); } + virtual void onUuid(const CharSequence& v, const Descriptor* descriptor) { getReader().onUuid(v, descriptor); } + virtual void onTimestamp(int64_t v, const Descriptor* descriptor) { getReader().onTimestamp(v, descriptor); } + + virtual void onBinary(const CharSequence& v, const Descriptor* descriptor) { getReader().onBinary(v, descriptor); } + virtual void onString(const CharSequence& v, const Descriptor* descriptor) { getReader().onString(v, descriptor); } + virtual void onSymbol(const CharSequence& v, const Descriptor* descriptor) { getReader().onSymbol(v, descriptor); } + + virtual bool onStartList(uint32_t count, const CharSequence& elements, const CharSequence& all, const Descriptor* descriptor) + { + ++level; + getReader().onStartList(count, elements, all, descriptor); + return false; + } + virtual void onEndList(uint32_t count, const Descriptor* descriptor) + { + --level; + getReader().onEndList(count, descriptor); + } + virtual bool onStartMap(uint32_t count, const CharSequence& elements, const CharSequence& all, const Descriptor* descriptor) + { + ++level; + getReader().onStartMap(count, elements, all, descriptor); + return false; + } + virtual void onEndMap(uint32_t count, const Descriptor* descriptor) + { + --level; + getReader().onEndList(count, descriptor); + } + virtual bool onStartArray(uint32_t count, const CharSequence& v, const Constructor& c, const Descriptor* descriptor) + { + ++level; + getReader().onStartArray(count, v, c, descriptor); + return false; + } + virtual void onEndArray(uint32_t count, const Descriptor* descriptor) + { + --level; + getReader().onEndList(count, descriptor); + } + private: + size_t index; + size_t level; + Reader& getReader() + { + Reader& r = getReader(index); + if (level == 0) ++index; + return r; + } + protected: + virtual Reader& getReader(size_t i) = 0; +}; +}} // namespace qpid::amqp + +#endif /*!QPID_AMQP_LISTREADER_H*/ diff --git a/qpid/cpp/src/qpid/amqp/LoggingReader.h b/qpid/cpp/src/qpid/amqp/LoggingReader.h new file mode 100644 index 0000000000..ed5cab1cbd --- /dev/null +++ b/qpid/cpp/src/qpid/amqp/LoggingReader.h @@ -0,0 +1,64 @@ +#ifndef QPID_AMQP_LOGGINGREADER_H +#define QPID_AMQP_LOGGINGREADER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Reader.h" +#include "qpid/log/Statement.h" + +namespace qpid { +namespace amqp { + +class LoggingReader : public Reader +{ + public: + virtual ~LoggingReader() {} + virtual void onNull(const Descriptor*) { if (!ignoreNull()) QPID_LOG(warning, prefix() << "null" << suffix()); } + virtual void onBoolean(bool, const Descriptor*) { QPID_LOG(warning, prefix() << "boolean" << suffix()); } + virtual void onUByte(uint8_t, const Descriptor*) { QPID_LOG(warning, prefix() << "ubyte" << suffix()); } + virtual void onUShort(uint16_t, const Descriptor*) { QPID_LOG(warning, prefix() << "ushort" << suffix()); } + virtual void onUInt(uint32_t, const Descriptor*) { QPID_LOG(warning, prefix() << "uint" << suffix()); } + virtual void onULong(uint64_t, const Descriptor*) { QPID_LOG(warning, prefix() << "ulong" << suffix()); } + virtual void onByte(int8_t, const Descriptor*) { QPID_LOG(warning, prefix() << "byte" << suffix()); } + virtual void onShort(int16_t, const Descriptor*) { QPID_LOG(warning, prefix() << "short" << suffix()); } + virtual void onInt(int32_t, const Descriptor*) { QPID_LOG(warning, prefix() << "int" << suffix()); } + virtual void onLong(int64_t, const Descriptor*) { QPID_LOG(warning, prefix() << "long" << suffix()); } + virtual void onFloat(float, const Descriptor*) { QPID_LOG(warning, prefix() << "float" << suffix()); } + virtual void onDouble(double, const Descriptor*) { QPID_LOG(warning, prefix() << "double" << suffix()); } + virtual void onUuid(const CharSequence&, const Descriptor*) { QPID_LOG(warning, prefix() << "uuid" << suffix()); } + virtual void onTimestamp(int64_t, const Descriptor*) { QPID_LOG(warning, prefix() << "timestamp" << suffix()); } + + virtual void onBinary(const CharSequence&, const Descriptor*) { QPID_LOG(warning, prefix() << "binary" << suffix()); } + virtual void onString(const CharSequence&, const Descriptor*) { QPID_LOG(warning, prefix() << "string" << suffix()); } + virtual void onSymbol(const CharSequence&, const Descriptor*) { QPID_LOG(warning, prefix() << "symbol" << suffix()); } + + virtual bool onStartList(uint32_t, const CharSequence&, const Descriptor*) { QPID_LOG(warning, prefix() << "list" << suffix()); return recursive; } + virtual bool onStartMap(uint32_t, const CharSequence&, const Descriptor*) { QPID_LOG(warning, prefix() << "map" << suffix()); return recursive; } + virtual bool onStartArray(uint32_t, const CharSequence&, const Constructor&, const Descriptor*) { QPID_LOG(warning, prefix() << "array" << suffix()); return recursive; } + protected: + virtual bool recursive() { return true; } + virtual bool ignoreNull() { return true; } + virtual std::string prefix() = 0; + virtual std::string suffix() = 0; +}; +}} // namespace qpid::amqp + +#endif /*!QPID_AMQP_LOGGINGREADER_H*/ diff --git a/qpid/cpp/src/qpid/amqp/MapBuilder.cpp b/qpid/cpp/src/qpid/amqp/MapBuilder.cpp new file mode 100644 index 0000000000..ce8eea038e --- /dev/null +++ b/qpid/cpp/src/qpid/amqp/MapBuilder.cpp @@ -0,0 +1,30 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "MapBuilder.h" + +namespace qpid { +namespace amqp { +MapBuilder::MapBuilder() : DataBuilder(qpid::types::Variant::Map()) {} +qpid::types::Variant::Map MapBuilder::getMap() +{ + return getValue().asMap(); +} +}} // namespace qpid::amqp diff --git a/qpid/cpp/src/qpid/amqp/MapBuilder.h b/qpid/cpp/src/qpid/amqp/MapBuilder.h new file mode 100644 index 0000000000..fd94ae04af --- /dev/null +++ b/qpid/cpp/src/qpid/amqp/MapBuilder.h @@ -0,0 +1,41 @@ +#ifndef QPID_AMQP_MAPBUILDER_H +#define QPID_AMQP_MAPBUILDER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "DataBuilder.h" +#include "qpid/CommonImportExport.h" + +namespace qpid { +namespace amqp { + +/** + * Utility to build a Variant::Map from a data stream + */ +class MapBuilder : public DataBuilder +{ + public: + QPID_COMMON_EXTERN MapBuilder(); + QPID_COMMON_EXTERN qpid::types::Variant::Map getMap(); +}; +}} // namespace qpid::amqp + +#endif /*!QPID_AMQP_MAPBUILDER_H*/ diff --git a/qpid/cpp/src/qpid/amqp/MapEncoder.cpp b/qpid/cpp/src/qpid/amqp/MapEncoder.cpp new file mode 100644 index 0000000000..cf8ef4ecb5 --- /dev/null +++ b/qpid/cpp/src/qpid/amqp/MapEncoder.cpp @@ -0,0 +1,142 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "MapEncoder.h" +#include "CharSequence.h" +#include "qpid/amqp/typecodes.h" +#include <string.h> + +namespace qpid { +namespace amqp { + +MapEncoder::MapEncoder(char* data, size_t size) : Encoder(data, size) {} + +void MapEncoder::handleVoid(const CharSequence& key) +{ + writeString(key); + writeNull(); +} + +void MapEncoder::handleBool(const CharSequence& key, bool value) +{ + writeString(key); + writeBoolean(value); +} + +void MapEncoder::handleUint8(const CharSequence& key, uint8_t value) +{ + writeString(key); + writeUByte(value); +} + +void MapEncoder::handleUint16(const CharSequence& key, uint16_t value) +{ + writeString(key); + writeUShort(value); +} + +void MapEncoder::handleUint32(const CharSequence& key, uint32_t value) +{ + writeString(key); + writeUInt(value); +} + +void MapEncoder::handleUint64(const CharSequence& key, uint64_t value) +{ + writeString(key); + writeULong(value); +} + +void MapEncoder::handleInt8(const CharSequence& key, int8_t value) +{ + writeString(key); + writeByte(value); +} + +void MapEncoder::handleInt16(const CharSequence& key, int16_t value) +{ + writeString(key); + writeShort(value); +} + +void MapEncoder::handleInt32(const CharSequence& key, int32_t value) +{ + writeString(key); + writeInt(value); +} + +void MapEncoder::handleInt64(const CharSequence& key, int64_t value) +{ + writeString(key); + writeLong(value); +} + +void MapEncoder::handleFloat(const CharSequence& key, float value) +{ + writeString(key); + writeFloat(value); +} + +void MapEncoder::handleDouble(const CharSequence& key, double value) +{ + writeString(key); + writeDouble(value); +} + +namespace { +const std::string BINARY("binary"); +} + +void MapEncoder::handleString(const CharSequence& key, const CharSequence& value, const CharSequence& encoding) +{ + writeString(key); + if (encoding.size == BINARY.size() && ::strncmp(encoding.data, BINARY.data(), encoding.size)) { + writeBinary(value); + } else { + writeString(value); + } +} + +void MapEncoder::writeMetaData(size_t size, size_t count, const Descriptor* d) +{ + if (count > 255 || size > 255) { + writeMap32MetaData((uint32_t) size, (uint32_t) count, d); + } else { + writeMap8MetaData((uint8_t) size, (uint8_t) count, d); + } +} + +void MapEncoder::writeMap8MetaData(uint8_t size, uint8_t count, const Descriptor* d) +{ + if (d) writeDescriptor(*d); + writeCode(typecodes::MAP8); + write((uint8_t) (size+1)/*size includes count field*/); + write(count); +} + +void MapEncoder::writeMap32MetaData(uint32_t size, uint32_t count, const Descriptor* d) +{ + if (d) writeDescriptor(*d); + writeCode(typecodes::MAP32); + write((uint32_t) (size+4)/*size includes count field*/); + write(count); +} + +}} // namespace qpid::amqp diff --git a/qpid/cpp/src/qpid/amqp/MapEncoder.h b/qpid/cpp/src/qpid/amqp/MapEncoder.h new file mode 100644 index 0000000000..1481f9125a --- /dev/null +++ b/qpid/cpp/src/qpid/amqp/MapEncoder.h @@ -0,0 +1,58 @@ +#ifndef QPID_AMQP_MAPENCODER_H +#define QPID_AMQP_MAPENCODER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "MapHandler.h" +#include "Encoder.h" + +namespace qpid { +namespace amqp { +struct Descriptor; + +/** + * Encode map like data + */ +class MapEncoder : public MapHandler, private Encoder +{ + public: + MapEncoder(char* data, size_t size); + void handleVoid(const CharSequence& key); + void handleBool(const CharSequence& key, bool value); + void handleUint8(const CharSequence& key, uint8_t value); + void handleUint16(const CharSequence& key, uint16_t value); + void handleUint32(const CharSequence& key, uint32_t value); + void handleUint64(const CharSequence& key, uint64_t value); + void handleInt8(const CharSequence& key, int8_t value); + void handleInt16(const CharSequence& key, int16_t value); + void handleInt32(const CharSequence& key, int32_t value); + void handleInt64(const CharSequence& key, int64_t value); + void handleFloat(const CharSequence& key, float value); + void handleDouble(const CharSequence& key, double value); + void handleString(const CharSequence& key, const CharSequence& value, const CharSequence& encoding); + void writeMetaData(size_t size, size_t count, const Descriptor* descriptor=0); + void writeMap8MetaData(uint8_t size, uint8_t count, const Descriptor* d=0); + void writeMap32MetaData(uint32_t size, uint32_t count, const Descriptor* d=0); + private: +}; +}} // namespace qpid::amqp + +#endif /*!QPID_AMQP_MAPENCODER_H*/ diff --git a/qpid/cpp/src/qpid/amqp/MapHandler.h b/qpid/cpp/src/qpid/amqp/MapHandler.h new file mode 100644 index 0000000000..14994ccac7 --- /dev/null +++ b/qpid/cpp/src/qpid/amqp/MapHandler.h @@ -0,0 +1,53 @@ +#ifndef QPID_AMQP_MAPHANDLER_H +#define QPID_AMQP_MAPHANDLER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/sys/IntegerTypes.h" + +namespace qpid { +namespace amqp { +struct CharSequence; +/** + * Interface for processing entries in some map-like object + */ +class MapHandler +{ + public: + virtual ~MapHandler() {} + virtual void handleVoid(const CharSequence& key) = 0; + virtual void handleBool(const CharSequence& key, bool value) = 0; + virtual void handleUint8(const CharSequence& key, uint8_t value) = 0; + virtual void handleUint16(const CharSequence& key, uint16_t value) = 0; + virtual void handleUint32(const CharSequence& key, uint32_t value) = 0; + virtual void handleUint64(const CharSequence& key, uint64_t value) = 0; + virtual void handleInt8(const CharSequence& key, int8_t value) = 0; + virtual void handleInt16(const CharSequence& key, int16_t value) = 0; + virtual void handleInt32(const CharSequence& key, int32_t value) = 0; + virtual void handleInt64(const CharSequence& key, int64_t value) = 0; + virtual void handleFloat(const CharSequence& key, float value) = 0; + virtual void handleDouble(const CharSequence& key, double value) = 0; + virtual void handleString(const CharSequence& key, const CharSequence& value, const CharSequence& encoding) = 0; + private: +}; +}} // namespace qpid::amqp + +#endif /*!QPID_AMQP_MAPHANDLER_H*/ diff --git a/qpid/cpp/src/qpid/amqp/MapReader.cpp b/qpid/cpp/src/qpid/amqp/MapReader.cpp new file mode 100644 index 0000000000..b6c31849f0 --- /dev/null +++ b/qpid/cpp/src/qpid/amqp/MapReader.cpp @@ -0,0 +1,309 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/amqp/MapReader.h" +#include "qpid/Exception.h" +#include "qpid/log/Statement.h" + +namespace qpid { +namespace amqp { + +void MapReader::onNull(const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + onNullValue(key, d); + clearKey(); + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } +} +void MapReader::onBoolean(bool v, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + onBooleanValue(key, v, d); + clearKey(); + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } +} + +void MapReader::onUByte(uint8_t v, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + onUByteValue(key, v, d); + clearKey(); + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } +} + +void MapReader::onUShort(uint16_t v, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + onUShortValue(key, v, d); + clearKey(); + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } +} + +void MapReader::onUInt(uint32_t v, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + onUIntValue(key, v, d); + clearKey(); + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } +} + +void MapReader::onULong(uint64_t v, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + onULongValue(key, v, d); + clearKey(); + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } +} + +void MapReader::onByte(int8_t v, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + onByteValue(key, v, d); + clearKey(); + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } +} + +void MapReader::onShort(int16_t v, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + onShortValue(key, v, d); + clearKey(); + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } +} + +void MapReader::onInt(int32_t v, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + onIntValue(key, v, d); + clearKey(); + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } +} + +void MapReader::onLong(int64_t v, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + onLongValue(key, v, d); + clearKey(); + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } +} + +void MapReader::onFloat(float v, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + onFloatValue(key, v, d); + clearKey(); + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } +} + +void MapReader::onDouble(double v, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + onDoubleValue(key, v, d); + clearKey(); + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } +} + +void MapReader::onUuid(const CharSequence& v, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + onUuidValue(key, v, d); + clearKey(); + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } +} + +void MapReader::onTimestamp(int64_t v, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + onTimestampValue(key, v, d); + clearKey(); + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } +} + +void MapReader::onBinary(const CharSequence& v, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + onBinaryValue(key, v, d); + clearKey(); + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } +} + +void MapReader::onString(const CharSequence& v, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + onStringValue(key, v, d); + clearKey(); + } else { + if (keyType & STRING_KEY) { + key = v; + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key, got string " << v.str())); + } + } +} + +void MapReader::onSymbol(const CharSequence& v, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + onSymbolValue(key, v, d); + clearKey(); + } else { + if (keyType & SYMBOL_KEY) { + key = v; + } else { + throw qpid::Exception(QPID_MSG("Expecting string as key, got symbol " << v.str())); + } + } +} + +bool MapReader::onStartList(uint32_t count, const CharSequence&, const CharSequence&, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + bool step = onStartListValue(key, count, d); + clearKey(); + return step; + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } + return true; +} + +bool MapReader::onStartMap(uint32_t count, const CharSequence&, const CharSequence&, const Descriptor* d) +{ + if (level++) { + if (key) { + bool step = onStartMapValue(key, count, d); + clearKey(); + return step; + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } + } + return true; +} + +bool MapReader::onStartArray(uint32_t count, const CharSequence&, const Constructor& c, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + bool step = onStartArrayValue(key, count, c, d); + clearKey(); + return step; + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } + return true; +} + +void MapReader::onEndList(uint32_t count, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + onEndListValue(key, count, d); + clearKey(); + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } +} + +void MapReader::onEndMap(uint32_t count, const Descriptor* d) +{ + if (--level) { + onEndMapValue(key, count, d); + clearKey(); + } +} + +void MapReader::onEndArray(uint32_t count, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + onEndArrayValue(key, count, d); + clearKey(); + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } +} + +MapReader::MapReader() : level(0), keyType(SYMBOL_KEY) +{ + clearKey(); +} + +void MapReader::setAllowedKeyType(int t) +{ + keyType = t; +} + +void MapReader::clearKey() +{ + key.data = 0; key.size = 0; +} + +const int MapReader::SYMBOL_KEY(1); +const int MapReader::STRING_KEY(2); +}} // namespace qpid::amqp diff --git a/qpid/cpp/src/qpid/amqp/MapReader.h b/qpid/cpp/src/qpid/amqp/MapReader.h new file mode 100644 index 0000000000..875f919d63 --- /dev/null +++ b/qpid/cpp/src/qpid/amqp/MapReader.h @@ -0,0 +1,110 @@ +#ifndef QPID_AMQP_MAPREADER_H +#define QPID_AMQP_MAPREADER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Reader.h" +#include "CharSequence.h" +#include <string> + +namespace qpid { +namespace amqp { + +/** + * Reading AMQP 1.0 encoded data which is constrained to be a symbol + * keyeed map. The keys are assumed never to be described, the values + * may be. + */ +class MapReader : public Reader +{ + public: + virtual void onNullValue(const CharSequence& /*key*/, const Descriptor*) {} + virtual void onBooleanValue(const CharSequence& /*key*/, bool, const Descriptor*) {} + virtual void onUByteValue(const CharSequence& /*key*/, uint8_t, const Descriptor*) {} + virtual void onUShortValue(const CharSequence& /*key*/, uint16_t, const Descriptor*) {} + virtual void onUIntValue(const CharSequence& /*key*/, uint32_t, const Descriptor*) {} + virtual void onULongValue(const CharSequence& /*key*/, uint64_t, const Descriptor*) {} + virtual void onByteValue(const CharSequence& /*key*/, int8_t, const Descriptor*) {} + virtual void onShortValue(const CharSequence& /*key*/, int16_t, const Descriptor*) {} + virtual void onIntValue(const CharSequence& /*key*/, int32_t, const Descriptor*) {} + virtual void onLongValue(const CharSequence& /*key*/, int64_t, const Descriptor*) {} + virtual void onFloatValue(const CharSequence& /*key*/, float, const Descriptor*) {} + virtual void onDoubleValue(const CharSequence& /*key*/, double, const Descriptor*) {} + virtual void onUuidValue(const CharSequence& /*key*/, const CharSequence&, const Descriptor*) {} + virtual void onTimestampValue(const CharSequence& /*key*/, int64_t, const Descriptor*) {} + + virtual void onBinaryValue(const CharSequence& /*key*/, const CharSequence&, const Descriptor*) {} + virtual void onStringValue(const CharSequence& /*key*/, const CharSequence&, const Descriptor*) {} + virtual void onSymbolValue(const CharSequence& /*key*/, const CharSequence&, const Descriptor*) {} + + /** + * @return true to step into elements of the compound value, false + * to skip over it + */ + virtual bool onStartListValue(const CharSequence& /*key*/, uint32_t /*count*/, const Descriptor*) { return true; } + virtual bool onStartMapValue(const CharSequence& /*key*/, uint32_t /*count*/, const Descriptor*) { return true; } + virtual bool onStartArrayValue(const CharSequence& /*key*/, uint32_t /*count*/, const Constructor&, const Descriptor*) { return true; } + virtual void onEndListValue(const CharSequence& /*key*/, uint32_t /*count*/, const Descriptor*) {} + virtual void onEndMapValue(const CharSequence& /*key*/, uint32_t /*count*/, const Descriptor*) {} + virtual void onEndArrayValue(const CharSequence& /*key*/, uint32_t /*count*/, const Descriptor*) {} + + + //this class implements the Reader interface, thus acting as a transformer into a more map oriented scheme + QPID_COMMON_EXTERN void onNull(const Descriptor*); + QPID_COMMON_EXTERN void onBoolean(bool, const Descriptor*); + QPID_COMMON_EXTERN void onUByte(uint8_t, const Descriptor*); + QPID_COMMON_EXTERN void onUShort(uint16_t, const Descriptor*); + QPID_COMMON_EXTERN void onUInt(uint32_t, const Descriptor*); + QPID_COMMON_EXTERN void onULong(uint64_t, const Descriptor*); + QPID_COMMON_EXTERN void onByte(int8_t, const Descriptor*); + QPID_COMMON_EXTERN void onShort(int16_t, const Descriptor*); + QPID_COMMON_EXTERN void onInt(int32_t, const Descriptor*); + QPID_COMMON_EXTERN void onLong(int64_t, const Descriptor*); + QPID_COMMON_EXTERN void onFloat(float, const Descriptor*); + QPID_COMMON_EXTERN void onDouble(double, const Descriptor*); + QPID_COMMON_EXTERN void onUuid(const CharSequence&, const Descriptor*); + QPID_COMMON_EXTERN void onTimestamp(int64_t, const Descriptor*); + + QPID_COMMON_EXTERN void onBinary(const CharSequence&, const Descriptor*); + QPID_COMMON_EXTERN void onString(const CharSequence&, const Descriptor*); + QPID_COMMON_EXTERN void onSymbol(const CharSequence&, const Descriptor*); + + QPID_COMMON_EXTERN bool onStartList(uint32_t /*count*/, const CharSequence&, const CharSequence&, const Descriptor*); + QPID_COMMON_EXTERN bool onStartMap(uint32_t /*count*/, const CharSequence&, const CharSequence&, const Descriptor*); + QPID_COMMON_EXTERN bool onStartArray(uint32_t /*count*/, const CharSequence&, const Constructor&, const Descriptor*); + QPID_COMMON_EXTERN void onEndList(uint32_t /*count*/, const Descriptor*); + QPID_COMMON_EXTERN void onEndMap(uint32_t /*count*/, const Descriptor*); + QPID_COMMON_EXTERN void onEndArray(uint32_t /*count*/, const Descriptor*); + + QPID_COMMON_EXTERN MapReader(); + QPID_COMMON_EXTERN static const int SYMBOL_KEY; + QPID_COMMON_EXTERN static const int STRING_KEY; + QPID_COMMON_EXTERN void setAllowedKeyType(int); + private: + CharSequence key; + size_t level; + int keyType; + + void clearKey(); +}; +}} // namespace qpid::amqp + +#endif /*!QPID_AMQP_MAPREADER_H*/ diff --git a/qpid/cpp/src/qpid/amqp/MapSizeCalculator.cpp b/qpid/cpp/src/qpid/amqp/MapSizeCalculator.cpp new file mode 100644 index 0000000000..2da152108f --- /dev/null +++ b/qpid/cpp/src/qpid/amqp/MapSizeCalculator.cpp @@ -0,0 +1,151 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "MapSizeCalculator.h" +#include "CharSequence.h" +#include "Descriptor.h" + +namespace qpid { +namespace amqp { + +MapSizeCalculator::MapSizeCalculator() : size(0), count(0) {} + +void MapSizeCalculator::handleKey(const CharSequence& key) +{ + ++count; + size += getEncodedSize(key); +} + +size_t MapSizeCalculator::getEncodedSize(const CharSequence& s) +{ + return 1/*typecode*/ + (s.size < 256 ? 1 : 4)/*size field*/ + s.size; +} + +void MapSizeCalculator::handleVoid(const CharSequence& key) +{ + handleKey(key); + size += 1;/*typecode*/; +} + +void MapSizeCalculator::handleBool(const CharSequence& key, bool /*value*/) +{ + handleKey(key); + size += 1;/*typecode*/; +} + +void MapSizeCalculator::handleUint8(const CharSequence& key, uint8_t /*value*/) +{ + handleKey(key); + size += 1/*typecode*/ + 1/*value*/; +} + +void MapSizeCalculator::handleUint16(const CharSequence& key, uint16_t /*value*/) +{ + handleKey(key); + size += 1/*typecode*/ + 2/*value*/; +} + +void MapSizeCalculator::handleUint32(const CharSequence& key, uint32_t value) +{ + handleKey(key); + size += 1;/*typecode*/; + if (value > 0) { + if (value < 256) size += 1/*UINT_SMALL*/; + else size += 4/*UINT*/; + }//else UINT_ZERO +} + +void MapSizeCalculator::handleUint64(const CharSequence& key, uint64_t value) +{ + handleKey(key); + size += 1;/*typecode*/; + if (value > 0) { + if (value < 256) size += 1/*ULONG_SMALL*/; + else size += 8/*ULONG*/; + }//else ULONG_ZERO +} + +void MapSizeCalculator::handleInt8(const CharSequence& key, int8_t /*value*/) +{ + handleKey(key); + size += 1/*typecode*/ + 1/*value*/; +} + +void MapSizeCalculator::handleInt16(const CharSequence& key, int16_t /*value*/) +{ + handleKey(key); + size += 1/*typecode*/ + 2/*value*/; +} + +void MapSizeCalculator::handleInt32(const CharSequence& key, int32_t /*value*/) +{ + handleKey(key); + size += 1/*typecode*/ + 4/*value*/; +} + +void MapSizeCalculator::handleInt64(const CharSequence& key, int64_t /*value*/) +{ + handleKey(key); + size += 1/*typecode*/ + 8/*value*/; +} + +void MapSizeCalculator::handleFloat(const CharSequence& key, float /*value*/) +{ + handleKey(key); + size += 1/*typecode*/ + 4/*value*/; +} + +void MapSizeCalculator::handleDouble(const CharSequence& key, double /*value*/) +{ + handleKey(key); + size += 1/*typecode*/ + 8/*value*/; +} + +void MapSizeCalculator::handleString(const CharSequence& key, const CharSequence& value, const CharSequence& /*encoding*/) +{ + handleKey(key); + size += getEncodedSize(value); +} + +size_t MapSizeCalculator::getSize() const +{ + return size; +} + +size_t MapSizeCalculator::getCount() const +{ + return count; +} + +size_t MapSizeCalculator::getTotalSizeRequired(const Descriptor* d) const +{ + size_t result(size); + if (d) result += d->getSize(); + result += 1/*typecode*/; + if (count * 2 > 255 || size > 255) { + result += 4/*size*/ + 4/*count*/; + } else { + result += 1/*size*/ + 1/*count*/; + } + return result; +} + + +}} // namespace qpid::amqp diff --git a/qpid/cpp/src/qpid/amqp/MapSizeCalculator.h b/qpid/cpp/src/qpid/amqp/MapSizeCalculator.h new file mode 100644 index 0000000000..35c9ad732d --- /dev/null +++ b/qpid/cpp/src/qpid/amqp/MapSizeCalculator.h @@ -0,0 +1,73 @@ +#ifndef QPID_AMQP_MAPSIZECALCULATOR_H +#define QPID_AMQP_MAPSIZECALCULATOR_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "MapHandler.h" +#include <cstring> + +namespace qpid { +namespace amqp { +struct Descriptor; + +/** + * Utility to calculate the encoded size for map data + */ +class MapSizeCalculator : public MapHandler +{ + public: + MapSizeCalculator(); + void handleVoid(const CharSequence& key); + void handleBool(const CharSequence& key, bool value); + void handleUint8(const CharSequence& key, uint8_t value); + void handleUint16(const CharSequence& key, uint16_t value); + void handleUint32(const CharSequence& key, uint32_t value); + void handleUint64(const CharSequence& key, uint64_t value); + void handleInt8(const CharSequence& key, int8_t value); + void handleInt16(const CharSequence& key, int16_t value); + void handleInt32(const CharSequence& key, int32_t value); + void handleInt64(const CharSequence& key, int64_t value); + void handleFloat(const CharSequence& key, float value); + void handleDouble(const CharSequence& key, double value); + void handleString(const CharSequence& key, const CharSequence& value, const CharSequence& encoding); + /** + * @returns the encoded size of the map entries (i.e. does not + * include the count field, typecode or any other metadata for the + * map as a whole) + */ + size_t getSize() const; + size_t getCount() const; + /** + * @returns the total encoded size for a map containing the + * handled values (i.e. including the metadata for the map as a + * whole) + */ + size_t getTotalSizeRequired(const Descriptor* d=0) const; + private: + size_t size; + size_t count; + + void handleKey(const CharSequence& key); + static size_t getEncodedSize(const CharSequence&); +}; +}} // namespace qpid::amqp + +#endif /*!QPID_AMQP_MAPSIZECALCULATOR_H*/ diff --git a/qpid/cpp/src/qpid/amqp/MessageEncoder.cpp b/qpid/cpp/src/qpid/amqp/MessageEncoder.cpp new file mode 100644 index 0000000000..71e7e75111 --- /dev/null +++ b/qpid/cpp/src/qpid/amqp/MessageEncoder.cpp @@ -0,0 +1,312 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/amqp/MessageEncoder.h" +#include "qpid/amqp/MapEncoder.h" +#include "qpid/amqp/MapSizeCalculator.h" +#include "qpid/amqp/descriptors.h" +#include "qpid/log/Statement.h" +#include <assert.h> + +namespace qpid { +namespace amqp { + +namespace { +size_t optimisable(const MessageEncoder::Header& msg) +{ + if (msg.getDeliveryCount()) return 5; + else if (msg.isFirstAcquirer()) return 4; + else if (msg.hasTtl()) return 3; + else if (msg.getPriority() != 4) return 2; + else if (msg.isDurable()) return 1; + else return 0; +} + +size_t optimisable(const MessageEncoder::Properties& msg) +{ + if (msg.hasReplyToGroupId()) return 13; + else if (msg.hasGroupSequence()) return 12; + else if (msg.hasGroupId()) return 11; + else if (msg.hasCreationTime()) return 10; + else if (msg.hasAbsoluteExpiryTime()) return 9; + else if (msg.hasContentEncoding()) return 8; + else if (msg.hasContentType()) return 7; + else if (msg.hasCorrelationId()) return 6; + else if (msg.hasReplyTo()) return 5; + else if (msg.hasSubject()) return 4; + else if (msg.hasTo()) return 3; + else if (msg.hasUserId()) return 2; + else if (msg.hasMessageId()) return 1; + else return 0; +} +size_t encodedSize(const std::string& s) +{ + size_t total = s.size(); + if (total > 255) total += 4; + else total += 1; + return total; +} +const std::string BINARY("binary"); +} + +void MessageEncoder::writeHeader(const Header& msg) +{ + size_t fields(optimise ? optimisable(msg) : 5); + if (fields) { + void* token = startList8(&qpid::amqp::message::HEADER); + writeBoolean(msg.isDurable()); + if (fields > 1) writeUByte(msg.getPriority()); + + if (msg.getTtl()) writeUInt(msg.getTtl()); + else if (fields > 2) writeNull(); + + if (msg.isFirstAcquirer()) writeBoolean(true); + else if (fields > 3) writeNull(); + + if (msg.getDeliveryCount()) writeUInt(msg.getDeliveryCount()); + else if (fields > 4) writeNull(); + endList8(fields, token); + } +} + + +void MessageEncoder::writeProperties(const Properties& msg) +{ + size_t fields(optimise ? optimisable(msg) : 13); + if (fields) { + void* token = startList32(&qpid::amqp::message::PROPERTIES); + if (msg.hasMessageId()) writeString(msg.getMessageId()); + else writeNull(); + + if (msg.hasUserId()) writeBinary(msg.getUserId()); + else if (fields > 1) writeNull(); + + if (msg.hasTo()) writeString(msg.getTo()); + else if (fields > 2) writeNull(); + + if (msg.hasSubject()) writeString(msg.getSubject()); + else if (fields > 3) writeNull(); + + if (msg.hasReplyTo()) writeString(msg.getReplyTo()); + else if (fields > 4) writeNull(); + + if (msg.hasCorrelationId()) writeString(msg.getCorrelationId()); + else if (fields > 5) writeNull(); + + if (msg.hasContentType()) writeSymbol(msg.getContentType()); + else if (fields > 6) writeNull(); + + if (msg.hasContentEncoding()) writeSymbol(msg.getContentEncoding()); + else if (fields > 7) writeNull(); + + if (msg.hasAbsoluteExpiryTime()) writeTimestamp(msg.getAbsoluteExpiryTime()); + else if (fields > 8) writeNull(); + + if (msg.hasCreationTime()) writeTimestamp(msg.getCreationTime()); + else if (fields > 9) writeNull(); + + if (msg.hasGroupId()) writeString(msg.getGroupId()); + else if (fields > 10) writeNull(); + + if (msg.hasGroupSequence()) writeUInt(msg.getGroupSequence()); + else if (fields > 11) writeNull(); + + if (msg.hasReplyToGroupId()) writeString(msg.getReplyToGroupId()); + else if (fields > 12) writeNull(); + + endList32(fields, token); + } +} + +void MessageEncoder::writeApplicationProperties(const ApplicationProperties& properties) +{ + MapSizeCalculator calc; + properties.handle(calc); + size_t required = calc.getTotalSizeRequired(&qpid::amqp::message::APPLICATION_PROPERTIES); + assert(required <= getSize() - getPosition()); + MapEncoder encoder(skip(required), required); + encoder.writeMetaData(calc.getSize(), calc.getCount()*2, &qpid::amqp::message::APPLICATION_PROPERTIES); + properties.handle(encoder); +} + +void MessageEncoder::writeApplicationProperties(const qpid::types::Variant::Map& properties) +{ + writeApplicationProperties(properties, !optimise || properties.size()*2 > 255 || getEncodedSizeForElements(properties) > 255); +} + +void MessageEncoder::writeApplicationProperties(const qpid::types::Variant::Map& properties, bool large) +{ + writeMap(properties, &qpid::amqp::message::APPLICATION_PROPERTIES, large); +} + +size_t MessageEncoder::getEncodedSize(const Header& h, const Properties& p, const qpid::types::Variant::Map& ap, const std::string& d) +{ + return getEncodedSize(h) + getEncodedSize(p, ap, d); +} + +size_t MessageEncoder::getEncodedSize(const Header& h, const Properties& p, const ApplicationProperties& ap, const std::string& d) +{ + return getEncodedSize(h) + getEncodedSize(p) + getEncodedSize(ap) + getEncodedSizeForContent(d); +} + +size_t MessageEncoder::getEncodedSize(const Properties& p, const qpid::types::Variant::Map& ap, const std::string& d) +{ + size_t total(getEncodedSize(p)); + //application-properties: + total += 3/*descriptor*/ + getEncodedSize(ap, true); + //body: + if (d.size()) total += 3/*descriptor*/ + 1/*code*/ + encodedSize(d); + + return total; +} + +size_t MessageEncoder::getEncodedSizeForContent(const std::string& d) +{ + if (d.size()) return 3/*descriptor*/ + 1/*code*/ + encodedSize(d); + else return 0; +} + +size_t MessageEncoder::getEncodedSize(const Header& h) +{ + //NOTE: this does not take optional optimisation into account, + //i.e. it is a 'worst case' estimate for required buffer space + size_t total(3/*descriptor*/ + 1/*code*/ + 1/*size*/ + 1/*count*/ + 5/*codes for each field*/); + if (h.getPriority() != 4) total += 1; + if (h.getDeliveryCount()) total += 4; + if (h.hasTtl()) total += 4; + return total; +} + +size_t MessageEncoder::getEncodedSize(const Properties& p) +{ + //NOTE: this does not take optional optimisation into account, + //i.e. it is a 'worst case' estimate for required buffer space + size_t total(3/*descriptor*/ + 1/*code*/ + 4/*size*/ + 4/*count*/ + 13/*codes for each field*/); + if (p.hasMessageId()) total += encodedSize(p.getMessageId()); + if (p.hasUserId()) total += encodedSize(p.getUserId()); + if (p.hasTo()) total += encodedSize(p.getTo()); + if (p.hasSubject()) total += encodedSize(p.getSubject()); + if (p.hasReplyTo()) total += encodedSize(p.getReplyTo()); + if (p.hasCorrelationId()) total += encodedSize(p.getCorrelationId()); + if (p.hasContentType()) total += encodedSize(p.getContentType()); + if (p.hasContentEncoding()) total += encodedSize(p.getContentEncoding()); + if (p.hasAbsoluteExpiryTime()) total += 8; + if (p.hasCreationTime()) total += 8; + if (p.hasGroupId()) total += encodedSize(p.getGroupId()); + if (p.hasGroupSequence()) total += 4; + if (p.hasReplyToGroupId()) total += encodedSize(p.getReplyToGroupId()); + return total; +} + +size_t MessageEncoder::getEncodedSize(const ApplicationProperties& p) +{ + MapSizeCalculator calc; + p.handle(calc); + return calc.getTotalSizeRequired(&qpid::amqp::message::APPLICATION_PROPERTIES); +} + +size_t MessageEncoder::getEncodedSizeForElements(const qpid::types::Variant::Map& map) +{ + size_t total = 0; + for (qpid::types::Variant::Map::const_iterator i = map.begin(); i != map.end(); ++i) { + total += 1/*code*/ + encodedSize(i->first) + getEncodedSizeForValue(i->second); + } + return total; +} + +size_t MessageEncoder::getEncodedSizeForValue(const qpid::types::Variant& value) +{ + size_t total = 0; + switch (value.getType()) { + case qpid::types::VAR_MAP: + total += getEncodedSize(value.asMap(), true); + break; + case qpid::types::VAR_LIST: + total += getEncodedSize(value.asList(), true); + break; + + case qpid::types::VAR_VOID: + case qpid::types::VAR_BOOL: + total += 1; + break; + + case qpid::types::VAR_UINT8: + case qpid::types::VAR_INT8: + total += 2; + break; + + case qpid::types::VAR_UINT16: + case qpid::types::VAR_INT16: + total += 3; + break; + + case qpid::types::VAR_UINT32: + case qpid::types::VAR_INT32: + case qpid::types::VAR_FLOAT: + total += 5; + break; + + case qpid::types::VAR_UINT64: + case qpid::types::VAR_INT64: + case qpid::types::VAR_DOUBLE: + total += 9; + break; + + case qpid::types::VAR_UUID: + total += 17; + break; + + case qpid::types::VAR_STRING: + total += 1/*code*/ + encodedSize(value.getString()); + break; + } + return total; +} + + +size_t MessageEncoder::getEncodedSize(const qpid::types::Variant::Map& map, bool alwaysUseLargeMap) +{ + size_t total = getEncodedSizeForElements(map); + + //its not just the count that determines whether we can use a small map, but the aggregate size: + if (alwaysUseLargeMap || map.size()*2 > 255 || total > 255) total += 4/*size*/ + 4/*count*/; + else total += 1/*size*/ + 1/*count*/; + + total += 1 /*code for map itself*/; + + return total; +} + +size_t MessageEncoder::getEncodedSize(const qpid::types::Variant::List& list, bool alwaysUseLargeList) +{ + size_t total(0); + for (qpid::types::Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) { + total += getEncodedSizeForValue(*i); + } + + //its not just the count that determines whether we can use a small list, but the aggregate size: + if (alwaysUseLargeList || list.size()*2 > 255 || total > 255) total += 4/*size*/ + 4/*count*/; + else total += 1/*size*/ + 1/*count*/; + + total += 1 /*code for list itself*/; + + return total; +} +}} // namespace qpid::amqp diff --git a/qpid/cpp/src/qpid/amqp/MessageEncoder.h b/qpid/cpp/src/qpid/amqp/MessageEncoder.h new file mode 100644 index 0000000000..05e1714004 --- /dev/null +++ b/qpid/cpp/src/qpid/amqp/MessageEncoder.h @@ -0,0 +1,117 @@ +#ifndef QPID_AMQP_MESSAGEENCODER_H +#define QPID_AMQP_MESSAGEENCODER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/amqp/Encoder.h" +#include "qpid/types/Variant.h" + +namespace qpid { +namespace amqp { +class MapHandler; +/** + * + */ +class MessageEncoder : public Encoder +{ + public: + class Header + { + public: + virtual ~Header() {} + virtual bool isDurable() const = 0; + virtual uint8_t getPriority() const = 0; + QPID_COMMON_EXTERN virtual bool hasTtl() const = 0; + virtual uint32_t getTtl() const = 0; + virtual bool isFirstAcquirer() const = 0; + virtual uint32_t getDeliveryCount() const = 0; + }; + + class Properties + { + public: + virtual ~Properties() {} + virtual bool hasMessageId() const = 0; + virtual std::string getMessageId() const = 0; + virtual bool hasUserId() const = 0; + virtual std::string getUserId() const = 0; + virtual bool hasTo() const = 0; + virtual std::string getTo() const = 0; + virtual bool hasSubject() const = 0; + virtual std::string getSubject() const = 0; + virtual bool hasReplyTo() const = 0; + virtual std::string getReplyTo() const = 0; + virtual bool hasCorrelationId() const = 0; + virtual std::string getCorrelationId() const = 0; + virtual bool hasContentType() const = 0; + virtual std::string getContentType() const = 0; + virtual bool hasContentEncoding() const = 0; + virtual std::string getContentEncoding() const = 0; + virtual bool hasAbsoluteExpiryTime() const = 0; + virtual int64_t getAbsoluteExpiryTime() const = 0; + virtual bool hasCreationTime() const = 0; + virtual int64_t getCreationTime() const = 0; + virtual bool hasGroupId() const = 0; + virtual std::string getGroupId() const = 0; + virtual bool hasGroupSequence() const = 0; + virtual uint32_t getGroupSequence() const = 0; + virtual bool hasReplyToGroupId() const = 0; + virtual std::string getReplyToGroupId() const = 0; + }; + + class ApplicationProperties + { + public: + virtual ~ApplicationProperties() {} + virtual void handle(MapHandler&) const = 0; + }; + + QPID_COMMON_EXTERN MessageEncoder(char* d, size_t s) : Encoder(d, s), optimise(true) {} + QPID_COMMON_EXTERN void writeHeader(const Header&); + QPID_COMMON_EXTERN void writeProperties(const Properties&); + QPID_COMMON_EXTERN void writeApplicationProperties(const ApplicationProperties&); + QPID_COMMON_EXTERN void writeApplicationProperties(const qpid::types::Variant::Map& properties); + QPID_COMMON_EXTERN void writeApplicationProperties(const qpid::types::Variant::Map& properties, bool useLargeMap); + + QPID_COMMON_EXTERN static size_t getEncodedSize(const Header&); + QPID_COMMON_EXTERN static size_t getEncodedSize(const Properties&); + QPID_COMMON_EXTERN static size_t getEncodedSize(const ApplicationProperties&); + + QPID_COMMON_EXTERN static size_t getEncodedSize(const qpid::types::Variant::List&, bool useLargeList); + QPID_COMMON_EXTERN static size_t getEncodedSize(const qpid::types::Variant::Map&, bool useLargeMap); + + QPID_COMMON_EXTERN static size_t getEncodedSizeForValue(const qpid::types::Variant& value); + QPID_COMMON_EXTERN static size_t getEncodedSizeForContent(const std::string&); + + //used in translating 0-10 content to 1.0, to determine buffer space needed + QPID_COMMON_EXTERN static size_t getEncodedSize(const Properties&, const qpid::types::Variant::Map&, const std::string&); + + private: + bool optimise; + + static size_t getEncodedSize(const Header&, const Properties&, const ApplicationProperties&, const std::string&); + static size_t getEncodedSize(const Header&, const Properties&, const qpid::types::Variant::Map&, const std::string&); + + static size_t getEncodedSizeForElements(const qpid::types::Variant::Map&); +}; +}} // namespace qpid::amqp + +#endif /*!QPID_AMQP_MESSAGEENCODER_H*/ diff --git a/qpid/cpp/src/qpid/amqp/MessageId.cpp b/qpid/cpp/src/qpid/amqp/MessageId.cpp new file mode 100644 index 0000000000..69ec4ce7f5 --- /dev/null +++ b/qpid/cpp/src/qpid/amqp/MessageId.cpp @@ -0,0 +1,82 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/amqp/MessageId.h" +#include <assert.h> +#include <boost/lexical_cast.hpp> + +namespace qpid { +namespace amqp { + +MessageId::MessageId() : type(NONE) +{ +} +void MessageId::assign(std::string& s) const +{ + switch (type) { + case NONE: + s = std::string(); + break; + case BYTES: + if (value.bytes) s.assign(value.bytes.data, value.bytes.size); + break; + case UUID: + s = qpid::types::Uuid(value.bytes).str(); + break; + case ULONG: + s = boost::lexical_cast<std::string>(value.ulong); + break; + } +} + +MessageId::operator bool() const +{ + return type!=NONE; +} + +std::string MessageId::str() const +{ + std::string s; + assign(s); + return s; +} + +void MessageId::set(qpid::amqp::CharSequence bytes, qpid::types::VariantType t) +{ + switch (t) { + case qpid::types::VAR_STRING: + type = BYTES; + break; + case qpid::types::VAR_UUID: + type = UUID; + assert(bytes.size == 16); + break; + default: + assert(false); + } + value.bytes = bytes; +} +void MessageId::set(uint64_t ulong) +{ + type = ULONG; + value.ulong = ulong; +} + +}} // namespace qpid::amqp diff --git a/qpid/cpp/src/qpid/amqp/MessageId.h b/qpid/cpp/src/qpid/amqp/MessageId.h new file mode 100644 index 0000000000..4505469148 --- /dev/null +++ b/qpid/cpp/src/qpid/amqp/MessageId.h @@ -0,0 +1,57 @@ +#ifndef QPID_AMQP_MESSAGEID_H +#define QPID_AMQP_MESSAGEID_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/amqp/CharSequence.h" +#include "qpid/types/Variant.h" +#include "qpid/CommonImportExport.h" + +namespace qpid { +namespace amqp { + +struct MessageId +{ + union + { + qpid::amqp::CharSequence bytes; + uint64_t ulong; + } value; + enum + { + NONE, + BYTES, + UUID, + ULONG + } type; + + QPID_COMMON_EXTERN MessageId(); + QPID_COMMON_EXTERN operator bool() const; + QPID_COMMON_EXTERN std::string str() const; + QPID_COMMON_EXTERN void assign(std::string&) const; + QPID_COMMON_EXTERN void set(qpid::amqp::CharSequence bytes, qpid::types::VariantType t); + QPID_COMMON_EXTERN void set(uint64_t ulong); + +}; + +}} // namespace qpid::amqp + +#endif /*!QPID_AMQP_MESSAGEID_H*/ diff --git a/qpid/cpp/src/qpid/amqp/MessageReader.cpp b/qpid/cpp/src/qpid/amqp/MessageReader.cpp new file mode 100644 index 0000000000..ab90472067 --- /dev/null +++ b/qpid/cpp/src/qpid/amqp/MessageReader.cpp @@ -0,0 +1,685 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/amqp/MessageReader.h" +#include "qpid/amqp/Descriptor.h" +#include "qpid/amqp/descriptors.h" +#include "qpid/amqp/typecodes.h" +#include "qpid/types/Uuid.h" +#include "qpid/types/Variant.h" +#include "qpid/log/Statement.h" + +using namespace qpid::amqp::message; + +namespace qpid { +namespace amqp { +namespace { + +//header fields: +const size_t DURABLE(0); +const size_t PRIORITY(1); +const size_t TTL(2); +const size_t FIRST_ACQUIRER(3); +const size_t DELIVERY_COUNT(4); + +//properties fields: +const size_t MESSAGE_ID(0); +const size_t USER_ID(1); +const size_t TO(2); +const size_t SUBJECT(3); +const size_t REPLY_TO(4); +const size_t CORRELATION_ID(5); +const size_t CONTENT_TYPE(6); +const size_t CONTENT_ENCODING(7); +const size_t ABSOLUTE_EXPIRY_TIME(8); +const size_t CREATION_TIME(9); +const size_t GROUP_ID(10); +const size_t GROUP_SEQUENCE(11); +const size_t REPLY_TO_GROUP_ID(12); + + +const Descriptor* nested(const Descriptor* d) +{ + if (d && d->nested) return d->nested.get(); + else return 0; +} +} + +MessageReader::HeaderReader::HeaderReader(MessageReader& p) : parent(p), index(0) {} +void MessageReader::HeaderReader::onBoolean(bool v, const Descriptor*) // durable, first-acquirer +{ + if (index == DURABLE) { + parent.onDurable(v); + } else if (index == FIRST_ACQUIRER) { + parent.onFirstAcquirer(v); + } else { + QPID_LOG(warning, "Unexpected message format, got boolean at index " << index << " of headers"); + } + ++index; +} +void MessageReader::HeaderReader::onUByte(uint8_t v, const Descriptor*) // priority +{ + if (index == PRIORITY) { + parent.onPriority(v); + } else { + QPID_LOG(warning, "Unexpected message format, got ubyte at index " << index << " of headers"); + } + ++index; +} +void MessageReader::HeaderReader::onUInt(uint32_t v, const Descriptor*) // ttl, delivery-count +{ + if (index == TTL) { + parent.onTtl(v); + } else if (index == DELIVERY_COUNT) { + parent.onDeliveryCount(v); + } else { + QPID_LOG(warning, "Unexpected message format, got uint at index " << index << " of headers"); + } + ++index; +} +void MessageReader::HeaderReader::onNull(const Descriptor*) +{ + ++index; +} + +MessageReader::PropertiesReader::PropertiesReader(MessageReader& p) : parent(p), index(0) {} +void MessageReader::PropertiesReader::onUuid(const CharSequence& v, const Descriptor*) // message-id, correlation-id +{ + if (index == MESSAGE_ID) { + parent.onMessageId(v, qpid::types::VAR_UUID); + } else if (index == CORRELATION_ID) { + parent.onCorrelationId(v, qpid::types::VAR_UUID); + } else { + QPID_LOG(warning, "Unexpected message format, got uuid at index " << index << " of properties"); + } + ++index; +} +void MessageReader::PropertiesReader::onULong(uint64_t v, const Descriptor*) // message-id, correlation-id +{ + if (index == MESSAGE_ID) { + parent.onMessageId(v); + } else if (index == CORRELATION_ID) { + parent.onCorrelationId(v); + } else { + QPID_LOG(warning, "Unexpected message format, got long at index " << index << " of properties"); + } + ++index; +} +void MessageReader::PropertiesReader::onBinary(const CharSequence& v, const Descriptor*) // message-id, correlation-id, user-id +{ + if (index == MESSAGE_ID) { + parent.onMessageId(v, qpid::types::VAR_STRING); + } else if (index == CORRELATION_ID) { + parent.onCorrelationId(v, qpid::types::VAR_STRING); + } else if (index == USER_ID) { + parent.onUserId(v); + } else { + QPID_LOG(warning, "Unexpected message format, got binary at index " << index << " of properties"); + } + ++index; +} +void MessageReader::PropertiesReader::onString(const CharSequence& v, const Descriptor*) // message-id, correlation-id, group-id, reply-to-group-id, subject, to, reply-to +{ + if (index == MESSAGE_ID) { + parent.onMessageId(v, qpid::types::VAR_STRING); + } else if (index == CORRELATION_ID) { + parent.onCorrelationId(v, qpid::types::VAR_STRING); + } else if (index == GROUP_ID) { + parent.onGroupId(v); + } else if (index == REPLY_TO_GROUP_ID) { + parent.onReplyToGroupId(v); + } else if (index == SUBJECT) { + parent.onSubject(v); + } else if (index == TO) { + parent.onTo(v); + } else if (index == REPLY_TO) { + parent.onReplyTo(v); + } else { + QPID_LOG(warning, "Unexpected message format, got string at index " << index << " of properties"); + } + ++index; +} +void MessageReader::PropertiesReader::onSymbol(const CharSequence& v, const Descriptor*) // content-type, content-encoding +{ + if (index == CONTENT_TYPE) { + parent.onContentType(v); + } else if (index == CONTENT_ENCODING) { + parent.onContentEncoding(v); + } else { + QPID_LOG(warning, "Unexpected message format, got symbol at index " << index << " of properties"); + } + ++index; +} +void MessageReader::PropertiesReader::onTimestamp(int64_t v, const Descriptor*) // absolute-expiry-time, creation-time +{ + if (index == ABSOLUTE_EXPIRY_TIME) { + parent.onAbsoluteExpiryTime(v); + } else if (index == CREATION_TIME) { + parent.onCreationTime(v); + } else { + QPID_LOG(warning, "Unexpected message format, got timestamp at index " << index << " of properties"); + } + ++index; +} +void MessageReader::PropertiesReader::onUInt(uint32_t v, const Descriptor*) // group-sequence +{ + if (index == GROUP_SEQUENCE) { + parent.onGroupSequence(v); + } else { + QPID_LOG(warning, "Unexpected message format, got uint at index " << index << " of properties"); + } + ++index; +} +void MessageReader::PropertiesReader::onNull(const Descriptor*) +{ + ++index; +} +void MessageReader::PropertiesReader::onBoolean(bool, const Descriptor*) +{ + QPID_LOG(info, "skipping message property at index " << index << " unexpected type (boolean)"); + ++index; +} +void MessageReader::PropertiesReader::onUByte(uint8_t, const Descriptor*) +{ + QPID_LOG(info, "skipping message property at index " << index << " unexpected type (ubyte)"); + ++index; +} +void MessageReader::PropertiesReader::onUShort(uint16_t, const Descriptor*) +{ + QPID_LOG(info, "skipping message property at index " << index << " unexpected type (ushort)"); + ++index; +} +void MessageReader::PropertiesReader::onByte(int8_t, const Descriptor*) +{ + QPID_LOG(info, "skipping message property at index " << index << " unexpected type (byte)"); + ++index; +} +void MessageReader::PropertiesReader::onShort(int16_t, const Descriptor*) +{ + QPID_LOG(info, "skipping message property at index " << index << " unexpected type (short)"); + ++index; +} +void MessageReader::PropertiesReader::onInt(int32_t, const Descriptor*) +{ + QPID_LOG(info, "skipping message property at index " << index << " unexpected type (int)"); + ++index; +} +void MessageReader::PropertiesReader::onLong(int64_t, const Descriptor*) +{ + QPID_LOG(info, "skipping message property at index " << index << " unexpected type (long)"); + ++index; +} +void MessageReader::PropertiesReader::onFloat(float, const Descriptor*) +{ + QPID_LOG(info, "skipping message property at index " << index << " unexpected type (float)"); + ++index; +} +void MessageReader::PropertiesReader::onDouble(double, const Descriptor*) +{ + QPID_LOG(info, "skipping message property at index " << index << " unexpected type (double)"); + ++index; +} +bool MessageReader::PropertiesReader::onStartList(uint32_t /*count*/, const CharSequence& /*elements*/, const CharSequence& /*complete*/, const Descriptor*) +{ + QPID_LOG(info, "skipping message property at index " << index << " unexpected type (list)"); + ++index; + return false; +} +bool MessageReader::PropertiesReader::onStartMap(uint32_t /*count*/, const CharSequence& /*elements*/, const CharSequence& /*complete*/, const Descriptor*) +{ + QPID_LOG(info, "skipping message property at index " << index << " unexpected type (map)"); + ++index; + return false; +} +bool MessageReader::PropertiesReader::onStartArray(uint32_t /*count*/, const CharSequence&, const Constructor&, const Descriptor*) +{ + QPID_LOG(info, "skipping message property at index " << index << " unexpected type (array)"); + ++index; + return false; +} + + +//header, properties, amqp-sequence, amqp-value +bool MessageReader::onStartList(uint32_t count, const CharSequence& elements, const CharSequence& raw, const Descriptor* descriptor) +{ + if (delegate) { + return delegate->onStartList(count, elements, raw, descriptor); + } else { + if (!descriptor) { + QPID_LOG(warning, "Expected described type but got no descriptor for list."); + return false; + } else if (descriptor->match(HEADER_SYMBOL, HEADER_CODE)) { + delegate = &headerReader; + return true; + } else if (descriptor->match(PROPERTIES_SYMBOL, PROPERTIES_CODE)) { + delegate = &propertiesReader; + return true; + } else if (descriptor->match(AMQP_SEQUENCE_SYMBOL, AMQP_SEQUENCE_CODE)) { + onAmqpSequence(raw); + return false; + } else if (descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { + onAmqpValue(elements, qpid::amqp::typecodes::LIST_NAME, nested(descriptor)); + return false; + } else { + QPID_LOG(warning, "Unexpected described list: " << *descriptor); + return false; + } + } +} +void MessageReader::onEndList(uint32_t count, const Descriptor* descriptor) +{ + if (delegate) { + if (descriptor && (descriptor->match(HEADER_SYMBOL, HEADER_CODE) || descriptor->match(PROPERTIES_SYMBOL, PROPERTIES_CODE))) { + delegate = 0; + } else { + delegate->onEndList(count, descriptor); + } + } +} + +//delivery-annotations, message-annotations, application-properties, amqp-value +bool MessageReader::onStartMap(uint32_t count, const CharSequence& elements, const CharSequence& raw, const Descriptor* descriptor) +{ + if (delegate) { + return delegate->onStartMap(count, elements, raw, descriptor); + } else { + if (!descriptor) { + QPID_LOG(warning, "Expected described type but got no descriptor for map."); + return false; + } else if (descriptor->match(DELIVERY_ANNOTATIONS_SYMBOL, DELIVERY_ANNOTATIONS_CODE)) { + onDeliveryAnnotations(elements, raw); + return false; + } else if (descriptor->match(MESSAGE_ANNOTATIONS_SYMBOL, MESSAGE_ANNOTATIONS_CODE)) { + onMessageAnnotations(elements, raw); + return false; + } else if (descriptor->match(FOOTER_SYMBOL, FOOTER_CODE)) { + onFooter(elements, raw); + return false; + } else if (descriptor->match(APPLICATION_PROPERTIES_SYMBOL, APPLICATION_PROPERTIES_CODE)) { + onApplicationProperties(elements, raw); + return false; + } else if (descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { + onAmqpValue(elements, qpid::amqp::typecodes::MAP_NAME, nested(descriptor)); + return false; + } else { + QPID_LOG(warning, "Unexpected described map: " << *descriptor); + return false; + } + } +} + +void MessageReader::onEndMap(uint32_t count, const Descriptor* descriptor) +{ + if (delegate) { + delegate->onEndMap(count, descriptor); + } +} + +//data, amqp-value +void MessageReader::onBinary(const CharSequence& bytes, const Descriptor* descriptor) +{ + if (delegate) { + delegate->onBinary(bytes, descriptor); + } else { + if (!descriptor) { + QPID_LOG(warning, "Expected described type but got binary value with no descriptor."); + } else if (descriptor->match(DATA_SYMBOL, DATA_CODE)) { + onData(bytes); + } else if (descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { + onAmqpValue(bytes, qpid::amqp::typecodes::BINARY_NAME, nested(descriptor)); + } else { + QPID_LOG(warning, "Unexpected binary value with descriptor: " << *descriptor); + } + } + +} + +//amqp-value +void MessageReader::onNull(const Descriptor* descriptor) +{ + if (delegate) { + delegate->onNull(descriptor); + } else { + if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { + qpid::types::Variant v; + onAmqpValue(v, nested(descriptor)); + } else { + if (!descriptor) { + QPID_LOG(warning, "Expected described type but got null value with no descriptor."); + } else { + QPID_LOG(warning, "Unexpected null value with descriptor: " << *descriptor); + } + } + } +} +void MessageReader::onString(const CharSequence& v, const Descriptor* descriptor) +{ + if (delegate) { + delegate->onString(v, descriptor); + } else { + if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { + onAmqpValue(v, qpid::amqp::typecodes::STRING_NAME, nested(descriptor)); + } else { + if (!descriptor) { + QPID_LOG(warning, "Expected described type but got string value with no descriptor."); + } else { + QPID_LOG(warning, "Unexpected string value with descriptor: " << *descriptor); + } + } + } +} +void MessageReader::onSymbol(const CharSequence& v, const Descriptor* descriptor) +{ + if (delegate) { + delegate->onSymbol(v, descriptor); + } else { + if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { + onAmqpValue(v, qpid::amqp::typecodes::SYMBOL_NAME, nested(descriptor)); + } else { + if (!descriptor) { + QPID_LOG(warning, "Expected described type but got symbol value with no descriptor."); + } else { + QPID_LOG(warning, "Unexpected symbol value with descriptor: " << *descriptor); + } + } + } +} + +void MessageReader::onBoolean(bool v, const Descriptor* descriptor) +{ + if (delegate) { + delegate->onBoolean(v, descriptor); + } else { + if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { + qpid::types::Variant body = v; + onAmqpValue(body, nested(descriptor)); + } else { + if (!descriptor) { + QPID_LOG(warning, "Expected described type but got boolean value with no descriptor."); + } else { + QPID_LOG(warning, "Unexpected boolean value with descriptor: " << *descriptor); + } + } + } +} + +void MessageReader::onUByte(uint8_t v, const Descriptor* descriptor) +{ + if (delegate) { + delegate->onUByte(v, descriptor); + } else { + if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { + qpid::types::Variant body = v; + onAmqpValue(body, nested(descriptor)); + } else { + if (!descriptor) { + QPID_LOG(warning, "Expected described type but got ubyte value with no descriptor."); + } else { + QPID_LOG(warning, "Unexpected ubyte value with descriptor: " << *descriptor); + } + } + } +} + +void MessageReader::onUShort(uint16_t v, const Descriptor* descriptor) +{ + if (delegate) { + delegate->onUShort(v, descriptor); + } else { + if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { + qpid::types::Variant body = v; + onAmqpValue(body, nested(descriptor)); + } else { + if (!descriptor) { + QPID_LOG(warning, "Expected described type but got ushort value with no descriptor."); + } else { + QPID_LOG(warning, "Unexpected ushort value with descriptor: " << *descriptor); + } + } + } +} + +void MessageReader::onUInt(uint32_t v, const Descriptor* descriptor) +{ + if (delegate) { + delegate->onUInt(v, descriptor); + } else { + if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { + qpid::types::Variant body = v; + onAmqpValue(body, nested(descriptor)); + } else { + if (!descriptor) { + QPID_LOG(warning, "Expected described type but got uint value with no descriptor."); + } else { + QPID_LOG(warning, "Unexpected uint value with descriptor: " << *descriptor); + } + } + } +} + +void MessageReader::onULong(uint64_t v, const Descriptor* descriptor) +{ + if (delegate) { + delegate->onULong(v, descriptor); + } else { + if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { + qpid::types::Variant body = v; + onAmqpValue(body, nested(descriptor)); + } else { + if (!descriptor) { + QPID_LOG(warning, "Expected described type but got ulong value with no descriptor."); + } else { + QPID_LOG(warning, "Unexpected ulong value with descriptor: " << *descriptor); + } + } + } +} + +void MessageReader::onByte(int8_t v, const Descriptor* descriptor) +{ + if (delegate) { + delegate->onByte(v, descriptor); + } else { + if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { + qpid::types::Variant body = v; + onAmqpValue(body, nested(descriptor)); + } else { + if (!descriptor) { + QPID_LOG(warning, "Expected described type but got byte value with no descriptor."); + } else { + QPID_LOG(warning, "Unexpected byte value with descriptor: " << *descriptor); + } + } + } +} + +void MessageReader::onShort(int16_t v, const Descriptor* descriptor) +{ + if (delegate) { + delegate->onShort(v, descriptor); + } else { + if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { + qpid::types::Variant body = v; + onAmqpValue(body, nested(descriptor)); + } else { + if (!descriptor) { + QPID_LOG(warning, "Expected described type but got short value with no descriptor."); + } else { + QPID_LOG(warning, "Unexpected short value with descriptor: " << *descriptor); + } + } + } +} + +void MessageReader::onInt(int32_t v, const Descriptor* descriptor) +{ + if (delegate) { + delegate->onInt(v, descriptor); + } else { + if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { + qpid::types::Variant body = v; + onAmqpValue(body, nested(descriptor)); + } else { + if (!descriptor) { + QPID_LOG(warning, "Expected described type but got int value with no descriptor."); + } else { + QPID_LOG(warning, "Unexpected int value with descriptor: " << *descriptor); + } + } + } +} + +void MessageReader::onLong(int64_t v, const Descriptor* descriptor) +{ + if (delegate) { + delegate->onLong(v, descriptor); + } else { + if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { + qpid::types::Variant body = v; + onAmqpValue(body, nested(descriptor)); + } else { + if (!descriptor) { + QPID_LOG(warning, "Expected described type but got long value with no descriptor."); + } else { + QPID_LOG(warning, "Unexpected long value with descriptor: " << *descriptor); + } + } + } +} + +void MessageReader::onFloat(float v, const Descriptor* descriptor) +{ + if (delegate) { + delegate->onFloat(v, descriptor); + } else { + if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { + qpid::types::Variant body = v; + onAmqpValue(body, nested(descriptor)); + } else { + if (!descriptor) { + QPID_LOG(warning, "Expected described type but got float value with no descriptor."); + } else { + QPID_LOG(warning, "Unexpected float value with descriptor: " << *descriptor); + } + } + } +} + +void MessageReader::onDouble(double v, const Descriptor* descriptor) +{ + if (delegate) { + delegate->onDouble(v, descriptor); + } else { + if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { + qpid::types::Variant body = v; + onAmqpValue(body, nested(descriptor)); + } else { + if (!descriptor) { + QPID_LOG(warning, "Expected described type but got double value with no descriptor."); + } else { + QPID_LOG(warning, "Unexpected double value with descriptor: " << *descriptor); + } + } + } +} + +void MessageReader::onUuid(const CharSequence& v, const Descriptor* descriptor) +{ + if (delegate) { + delegate->onUuid(v, descriptor); + } else { + if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { + onAmqpValue(v, qpid::amqp::typecodes::UUID_NAME, nested(descriptor)); + } else { + if (!descriptor) { + QPID_LOG(warning, "Expected described type but got uuid value with no descriptor."); + } else { + QPID_LOG(warning, "Unexpected uuid value with descriptor: " << *descriptor); + } + } + } +} + +void MessageReader::onTimestamp(int64_t v, const Descriptor* descriptor) +{ + if (delegate) { + delegate->onTimestamp(v, descriptor); + } else { + if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { + qpid::types::Variant body = v; + onAmqpValue(body, nested(descriptor)); + } else { + if (!descriptor) { + QPID_LOG(warning, "Expected described type but got timestamp value with no descriptor."); + } else { + QPID_LOG(warning, "Unexpected timestamp value with descriptor: " << *descriptor); + } + } + } +} + +bool MessageReader::onStartArray(uint32_t count, const CharSequence& raw, const Constructor& constructor, const Descriptor* descriptor) +{ + if (delegate) { + return delegate->onStartArray(count, raw, constructor, descriptor); + } else { + if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { + //TODO: might be better to decode this here + onAmqpValue(raw, qpid::amqp::typecodes::ARRAY_NAME, nested(descriptor)); + } else { + if (!descriptor) { + QPID_LOG(warning, "Expected described type but got array with no descriptor."); + } else { + QPID_LOG(warning, "Unexpected array with descriptor: " << *descriptor); + } + } + return false; + } +} + +void MessageReader::onEndArray(uint32_t v, const Descriptor* descriptor) +{ + if (delegate) { + delegate->onEndArray(v, descriptor); + } +} + +MessageReader::MessageReader() : headerReader(*this), propertiesReader(*this), delegate(0) +{ + bare.init(); +} + +void MessageReader::onDescriptor(const Descriptor& descriptor, const char* position) +{ + if (bare.data) { + if (descriptor.match(FOOTER_SYMBOL, FOOTER_CODE)) { + bare.size = position - bare.data; + } + } else { + if (descriptor.match(PROPERTIES_SYMBOL, PROPERTIES_CODE) || descriptor.match(APPLICATION_PROPERTIES_SYMBOL, APPLICATION_PROPERTIES_CODE) + || descriptor.match(AMQP_SEQUENCE_SYMBOL, AMQP_SEQUENCE_CODE) || descriptor.match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE) || descriptor.match(DATA_SYMBOL, DATA_CODE)) { + bare.data = position; + } + } +} + +CharSequence MessageReader::getBareMessage() const { return bare; } + +}} // namespace qpid::amqp diff --git a/qpid/cpp/src/qpid/amqp/MessageReader.h b/qpid/cpp/src/qpid/amqp/MessageReader.h new file mode 100644 index 0000000000..2fb588546f --- /dev/null +++ b/qpid/cpp/src/qpid/amqp/MessageReader.h @@ -0,0 +1,161 @@ +#ifndef QPID_AMQP_MESSAGEREADER_H +#define QPID_AMQP_MESSAGEREADER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/amqp/CharSequence.h" +#include "qpid/amqp/Reader.h" +#include "qpid/types/Variant.h" +#include "qpid/CommonImportExport.h" + +namespace qpid { +namespace amqp { + +/** + * Reader for an AMQP 1.0 message + */ +class MessageReader : public Reader +{ + public: + QPID_COMMON_EXTERN MessageReader(); + + //header, properties, amqp-sequence, amqp-value + QPID_COMMON_EXTERN bool onStartList(uint32_t, const CharSequence&, const CharSequence&, const Descriptor*); + QPID_COMMON_EXTERN void onEndList(uint32_t, const Descriptor*); + + //delivery-annotations, message-annotations, application-headers, amqp-value + QPID_COMMON_EXTERN bool onStartMap(uint32_t, const CharSequence&, const CharSequence&, const Descriptor*); + QPID_COMMON_EXTERN void onEndMap(uint32_t, const Descriptor*); + + //data, amqp-value + QPID_COMMON_EXTERN void onBinary(const CharSequence&, const Descriptor*); + + //amqp-value + QPID_COMMON_EXTERN void onNull(const Descriptor*); + QPID_COMMON_EXTERN void onString(const CharSequence&, const Descriptor*); + QPID_COMMON_EXTERN void onSymbol(const CharSequence&, const Descriptor*); + QPID_COMMON_EXTERN void onBoolean(bool, const Descriptor*); + QPID_COMMON_EXTERN void onUByte(uint8_t, const Descriptor*); + QPID_COMMON_EXTERN void onUShort(uint16_t, const Descriptor*); + QPID_COMMON_EXTERN void onUInt(uint32_t, const Descriptor*); + QPID_COMMON_EXTERN void onULong(uint64_t, const Descriptor*); + QPID_COMMON_EXTERN void onByte(int8_t, const Descriptor*); + QPID_COMMON_EXTERN void onShort(int16_t, const Descriptor*); + QPID_COMMON_EXTERN void onInt(int32_t, const Descriptor*); + QPID_COMMON_EXTERN void onLong(int64_t, const Descriptor*); + QPID_COMMON_EXTERN void onFloat(float, const Descriptor*); + QPID_COMMON_EXTERN void onDouble(double, const Descriptor*); + QPID_COMMON_EXTERN void onUuid(const CharSequence&, const Descriptor*); + QPID_COMMON_EXTERN void onTimestamp(int64_t, const Descriptor*); + QPID_COMMON_EXTERN bool onStartArray(uint32_t, const CharSequence&, const Constructor&, const Descriptor*); + QPID_COMMON_EXTERN void onEndArray(uint32_t, const Descriptor*); + QPID_COMMON_EXTERN void onDescriptor(const Descriptor&, const char*); + + //header: + virtual void onDurable(bool) = 0; + virtual void onPriority(uint8_t) = 0; + virtual void onTtl(uint32_t) = 0; + virtual void onFirstAcquirer(bool) = 0; + virtual void onDeliveryCount(uint32_t) = 0; + + //properties: + virtual void onMessageId(uint64_t) = 0; + virtual void onMessageId(const CharSequence&, qpid::types::VariantType) = 0; + virtual void onUserId(const CharSequence&) = 0; + virtual void onTo(const CharSequence&) = 0; + virtual void onSubject(const CharSequence&) = 0; + virtual void onReplyTo(const CharSequence&) = 0; + virtual void onCorrelationId(uint64_t) = 0; + virtual void onCorrelationId(const CharSequence&, qpid::types::VariantType) = 0; + virtual void onContentType(const CharSequence&) = 0; + virtual void onContentEncoding(const CharSequence&) = 0; + virtual void onAbsoluteExpiryTime(int64_t) = 0; + virtual void onCreationTime(int64_t) = 0; + virtual void onGroupId(const CharSequence&) = 0; + virtual void onGroupSequence(uint32_t) = 0; + virtual void onReplyToGroupId(const CharSequence&) = 0; + + virtual void onApplicationProperties(const CharSequence& /*values*/, const CharSequence& /*full*/) = 0; + virtual void onDeliveryAnnotations(const CharSequence& /*values*/, const CharSequence& /*full*/) = 0; + virtual void onMessageAnnotations(const CharSequence& /*values*/, const CharSequence& /*full*/) = 0; + + virtual void onData(const CharSequence&) = 0; + virtual void onAmqpSequence(const CharSequence&) = 0; + virtual void onAmqpValue(const CharSequence&, const std::string& type, const Descriptor*) = 0; + virtual void onAmqpValue(const qpid::types::Variant&, const Descriptor*) = 0; + + virtual void onFooter(const CharSequence& /*values*/, const CharSequence& /*full*/) = 0; + + QPID_COMMON_EXTERN CharSequence getBareMessage() const; + + private: + + class HeaderReader : public Reader + { + public: + HeaderReader(MessageReader&); + void onBoolean(bool v, const Descriptor*); // durable, first-acquirer + void onUByte(uint8_t v, const Descriptor*); // priority + void onUInt(uint32_t v, const Descriptor*); // ttl, delivery-count + void onNull(const Descriptor*); + private: + MessageReader& parent; + size_t index; + }; + class PropertiesReader : public Reader + { + public: + PropertiesReader(MessageReader&); + void onUuid(const CharSequence& v, const Descriptor*); // message-id, correlation-id + void onULong(uint64_t v, const Descriptor*); // message-id, correlation-id + void onBinary(const CharSequence& v, const Descriptor*); // message-id, correlation-id, user-id + void onString(const CharSequence& v, const Descriptor*); // message-id, correlation-id, group-id, reply-to-group-id, subject, to, reply-to + void onSymbol(const CharSequence& v, const Descriptor*); // content-type, content-encoding + void onTimestamp(int64_t v, const Descriptor*); // absolute-expiry-time, creation-time + void onUInt(uint32_t v, const Descriptor*); // group-sequence + void onNull(const Descriptor*); + + void onBoolean(bool, const Descriptor*); + void onUByte(uint8_t, const Descriptor*); + void onUShort(uint16_t, const Descriptor*); + void onByte(int8_t, const Descriptor*); + void onShort(int16_t, const Descriptor*); + void onInt(int32_t, const Descriptor*); + void onLong(int64_t, const Descriptor*); + void onFloat(float, const Descriptor*); + void onDouble(double, const Descriptor*); + bool onStartList(uint32_t /*count*/, const CharSequence& /*elements*/, const CharSequence& /*complete*/, const Descriptor*); + bool onStartMap(uint32_t /*count*/, const CharSequence& /*elements*/, const CharSequence& /*complete*/, const Descriptor*); + bool onStartArray(uint32_t /*count*/, const CharSequence&, const Constructor&, const Descriptor*); + + private: + MessageReader& parent; + size_t index; + }; + HeaderReader headerReader; + PropertiesReader propertiesReader; + Reader* delegate; + CharSequence bare; +}; +}} // namespace qpid::amqp + +#endif /*!QPID_AMQP_MESSAGEREADER_H*/ diff --git a/qpid/cpp/src/qpid/amqp/Reader.h b/qpid/cpp/src/qpid/amqp/Reader.h new file mode 100644 index 0000000000..32f33dc33f --- /dev/null +++ b/qpid/cpp/src/qpid/amqp/Reader.h @@ -0,0 +1,80 @@ +#ifndef QPID_AMQP_READER_H +#define QPID_AMQP_READER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/sys/IntegerTypes.h" +#include <stddef.h> + +namespace qpid { +namespace amqp { +struct CharSequence; +struct Constructor; +struct Descriptor; + +/** + * Allows an event-driven, callback-based approach to processing an + * AMQP encoded data stream. By sublassing and implementing the + * methods of interest, readers can be constructed for different + * contexts. + */ +class Reader +{ + public: + virtual ~Reader() {} + virtual void onNull(const Descriptor*) {} + virtual void onBoolean(bool, const Descriptor*) {} + virtual void onUByte(uint8_t, const Descriptor*) {} + virtual void onUShort(uint16_t, const Descriptor*) {} + virtual void onUInt(uint32_t, const Descriptor*) {} + virtual void onULong(uint64_t, const Descriptor*) {} + virtual void onByte(int8_t, const Descriptor*) {} + virtual void onShort(int16_t, const Descriptor*) {} + virtual void onInt(int32_t, const Descriptor*) {} + virtual void onLong(int64_t, const Descriptor*) {} + virtual void onFloat(float, const Descriptor*) {} + virtual void onDouble(double, const Descriptor*) {} + virtual void onUuid(const CharSequence&, const Descriptor*) {} + virtual void onTimestamp(int64_t, const Descriptor*) {} + + virtual void onBinary(const CharSequence&, const Descriptor*) {} + virtual void onString(const CharSequence&, const Descriptor*) {} + virtual void onSymbol(const CharSequence&, const Descriptor*) {} + + /** + * @return true to get elements of the compound value, false + * to skip over it + */ + virtual bool onStartList(uint32_t /*count*/, const CharSequence& /*elements*/, const CharSequence& /*complete*/, const Descriptor*) { return true; } + virtual bool onStartMap(uint32_t /*count*/, const CharSequence& /*elements*/, const CharSequence& /*complete*/, const Descriptor*) { return true; } + virtual bool onStartArray(uint32_t /*count*/, const CharSequence&, const Constructor&, const Descriptor*) { return true; } + virtual void onEndList(uint32_t /*count*/, const Descriptor*) {} + virtual void onEndMap(uint32_t /*count*/, const Descriptor*) {} + virtual void onEndArray(uint32_t /*count*/, const Descriptor*) {} + + virtual void onDescriptor(const Descriptor&, const char*) {} + + virtual bool proceed() { return true; } + private: +}; +}} // namespace qpid::amqp + +#endif /*!QPID_AMQP_READER_H*/ diff --git a/qpid/cpp/src/qpid/amqp/Sasl.cpp b/qpid/cpp/src/qpid/amqp/Sasl.cpp new file mode 100644 index 0000000000..a7c2eea35b --- /dev/null +++ b/qpid/cpp/src/qpid/amqp/Sasl.cpp @@ -0,0 +1,141 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/amqp/Sasl.h" +#include "qpid/amqp/Decoder.h" +#include "qpid/amqp/Descriptor.h" +#include "qpid/amqp/Encoder.h" +#include "qpid/log/Statement.h" +#include "qpid/framing/Buffer.h" +#include "qpid/framing/ProtocolVersion.h" +#include "qpid/framing/ProtocolInitiation.h" +#include <string.h> + +namespace qpid { +namespace amqp { + +Sasl::Sasl(const std::string& i) : id(i), buffer(2*512/*AMQP 1.0's MAX_MIN_FRAME_SIZE - is this enough though?*/), encoder(&buffer[0], buffer.size()) {} +Sasl::~Sasl() {} + +void* Sasl::startFrame() +{ + //write sasl frame header, leaving 4 bytes for total size + char* start = encoder.skip(4); + encoder.write((uint8_t) 0x02);//data offset + encoder.write((uint8_t) 0x01);//frame type + encoder.write((uint16_t) 0x0000);//ignored + return start; +} + +void Sasl::endFrame(void* frame) +{ + //now backfill the frame size + char* start = (char*) frame; + char* current = &buffer[encoder.getPosition()]; + uint32_t frameSize = current - start; + Encoder backfill(start, 4); + backfill.write(frameSize); + QPID_LOG(trace, "Completed encoding of frame of " << frameSize << " bytes"); +} + + +std::size_t Sasl::read(const char* data, size_t available) +{ + size_t consumed = 0; + while (!stopReading() && available - consumed > 4/*framesize*/) { + Decoder decoder(data+consumed, available-consumed); + //read frame-header + uint32_t frameSize = decoder.readUInt(); + if (frameSize > decoder.getSize()) break;//don't have all the data for this frame yet + + QPID_LOG(trace, "Reading SASL frame of size " << frameSize); + decoder.resetSize(frameSize); + uint8_t dataOffset = decoder.readUByte(); + uint8_t frameType = decoder.readUByte(); + if (frameType != 0x01) { + QPID_LOG(error, "Expected SASL frame; got type " << frameType); + } + uint16_t ignored = decoder.readUShort(); + if (ignored) { + QPID_LOG(info, "Got non null bytes at end of SASL frame header"); + } + + //body is at offset 4*dataOffset from the start + size_t skip = dataOffset*4 - 8; + if (skip) { + QPID_LOG(info, "Offset for sasl frame was not as expected"); + decoder.advance(skip); + } + decoder.read(*this); + consumed += decoder.getPosition(); + } + return consumed; +} + +std::size_t Sasl::write(char* data, size_t size) +{ + size_t available = encoder.getPosition(); + if (available) { + size_t encoded = available > size ? size : available; + ::memcpy(data, &buffer[0], encoded); + size_t remainder = encoder.getPosition() - encoded; + if (remainder) { + //shuffle + ::memcpy(&buffer[0], &buffer[size], remainder); + } + encoder.resetPosition(remainder); + return encoded; + } else { + return 0; + } +} + +std::size_t Sasl::readProtocolHeader(const char* buffer, std::size_t size) +{ + framing::ProtocolInitiation pi(qpid::framing::ProtocolVersion(1,0,qpid::framing::ProtocolVersion::SASL)); + if (size >= pi.encodedSize()) { + qpid::framing::Buffer out(const_cast<char*>(buffer), size); + pi.decode(out); + QPID_LOG_CAT(debug, protocol, id << " read protocol header: " << pi); + return pi.encodedSize(); + } else { + return 0; + } +} +std::size_t Sasl::writeProtocolHeader(char* buffer, std::size_t size) +{ + framing::ProtocolInitiation pi(qpid::framing::ProtocolVersion(1,0,qpid::framing::ProtocolVersion::SASL)); + if (size >= pi.encodedSize()) { + QPID_LOG_CAT(debug, protocol, id << " writing protocol header: " << pi); + qpid::framing::Buffer out(buffer, size); + pi.encode(out); + return pi.encodedSize(); + } else { + QPID_LOG_CAT(warning, protocol, id << " insufficient buffer for protocol header: " << size) + return 0; + } +} + +bool Sasl::stopReading() +{ + return false; +} + +}} // namespace qpid::amqp diff --git a/qpid/cpp/src/qpid/amqp/Sasl.h b/qpid/cpp/src/qpid/amqp/Sasl.h new file mode 100644 index 0000000000..24a8de7dc4 --- /dev/null +++ b/qpid/cpp/src/qpid/amqp/Sasl.h @@ -0,0 +1,55 @@ +#ifndef QPID_AMQP_SASL_H +#define QPID_AMQP_SASL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/amqp/Encoder.h" +#include "qpid/amqp/Reader.h" +#include <string> +#include <vector> + +namespace qpid { +namespace amqp { + +/** + * Base for SASL client and server utilities + */ +class Sasl : protected Reader +{ + public: + QPID_COMMON_EXTERN Sasl(const std::string& id); + QPID_COMMON_EXTERN virtual ~Sasl(); + QPID_COMMON_EXTERN std::size_t read(const char* data, size_t available); + QPID_COMMON_EXTERN std::size_t write(char* data, size_t available); + QPID_COMMON_EXTERN std::size_t readProtocolHeader(const char* buffer, std::size_t size); + QPID_COMMON_EXTERN std::size_t writeProtocolHeader(char* buffer, std::size_t size); + protected: + const std::string id; + std::vector<char> buffer; + Encoder encoder; + + void* startFrame(); + void endFrame(void*); + QPID_COMMON_EXTERN virtual bool stopReading(); +}; +}} // namespace qpid::amqp + +#endif /*!QPID_AMQP_SASL_H*/ diff --git a/qpid/cpp/src/qpid/amqp/SaslClient.cpp b/qpid/cpp/src/qpid/amqp/SaslClient.cpp new file mode 100644 index 0000000000..d8a38750c5 --- /dev/null +++ b/qpid/cpp/src/qpid/amqp/SaslClient.cpp @@ -0,0 +1,154 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/amqp/SaslClient.h" +#include "qpid/amqp/Decoder.h" +#include "qpid/amqp/Descriptor.h" +#include "qpid/amqp/descriptors.h" +#include "qpid/amqp/Encoder.h" +#include "qpid/log/Statement.h" + +using namespace qpid::amqp::sasl; + +namespace qpid { +namespace amqp { + +SaslClient::SaslClient(const std::string& id) : Sasl(id) {} +SaslClient::~SaslClient() {} +void SaslClient::init(const std::string& mechanism, const std::string* response, const std::string* hostname) +{ + void* frame = startFrame(); + + void* token = encoder.startList32(&SASL_INIT); + encoder.writeSymbol(mechanism); + if (response) encoder.writeBinary(*response); + else encoder.writeNull(); + if (hostname) encoder.writeString(*hostname); + else encoder.writeNull(); + encoder.endList32(3, token); + + endFrame(frame); + QPID_LOG_CAT(debug, protocol, id << " Sent SASL-INIT(" << mechanism << ", " << (response ? *response : "null") << ", " << (hostname ? *hostname : "null") << ")"); +} +void SaslClient::response(const std::string* r) +{ + void* frame = startFrame(); + + void* token = encoder.startList32(&SASL_RESPONSE); + if (r) encoder.writeBinary(*r); + else encoder.writeNull(); + encoder.endList32(1, token); + + endFrame(frame); + QPID_LOG_CAT(debug, protocol, id << " Sent SASL-RESPONSE(" << (r ? *r : "null") << ")"); +} + + +namespace { +const std::string SPACE(" "); +class SaslMechanismsReader : public Reader +{ + public: + SaslMechanismsReader(SaslClient& c) : client(c), expected(0) {} + void onSymbol(const CharSequence& mechanism, const Descriptor*) + { + if (expected) { + mechanisms << mechanism.str() << SPACE; + } else { + client.mechanisms(mechanism.str()); + } + } + bool onStartArray(uint32_t count, const CharSequence&, const Constructor&, const Descriptor*) + { + expected = count; + return true; + } + void onEndArray(uint32_t, const Descriptor*) + { + client.mechanisms(mechanisms.str()); + } + private: + SaslClient& client; + uint32_t expected; + std::stringstream mechanisms; +}; +class SaslChallengeReader : public Reader +{ + public: + SaslChallengeReader(SaslClient& c) : client(c) {} + void onNull(const Descriptor*) { client.challenge(); } + void onBinary(const CharSequence& c, const Descriptor*) { client.challenge(c.str()); } + private: + SaslClient& client; +}; +class SaslOutcomeReader : public Reader +{ + public: + SaslOutcomeReader(SaslClient& c, bool e) : client(c), expectExtraData(e) {} + void onUByte(uint8_t c, const Descriptor*) + { + if (expectExtraData) code = c; + else client.outcome(c); + } + void onBinary(const CharSequence& extra, const Descriptor*) { client.outcome(code, extra.str()); } + void onNull(const Descriptor*) { client.outcome(code); } + private: + SaslClient& client; + bool expectExtraData; + uint8_t code; +}; +} + +bool SaslClient::onStartList(uint32_t count, const CharSequence& arguments, const CharSequence& /*full raw data*/, const Descriptor* descriptor) +{ + if (!descriptor) { + QPID_LOG(error, "Expected described type in SASL negotiation but got no descriptor"); + } else if (descriptor->match(SASL_MECHANISMS_SYMBOL, SASL_MECHANISMS_CODE)) { + QPID_LOG(trace, "Reading SASL-MECHANISMS"); + Decoder decoder(arguments.data, arguments.size); + if (count != 1) QPID_LOG(error, "Invalid SASL-MECHANISMS frame; exactly one field expected, got " << count); + SaslMechanismsReader reader(*this); + decoder.read(reader); + } else if (descriptor->match(SASL_CHALLENGE_SYMBOL, SASL_CHALLENGE_CODE)) { + QPID_LOG(trace, "Reading SASL-CHALLENGE"); + Decoder decoder(arguments.data, arguments.size); + if (count != 1) QPID_LOG(error, "Invalid SASL-CHALLENGE frame; exactly one field expected, got " << count); + SaslChallengeReader reader(*this); + decoder.read(reader); + } else if (descriptor->match(SASL_OUTCOME_SYMBOL, SASL_OUTCOME_CODE)) { + QPID_LOG(trace, "Reading SASL-OUTCOME"); + Decoder decoder(arguments.data, arguments.size); + if (count == 1) { + SaslOutcomeReader reader(*this, false); + decoder.read(reader); + } else if (count == 2) { + SaslOutcomeReader reader(*this, true); + decoder.read(reader); + } else { + QPID_LOG(error, "Invalid SASL-OUTCOME frame; got " << count << " fields"); + } + } else { + QPID_LOG(error, "Unexpected descriptor in SASL negotiation: " << *descriptor); + } + return false; +} + + +}} // namespace qpid::amqp diff --git a/qpid/cpp/src/qpid/amqp/SaslClient.h b/qpid/cpp/src/qpid/amqp/SaslClient.h new file mode 100644 index 0000000000..d22887de1a --- /dev/null +++ b/qpid/cpp/src/qpid/amqp/SaslClient.h @@ -0,0 +1,55 @@ +#ifndef QPID_AMQP_SASLCLIENT_H +#define QPID_AMQP_SASLCLIENT_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <qpid/CommonImportExport.h> +#include "qpid/amqp/Sasl.h" + +namespace qpid { +namespace amqp { + +/** + * Utility for decoding and encoding SASL frames by the peer acting as + * the SASL client. + */ +class SaslClient : public Sasl +{ + public: + QPID_COMMON_EXTERN SaslClient(const std::string& id); + QPID_COMMON_EXTERN virtual ~SaslClient(); + QPID_COMMON_EXTERN virtual void mechanisms(const std::string&) = 0; + QPID_COMMON_EXTERN virtual void challenge(const std::string&) = 0; + QPID_COMMON_EXTERN virtual void challenge() = 0; //null != empty string + QPID_COMMON_EXTERN virtual void outcome(uint8_t result, const std::string&) = 0; + QPID_COMMON_EXTERN virtual void outcome(uint8_t result) = 0; + + QPID_COMMON_EXTERN void init(const std::string& mechanism, const std::string* response, const std::string* hostname); + QPID_COMMON_EXTERN void response(const std::string*); + + private: + QPID_COMMON_EXTERN bool onStartList(uint32_t count, const CharSequence& arguments, const CharSequence&, const Descriptor* descriptor); + +}; + +}} // namespace qpid::amqp + +#endif /*!QPID_AMQP_SASLCLIENT_H*/ diff --git a/qpid/cpp/src/qpid/amqp/SaslServer.cpp b/qpid/cpp/src/qpid/amqp/SaslServer.cpp new file mode 100644 index 0000000000..250858bda0 --- /dev/null +++ b/qpid/cpp/src/qpid/amqp/SaslServer.cpp @@ -0,0 +1,183 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/amqp/SaslServer.h" +#include "qpid/amqp/Constructor.h" +#include "qpid/amqp/Decoder.h" +#include "qpid/amqp/Descriptor.h" +#include "qpid/amqp/descriptors.h" +#include "qpid/amqp/Encoder.h" +#include "qpid/amqp/typecodes.h" +#include "qpid/log/Statement.h" +#include "qpid/StringUtils.h" +#include <limits> +#include <vector> + +using namespace qpid::amqp::sasl; +using namespace qpid::amqp::typecodes; + +namespace qpid { +namespace amqp { +namespace { +const std::string SPACE(" "); +const std::string NULL_("NULL"); +} + +SaslServer::SaslServer(const std::string& id) : Sasl(id) {} +SaslServer::~SaslServer() {} + +void SaslServer::mechanisms(const std::string& mechanisms) +{ + void* frameToken = startFrame(); + + std::vector<std::string> parts = split(mechanisms, SPACE); + void* listToken = encoder.startList32(&SASL_MECHANISMS); + if (parts.size() > 1) { + void* arrayToken = encoder.startArray8(Constructor(SYMBOL8)); + for (std::vector<std::string>::const_iterator i = parts.begin();i != parts.end(); ++i) { + uint8_t size = i->size() > std::numeric_limits<uint8_t>::max() ? std::numeric_limits<uint8_t>::max() : i->size(); + encoder.write(size); + encoder.writeBytes(i->data(), size); + } + encoder.endArray8(parts.size(), arrayToken); + } else { + encoder.writeSymbol(mechanisms); + } + encoder.endList32(1, listToken); + + endFrame(frameToken); + QPID_LOG_CAT(debug, protocol, id << " Sent SASL-MECHANISMS(" << mechanisms << ") " << encoder.getPosition()); +} +void SaslServer::challenge(const std::string* c) +{ + void* frameToken = startFrame(); + + void* listToken = encoder.startList32(&SASL_CHALLENGE); + if (c) encoder.writeBinary(*c); + else encoder.writeNull(); + encoder.endList32(1, listToken); + + endFrame(frameToken); + QPID_LOG_CAT(debug, protocol, id << " Sent SASL-CHALLENGE(" << (c ? *c : NULL_) << ") " << encoder.getPosition()); +} +void SaslServer::completed(bool succeeded) +{ + void* frameToken = startFrame(); + + void* listToken = encoder.startList8(&SASL_OUTCOME); + encoder.writeUByte(succeeded ? 0 : 1); + encoder.endList8(1, listToken); + + endFrame(frameToken); + QPID_LOG_CAT(debug, protocol, id << " Sent SASL-OUTCOME(" << (succeeded ? 0 : 1) << ") " << encoder.getPosition()); +} + +namespace { +class SaslInitReader : public Reader +{ + public: + SaslInitReader(SaslServer& s, uint32_t e) : server(s), expected(e), hasResponse(false), index(0) {} + void onNull(const Descriptor*) + { + ++index; + if (index == 2) { + if (--expected == 0) { + server.init(mechanism, 0, 0); + } + } else if (index == 3) { + server.init(mechanism, hasResponse ? &response : 0, 0); + } else { + QPID_LOG(warning, "Unexpected sequence of fields for SASL-INIT: got null for field " << index); + } + } + void onBinary(const CharSequence& r, const Descriptor*) + { + if (++index != 2) QPID_LOG(warning, "Unexpected sequence of fields for SASL-INIT: got binary for field " << index); + response = r.str(); + hasResponse = true; + if (--expected == 0) { + server.init(mechanism, &response, 0); + } + } + void onString(const CharSequence& h, const Descriptor*) + { + if (--expected || ++index != 3) { + QPID_LOG(warning, "Unexpected sequence of fields for SASL-INIT: got string for field " << index); + } else { + std::string hostname = h.str(); + server.init(mechanism, hasResponse ? &response : 0, &hostname); + } + } + void onSymbol(const CharSequence& m, const Descriptor*) + { + if (++index != 1) QPID_LOG(warning, "Unexpected sequence of fields for SASL-INIT: got symbol for field " << index); + if (--expected) { + mechanism = m.str(); + } else { + server.init(m.str(), 0, 0); + } + } + private: + SaslServer& server; + uint32_t expected; + std::string mechanism; + std::string response; + bool hasResponse; + uint32_t index; +}; + +class SaslResponseReader : public Reader +{ + public: + SaslResponseReader(SaslServer& s) : server(s) {} + void onNull(const Descriptor*) { server.response(0); } + void onBinary(const CharSequence& r, const Descriptor*) + { + std::string s = r.str(); + server.response(&s); + } + private: + SaslServer& server; +}; +} + +bool SaslServer::onStartList(uint32_t count, const CharSequence& arguments, const CharSequence& /*full raw data*/, const Descriptor* descriptor) +{ + if (!descriptor) { + QPID_LOG(error, "Expected described type in SASL negotiation but got no descriptor"); + } else if (descriptor->match(SASL_INIT_SYMBOL, SASL_INIT_CODE)) { + QPID_LOG(trace, "Reading SASL-INIT"); + Decoder decoder(arguments.data, arguments.size); + if (count < 1 || count > 3) QPID_LOG(error, "Invalid SASL-INIT frame; got " << count << " fields"); + SaslInitReader reader(*this, count); + decoder.read(reader); + } else if (descriptor->match(SASL_RESPONSE_SYMBOL, SASL_RESPONSE_CODE)) { + QPID_LOG(trace, "Reading SASL-RESPONSE (" << std::string(arguments.data, arguments.size) << ") " << count << " elements"); + Decoder decoder(arguments.data, arguments.size); + if (count != 1) QPID_LOG(error, "Invalid SASL-RESPONSE frame; exactly one field expected, got " << count); + SaslResponseReader reader(*this); + decoder.read(reader); + } else { + QPID_LOG(error, "Unexpected descriptor in SASL negotiation: " << *descriptor); + } + return false; +} + +}} // namespace qpid::amqp diff --git a/qpid/cpp/src/qpid/amqp/SaslServer.h b/qpid/cpp/src/qpid/amqp/SaslServer.h new file mode 100644 index 0000000000..68d0854488 --- /dev/null +++ b/qpid/cpp/src/qpid/amqp/SaslServer.h @@ -0,0 +1,50 @@ +#ifndef QPID_AMQP_SASLSERVER_H +#define QPID_AMQP_SASLSERVER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/amqp/Sasl.h" + +namespace qpid { +namespace amqp { + +/** + * Utility for decoding and encoding SASL frames by the peer acting as + * the SASL server. + */ +class SaslServer : public Sasl +{ + public: + QPID_COMMON_EXTERN SaslServer(const std::string& id); + QPID_COMMON_EXTERN virtual ~SaslServer(); + virtual void init(const std::string& mechanism, const std::string* response, const std::string* hostname) = 0; + virtual void response(const std::string*) = 0; + + QPID_COMMON_EXTERN void mechanisms(const std::string& mechanisms); + QPID_COMMON_EXTERN void challenge(const std::string*); + QPID_COMMON_EXTERN void completed(bool succeeded); + + private: + QPID_COMMON_EXTERN bool onStartList(uint32_t count, const CharSequence& arguments, const CharSequence&, const Descriptor* descriptor); +}; +}} // namespace qpid::amqp + +#endif /*!QPID_AMQP_SASLSERVER_H*/ diff --git a/qpid/cpp/src/qpid/amqp/descriptors.h b/qpid/cpp/src/qpid/amqp/descriptors.h new file mode 100644 index 0000000000..29c626edc2 --- /dev/null +++ b/qpid/cpp/src/qpid/amqp/descriptors.h @@ -0,0 +1,140 @@ +#ifndef QPID_AMQP_DESCRIPTORS_H +#define QPID_AMQP_DESCRIPTORS_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Descriptor.h" + +namespace qpid { +namespace amqp { + +// NOTE: If you add descriptor symbols and codes here, you must also update the DescriptorMap +// constructor in Descriptor.cpp. + +namespace message { +const std::string HEADER_SYMBOL("amqp:header:list"); +const std::string PROPERTIES_SYMBOL("amqp:properties:list"); +const std::string DELIVERY_ANNOTATIONS_SYMBOL("amqp:delivery-annotations:map"); +const std::string MESSAGE_ANNOTATIONS_SYMBOL("amqp:message-annotations:map"); +const std::string APPLICATION_PROPERTIES_SYMBOL("amqp:application-properties:map"); +const std::string AMQP_SEQUENCE_SYMBOL("amqp:amqp-sequence:list"); +const std::string AMQP_VALUE_SYMBOL("amqp:amqp-value:*"); +const std::string DATA_SYMBOL("amqp:data:binary"); +const std::string FOOTER_SYMBOL("amqp:footer:map"); +const std::string ACCEPTED_SYMBOL("amqp:accepted:list"); + +const uint64_t HEADER_CODE(0x70); +const uint64_t DELIVERY_ANNOTATIONS_CODE(0x71); +const uint64_t MESSAGE_ANNOTATIONS_CODE(0x72); +const uint64_t PROPERTIES_CODE(0x73); +const uint64_t APPLICATION_PROPERTIES_CODE(0x74); +const uint64_t DATA_CODE(0x75); +const uint64_t AMQP_SEQUENCE_CODE(0x76); +const uint64_t AMQP_VALUE_CODE(0x77); +const uint64_t FOOTER_CODE(0x78); +const uint64_t ACCEPTED_CODE(0x24); + +const Descriptor HEADER(HEADER_CODE); +const Descriptor DELIVERY_ANNOTATIONS(DELIVERY_ANNOTATIONS_CODE); +const Descriptor MESSAGE_ANNOTATIONS(MESSAGE_ANNOTATIONS_CODE); +const Descriptor PROPERTIES(PROPERTIES_CODE); +const Descriptor APPLICATION_PROPERTIES(APPLICATION_PROPERTIES_CODE); +const Descriptor AMQP_VALUE(AMQP_VALUE_CODE); +const Descriptor DATA(DATA_CODE); +} + +namespace sasl { +const std::string SASL_MECHANISMS_SYMBOL("amqp:sasl-mechanisms:list"); +const std::string SASL_INIT_SYMBOL("amqp:sasl-init:list"); +const std::string SASL_CHALLENGE_SYMBOL("amqp:sasl-challenge:list"); +const std::string SASL_RESPONSE_SYMBOL("amqp:sasl-response:list"); +const std::string SASL_OUTCOME_SYMBOL("amqp:sasl-outcome:list"); + +const uint64_t SASL_MECHANISMS_CODE(0x40); +const uint64_t SASL_INIT_CODE(0x41); +const uint64_t SASL_CHALLENGE_CODE(0x42); +const uint64_t SASL_RESPONSE_CODE(0x43); +const uint64_t SASL_OUTCOME_CODE(0x44); + +const Descriptor SASL_MECHANISMS(SASL_MECHANISMS_CODE); +const Descriptor SASL_INIT(SASL_INIT_CODE); +const Descriptor SASL_CHALLENGE(SASL_CHALLENGE_CODE); +const Descriptor SASL_RESPONSE(SASL_RESPONSE_CODE); +const Descriptor SASL_OUTCOME(SASL_OUTCOME_CODE); +} + +namespace filters { +const std::string LEGACY_DIRECT_FILTER_SYMBOL("apache.org:legacy-amqp-direct-binding:string"); +const std::string LEGACY_TOPIC_FILTER_SYMBOL("apache.org:legacy-amqp-topic-binding:string"); +const std::string LEGACY_HEADERS_FILTER_SYMBOL("apache.org:legacy-amqp-headers-binding:map"); +const std::string SELECTOR_FILTER_SYMBOL("apache.org:selector-filter:string"); +const std::string XQUERY_FILTER_SYMBOL("apache.org:xquery-filter:string"); + +const uint64_t LEGACY_DIRECT_FILTER_CODE(0x0000468C00000000ULL); +const uint64_t LEGACY_TOPIC_FILTER_CODE(0x0000468C00000001ULL); +const uint64_t LEGACY_HEADERS_FILTER_CODE(0x0000468C00000002ULL); +const uint64_t SELECTOR_FILTER_CODE(0x0000468C00000004ULL); +const uint64_t XQUERY_FILTER_CODE(0x0000468C00000005ULL); +} + +namespace lifetime_policy { +const std::string DELETE_ON_CLOSE_SYMBOL("amqp:delete-on-close:list"); +const std::string DELETE_ON_NO_LINKS_SYMBOL("amqp:delete-on-no-links:list"); +const std::string DELETE_ON_NO_MESSAGES_SYMBOL("amqp:delete-on-no-messages:list"); +const std::string DELETE_ON_NO_LINKS_OR_MESSAGES_SYMBOL("amqp:delete-on-no-links-or-messages:list"); + +const uint64_t DELETE_ON_CLOSE_CODE(0x2B); +const uint64_t DELETE_ON_NO_LINKS_CODE(0x2C); +const uint64_t DELETE_ON_NO_MESSAGES_CODE(0x2D); +const uint64_t DELETE_ON_NO_LINKS_OR_MESSAGES_CODE(0x2E); +} + +namespace transaction { +const std::string DECLARE_SYMBOL("amqp:declare:list"); +const std::string DISCHARGE_SYMBOL("amqp:discharge:list"); +const std::string DECLARED_SYMBOL("amqp:declared:list"); +const std::string TRANSACTIONAL_STATE_SYMBOL("amqp:transactional-state:list"); + +const uint64_t DECLARE_CODE(0x31); +const uint64_t DISCHARGE_CODE(0x32); +const uint64_t DECLARED_CODE(0x33); +const uint64_t TRANSACTIONAL_STATE_CODE(0x34); +} + +namespace error_conditions { +//note these are not actually descriptors +const std::string INTERNAL_ERROR("amqp:internal-error"); +const std::string NOT_FOUND("amqp:not-found"); +const std::string UNAUTHORIZED_ACCESS("amqp:unauthorized-access"); +const std::string DECODE_ERROR("amqp:decode-error"); +const std::string NOT_ALLOWED("amqp:not-allowed"); +const std::string NOT_IMPLEMENTED("amqp:not-implemented"); +const std::string RESOURCE_LIMIT_EXCEEDED("amqp:resource-limit-exceeded"); +const std::string RESOURCE_DELETED("amqp:resource-deleted"); +const std::string PRECONDITION_FAILED("amqp:precondition-failed"); +namespace transaction { +const std::string UNKNOWN_ID("amqp:transaction:unknown-id"); +const std::string ROLLBACK("amqp:transaction:rollback"); +} +} +}} // namespace qpid::amqp + +#endif /*!QPID_AMQP_DESCRIPTORS_H*/ diff --git a/qpid/cpp/src/qpid/amqp/typecodes.h b/qpid/cpp/src/qpid/amqp/typecodes.h new file mode 100644 index 0000000000..915b75ca3f --- /dev/null +++ b/qpid/cpp/src/qpid/amqp/typecodes.h @@ -0,0 +1,115 @@ +#ifndef QPID_AMQP_TYPECODES_H +#define QPID_AMQP_TYPECODES_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace qpid { +namespace amqp { + +namespace typecodes +{ +const uint8_t DESCRIPTOR(0x0); + +const uint8_t NULL_VALUE(0x40); + +const uint8_t BOOLEAN(0x56); +const uint8_t BOOLEAN_TRUE(0x41); +const uint8_t BOOLEAN_FALSE(0x42); + +const uint8_t UBYTE(0x50); +const uint8_t USHORT(0x60); +const uint8_t UINT(0x70); +const uint8_t UINT_SMALL(0x52); +const uint8_t UINT_ZERO(0x43); +const uint8_t ULONG(0x80); +const uint8_t ULONG_SMALL(0x53); +const uint8_t ULONG_ZERO(0x44); + +const uint8_t BYTE(0x51); +const uint8_t SHORT(0x61); +const uint8_t INT(0x71); +const uint8_t INT_SMALL(0x54); +const uint8_t LONG(0x81); +const uint8_t LONG_SMALL(0x55); + +const uint8_t FLOAT(0x72); +const uint8_t DOUBLE(0x82); + +const uint8_t DECIMAL32(0x74); +const uint8_t DECIMAL64(0x84); +const uint8_t DECIMAL128(0x94); + +const uint8_t CHAR_UTF32(0x73); +const uint8_t TIMESTAMP(0x83); +const uint8_t UUID(0x98); + +const uint8_t BINARY8(0xa0); +const uint8_t BINARY32(0xb0); +const uint8_t STRING8(0xa1); +const uint8_t STRING32(0xb1); +const uint8_t SYMBOL8(0xa3); +const uint8_t SYMBOL32(0xb3); + +typedef std::pair<uint8_t, uint8_t> CodePair; +const CodePair SYMBOL(SYMBOL8, SYMBOL32); +const CodePair STRING(STRING8, STRING32); +const CodePair BINARY(BINARY8, BINARY32); + +const uint8_t LIST0(0x45); +const uint8_t LIST8(0xc0); +const uint8_t LIST32(0xd0); +const uint8_t MAP8(0xc1); +const uint8_t MAP32(0xd1); +const uint8_t ARRAY8(0xe0); +const uint8_t ARRAY32(0xf0); + + +const std::string NULL_NAME("null"); +const std::string BOOLEAN_NAME("bool"); + +const std::string UBYTE_NAME("ubyte"); +const std::string USHORT_NAME("ushort"); +const std::string UINT_NAME("uint"); +const std::string ULONG_NAME("ulong"); + +const std::string BYTE_NAME("byte"); +const std::string SHORT_NAME("short"); +const std::string INT_NAME("int"); +const std::string LONG_NAME("long"); + +const std::string FLOAT_NAME("float"); +const std::string DOUBLE_NAME("double"); + +const std::string TIMESTAMP_NAME("timestamp"); +const std::string UUID_NAME("uuid"); + +const std::string BINARY_NAME("binary"); +const std::string STRING_NAME("string"); +const std::string SYMBOL_NAME("symbol"); + +const std::string LIST_NAME("list"); +const std::string MAP_NAME("map"); +const std::string ARRAY_NAME("array"); +} + +}} // namespace qpid::amqp + +#endif /*!QPID_AMQP_TYPECODES_H*/ |