diff options
Diffstat (limited to 'cpp/src/qpid/amqp')
29 files changed, 4592 insertions, 0 deletions
diff --git a/cpp/src/qpid/amqp/CharSequence.cpp b/cpp/src/qpid/amqp/CharSequence.cpp new file mode 100644 index 0000000000..857ec7e587 --- /dev/null +++ b/cpp/src/qpid/amqp/CharSequence.cpp @@ -0,0 +1,47 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "CharSequence.h" + +namespace qpid { +namespace amqp { + +void CharSequence::init() +{ + data = 0; + size = 0; +} + +CharSequence::operator bool() const +{ + return data && size; +} +std::string CharSequence::str() const +{ + return std::string(data, size); +} + +CharSequence CharSequence::create(const char* data, size_t size) +{ + CharSequence c = {data, size}; + return c; +} + +}} // namespace qpid::amqp diff --git a/cpp/src/qpid/amqp/CharSequence.h b/cpp/src/qpid/amqp/CharSequence.h new file mode 100644 index 0000000000..307a4b1537 --- /dev/null +++ b/cpp/src/qpid/amqp/CharSequence.h @@ -0,0 +1,49 @@ +#ifndef QPID_AMQP_CHARSEQUENCE_H +#define QPID_AMQP_CHARSEQUENCE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <stddef.h> +#include <string> +#include "qpid/CommonImportExport.h" + +namespace qpid { +namespace amqp { + +/** + * Simple record of a particular sequence of chars/bytes. The memroy + * referenced is assumed to be owned by some other entity, this is + * merely a pointer into a (segment of) it. + */ +struct CharSequence +{ + const char* data; + size_t size; + + QPID_COMMON_EXTERN operator bool() const; + QPID_COMMON_EXTERN std::string str() const; + QPID_COMMON_EXTERN void init(); + + QPID_COMMON_EXTERN static CharSequence create(const char* data, size_t size); +}; +}} // namespace qpid::amqp + +#endif /*!QPID_AMQP_CHARSEQUENCE_H*/ diff --git a/cpp/src/qpid/amqp/Codec.h b/cpp/src/qpid/amqp/Codec.h new file mode 100644 index 0000000000..c91cd0a96b --- /dev/null +++ b/cpp/src/qpid/amqp/Codec.h @@ -0,0 +1,83 @@ +#ifndef QPID_AMQP_CODEC_H +#define QPID_AMQP_CODEC_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace qpid { +namespace amqp { + +/** + * + */ +class Codec +{ + public: + + + + private: + + struct Constructor + { + uint8_t code; + Descriptor descriptor; + bool isDescribed; + }; + + Constructor readConstructor(Decoder decoder, Reader reader) + { + Constructor result; + result.code = decoder.readCode(); + if (code == DESCRIPTOR) { + result.isDescribed = true; + result.descriptor = decoder.readDescriptor(); + result.code = decoder.readCode(); + } else { + result.isDescribed = false; + } + return result; + } +}; + +Codec::Descriptor Codec::Decoder::readDescriptor() +{ + uint8_t code = decoder.readCode(); + switch(code) { + case SYMBOL8: + return Descriptor(readSequence8()); + case SYMBOL32: + return Descriptor(readSequence32()); + case ULONG: + return Descriptor(readULong()); + case ULONG_SMALL: + return Descriptor((uint64_t) readUByte()); + case ULONG_ZERO: + return Descriptor((uint64_t) 0); + default: + throw qpid::Exception("Expected descriptor of type ulong or symbol; found " << code); + } +} + +Codec::Descriptor::Descriptor(uint64_t id) : value.id(id), type(NUMERIC) {} +Codec::Descriptor::Descriptor(const CharSequence& symbol) : value.symbol(symbol), type(SYMBOLIC) {} +}} // namespace qpid::amqp + +#endif /*!QPID_AMQP_CODEC_H*/ diff --git a/cpp/src/qpid/amqp/Constructor.h b/cpp/src/qpid/amqp/Constructor.h new file mode 100644 index 0000000000..444e455670 --- /dev/null +++ b/cpp/src/qpid/amqp/Constructor.h @@ -0,0 +1,42 @@ +#ifndef QPID_AMQP_CONSTRUCTOR_H +#define QPID_AMQP_CONSTRUCTOR_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/amqp/Descriptor.h" +namespace qpid { +namespace amqp { + +/** + * Representation of an AMQP 1.0 type 'constructor' (i.e. a type code + * with an optional descriptor) + */ +struct Constructor +{ + uint8_t code; + Descriptor descriptor; + bool isDescribed; + + Constructor(uint8_t c) : code(c), descriptor(0), isDescribed(false) {} +}; +}} // namespace qpid::amqp + +#endif /*!QPID_AMQP_CONSTRUCTOR_H*/ diff --git a/cpp/src/qpid/amqp/Decoder.cpp b/cpp/src/qpid/amqp/Decoder.cpp new file mode 100644 index 0000000000..9c577e6c92 --- /dev/null +++ b/cpp/src/qpid/amqp/Decoder.cpp @@ -0,0 +1,545 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/amqp/Decoder.h" +#include "qpid/amqp/CharSequence.h" +#include "qpid/amqp/Constructor.h" +#include "qpid/amqp/Descriptor.h" +#include "qpid/amqp/Reader.h" +#include "qpid/amqp/typecodes.h" +#include "qpid/types/Uuid.h" +#include "qpid/types/Variant.h" +#include "qpid/log/Statement.h" +#include "qpid/Exception.h" + +namespace qpid { +namespace amqp { + +using namespace qpid::amqp::typecodes; + +Decoder::Decoder(const char* d, size_t s) : start(d), size(s), position(0) {} + +namespace { +class MapBuilder : public Reader +{ + public: + void onNull(const Descriptor*) + { + qpid::types::Variant v; + handle(v, NULL_NAME); + } + void onBoolean(bool v, const Descriptor*) + { + handle(v, BOOLEAN_NAME); + } + void onUByte(uint8_t v, const Descriptor*) + { + handle(v, UBYTE_NAME); + } + void onUShort(uint16_t v, const Descriptor*) + { + handle(v, USHORT_NAME); + } + void onUInt(uint32_t v, const Descriptor*) + { + handle(v, UINT_NAME); + } + void onULong(uint64_t v, const Descriptor*) + { + handle(v, ULONG_NAME); + } + void onByte(int8_t v, const Descriptor*) + { + handle(v, BYTE_NAME); + } + void onShort(int16_t v, const Descriptor*) + { + handle(v, SHORT_NAME); + } + void onInt(int32_t v, const Descriptor*) + { + handle(v, INT_NAME); + } + void onLong(int64_t v, const Descriptor*) + { + handle(v, LONG_NAME); + } + void onFloat(float v, const Descriptor*) + { + handle(v, FLOAT_NAME); + } + void onDouble(double v, const Descriptor*) + { + handle(v, DOUBLE_NAME); + } + void onUuid(const CharSequence& v, const Descriptor*) + { + handle(v, UUID_NAME); + } + void onTimestamp(int64_t v, const Descriptor*) + { + handle(v, TIMESTAMP_NAME); + } + void onBinary(const CharSequence& v, const Descriptor*) + { + handle(v); + } + void onString(const CharSequence& v, const Descriptor*) + { + handle(v); + } + void onSymbol(const CharSequence& v, const Descriptor*) + { + handle(v); + } + MapBuilder(qpid::types::Variant::Map& m) : map(m), state(KEY) {} + private: + qpid::types::Variant::Map& map; + enum {KEY, SKIP, VALUE} state; + std::string key; + + template <typename T> void handle(T value, const std::string& name) + { + switch (state) { + case KEY: + QPID_LOG(warning, "Ignoring key of type " << name); + state = SKIP; + break; + case VALUE: + map[key] = value; + case SKIP: + state = KEY; + break; + } + } + void handle(const CharSequence& value) + { + switch (state) { + case KEY: + key = value.str(); + state = VALUE; + break; + case VALUE: + map[key] = value.str(); + case SKIP: + state = KEY; + break; + } + } +}; +} +void Decoder::readMap(qpid::types::Variant::Map& map) +{ + MapBuilder builder(map); + read(builder); +} + +qpid::types::Variant::Map Decoder::readMap() +{ + qpid::types::Variant::Map map; + readMap(map); + return map; +} + +void Decoder::read(Reader& reader) +{ + while (available() && reader.proceed()) { + readOne(reader); + } +} + +void Decoder::readOne(Reader& reader) +{ + const char* temp = start + position; + Constructor c = readConstructor(); + if (c.isDescribed) reader.onDescriptor(c.descriptor, temp); + readValue(reader, c.code, c.isDescribed ? &c.descriptor : 0); +} + +void Decoder::readValue(Reader& reader, uint8_t code, const Descriptor* descriptor) +{ + switch(code) { + case NULL_VALUE: + reader.onNull(descriptor); + break; + case BOOLEAN: + reader.onBoolean(readBoolean(), descriptor); + break; + case BOOLEAN_TRUE: + reader.onBoolean(true, descriptor); + break; + case BOOLEAN_FALSE: + reader.onBoolean(false, descriptor); + break; + case UBYTE: + reader.onUByte(readUByte(), descriptor); + break; + case USHORT: + reader.onUShort(readUShort(), descriptor); + break; + case UINT: + reader.onUInt(readUInt(), descriptor); + break; + case UINT_SMALL: + reader.onUInt(readUByte(), descriptor); + break; + case UINT_ZERO: + reader.onUInt(0, descriptor); + break; + case ULONG: + reader.onULong(readULong(), descriptor); + break; + case ULONG_SMALL: + reader.onULong(readUByte(), descriptor); + break; + case ULONG_ZERO: + reader.onULong(0, descriptor); + break; + case BYTE: + reader.onByte(readByte(), descriptor); + break; + case SHORT: + reader.onShort(readShort(), descriptor); + break; + case INT: + reader.onInt(readInt(), descriptor); + break; + case INT_SMALL: + reader.onInt(readByte(), descriptor); + break; + case LONG: + reader.onLong(readLong(), descriptor); + break; + case LONG_SMALL: + reader.onLong(readByte(), descriptor); + break; + case FLOAT: + reader.onFloat(readFloat(), descriptor); + break; + case DOUBLE: + reader.onDouble(readDouble(), descriptor); + break; + case UUID: + reader.onUuid(readRawUuid(), descriptor); + break; + case TIMESTAMP: + reader.onTimestamp(readLong(), descriptor); + break; + + case BINARY8: + reader.onBinary(readSequence8(), descriptor); + break; + case BINARY32: + reader.onBinary(readSequence32(), descriptor); + break; + case STRING8: + reader.onString(readSequence8(), descriptor); + break; + case STRING32: + reader.onString(readSequence32(), descriptor); + break; + case SYMBOL8: + reader.onSymbol(readSequence8(), descriptor); + break; + case SYMBOL32: + reader.onSymbol(readSequence32(), descriptor); + break; + + case LIST0: + reader.onStartList(0, CharSequence::create(0, 0), descriptor); + reader.onEndList(0, descriptor); + break; + case LIST8: + readList8(reader, descriptor); + break; + case LIST32: + readList32(reader, descriptor); + break; + case MAP8: + readMap8(reader, descriptor); + break; + case MAP32: + readMap32(reader, descriptor); + break; + case ARRAY8: + readArray8(reader, descriptor); + break; + case ARRAY32: + readArray32(reader, descriptor); + break; + default: + break; + } +} + +void Decoder::readList8(Reader& reader, const Descriptor* descriptor) +{ + uint8_t size = readUByte(); + uint8_t count = readUByte(); + readList(reader, size-sizeof(size), count, descriptor); +} + +void Decoder::readList32(Reader& reader, const Descriptor* descriptor) +{ + uint32_t size = readUInt(); + uint32_t count = readUInt(); + readList(reader, size-sizeof(size), count, descriptor); +} + +void Decoder::readMap8(Reader& reader, const Descriptor* descriptor) +{ + uint8_t size = readUByte(); + uint8_t count = readUByte(); + readMap(reader, size-sizeof(size), count, descriptor); +} + +void Decoder::readMap32(Reader& reader, const Descriptor* descriptor) +{ + uint32_t size = readUInt(); + uint32_t count = readUInt(); + readMap(reader, size-sizeof(size), count, descriptor); +} + +void Decoder::readArray8(Reader& reader, const Descriptor* descriptor) +{ + uint8_t size = readUByte(); + uint8_t count = readUByte(); + readArray(reader, size-sizeof(size), count, descriptor); +} + +void Decoder::readArray32(Reader& reader, const Descriptor* descriptor) +{ + uint32_t size = readUInt(); + uint32_t count = readUInt(); + readArray(reader, size-sizeof(size), count, descriptor); +} + +void Decoder::readList(Reader& reader, uint32_t size, uint32_t count, const Descriptor* descriptor) +{ + if (reader.onStartList(count, CharSequence::create(data(), size), descriptor)) { + for (uint32_t i = 0; i < count; ++i) { + readOne(reader); + } + reader.onEndList(count, descriptor); + } else { + //skip + advance(size); + } +} +void Decoder::readMap(Reader& reader, uint32_t size, uint32_t count, const Descriptor* descriptor) +{ + if (reader.onStartMap(count, CharSequence::create(data(), size), descriptor)) { + for (uint32_t i = 0; i < count; ++i) { + readOne(reader); + } + reader.onEndMap(count, descriptor); + } else { + //skip + advance(size); + } +} + +void Decoder::readArray(Reader& reader, uint32_t size, uint32_t count, const Descriptor* descriptor) +{ + size_t temp = position; + Constructor constructor = readConstructor(); + CharSequence raw = CharSequence::create(data(), size-(position-temp)); + if (reader.onStartArray(count, raw, constructor, descriptor)) { + for (uint32_t i = 0; i < count; ++i) { + readValue(reader, constructor.code, constructor.isDescribed ? &constructor.descriptor : 0); + } + reader.onEndArray(count, descriptor); + } else { + //skip + advance(raw.size); + } +} + + +Constructor Decoder::readConstructor() +{ + Constructor result(readCode()); + if (result.code == DESCRIPTOR) { + result.isDescribed = true; + result.descriptor = readDescriptor(); + result.code = readCode(); + } else { + result.isDescribed = false; + } + return result; +} + +Descriptor Decoder::readDescriptor() +{ + uint8_t code = readCode(); + switch(code) { + case SYMBOL8: + return Descriptor(readSequence8()); + case SYMBOL32: + return Descriptor(readSequence32()); + case ULONG: + return Descriptor(readULong()); + case ULONG_SMALL: + return Descriptor((uint64_t) readUByte()); + case ULONG_ZERO: + return Descriptor((uint64_t) 0); + default: + throw qpid::Exception(QPID_MSG("Expected descriptor of type ulong or symbol; found " << code)); + } +} + +void Decoder::advance(size_t n) +{ + if (n > available()) throw qpid::Exception(QPID_MSG("Out of Bounds")); + position += n; +} + +const char* Decoder::data() +{ + return start + position; +} + +size_t Decoder::available() +{ + return size - position; +} + +uint8_t Decoder::readCode() +{ + return readUByte(); +} + +bool Decoder::readBoolean() +{ + return readUByte(); +} + +uint8_t Decoder::readUByte() +{ + return static_cast<uint8_t>(start[position++]); +} + +uint16_t Decoder::readUShort() +{ + uint16_t hi = (unsigned char) start[position++]; + hi = hi << 8; + hi |= (unsigned char) start[position++]; + return hi; +} + +uint32_t Decoder::readUInt() +{ + uint32_t a = (unsigned char) start[position++]; + uint32_t b = (unsigned char) start[position++]; + uint32_t c = (unsigned char) start[position++]; + uint32_t d = (unsigned char) start[position++]; + a = a << 24; + a |= b << 16; + a |= c << 8; + a |= d; + return a; +} + +uint64_t Decoder::readULong() +{ + uint64_t hi =readUInt(); + uint64_t lo = readUInt(); + hi = hi << 32; + return hi | lo; +} + +int8_t Decoder::readByte() +{ + return (int8_t) readUByte(); +} + +int16_t Decoder::readShort() +{ + return (int16_t) readUShort(); +} + +int32_t Decoder::readInt() +{ + return (int32_t) readUInt(); +} + +int64_t Decoder::readLong() +{ + return (int64_t) readULong(); +} + +float Decoder::readFloat() +{ + union { + uint32_t i; + float f; + } val; + val.i = readUInt(); + return val.f; +} + +double Decoder::readDouble() +{ + union { + uint64_t i; + double f; + } val; + val.i = readULong(); + return val.f; +} + +CharSequence Decoder::readSequence8() +{ + CharSequence s; + s.size = readUByte(); + s.data = start + position; + advance(s.size); + return s; +} + +CharSequence Decoder::readSequence32() +{ + CharSequence s; + s.size = readUInt(); + s.data = start + position; + advance(s.size); + return s; +} + +qpid::types::Uuid Decoder::readUuid() +{ + qpid::types::Uuid uuid(start + position); + advance(16); + return uuid; +} + +CharSequence Decoder::readRawUuid() +{ + CharSequence s; + s.data = start + position; + s.size = 16; + advance(s.size); + return s; +} + +size_t Decoder::getPosition() const { return position; } +size_t Decoder::getSize() const { return size; } +void Decoder::resetSize(size_t s) { size = s; } +}} // namespace qpid::amqp diff --git a/cpp/src/qpid/amqp/Decoder.h b/cpp/src/qpid/amqp/Decoder.h new file mode 100644 index 0000000000..7ddfe0f17f --- /dev/null +++ b/cpp/src/qpid/amqp/Decoder.h @@ -0,0 +1,99 @@ +#ifndef QPID_AMQP_DECODER_H +#define QPID_AMQP_DECODER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/sys/IntegerTypes.h" +#include "qpid/CommonImportExport.h" +#include <map> +#include <string> +#include <stddef.h> + +namespace qpid { +namespace types { +class Uuid; +class Variant; +} +namespace amqp { +struct CharSequence; +struct Constructor; +struct Descriptor; +class Reader; + +/** + * Class to assist in decoding an AMQP encoded data-stream. + */ +class Decoder +{ + public: + QPID_COMMON_EXTERN Decoder(const char*, size_t); + + QPID_COMMON_EXTERN size_t available(); + QPID_COMMON_EXTERN uint8_t readCode(); + + QPID_COMMON_EXTERN bool readBoolean(); + QPID_COMMON_EXTERN uint8_t readUByte(); + QPID_COMMON_EXTERN uint16_t readUShort(); + QPID_COMMON_EXTERN uint32_t readUInt(); + QPID_COMMON_EXTERN uint64_t readULong(); + QPID_COMMON_EXTERN int8_t readByte(); + QPID_COMMON_EXTERN int16_t readShort(); + QPID_COMMON_EXTERN int32_t readInt(); + QPID_COMMON_EXTERN int64_t readLong(); + QPID_COMMON_EXTERN float readFloat(); + QPID_COMMON_EXTERN double readDouble(); + QPID_COMMON_EXTERN qpid::types::Uuid readUuid(); + QPID_COMMON_EXTERN CharSequence readSequence8(); + QPID_COMMON_EXTERN CharSequence readSequence32(); + QPID_COMMON_EXTERN Descriptor readDescriptor(); + QPID_COMMON_EXTERN void read(Reader& reader); + + QPID_COMMON_EXTERN void readMap(std::map<std::string, qpid::types::Variant>&); + QPID_COMMON_EXTERN std::map<std::string, qpid::types::Variant> readMap(); + QPID_COMMON_EXTERN void advance(size_t); + QPID_COMMON_EXTERN size_t getPosition() const; + QPID_COMMON_EXTERN void resetSize(size_t size); + QPID_COMMON_EXTERN size_t getSize() const; + + private: + const char* const start; + size_t size; + size_t position; + + void readOne(Reader& reader); + void readValue(Reader& reader, uint8_t code, const Descriptor* descriptor); + void readList(Reader& reader, uint32_t size, uint32_t count, const Descriptor* descriptor); + void readMap(Reader& reader, uint32_t size, uint32_t count, const Descriptor* descriptor); + void readArray(Reader& reader, uint32_t size, uint32_t count, const Descriptor* descriptor); + void readList8(Reader& reader, const Descriptor* descriptor); + void readList32(Reader& reader, const Descriptor* descriptor); + void readMap8(Reader& reader, const Descriptor* descriptor); + void readMap32(Reader& reader, const Descriptor* descriptor); + void readArray8(Reader& reader, const Descriptor* descriptor); + void readArray32(Reader& reader, const Descriptor* descriptor); + CharSequence readRawUuid(); + Constructor readConstructor(); + const char* data(); + +}; +}} // namespace qpid::amqp + +#endif /*!QPID_AMQP_DECODER_H*/ diff --git a/cpp/src/qpid/amqp/Descriptor.cpp b/cpp/src/qpid/amqp/Descriptor.cpp new file mode 100644 index 0000000000..087e87c5e6 --- /dev/null +++ b/cpp/src/qpid/amqp/Descriptor.cpp @@ -0,0 +1,52 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Descriptor.h" + +namespace qpid { +namespace amqp { +Descriptor::Descriptor(uint64_t code) : type(NUMERIC) { value.code = code; } +Descriptor::Descriptor(const CharSequence& symbol) : type(SYMBOLIC) { value.symbol = symbol; } +bool Descriptor::match(const std::string& symbol, uint64_t code) const +{ + switch (type) { + case SYMBOLIC: + return symbol.compare(0, symbol.size(), value.symbol.data, value.symbol.size) == 0; + case NUMERIC: + return code == value.code; + } + return false; +} + + +std::ostream& operator<<(std::ostream& os, const Descriptor& d) +{ + switch (d.type) { + case Descriptor::SYMBOLIC: + if (d.value.symbol.data && d.value.symbol.size) os << std::string(d.value.symbol.data, d.value.symbol.size); + else os << "null"; + break; + case Descriptor::NUMERIC: + os << d.value.code; + break; + } + return os; +} +}} // namespace qpid::amqp diff --git a/cpp/src/qpid/amqp/Descriptor.h b/cpp/src/qpid/amqp/Descriptor.h new file mode 100644 index 0000000000..c36aa38ee3 --- /dev/null +++ b/cpp/src/qpid/amqp/Descriptor.h @@ -0,0 +1,54 @@ +#ifndef QPID_AMQP_DESCRIPTOR_H +#define QPID_AMQP_DESCRIPTOR_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/amqp/CharSequence.h" +#include "qpid/sys/IntegerTypes.h" +#include <ostream> + +namespace qpid { +namespace amqp { + +/** + * Representation of an AMQP 1.0 type descriptor. + */ +struct Descriptor +{ + union { + CharSequence symbol; + uint64_t code; + } value; + enum { + NUMERIC, + SYMBOLIC + } type; + + Descriptor(uint64_t code); + Descriptor(const CharSequence& symbol); + bool match(const std::string&, uint64_t) const; +}; + +std::ostream& operator<<(std::ostream& os, const Descriptor& d); + +}} // namespace qpid::amqp + +#endif /*!QPID_AMQP_DESCRIPTOR_H*/ diff --git a/cpp/src/qpid/amqp/Encoder.cpp b/cpp/src/qpid/amqp/Encoder.cpp new file mode 100644 index 0000000000..6599f70811 --- /dev/null +++ b/cpp/src/qpid/amqp/Encoder.cpp @@ -0,0 +1,402 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/amqp/Encoder.h" +#include "qpid/amqp/CharSequence.h" +#include "qpid/amqp/Descriptor.h" +#include "qpid/amqp/typecodes.h" +#include "qpid/types/Uuid.h" +#include "qpid/log/Statement.h" +#include "qpid/Exception.h" +#include <assert.h> +#include <string.h> + +namespace qpid { +namespace amqp { + +namespace { +template <typename T> size_t encode(char* data, T i); +template <> size_t encode<uint8_t>(char* data, uint8_t i) +{ + *data = i; + return 1; +} +template <> size_t encode<uint16_t>(char* data, uint16_t i) +{ + uint16_t b = i; + size_t position(0); + data[position++] = (uint8_t) (0xFF & (b >> 8)); + data[position++] = (uint8_t) (0xFF & b); + return position; +} +template <> size_t encode<uint32_t>(char* data, uint32_t i) +{ + uint32_t b = i; + size_t position(0); + data[position++] = (uint8_t) (0xFF & (b >> 24)); + data[position++] = (uint8_t) (0xFF & (b >> 16)); + data[position++] = (uint8_t) (0xFF & (b >> 8)); + data[position++] = (uint8_t) (0xFF & b); + return position; +} +template <> size_t encode<uint64_t>(char* data, uint64_t i) +{ + uint32_t hi = i >> 32; + uint32_t lo = i; + size_t r(0); + r += encode(data, hi); + r += encode(data + r, lo); + return r; +} +template<typename T> struct Backfill +{ + T size; + T count; + char* location; +}; + +template<typename T> void end(T count, void* token, char* current) +{ + Backfill<T> b; + b.location = (char*) token; + b.size = (T) (current - b.location) - sizeof(b.size); + b.count = count; + b.location += encode<T>(b.location, b.size); + encode<T>(b.location, b.count); +} +} +char* Encoder::skip(size_t n) +{ + char* current = data + position; + check(n); + position += n; + return current; +} + +void Encoder::write(bool b) +{ + check(sizeof(b)); + position += encode<uint8_t>(data+position, b ? 1u : 0u); +} +void Encoder::write(uint8_t i) +{ + check(sizeof(i)); + position += encode<uint8_t>(data+position, i); +} +void Encoder::write(uint16_t i) +{ + check(sizeof(i)); + position += encode<uint16_t>(data+position, i); +} +void Encoder::write(uint32_t i) +{ + check(sizeof(i)); + position += encode<uint32_t>(data+position, i); +} +void Encoder::write(uint64_t i) +{ + check(sizeof(i)); + position += encode<uint64_t>(data+position, i); +} +void Encoder::write(int8_t i) +{ + check(sizeof(i)); + position += encode(data+position, (uint8_t) i); +} +void Encoder::write(int16_t i) +{ + check(sizeof(i)); + position += encode(data+position, (uint16_t) i); +} +void Encoder::write(int32_t i) +{ + check(sizeof(i)); + position += encode(data+position, (uint32_t) i); +} +void Encoder::write(int64_t i) +{ + check(sizeof(i)); + position += encode(data+position, (uint64_t) i); +} +void Encoder::write(float f) +{ + check(sizeof(f)); + union { + uint32_t i; + float f; + } val; + + val.f = f; + write(val.i); +} +void Encoder::write(double d) +{ + check(sizeof(d)); + union { + uint64_t i; + double d; + } val; + + val.d = d; + write(val.i); +} +void Encoder::write(const qpid::types::Uuid& uuid) +{ + writeBytes((const char*) uuid.data(), uuid.size()); +} + +void Encoder::writeBytes(const char* bytes, size_t count) +{ + check(count); + ::memcpy(data + position, bytes, count); + position += count; +} + +void Encoder::writeCode(uint8_t code) +{ + write(code); +} + +void Encoder::writeNull(const Descriptor* d) +{ + if (d) writeDescriptor(*d); + writeCode(typecodes::NULL_VALUE); +} +void Encoder::writeBoolean(bool b, const Descriptor* d) +{ + if (d) writeDescriptor(*d); + writeCode(b ? typecodes::BOOLEAN_TRUE : typecodes::BOOLEAN_FALSE); +} +void Encoder::writeUByte(uint8_t i, const Descriptor* d) +{ + write(i, typecodes::UBYTE, d); +} + +void Encoder::writeUShort(uint16_t i, const Descriptor* d) +{ + write(i, typecodes::USHORT, d); +} + +void Encoder::writeUInt(uint32_t i, const Descriptor* d) +{ + if (i == 0) { + if (d) writeDescriptor(*d); + writeCode(typecodes::UINT_ZERO); + } else { + if (i < 256) { + write((uint8_t) i, typecodes::UINT_SMALL, d); + } else { + write(i, typecodes::UINT, d); + } + } +} + +void Encoder::writeULong(uint64_t i, const Descriptor* d) +{ + if (i == 0) { + if (d) writeDescriptor(*d); + writeCode(typecodes::ULONG_ZERO); + } else { + if (i < 256) { + write((uint8_t) i, typecodes::ULONG_SMALL, d); + } else { + write(i, typecodes::ULONG, d); + } + } +} + +void Encoder::writeByte(int8_t i, const Descriptor* d) +{ + write((uint8_t) i, typecodes::LONG, d); +} + +void Encoder::writeShort(int16_t i, const Descriptor* d) +{ + write((uint16_t) i, typecodes::SHORT, d); +} + +void Encoder::writeInt(int32_t i, const Descriptor* d) +{ + write((uint32_t) i, typecodes::INT, d); +} + +void Encoder::writeLong(int64_t i, const Descriptor* d) +{ + write((uint64_t) i, typecodes::LONG, d); +} + +void Encoder::writeFloat(float f, const Descriptor* d) +{ + write(f, typecodes::FLOAT, d); +} + +void Encoder::writeDouble(double f, const Descriptor* d) +{ + write(f, typecodes::DOUBLE, d); +} + +void Encoder::writeUuid(const qpid::types::Uuid& uuid, const Descriptor* d) +{ + write(uuid, typecodes::UUID, d); +} + +void Encoder::write(const CharSequence& v, std::pair<uint8_t, uint8_t> codes, const Descriptor* d) +{ + if (d) writeDescriptor(*d); + if (v.size < 256) { + writeCode(codes.first); + write((uint8_t) v.size); + } else { + writeCode(codes.second); + write((uint32_t) v.size); + } + writeBytes(v.data, v.size); +} + +void Encoder::write(const std::string& v, std::pair<uint8_t, uint8_t> codes, const Descriptor* d) +{ + if (d) writeDescriptor(*d); + if (v.size() < 256) { + writeCode(codes.first); + write((uint8_t) v.size()); + } else { + writeCode(codes.second); + write((uint32_t) v.size()); + } + writeBytes(v.data(), v.size()); +} + +void Encoder::writeSymbol(const CharSequence& v, const Descriptor* d) +{ + write(v, typecodes::SYMBOL, d); +} + +void Encoder::writeSymbol(const std::string& v, const Descriptor* d) +{ + write(v, typecodes::SYMBOL, d); +} + +void Encoder::writeString(const CharSequence& v, const Descriptor* d) +{ + write(v, typecodes::STRING, d); +} + +void Encoder::writeString(const std::string& v, const Descriptor* d) +{ + write(v, typecodes::STRING, d); +} + +void Encoder::writeBinary(const CharSequence& v, const Descriptor* d) +{ + write(v, typecodes::BINARY, d); +} + +void Encoder::writeBinary(const std::string& v, const Descriptor* d) +{ + write(v, typecodes::BINARY, d); +} + +void* Encoder::startList8(const Descriptor* d) +{ + return start<uint8_t>(typecodes::LIST8, d); +} + +void* Encoder::startList32(const Descriptor* d) +{ + return start<uint32_t>(typecodes::LIST32, d); +} + +void Encoder::endList8(uint8_t count, void* token) +{ + end<uint8_t>(count, token, data+position); +} + +void Encoder::endList32(uint32_t count, void* token) +{ + end<uint32_t>(count, token, data+position); +} + +void* Encoder::startMap8(const Descriptor* d) +{ + return start<uint8_t>(typecodes::MAP8, d); +} + +void* Encoder::startMap32(const Descriptor* d) +{ + return start<uint32_t>(typecodes::MAP32, d); +} + +void Encoder::endMap8(uint8_t count, void* token) +{ + end<uint8_t>(count, token, data+position); +} + +void Encoder::endMap32(uint32_t count, void* token) +{ + end<uint32_t>(count, token, data+position); +} + + +void* Encoder::startArray8(const Constructor& c, const Descriptor* d) +{ + return startArray<uint8_t>(typecodes::ARRAY8, d, c); +} + +void* Encoder::startArray32(const Constructor& c, const Descriptor* d) +{ + return startArray<uint8_t>(typecodes::ARRAY32, d, c); +} + +void Encoder::endArray8(size_t count, void* token) +{ + end<uint8_t>(count, token, data+position); +} + +void Encoder::endArray32(size_t count, void* token) +{ + end<uint32_t>(count, token, data+position); +} + + +void Encoder::writeDescriptor(const Descriptor& d) +{ + writeCode(typecodes::DESCRIPTOR); + switch (d.type) { + case Descriptor::NUMERIC: + writeULong(d.value.code, 0); + break; + case Descriptor::SYMBOLIC: + writeSymbol(d.value.symbol, 0); + break; + } +} +void Encoder::check(size_t s) +{ + if (position + s > size) { + QPID_LOG(notice, "Buffer overflow for write of size " << s << " to buffer of size " << size << " at position " << position); + assert(false); + throw qpid::Exception("Buffer overflow in encoder!"); + } +} +Encoder::Encoder(char* d, size_t s) : data(d), size(s), position(0) {} +size_t Encoder::getPosition() { return position; } +void Encoder::resetPosition(size_t p) { assert(p <= size); position = p; } + +}} // namespace qpid::amqp diff --git a/cpp/src/qpid/amqp/Encoder.h b/cpp/src/qpid/amqp/Encoder.h new file mode 100644 index 0000000000..e2938a002a --- /dev/null +++ b/cpp/src/qpid/amqp/Encoder.h @@ -0,0 +1,149 @@ +#ifndef QPID_AMQP_ENCODER_H +#define QPID_AMQP_ENCODER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/sys/IntegerTypes.h" +#include "qpid/amqp/Constructor.h" +#include <stddef.h> +#include <string> + +namespace qpid { +namespace types { +class Uuid; +} +namespace amqp { +struct CharSequence; +struct Descriptor; + +/** + * Class to help create AMQP encoded data. + */ +class Encoder +{ + public: + void writeCode(uint8_t); + + void write(bool); + void write(uint8_t); + void write(uint16_t); + void write(uint32_t); + void write(uint64_t); + void write(int8_t); + void write(int16_t); + void write(int32_t); + void write(int64_t); + void write(float); + void write(double); + void write(const qpid::types::Uuid&); + + void writeNull(const Descriptor* d=0); + void writeBoolean(bool, const Descriptor* d=0); + void writeUByte(uint8_t, const Descriptor* d=0); + void writeUShort(uint16_t, const Descriptor* d=0); + void writeUInt(uint32_t, const Descriptor* d=0); + void writeULong(uint64_t, const Descriptor* d=0); + void writeByte(int8_t, const Descriptor* d=0); + void writeShort(int16_t, const Descriptor* d=0); + void writeInt(int32_t, const Descriptor* d=0); + void writeLong(int64_t, const Descriptor* d=0); + void writeFloat(float, const Descriptor* d=0); + void writeDouble(double, const Descriptor* d=0); + void writeUuid(const qpid::types::Uuid&, const Descriptor* d=0); + + void writeSymbol(const CharSequence&, const Descriptor* d=0); + void writeSymbol(const std::string&, const Descriptor* d=0); + void writeString(const CharSequence&, const Descriptor* d=0); + void writeString(const std::string&, const Descriptor* d=0); + void writeBinary(const CharSequence&, const Descriptor* d=0); + void writeBinary(const std::string&, const Descriptor* d=0); + + void* startList8(const Descriptor* d=0); + void* startList32(const Descriptor* d=0); + void endList8(uint8_t count, void*); + void endList32(uint32_t count, void*); + + void* startMap8(const Descriptor* d=0); + void* startMap32(const Descriptor* d=0); + void endMap8(uint8_t count, void*); + void endMap32(uint32_t count, void*); + + void* startArray8(const Constructor&, const Descriptor* d=0); + void* startArray32(const Constructor&, const Descriptor* d=0); + void endArray8(size_t count, void*); + void endArray32(size_t count, void*); + + void writeDescriptor(const Descriptor&); + Encoder(char* data, size_t size); + size_t getPosition(); + void resetPosition(size_t p); + char* skip(size_t); + void writeBytes(const char* bytes, size_t count); + virtual ~Encoder() {} + private: + char* data; + size_t size; + size_t position; + + void write(const CharSequence& v, std::pair<uint8_t, uint8_t> codes, const Descriptor* d); + void write(const std::string& v, std::pair<uint8_t, uint8_t> codes, const Descriptor* d); + void check(size_t); + + template<typename T> void write(T value, uint8_t code, const Descriptor* d) + { + if (d) writeDescriptor(*d); + writeCode(code); + write(value); + } + + template<typename T> void write(T value, std::pair<uint8_t, uint8_t> codes, const Descriptor* d) + { + if (value < 256) { + write((uint8_t) value, codes.first, d); + } else { + write(value, codes.second, d); + } + } + + template<typename T> void* start(uint8_t code, const Descriptor* d) + { + if (d) writeDescriptor(*d); + writeCode(code); + //skip size and count, will backfill on end + return skip(sizeof(T)/*size*/ + sizeof(T)/*count*/); + } + + template<typename T> void* startArray(uint8_t code, const Descriptor* d, const Constructor& c) + { + void* token = start<T>(code, d); + if (c.isDescribed) { + writeDescriptor(c.descriptor); + } + check(1); + writeCode(c.code); + return token; + } + +}; + +}} // namespace qpid::amqp + +#endif /*!QPID_AMQP_ENCODER_H*/ diff --git a/cpp/src/qpid/amqp/ListReader.h b/cpp/src/qpid/amqp/ListReader.h new file mode 100644 index 0000000000..dce874bf2f --- /dev/null +++ b/cpp/src/qpid/amqp/ListReader.h @@ -0,0 +1,103 @@ +#ifndef QPID_AMQP_LISTREADER_H +#define QPID_AMQP_LISTREADER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Reader.h" + +namespace qpid { +namespace amqp { + +/** + * Utility to assist in reading AMQP encoded lists + */ +class ListReader : public Reader +{ + public: + ListReader() : index(0), level(0) {} + virtual ~ListReader() {} + virtual void onNull(const Descriptor* descriptor) { getReader().onNull(descriptor); } + virtual void onBoolean(bool v, const Descriptor* descriptor) { getReader().onBoolean(v, descriptor); } + virtual void onUByte(uint8_t v, const Descriptor* descriptor) { getReader().onUByte(v, descriptor); } + virtual void onUShort(uint16_t v, const Descriptor* descriptor) { getReader().onUShort(v, descriptor); } + virtual void onUInt(uint32_t v, const Descriptor* descriptor) { getReader().onUInt(v, descriptor); } + virtual void onULong(uint64_t v, const Descriptor* descriptor) { getReader().onULong(v, descriptor); } + virtual void onByte(int8_t v, const Descriptor* descriptor) { getReader().onByte(v, descriptor); } + virtual void onShort(int16_t v, const Descriptor* descriptor) { getReader().onShort(v, descriptor); } + virtual void onInt(int32_t v, const Descriptor* descriptor) { getReader().onInt(v, descriptor); } + virtual void onLong(int64_t v, const Descriptor* descriptor) { getReader().onLong(v, descriptor); } + virtual void onFloat(float v, const Descriptor* descriptor) { getReader().onFloat(v, descriptor); } + virtual void onDouble(double v, const Descriptor* descriptor) { getReader().onDouble(v, descriptor); } + virtual void onUuid(const CharSequence& v, const Descriptor* descriptor) { getReader().onUuid(v, descriptor); } + virtual void onTimestamp(int64_t v, const Descriptor* descriptor) { getReader().onTimestamp(v, descriptor); } + + virtual void onBinary(const CharSequence& v, const Descriptor* descriptor) { getReader().onBinary(v, descriptor); } + virtual void onString(const CharSequence& v, const Descriptor* descriptor) { getReader().onString(v, descriptor); } + virtual void onSymbol(const CharSequence& v, const Descriptor* descriptor) { getReader().onSymbol(v, descriptor); } + + virtual bool onStartList(uint32_t count, const CharSequence& v, const Descriptor* descriptor) + { + ++level; + getReader().onStartList(count, v, descriptor); + return false; + } + virtual void onEndList(uint32_t count, const Descriptor* descriptor) + { + --level; + getReader().onEndList(count, descriptor); + } + virtual bool onStartMap(uint32_t count, const CharSequence& v, const Descriptor* descriptor) + { + ++level; + getReader().onStartMap(count, v, descriptor); + return false; + } + virtual void onEndMap(uint32_t count, const Descriptor* descriptor) + { + --level; + getReader().onEndList(count, descriptor); + } + virtual bool onStartArray(uint32_t count, const CharSequence& v, const Constructor& c, const Descriptor* descriptor) + { + ++level; + getReader().onStartArray(count, v, c, descriptor); + return false; + } + virtual void onEndArray(uint32_t count, const Descriptor* descriptor) + { + --level; + getReader().onEndList(count, descriptor); + } + private: + size_t index; + size_t level; + Reader& getReader() + { + Reader& r = getReader(index); + if (level == 0) ++index; + return r; + } + protected: + virtual Reader& getReader(size_t i) = 0; +}; +}} // namespace qpid::amqp + +#endif /*!QPID_AMQP_LISTREADER_H*/ diff --git a/cpp/src/qpid/amqp/LoggingReader.h b/cpp/src/qpid/amqp/LoggingReader.h new file mode 100644 index 0000000000..ed5cab1cbd --- /dev/null +++ b/cpp/src/qpid/amqp/LoggingReader.h @@ -0,0 +1,64 @@ +#ifndef QPID_AMQP_LOGGINGREADER_H +#define QPID_AMQP_LOGGINGREADER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Reader.h" +#include "qpid/log/Statement.h" + +namespace qpid { +namespace amqp { + +class LoggingReader : public Reader +{ + public: + virtual ~LoggingReader() {} + virtual void onNull(const Descriptor*) { if (!ignoreNull()) QPID_LOG(warning, prefix() << "null" << suffix()); } + virtual void onBoolean(bool, const Descriptor*) { QPID_LOG(warning, prefix() << "boolean" << suffix()); } + virtual void onUByte(uint8_t, const Descriptor*) { QPID_LOG(warning, prefix() << "ubyte" << suffix()); } + virtual void onUShort(uint16_t, const Descriptor*) { QPID_LOG(warning, prefix() << "ushort" << suffix()); } + virtual void onUInt(uint32_t, const Descriptor*) { QPID_LOG(warning, prefix() << "uint" << suffix()); } + virtual void onULong(uint64_t, const Descriptor*) { QPID_LOG(warning, prefix() << "ulong" << suffix()); } + virtual void onByte(int8_t, const Descriptor*) { QPID_LOG(warning, prefix() << "byte" << suffix()); } + virtual void onShort(int16_t, const Descriptor*) { QPID_LOG(warning, prefix() << "short" << suffix()); } + virtual void onInt(int32_t, const Descriptor*) { QPID_LOG(warning, prefix() << "int" << suffix()); } + virtual void onLong(int64_t, const Descriptor*) { QPID_LOG(warning, prefix() << "long" << suffix()); } + virtual void onFloat(float, const Descriptor*) { QPID_LOG(warning, prefix() << "float" << suffix()); } + virtual void onDouble(double, const Descriptor*) { QPID_LOG(warning, prefix() << "double" << suffix()); } + virtual void onUuid(const CharSequence&, const Descriptor*) { QPID_LOG(warning, prefix() << "uuid" << suffix()); } + virtual void onTimestamp(int64_t, const Descriptor*) { QPID_LOG(warning, prefix() << "timestamp" << suffix()); } + + virtual void onBinary(const CharSequence&, const Descriptor*) { QPID_LOG(warning, prefix() << "binary" << suffix()); } + virtual void onString(const CharSequence&, const Descriptor*) { QPID_LOG(warning, prefix() << "string" << suffix()); } + virtual void onSymbol(const CharSequence&, const Descriptor*) { QPID_LOG(warning, prefix() << "symbol" << suffix()); } + + virtual bool onStartList(uint32_t, const CharSequence&, const Descriptor*) { QPID_LOG(warning, prefix() << "list" << suffix()); return recursive; } + virtual bool onStartMap(uint32_t, const CharSequence&, const Descriptor*) { QPID_LOG(warning, prefix() << "map" << suffix()); return recursive; } + virtual bool onStartArray(uint32_t, const CharSequence&, const Constructor&, const Descriptor*) { QPID_LOG(warning, prefix() << "array" << suffix()); return recursive; } + protected: + virtual bool recursive() { return true; } + virtual bool ignoreNull() { return true; } + virtual std::string prefix() = 0; + virtual std::string suffix() = 0; +}; +}} // namespace qpid::amqp + +#endif /*!QPID_AMQP_LOGGINGREADER_H*/ diff --git a/cpp/src/qpid/amqp/MapReader.cpp b/cpp/src/qpid/amqp/MapReader.cpp new file mode 100644 index 0000000000..2bace74d34 --- /dev/null +++ b/cpp/src/qpid/amqp/MapReader.cpp @@ -0,0 +1,289 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/amqp/MapReader.h" +#include "qpid/Exception.h" +#include "qpid/log/Statement.h" + +namespace qpid { +namespace amqp { + +void MapReader::onNull(const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + onNullValue(key, d); + key.data = 0; key.size = 0; + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } +} +void MapReader::onBoolean(bool v, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + onBooleanValue(key, v, d); + key.data = 0; key.size = 0; + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } +} + +void MapReader::onUByte(uint8_t v, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + onUByteValue(key, v, d); + key.data = 0; key.size = 0; + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } +} + +void MapReader::onUShort(uint16_t v, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + onUShortValue(key, v, d); + key.data = 0; key.size = 0; + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } +} + +void MapReader::onUInt(uint32_t v, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + onUIntValue(key, v, d); + key.data = 0; key.size = 0; + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } +} + +void MapReader::onULong(uint64_t v, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + onULongValue(key, v, d); + key.data = 0; key.size = 0; + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } +} + +void MapReader::onByte(int8_t v, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + onByteValue(key, v, d); + key.data = 0; key.size = 0; + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } +} + +void MapReader::onShort(int16_t v, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + onShortValue(key, v, d); + key.data = 0; key.size = 0; + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } +} + +void MapReader::onInt(int32_t v, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + onIntValue(key, v, d); + key.data = 0; key.size = 0; + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } +} + +void MapReader::onLong(int64_t v, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + onLongValue(key, v, d); + key.data = 0; key.size = 0; + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } +} + +void MapReader::onFloat(float v, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + onFloatValue(key, v, d); + key.data = 0; key.size = 0; + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } +} + +void MapReader::onDouble(double v, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + onDoubleValue(key, v, d); + key.data = 0; key.size = 0; + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } +} + +void MapReader::onUuid(const CharSequence& v, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + onUuidValue(key, v, d); + key.data = 0; key.size = 0; + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } +} + +void MapReader::onTimestamp(int64_t v, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + onTimestampValue(key, v, d); + key.data = 0; key.size = 0; + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } +} + +void MapReader::onBinary(const CharSequence& v, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + onBinaryValue(key, v, d); + key.data = 0; key.size = 0; + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } +} + +void MapReader::onString(const CharSequence& v, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + onStringValue(key, v, d); + key.data = 0; key.size = 0; + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key, got string " << v.str())); + } +} + +void MapReader::onSymbol(const CharSequence& v, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + onSymbolValue(key, v, d); + key.data = 0; key.size = 0; + } else { + key = v; + } +} + +bool MapReader::onStartList(uint32_t count, const CharSequence&, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + bool step = onStartListValue(key, count, d); + key.data = 0; key.size = 0; + return step; + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } + return true; +} + +bool MapReader::onStartMap(uint32_t count, const CharSequence&, const Descriptor* d) +{ + if (level++) { + if (key) { + bool step = onStartMapValue(key, count, d); + key.data = 0; key.size = 0; + return step; + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } + } + return true; +} + +bool MapReader::onStartArray(uint32_t count, const CharSequence&, const Constructor& c, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + bool step = onStartArrayValue(key, count, c, d); + key.data = 0; key.size = 0; + return step; + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } + return true; +} + +void MapReader::onEndList(uint32_t count, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + onEndListValue(key, count, d); + key.data = 0; key.size = 0; + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } +} + +void MapReader::onEndMap(uint32_t count, const Descriptor* d) +{ + if (--level) { + onEndMapValue(key, count, d); + key.data = 0; key.size = 0; + } +} + +void MapReader::onEndArray(uint32_t count, const Descriptor* d) +{ + if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); + if (key) { + onEndArrayValue(key, count, d); + key.data = 0; key.size = 0; + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key")); + } +} + +MapReader::MapReader() : level(0) +{ + key.data = 0; key.size = 0; +} + +}} // namespace qpid::amqp diff --git a/cpp/src/qpid/amqp/MapReader.h b/cpp/src/qpid/amqp/MapReader.h new file mode 100644 index 0000000000..fe8f65b30c --- /dev/null +++ b/cpp/src/qpid/amqp/MapReader.h @@ -0,0 +1,104 @@ +#ifndef QPID_AMQP_MAPREADER_H +#define QPID_AMQP_MAPREADER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Reader.h" +#include "CharSequence.h" +#include <string> + +namespace qpid { +namespace amqp { + +/** + * Reading AMQP 1.0 encoded data which is constrained to be a symbol + * keyeed map. The keys are assumed never to be described, the values + * may be. + */ +class MapReader : public Reader +{ + public: + virtual void onNullValue(const CharSequence& /*key*/, const Descriptor*) {} + virtual void onBooleanValue(const CharSequence& /*key*/, bool, const Descriptor*) {} + virtual void onUByteValue(const CharSequence& /*key*/, uint8_t, const Descriptor*) {} + virtual void onUShortValue(const CharSequence& /*key*/, uint16_t, const Descriptor*) {} + virtual void onUIntValue(const CharSequence& /*key*/, uint32_t, const Descriptor*) {} + virtual void onULongValue(const CharSequence& /*key*/, uint64_t, const Descriptor*) {} + virtual void onByteValue(const CharSequence& /*key*/, int8_t, const Descriptor*) {} + virtual void onShortValue(const CharSequence& /*key*/, int16_t, const Descriptor*) {} + virtual void onIntValue(const CharSequence& /*key*/, int32_t, const Descriptor*) {} + virtual void onLongValue(const CharSequence& /*key*/, int64_t, const Descriptor*) {} + virtual void onFloatValue(const CharSequence& /*key*/, float, const Descriptor*) {} + virtual void onDoubleValue(const CharSequence& /*key*/, double, const Descriptor*) {} + virtual void onUuidValue(const CharSequence& /*key*/, const CharSequence&, const Descriptor*) {} + virtual void onTimestampValue(const CharSequence& /*key*/, int64_t, const Descriptor*) {} + + virtual void onBinaryValue(const CharSequence& /*key*/, const CharSequence&, const Descriptor*) {} + virtual void onStringValue(const CharSequence& /*key*/, const CharSequence&, const Descriptor*) {} + virtual void onSymbolValue(const CharSequence& /*key*/, const CharSequence&, const Descriptor*) {} + + /** + * @return true to step into elements of the compound value, false + * to skip over it + */ + virtual bool onStartListValue(const CharSequence& /*key*/, uint32_t /*count*/, const Descriptor*) { return true; } + virtual bool onStartMapValue(const CharSequence& /*key*/, uint32_t /*count*/, const Descriptor*) { return true; } + virtual bool onStartArrayValue(const CharSequence& /*key*/, uint32_t /*count*/, const Constructor&, const Descriptor*) { return true; } + virtual void onEndListValue(const CharSequence& /*key*/, uint32_t /*count*/, const Descriptor*) {} + virtual void onEndMapValue(const CharSequence& /*key*/, uint32_t /*count*/, const Descriptor*) {} + virtual void onEndArrayValue(const CharSequence& /*key*/, uint32_t /*count*/, const Descriptor*) {} + + + //this class implements the Reader interface, thus acting as a transformer into a more map oriented scheme + void onNull(const Descriptor*); + void onBoolean(bool, const Descriptor*); + void onUByte(uint8_t, const Descriptor*); + void onUShort(uint16_t, const Descriptor*); + void onUInt(uint32_t, const Descriptor*); + void onULong(uint64_t, const Descriptor*); + void onByte(int8_t, const Descriptor*); + void onShort(int16_t, const Descriptor*); + void onInt(int32_t, const Descriptor*); + void onLong(int64_t, const Descriptor*); + void onFloat(float, const Descriptor*); + void onDouble(double, const Descriptor*); + void onUuid(const CharSequence&, const Descriptor*); + void onTimestamp(int64_t, const Descriptor*); + + void onBinary(const CharSequence&, const Descriptor*); + void onString(const CharSequence&, const Descriptor*); + void onSymbol(const CharSequence&, const Descriptor*); + + bool onStartList(uint32_t /*count*/, const CharSequence&, const Descriptor*); + bool onStartMap(uint32_t /*count*/, const CharSequence&, const Descriptor*); + bool onStartArray(uint32_t /*count*/, const CharSequence&, const Constructor&, const Descriptor*); + void onEndList(uint32_t /*count*/, const Descriptor*); + void onEndMap(uint32_t /*count*/, const Descriptor*); + void onEndArray(uint32_t /*count*/, const Descriptor*); + + MapReader(); + private: + CharSequence key; + size_t level; +}; +}} // namespace qpid::amqp + +#endif /*!QPID_AMQP_MAPREADER_H*/ diff --git a/cpp/src/qpid/amqp/MessageEncoder.cpp b/cpp/src/qpid/amqp/MessageEncoder.cpp new file mode 100644 index 0000000000..852ad29635 --- /dev/null +++ b/cpp/src/qpid/amqp/MessageEncoder.cpp @@ -0,0 +1,313 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/amqp/MessageEncoder.h" +#include "qpid/amqp/descriptors.h" +#include "qpid/log/Statement.h" + +namespace qpid { +namespace amqp { + +namespace { +size_t optimisable(const MessageEncoder::Header& msg) +{ + if (msg.getDeliveryCount()) return 5; + else if (msg.isFirstAcquirer()) return 4; + else if (msg.hasTtl()) return 3; + else if (msg.getPriority() != 4) return 2; + else if (msg.isDurable()) return 1; + else return 0; +} + +size_t optimisable(const MessageEncoder::Properties& msg) +{ + if (msg.hasReplyToGroupId()) return 13; + else if (msg.hasGroupSequence()) return 12; + else if (msg.hasGroupId()) return 11; + else if (msg.hasCreationTime()) return 10; + else if (msg.hasAbsoluteExpiryTime()) return 9; + else if (msg.hasContentEncoding()) return 8; + else if (msg.hasContentType()) return 7; + else if (msg.hasCorrelationId()) return 6; + else if (msg.hasReplyTo()) return 5; + else if (msg.hasSubject()) return 4; + else if (msg.hasTo()) return 3; + else if (msg.hasUserId()) return 2; + else if (msg.hasMessageId()) return 1; + else return 0; +} +size_t encodedSize(const std::string& s) +{ + size_t total = s.size(); + if (total > 255) total += 4; + else total += 1; + return total; +} +const std::string BINARY("binary"); +} + +void MessageEncoder::writeHeader(const Header& msg) +{ + size_t fields(optimise ? optimisable(msg) : 5); + if (fields) { + void* token = startList8(&qpid::amqp::message::HEADER); + writeBoolean(msg.isDurable()); + if (fields > 1) writeUByte(msg.getPriority()); + + if (msg.getTtl()) writeUInt(msg.getTtl()); + else if (fields > 2) writeNull(); + + if (msg.isFirstAcquirer()) writeBoolean(true); + else if (fields > 3) writeNull(); + + if (msg.getDeliveryCount()) writeUInt(msg.getDeliveryCount()); + else if (fields > 4) writeNull(); + endList8(fields, token); + } +} + + +void MessageEncoder::writeProperties(const Properties& msg) +{ + size_t fields(optimise ? optimisable(msg) : 13); + if (fields) { + void* token = startList32(&qpid::amqp::message::PROPERTIES); + if (msg.hasMessageId()) writeString(msg.getMessageId()); + else writeNull(); + + if (msg.hasUserId()) writeBinary(msg.getUserId()); + else if (fields > 1) writeNull(); + + if (msg.hasTo()) writeString(msg.getTo()); + else if (fields > 2) writeNull(); + + if (msg.hasSubject()) writeString(msg.getSubject()); + else if (fields > 3) writeNull(); + + if (msg.hasReplyTo()) writeString(msg.getReplyTo()); + else if (fields > 4) writeNull(); + + if (msg.hasCorrelationId()) writeString(msg.getCorrelationId()); + else if (fields > 5) writeNull(); + + if (msg.hasContentType()) writeSymbol(msg.getContentType()); + else if (fields > 6) writeNull(); + + if (msg.hasContentEncoding()) writeSymbol(msg.getContentEncoding()); + else if (fields > 7) writeNull(); + + if (msg.hasAbsoluteExpiryTime()) writeLong(msg.getAbsoluteExpiryTime()); + else if (fields > 8) writeNull(); + + if (msg.hasCreationTime()) writeLong(msg.getCreationTime()); + else if (fields > 9) writeNull(); + + if (msg.hasGroupId()) writeString(msg.getGroupId()); + else if (fields > 10) writeNull(); + + if (msg.hasGroupSequence()) writeUInt(msg.getGroupSequence()); + else if (fields > 11) writeNull(); + + if (msg.hasReplyToGroupId()) writeString(msg.getReplyToGroupId()); + else if (fields > 12) writeNull(); + + endList32(fields, token); + } +} + +void MessageEncoder::writeApplicationProperties(const qpid::types::Variant::Map& properties) +{ + writeApplicationProperties(properties, !optimise || properties.size()*2 > 255 || getEncodedSizeForElements(properties) > 255); +} + +void MessageEncoder::writeApplicationProperties(const qpid::types::Variant::Map& properties, bool large) +{ + writeMap(properties, &qpid::amqp::message::APPLICATION_PROPERTIES, large); +} + +void MessageEncoder::writeMap(const qpid::types::Variant::Map& properties, const Descriptor* d, bool large) +{ + void* token = large ? startMap32(d) : startMap8(d); + for (qpid::types::Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) { + writeString(i->first); + switch (i->second.getType()) { + case qpid::types::VAR_MAP: + case qpid::types::VAR_LIST: + //not allowed (TODO: revise, only strictly true for application-properties) whereas this is now a more general method) + QPID_LOG(warning, "Ignoring nested map/list; not allowed in application-properties for AMQP 1.0"); + case qpid::types::VAR_VOID: + writeNull(); + break; + case qpid::types::VAR_BOOL: + writeBoolean(i->second); + break; + case qpid::types::VAR_UINT8: + writeUByte(i->second); + break; + case qpid::types::VAR_UINT16: + writeUShort(i->second); + break; + case qpid::types::VAR_UINT32: + writeUInt(i->second); + break; + case qpid::types::VAR_UINT64: + writeULong(i->second); + break; + case qpid::types::VAR_INT8: + writeByte(i->second); + break; + case qpid::types::VAR_INT16: + writeShort(i->second); + break; + case qpid::types::VAR_INT32: + writeInt(i->second); + break; + case qpid::types::VAR_INT64: + writeULong(i->second); + break; + case qpid::types::VAR_FLOAT: + writeFloat(i->second); + break; + case qpid::types::VAR_DOUBLE: + writeDouble(i->second); + break; + case qpid::types::VAR_STRING: + if (i->second.getEncoding() == BINARY) { + writeBinary(i->second); + } else { + writeString(i->second); + } + break; + case qpid::types::VAR_UUID: + writeUuid(i->second); + break; + } + } + if (large) endMap32(properties.size()*2, token); + else endMap8(properties.size()*2, token); +} + +size_t MessageEncoder::getEncodedSize(const Header& h, const Properties& p, const qpid::types::Variant::Map& ap, const std::string& d) +{ + //NOTE: this does not take optional optimisation into account, + //i.e. it is a 'worst case' estimate for required buffer space + size_t total(0); + + //header: + total += 3/*descriptor*/ + 1/*code*/ + 1/*size*/ + 1/*count*/ + 5/*codes for each field*/; + if (h.getPriority() != 4) total += 1; + if (h.getDeliveryCount()) total += 4; + if (h.hasTtl()) total += 4; + return total + getEncodedSize(p, ap, d); +} + +size_t MessageEncoder::getEncodedSize(const Properties& p, const qpid::types::Variant::Map& ap, const std::string& d) +{ + //NOTE: this does not take optional optimisation into account, + //i.e. it is a 'worst case' estimate for required buffer space + size_t total(0); + + //properties: + total += 3/*descriptor*/ + 1/*code*/ + 4/*size*/ + 4/*count*/ + 13/*codes for each field*/; + if (p.hasMessageId()) total += encodedSize(p.getMessageId()); + if (p.hasUserId()) total += encodedSize(p.getUserId()); + if (p.hasTo()) total += encodedSize(p.getTo()); + if (p.hasSubject()) total += encodedSize(p.getSubject()); + if (p.hasReplyTo()) total += encodedSize(p.getReplyTo()); + if (p.hasCorrelationId()) total += encodedSize(p.getCorrelationId()); + if (p.hasContentType()) total += encodedSize(p.getContentType()); + if (p.hasContentEncoding()) total += encodedSize(p.getContentEncoding()); + if (p.hasAbsoluteExpiryTime()) total += 8; + if (p.hasCreationTime()) total += 8; + if (p.hasGroupId()) total += encodedSize(p.getGroupId()); + if (p.hasGroupSequence()) total += 4; + if (p.hasReplyToGroupId()) total += encodedSize(p.getReplyToGroupId()); + + + //application-properties: + total += 3/*descriptor*/ + getEncodedSize(ap, true); + //body: + if (d.size()) total += 3/*descriptor*/ + 1/*code*/ + encodedSize(d); + + return total; +} + +size_t MessageEncoder::getEncodedSizeForElements(const qpid::types::Variant::Map& map) +{ + size_t total = 0; + for (qpid::types::Variant::Map::const_iterator i = map.begin(); i != map.end(); ++i) { + total += 1/*code*/ + encodedSize(i->first); + + switch (i->second.getType()) { + case qpid::types::VAR_MAP: + case qpid::types::VAR_LIST: + case qpid::types::VAR_VOID: + case qpid::types::VAR_BOOL: + total += 1; + break; + + case qpid::types::VAR_UINT8: + case qpid::types::VAR_INT8: + total += 2; + break; + + case qpid::types::VAR_UINT16: + case qpid::types::VAR_INT16: + total += 3; + break; + + case qpid::types::VAR_UINT32: + case qpid::types::VAR_INT32: + case qpid::types::VAR_FLOAT: + total += 5; + break; + + case qpid::types::VAR_UINT64: + case qpid::types::VAR_INT64: + case qpid::types::VAR_DOUBLE: + total += 9; + break; + + case qpid::types::VAR_UUID: + total += 17; + break; + + case qpid::types::VAR_STRING: + total += 1/*code*/ + encodedSize(i->second); + break; + } + } + return total; +} + + +size_t MessageEncoder::getEncodedSize(const qpid::types::Variant::Map& map, bool alwaysUseLargeMap) +{ + size_t total = getEncodedSizeForElements(map); + + //its not just the count that determines whether we can use a small map, but the aggregate size: + if (alwaysUseLargeMap || map.size()*2 > 255 || total > 255) total += 4/*size*/ + 4/*count*/; + else total += 1/*size*/ + 1/*count*/; + + total += 1 /*code for map itself*/; + + return total; +} +}} // namespace qpid::amqp diff --git a/cpp/src/qpid/amqp/MessageEncoder.h b/cpp/src/qpid/amqp/MessageEncoder.h new file mode 100644 index 0000000000..3db0763e8a --- /dev/null +++ b/cpp/src/qpid/amqp/MessageEncoder.h @@ -0,0 +1,100 @@ +#ifndef QPID_AMQP_MESSAGEENCODER_H +#define QPID_AMQP_MESSAGEENCODER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/amqp/Encoder.h" +#include "qpid/types/Variant.h" + +namespace qpid { +namespace amqp { + +/** + * + */ +class MessageEncoder : public Encoder +{ + public: + class Header + { + public: + virtual ~Header() {} + virtual bool isDurable() const = 0; + virtual uint8_t getPriority() const = 0; + virtual bool hasTtl() const = 0; + virtual uint32_t getTtl() const = 0; + virtual bool isFirstAcquirer() const = 0; + virtual uint32_t getDeliveryCount() const = 0; + }; + + class Properties + { + public: + virtual ~Properties() {} + virtual bool hasMessageId() const = 0; + virtual std::string getMessageId() const = 0; + virtual bool hasUserId() const = 0; + virtual std::string getUserId() const = 0; + virtual bool hasTo() const = 0; + virtual std::string getTo() const = 0; + virtual bool hasSubject() const = 0; + virtual std::string getSubject() const = 0; + virtual bool hasReplyTo() const = 0; + virtual std::string getReplyTo() const = 0; + virtual bool hasCorrelationId() const = 0; + virtual std::string getCorrelationId() const = 0; + virtual bool hasContentType() const = 0; + virtual std::string getContentType() const = 0; + virtual bool hasContentEncoding() const = 0; + virtual std::string getContentEncoding() const = 0; + virtual bool hasAbsoluteExpiryTime() const = 0; + virtual int64_t getAbsoluteExpiryTime() const = 0; + virtual bool hasCreationTime() const = 0; + virtual int64_t getCreationTime() const = 0; + virtual bool hasGroupId() const = 0; + virtual std::string getGroupId() const = 0; + virtual bool hasGroupSequence() const = 0; + virtual uint32_t getGroupSequence() const = 0; + virtual bool hasReplyToGroupId() const = 0; + virtual std::string getReplyToGroupId() const = 0; + }; + + MessageEncoder(char* d, size_t s, bool o=false) : Encoder(d, s), optimise(o) {} + void writeHeader(const Header&); + void writeProperties(const Properties&); + void writeApplicationProperties(const qpid::types::Variant::Map& properties); + void writeApplicationProperties(const qpid::types::Variant::Map& properties, bool useLargeMap); + + void writeMap(const qpid::types::Variant::Map& map, const Descriptor*, bool useLargeMap); + + static size_t getEncodedSize(const Header&, const Properties&, const qpid::types::Variant::Map&, const std::string&); + static size_t getEncodedSize(const Properties&, const qpid::types::Variant::Map&, const std::string&); + static size_t getEncodedSize(const qpid::types::Variant::Map&, bool useLargeMap); + static size_t getEncodedSize(const qpid::types::Variant::Map&); + + private: + bool optimise; + + static size_t getEncodedSizeForElements(const qpid::types::Variant::Map&); +}; +}} // namespace qpid::amqp + +#endif /*!QPID_AMQP_MESSAGEENCODER_H*/ diff --git a/cpp/src/qpid/amqp/MessageId.cpp b/cpp/src/qpid/amqp/MessageId.cpp new file mode 100644 index 0000000000..e6f6f4a231 --- /dev/null +++ b/cpp/src/qpid/amqp/MessageId.cpp @@ -0,0 +1,69 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/amqp/MessageId.h" +#include <assert.h> +#include <boost/lexical_cast.hpp> + +namespace qpid { +namespace amqp { + +MessageId::MessageId() : type(BYTES) +{ + value.bytes.data = 0; + value.bytes.size = 0; +} +void MessageId::assign(std::string& s) const +{ + switch (type) { + case BYTES: + if (value.bytes) s.assign(value.bytes.data, value.bytes.size); + break; + case UUID: + s = qpid::types::Uuid(value.bytes).str(); + break; + case ULONG: + s = boost::lexical_cast<std::string>(value.ulong); + break; + } +} + +void MessageId::set(qpid::amqp::CharSequence bytes, qpid::types::VariantType t) +{ + switch (t) { + case qpid::types::VAR_STRING: + type = BYTES; + break; + case qpid::types::VAR_UUID: + type = UUID; + assert(bytes.size == 16); + break; + default: + assert(false); + } + value.bytes = bytes; +} +void MessageId::set(uint64_t ulong) +{ + type = ULONG; + value.ulong = ulong; +} + +}} // namespace qpid::amqp diff --git a/cpp/src/qpid/amqp/MessageId.h b/cpp/src/qpid/amqp/MessageId.h new file mode 100644 index 0000000000..ee440f3011 --- /dev/null +++ b/cpp/src/qpid/amqp/MessageId.h @@ -0,0 +1,54 @@ +#ifndef QPID_AMQP_MESSAGEID_H +#define QPID_AMQP_MESSAGEID_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/amqp/CharSequence.h" +#include "qpid/types/Variant.h" +#include "qpid/CommonImportExport.h" + +namespace qpid { +namespace amqp { + +struct MessageId +{ + union + { + qpid::amqp::CharSequence bytes; + uint64_t ulong; + } value; + enum + { + BYTES, + UUID, + ULONG + } type; + + QPID_COMMON_EXTERN MessageId(); + QPID_COMMON_EXTERN void assign(std::string&) const; + QPID_COMMON_EXTERN void set(qpid::amqp::CharSequence bytes, qpid::types::VariantType t); + QPID_COMMON_EXTERN void set(uint64_t ulong); + +}; + +}} // namespace qpid::amqp + +#endif /*!QPID_AMQP_MESSAGEID_H*/ diff --git a/cpp/src/qpid/amqp/MessageReader.cpp b/cpp/src/qpid/amqp/MessageReader.cpp new file mode 100644 index 0000000000..1550fa1977 --- /dev/null +++ b/cpp/src/qpid/amqp/MessageReader.cpp @@ -0,0 +1,759 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/amqp/MessageReader.h" +#include "qpid/amqp/Descriptor.h" +#include "qpid/amqp/descriptors.h" +#include "qpid/types/Uuid.h" +#include "qpid/types/Variant.h" +#include "qpid/log/Statement.h" + +using namespace qpid::amqp::message; + +namespace qpid { +namespace amqp { +namespace { + +//header fields: +const size_t DURABLE(0); +const size_t PRIORITY(1); +const size_t TTL(2); +const size_t FIRST_ACQUIRER(3); +const size_t DELIVERY_COUNT(4); + +//properties fields: +const size_t MESSAGE_ID(0); +const size_t USER_ID(1); +const size_t TO(2); +const size_t SUBJECT(3); +const size_t REPLY_TO(4); +const size_t CORRELATION_ID(5); +const size_t CONTENT_TYPE(6); +const size_t CONTENT_ENCODING(7); +const size_t ABSOLUTE_EXPIRY_TIME(8); +const size_t CREATION_TIME(9); +const size_t GROUP_ID(10); +const size_t GROUP_SEQUENCE(11); +const size_t REPLY_TO_GROUP_ID(12); + +} + +/* +Reader& MessageReader::HeaderReader::getReader(size_t index) +{ + switch (index) { + case DURABLE: return durableReader; + case PRIORITY: return priorityReader; + case TTL: return ttlReader; + case FIRST_ACQUIRER: return firstAcquirerReader; + case DELIVERY_COUNT: return deliveryCountReader; + default: return noSuchFieldReader; + } +} + +Reader& MessageReader::PropertiesReader::getReader(size_t index) +{ + switch (index) { + case MESSAGE_ID: return messageIdReader; + case USER_ID: return userIdReader; + case TO: return toReader; + case SUBJECT: return subjectReader; + case REPLY_TO: return replyToReader; + case CORRELATION_ID: return correlationIdReader; + case CONTENT_TYPE: return contentTypeReader; + case CONTENT_ENCODING: return contentEncodingReader; + case ABSOLUTE_EXPIRY_TIME: return absoluteExpiryTimeReader; + case CREATION_TIME: return creationTimeReader; + case GROUP_ID: return groupIdReader; + case GROUP_SEQUENCE: return groupSequenceReader; + case REPLY_TO_GROUP_ID: return replyToGroupIdReader; + default: return noSuchFieldReader; + } +} +*/ + +MessageReader::HeaderReader::HeaderReader(MessageReader& p) : parent(p), index(0) {} +void MessageReader::HeaderReader::onBoolean(bool v, const Descriptor*) // durable, first-acquirer +{ + if (index == DURABLE) { + parent.onDurable(v); + } else if (index == FIRST_ACQUIRER) { + parent.onFirstAcquirer(v); + } else { + QPID_LOG(warning, "Unexpected message format, got boolean at index " << index << " of headers"); + } + ++index; +} +void MessageReader::HeaderReader::onUByte(uint8_t v, const Descriptor*) // priority +{ + if (index == PRIORITY) { + parent.onPriority(v); + } else { + QPID_LOG(warning, "Unexpected message format, got ubyte at index " << index << " of headers"); + } + ++index; +} +void MessageReader::HeaderReader::onUInt(uint32_t v, const Descriptor*) // ttl, delivery-count +{ + if (index == TTL) { + parent.onTtl(v); + } else if (index == DELIVERY_COUNT) { + parent.onDeliveryCount(v); + } else { + QPID_LOG(warning, "Unexpected message format, got uint at index " << index << " of headers"); + } + ++index; +} +void MessageReader::HeaderReader::onNull(const Descriptor*) +{ + ++index; +} + +MessageReader::PropertiesReader::PropertiesReader(MessageReader& p) : parent(p), index(0) {} +void MessageReader::PropertiesReader::onUuid(const CharSequence& v, const Descriptor*) // message-id, correlation-id +{ + if (index == MESSAGE_ID) { + parent.onMessageId(v, qpid::types::VAR_UUID); + } else if (index == CORRELATION_ID) { + parent.onCorrelationId(v); + } else { + QPID_LOG(warning, "Unexpected message format, got uuid at index " << index << " of properties"); + } + ++index; +} +void MessageReader::PropertiesReader::onULong(uint64_t v, const Descriptor*) // message-id, correlation-id +{ + if (index == MESSAGE_ID) { + parent.onMessageId(v); + } else if (index == CORRELATION_ID) { + parent.onCorrelationId(v); + } else { + QPID_LOG(warning, "Unexpected message format, got long at index " << index << " of properties"); + } + ++index; +} +void MessageReader::PropertiesReader::onBinary(const CharSequence& v, const Descriptor*) // message-id, correlation-id, user-id +{ + if (index == MESSAGE_ID) { + parent.onMessageId(v, qpid::types::VAR_STRING); + } else if (index == CORRELATION_ID) { + parent.onCorrelationId(v); + } else if (index == USER_ID) { + parent.onUserId(v); + } else { + QPID_LOG(warning, "Unexpected message format, got binary at index " << index << " of properties"); + } + ++index; +} +void MessageReader::PropertiesReader::onString(const CharSequence& v, const Descriptor*) // message-id, correlation-id, group-id, reply-to-group-id, subject, to, reply-to +{ + if (index == MESSAGE_ID) { + parent.onMessageId(v); + } else if (index == CORRELATION_ID) { + parent.onCorrelationId(v); + } else if (index == GROUP_ID) { + parent.onGroupId(v); + } else if (index == REPLY_TO_GROUP_ID) { + parent.onReplyToGroupId(v); + } else if (index == SUBJECT) { + parent.onSubject(v); + } else if (index == TO) { + parent.onTo(v); + } else if (index == REPLY_TO) { + parent.onReplyTo(v); + } else { + QPID_LOG(warning, "Unexpected message format, got string at index " << index << " of properties"); + } + ++index; +} +void MessageReader::PropertiesReader::onSymbol(const CharSequence& v, const Descriptor*) // content-type, content-encoding +{ + if (index == CONTENT_TYPE) { + parent.onContentType(v); + } else if (index == CONTENT_ENCODING) { + parent.onContentEncoding(v); + } else { + QPID_LOG(warning, "Unexpected message format, got symbol at index " << index << " of properties"); + } + ++index; +} +void MessageReader::PropertiesReader::onTimestamp(int64_t v, const Descriptor*) // absolute-expiry-time, creation-time +{ + if (index == ABSOLUTE_EXPIRY_TIME) { + parent.onAbsoluteExpiryTime(v); + } else if (index == CREATION_TIME) { + parent.onCreationTime(v); + } else { + QPID_LOG(warning, "Unexpected message format, got timestamp at index " << index << " of properties"); + } + ++index; +} +void MessageReader::PropertiesReader::onUInt(uint32_t v, const Descriptor*) // group-sequence +{ + if (index == GROUP_SEQUENCE) { + parent.onGroupSequence(v); + } else { + QPID_LOG(warning, "Unexpected message format, got uint at index " << index << " of properties"); + } + ++index; +} +void MessageReader::PropertiesReader::onNull(const Descriptor*) +{ + ++index; +} + +/* +MessageReader::DurableReader::DurableReader(MessageReader& p) : parent(p) {} +void MessageReader::DurableReader::onBoolean(bool v, const Descriptor*) +{ + parent.onDurable(v); +} +MessageReader::PriorityReader::PriorityReader(MessageReader& p) : parent(p) {} +void MessageReader::PriorityReader::onUByte(uint8_t v, const Descriptor*) +{ + parent.onPriority(v); +} +MessageReader::TtlReader::TtlReader(MessageReader& p) : parent(p) {} +void MessageReader::TtlReader::onUInt(uint32_t v, const Descriptor*) +{ + parent.onTtl(v); +} +MessageReader::FirstAcquirerReader::FirstAcquirerReader(MessageReader& p) : parent(p) {} +void MessageReader::FirstAcquirerReader::onBoolean(bool v, const Descriptor*) +{ + parent.onFirstAcquirer(v); +} +MessageReader::DeliveryCountReader::DeliveryCountReader(MessageReader& p) : parent(p) {} +void MessageReader::DeliveryCountReader::onUInt(uint32_t v, const Descriptor*) +{ + parent.onDeliveryCount(v); +} +MessageReader::MessageIdReader::MessageIdReader(MessageReader& p) : parent(p) {} +void MessageReader::MessageIdReader::onUuid(const qpid::types::Uuid& v, const Descriptor*) +{ + parent.onMessageId(v); +} +void MessageReader::MessageIdReader::onULong(uint64_t v, const Descriptor*) +{ + parent.onMessageId(v); +} +void MessageReader::MessageIdReader::onString(const CharSequence& v, const Descriptor*) +{ + parent.onMessageId(v); +} +void MessageReader::MessageIdReader::onBinary(const CharSequence& v, const Descriptor*) +{ + parent.onMessageId(v); +} +MessageReader::UserIdReader::UserIdReader(MessageReader& p) : parent(p) {} +void MessageReader::UserIdReader::onBinary(const CharSequence& v, const Descriptor*) +{ + parent.onUserId(v); +} +MessageReader::ToReader::ToReader(MessageReader& p) : parent(p) {} +void MessageReader::ToReader::onString(const CharSequence& v, const Descriptor*) +{ + parent.onTo(v); +} +MessageReader::SubjectReader::SubjectReader(MessageReader& p) : parent(p) {} +void MessageReader::SubjectReader::onString(const CharSequence& v, const Descriptor*) +{ + parent.onSubject(v); +} +MessageReader::ReplyToReader::ReplyToReader(MessageReader& p) : parent(p) {} +void MessageReader::ReplyToReader::onString(const CharSequence& v, const Descriptor*) +{ + parent.onReplyTo(v); +} +MessageReader::CorrelationIdReader::CorrelationIdReader(MessageReader& p) : parent(p) {} +void MessageReader::CorrelationIdReader::onUuid(const qpid::types::Uuid& v, const Descriptor*) +{ + parent.onCorrelationId(v); +} +void MessageReader::CorrelationIdReader::onULong(uint64_t v, const Descriptor*) +{ + parent.onCorrelationId(v); +} +void MessageReader::CorrelationIdReader::onString(const CharSequence& v, const Descriptor*) +{ + parent.onCorrelationId(v); +} +void MessageReader::CorrelationIdReader::onBinary(const CharSequence& v, const Descriptor*) +{ + parent.onCorrelationId(v); +} +MessageReader::ContentTypeReader::ContentTypeReader(MessageReader& p) : parent(p) {} +void MessageReader::ContentTypeReader::onString(const CharSequence& v, const Descriptor*) +{ + parent.onContentType(v); +} +MessageReader::ContentEncodingReader::ContentEncodingReader(MessageReader& p) : parent(p) {} +void MessageReader::ContentEncodingReader::onString(const CharSequence& v, const Descriptor*) +{ + parent.onContentEncoding(v); +} +MessageReader::AbsoluteExpiryTimeReader::AbsoluteExpiryTimeReader(MessageReader& p) : parent(p) {} +void MessageReader::AbsoluteExpiryTimeReader::onTimestamp(int64_t v, const Descriptor*) +{ + parent.onAbsoluteExpiryTime(v); +} +MessageReader::CreationTimeReader::CreationTimeReader(MessageReader& p) : parent(p) {} +void MessageReader::CreationTimeReader::onTimestamp(int64_t v, const Descriptor*) +{ + parent.onCreationTime(v); +} +MessageReader::GroupIdReader::GroupIdReader(MessageReader& p) : parent(p) {} +void MessageReader::GroupIdReader::onString(const CharSequence& v, const Descriptor*) +{ + parent.onGroupId(v); +} +MessageReader::GroupSequenceReader::GroupSequenceReader(MessageReader& p) : parent(p) {} +void MessageReader::GroupSequenceReader::onUInt(uint32_t v, const Descriptor*) +{ + parent.onGroupSequence(v); +} +MessageReader::ReplyToGroupIdReader::ReplyToGroupIdReader(MessageReader& p) : parent(p) {} +void MessageReader::ReplyToGroupIdReader::onString(const CharSequence& v, const Descriptor*) +{ + parent.onReplyToGroupId(v); +} +*/ + +//header, properties, amqp-sequence, amqp-value +bool MessageReader::onStartList(uint32_t count, const CharSequence& raw, const Descriptor* descriptor) +{ + if (delegate) { + return delegate->onStartList(count, raw, descriptor); + } else { + if (!descriptor) { + QPID_LOG(warning, "Expected described type but got no descriptor for list."); + return false; + } else if (descriptor->match(HEADER_SYMBOL, HEADER_CODE)) { + delegate = &headerReader; + return true; + } else if (descriptor->match(PROPERTIES_SYMBOL, PROPERTIES_CODE)) { + delegate = &propertiesReader; + return true; + } else if (descriptor->match(AMQP_SEQUENCE_SYMBOL, AMQP_SEQUENCE_CODE) || descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { + onBody(raw, *descriptor); + return false; + } else { + QPID_LOG(warning, "Unexpected described list: " << *descriptor); + return false; + } + } +} +void MessageReader::onEndList(uint32_t count, const Descriptor* descriptor) +{ + if (delegate) { + if (descriptor && (descriptor->match(HEADER_SYMBOL, HEADER_CODE) || descriptor->match(PROPERTIES_SYMBOL, PROPERTIES_CODE))) { + delegate = 0; + } else { + delegate->onEndList(count, descriptor); + } + } +} + +//delivery-annotations, message-annotations, application-properties, amqp-value +bool MessageReader::onStartMap(uint32_t count, const CharSequence& raw, const Descriptor* descriptor) +{ + if (delegate) { + return delegate->onStartMap(count, raw, descriptor); + } else { + if (!descriptor) { + QPID_LOG(warning, "Expected described type but got no descriptor for map."); + return false; + } else if (descriptor->match(DELIVERY_ANNOTATIONS_SYMBOL, DELIVERY_ANNOTATIONS_CODE)) { + onDeliveryAnnotations(raw); + return false; + } else if (descriptor->match(MESSAGE_ANNOTATIONS_SYMBOL, MESSAGE_ANNOTATIONS_CODE)) { + onMessageAnnotations(raw); + return false; + } else if (descriptor->match(FOOTER_SYMBOL, FOOTER_CODE)) { + onFooter(raw); + return false; + } else if (descriptor->match(APPLICATION_PROPERTIES_SYMBOL, APPLICATION_PROPERTIES_CODE)) { + onApplicationProperties(raw); + return false; + } else if (descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { + onBody(raw, *descriptor); + return false; + } else { + QPID_LOG(warning, "Unexpected described map: " << *descriptor); + return false; + } + } +} + +void MessageReader::onEndMap(uint32_t count, const Descriptor* descriptor) +{ + if (delegate) { + delegate->onEndMap(count, descriptor); + } +} + +//data, amqp-value +void MessageReader::onBinary(const CharSequence& bytes, const Descriptor* descriptor) +{ + if (delegate) { + delegate->onBinary(bytes, descriptor); + } else { + if (!descriptor) { + QPID_LOG(warning, "Expected described type but got binary value with no descriptor."); + } else if (descriptor->match(DATA_SYMBOL, DATA_CODE) || descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { + onBody(bytes, *descriptor); + } else { + QPID_LOG(warning, "Unexpected binary value with descriptor: " << *descriptor); + } + } + +} + +//amqp-value +void MessageReader::onNull(const Descriptor* descriptor) +{ + if (delegate) { + delegate->onNull(descriptor); + } else { + if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { + qpid::types::Variant v; + onBody(v, *descriptor); + } else { + if (!descriptor) { + QPID_LOG(warning, "Expected described type but got null value with no descriptor."); + } else { + QPID_LOG(warning, "Unexpected null value with descriptor: " << *descriptor); + } + } + } +} +void MessageReader::onString(const CharSequence& v, const Descriptor* descriptor) +{ + if (delegate) { + delegate->onString(v, descriptor); + } else { + if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { + onBody(v, *descriptor); + } else { + if (!descriptor) { + QPID_LOG(warning, "Expected described type but got string value with no descriptor."); + } else { + QPID_LOG(warning, "Unexpected string value with descriptor: " << *descriptor); + } + } + } +} +void MessageReader::onSymbol(const CharSequence& v, const Descriptor* descriptor) +{ + if (delegate) { + delegate->onSymbol(v, descriptor); + } else { + if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { + onBody(v, *descriptor); + } else { + if (!descriptor) { + QPID_LOG(warning, "Expected described type but got symbol value with no descriptor."); + } else { + QPID_LOG(warning, "Unexpected symbol value with descriptor: " << *descriptor); + } + } + } +} + +void MessageReader::onBoolean(bool v, const Descriptor* descriptor) +{ + if (delegate) { + delegate->onBoolean(v, descriptor); + } else { + if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { + qpid::types::Variant body = v; + onBody(body, *descriptor); + } else { + if (!descriptor) { + QPID_LOG(warning, "Expected described type but got boolean value with no descriptor."); + } else { + QPID_LOG(warning, "Unexpected boolean value with descriptor: " << *descriptor); + } + } + } +} + +void MessageReader::onUByte(uint8_t v, const Descriptor* descriptor) +{ + if (delegate) { + delegate->onUByte(v, descriptor); + } else { + if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { + qpid::types::Variant body = v; + onBody(body, *descriptor); + } else { + if (!descriptor) { + QPID_LOG(warning, "Expected described type but got ubyte value with no descriptor."); + } else { + QPID_LOG(warning, "Unexpected ubyte value with descriptor: " << *descriptor); + } + } + } +} + +void MessageReader::onUShort(uint16_t v, const Descriptor* descriptor) +{ + if (delegate) { + delegate->onUShort(v, descriptor); + } else { + if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { + qpid::types::Variant body = v; + onBody(body, *descriptor); + } else { + if (!descriptor) { + QPID_LOG(warning, "Expected described type but got ushort value with no descriptor."); + } else { + QPID_LOG(warning, "Unexpected ushort value with descriptor: " << *descriptor); + } + } + } +} + +void MessageReader::onUInt(uint32_t v, const Descriptor* descriptor) +{ + if (delegate) { + delegate->onUInt(v, descriptor); + } else { + if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { + qpid::types::Variant body = v; + onBody(body, *descriptor); + } else { + if (!descriptor) { + QPID_LOG(warning, "Expected described type but got uint value with no descriptor."); + } else { + QPID_LOG(warning, "Unexpected uint value with descriptor: " << *descriptor); + } + } + } +} + +void MessageReader::onULong(uint64_t v, const Descriptor* descriptor) +{ + if (delegate) { + delegate->onULong(v, descriptor); + } else { + if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { + qpid::types::Variant body = v; + onBody(body, *descriptor); + } else { + if (!descriptor) { + QPID_LOG(warning, "Expected described type but got ulong value with no descriptor."); + } else { + QPID_LOG(warning, "Unexpected ulong value with descriptor: " << *descriptor); + } + } + } +} + +void MessageReader::onByte(int8_t v, const Descriptor* descriptor) +{ + if (delegate) { + delegate->onByte(v, descriptor); + } else { + if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { + qpid::types::Variant body = v; + onBody(body, *descriptor); + } else { + if (!descriptor) { + QPID_LOG(warning, "Expected described type but got byte value with no descriptor."); + } else { + QPID_LOG(warning, "Unexpected byte value with descriptor: " << *descriptor); + } + } + } +} + +void MessageReader::onShort(int16_t v, const Descriptor* descriptor) +{ + if (delegate) { + delegate->onShort(v, descriptor); + } else { + if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { + qpid::types::Variant body = v; + onBody(body, *descriptor); + } else { + if (!descriptor) { + QPID_LOG(warning, "Expected described type but got short value with no descriptor."); + } else { + QPID_LOG(warning, "Unexpected short value with descriptor: " << *descriptor); + } + } + } +} + +void MessageReader::onInt(int32_t v, const Descriptor* descriptor) +{ + if (delegate) { + delegate->onInt(v, descriptor); + } else { + if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { + qpid::types::Variant body = v; + onBody(body, *descriptor); + } else { + if (!descriptor) { + QPID_LOG(warning, "Expected described type but got int value with no descriptor."); + } else { + QPID_LOG(warning, "Unexpected int value with descriptor: " << *descriptor); + } + } + } +} + +void MessageReader::onLong(int64_t v, const Descriptor* descriptor) +{ + if (delegate) { + delegate->onLong(v, descriptor); + } else { + if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { + qpid::types::Variant body = v; + onBody(body, *descriptor); + } else { + if (!descriptor) { + QPID_LOG(warning, "Expected described type but got long value with no descriptor."); + } else { + QPID_LOG(warning, "Unexpected long value with descriptor: " << *descriptor); + } + } + } +} + +void MessageReader::onFloat(float v, const Descriptor* descriptor) +{ + if (delegate) { + delegate->onFloat(v, descriptor); + } else { + if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { + qpid::types::Variant body = v; + onBody(body, *descriptor); + } else { + if (!descriptor) { + QPID_LOG(warning, "Expected described type but got float value with no descriptor."); + } else { + QPID_LOG(warning, "Unexpected float value with descriptor: " << *descriptor); + } + } + } +} + +void MessageReader::onDouble(double v, const Descriptor* descriptor) +{ + if (delegate) { + delegate->onDouble(v, descriptor); + } else { + if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { + qpid::types::Variant body = v; + onBody(body, *descriptor); + } else { + if (!descriptor) { + QPID_LOG(warning, "Expected described type but got double value with no descriptor."); + } else { + QPID_LOG(warning, "Unexpected double value with descriptor: " << *descriptor); + } + } + } +} + +void MessageReader::onUuid(const CharSequence& v, const Descriptor* descriptor) +{ + if (delegate) { + delegate->onUuid(v, descriptor); + } else { + if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { + onBody(v, *descriptor); + } else { + if (!descriptor) { + QPID_LOG(warning, "Expected described type but got uuid value with no descriptor."); + } else { + QPID_LOG(warning, "Unexpected uuid value with descriptor: " << *descriptor); + } + } + } +} + +void MessageReader::onTimestamp(int64_t v, const Descriptor* descriptor) +{ + if (delegate) { + delegate->onTimestamp(v, descriptor); + } else { + if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { + qpid::types::Variant body = v; + onBody(body, *descriptor); + } else { + if (!descriptor) { + QPID_LOG(warning, "Expected described type but got timestamp value with no descriptor."); + } else { + QPID_LOG(warning, "Unexpected timestamp value with descriptor: " << *descriptor); + } + } + } +} + +bool MessageReader::onStartArray(uint32_t count, const CharSequence& raw, const Constructor& constructor, const Descriptor* descriptor) +{ + if (delegate) { + return delegate->onStartArray(count, raw, constructor, descriptor); + } else { + if (descriptor && descriptor->match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE)) { + onBody(raw, *descriptor); + } else { + if (!descriptor) { + QPID_LOG(warning, "Expected described type but got array with no descriptor."); + } else { + QPID_LOG(warning, "Unexpected array with descriptor: " << *descriptor); + } + } + return false; + } +} + +void MessageReader::onEndArray(uint32_t v, const Descriptor* descriptor) +{ + if (delegate) { + delegate->onEndArray(v, descriptor); + } +} + +MessageReader::MessageReader() : headerReader(*this), propertiesReader(*this), delegate(0) +{ + bare.init(); +} + +void MessageReader::onDescriptor(const Descriptor& descriptor, const char* position) +{ + if (bare.data) { + if (descriptor.match(FOOTER_SYMBOL, FOOTER_CODE)) { + bare.size = position - bare.data; + } + } else { + if (descriptor.match(PROPERTIES_SYMBOL, PROPERTIES_CODE) || descriptor.match(APPLICATION_PROPERTIES_SYMBOL, APPLICATION_PROPERTIES_CODE) + || descriptor.match(AMQP_SEQUENCE_SYMBOL, AMQP_SEQUENCE_CODE) || descriptor.match(AMQP_VALUE_SYMBOL, AMQP_VALUE_CODE) || descriptor.match(DATA_SYMBOL, DATA_CODE)) { + bare.data = position; + } + } +} + +CharSequence MessageReader::getBareMessage() const { return bare; } + +}} // namespace qpid::amqp diff --git a/cpp/src/qpid/amqp/MessageReader.h b/cpp/src/qpid/amqp/MessageReader.h new file mode 100644 index 0000000000..5d26b288f5 --- /dev/null +++ b/cpp/src/qpid/amqp/MessageReader.h @@ -0,0 +1,301 @@ +#ifndef QPID_AMQP_MESSAGEREADER_H +#define QPID_AMQP_MESSAGEREADER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/amqp/CharSequence.h" +#include "qpid/amqp/Reader.h" +#include "qpid/amqp/ListReader.h" +#include "qpid/types/Variant.h" +#include "qpid/CommonImportExport.h" + +namespace qpid { +namespace amqp { + +/** + * Reader for an AMQP 1.0 message + */ +class MessageReader : public Reader +{ + public: + QPID_COMMON_EXTERN MessageReader(); + + //header, properties, amqp-sequence, amqp-value + QPID_COMMON_EXTERN bool onStartList(uint32_t, const CharSequence&, const Descriptor*); + QPID_COMMON_EXTERN void onEndList(uint32_t, const Descriptor*); + + //delivery-annotations, message-annotations, application-headers, amqp-value + QPID_COMMON_EXTERN bool onStartMap(uint32_t, const CharSequence&, const Descriptor*); + QPID_COMMON_EXTERN void onEndMap(uint32_t, const Descriptor*); + + //data, amqp-value + QPID_COMMON_EXTERN void onBinary(const CharSequence&, const Descriptor*); + + //amqp-value + QPID_COMMON_EXTERN void onNull(const Descriptor*); + QPID_COMMON_EXTERN void onString(const CharSequence&, const Descriptor*); + QPID_COMMON_EXTERN void onSymbol(const CharSequence&, const Descriptor*); + QPID_COMMON_EXTERN void onBoolean(bool, const Descriptor*); + QPID_COMMON_EXTERN void onUByte(uint8_t, const Descriptor*); + QPID_COMMON_EXTERN void onUShort(uint16_t, const Descriptor*); + QPID_COMMON_EXTERN void onUInt(uint32_t, const Descriptor*); + QPID_COMMON_EXTERN void onULong(uint64_t, const Descriptor*); + QPID_COMMON_EXTERN void onByte(int8_t, const Descriptor*); + QPID_COMMON_EXTERN void onShort(int16_t, const Descriptor*); + QPID_COMMON_EXTERN void onInt(int32_t, const Descriptor*); + QPID_COMMON_EXTERN void onLong(int64_t, const Descriptor*); + QPID_COMMON_EXTERN void onFloat(float, const Descriptor*); + QPID_COMMON_EXTERN void onDouble(double, const Descriptor*); + QPID_COMMON_EXTERN void onUuid(const CharSequence&, const Descriptor*); + QPID_COMMON_EXTERN void onTimestamp(int64_t, const Descriptor*); + QPID_COMMON_EXTERN bool onStartArray(uint32_t, const CharSequence&, const Constructor&, const Descriptor*); + QPID_COMMON_EXTERN void onEndArray(uint32_t, const Descriptor*); + QPID_COMMON_EXTERN void onDescriptor(const Descriptor&, const char*); + + //header: + virtual void onDurable(bool) = 0; + virtual void onPriority(uint8_t) = 0; + virtual void onTtl(uint32_t) = 0; + virtual void onFirstAcquirer(bool) = 0; + virtual void onDeliveryCount(uint32_t) = 0; + + //properties: + virtual void onMessageId(uint64_t) = 0; + virtual void onMessageId(const CharSequence&, qpid::types::VariantType) = 0; + virtual void onUserId(const CharSequence&) = 0; + virtual void onTo(const CharSequence&) = 0; + virtual void onSubject(const CharSequence&) = 0; + virtual void onReplyTo(const CharSequence&) = 0; + virtual void onCorrelationId(uint64_t) = 0; + virtual void onCorrelationId(const CharSequence&, qpid::types::VariantType) = 0; + virtual void onContentType(const CharSequence&) = 0; + virtual void onContentEncoding(const CharSequence&) = 0; + virtual void onAbsoluteExpiryTime(int64_t) = 0; + virtual void onCreationTime(int64_t) = 0; + virtual void onGroupId(const CharSequence&) = 0; + virtual void onGroupSequence(uint32_t) = 0; + virtual void onReplyToGroupId(const CharSequence&) = 0; + + virtual void onApplicationProperties(const CharSequence&) = 0; + virtual void onDeliveryAnnotations(const CharSequence&) = 0; + virtual void onMessageAnnotations(const CharSequence&) = 0; + virtual void onBody(const CharSequence&, const Descriptor&) = 0; + virtual void onBody(const qpid::types::Variant&, const Descriptor&) = 0; + virtual void onFooter(const CharSequence&) = 0; + + QPID_COMMON_EXTERN CharSequence getBareMessage() const; + + private: + /* + class DurableReader : public Reader + { + public: + DurableReader(MessageReader&); + void onBoolean(bool v, const Descriptor*); + private: + MessageReader& parent; + }; + class PriorityReader : public Reader + { + public: + PriorityReader(MessageReader&); + void onUByte(uint8_t v, const Descriptor*); + private: + MessageReader& parent; + }; + class TtlReader : public Reader + { + public: + TtlReader(MessageReader&); + void onUInt(uint32_t v, const Descriptor*); + private: + MessageReader& parent; + }; + class FirstAcquirerReader : public Reader + { + public: + FirstAcquirerReader(MessageReader&); + void onBoolean(bool v, const Descriptor*); + private: + MessageReader& parent; + }; + class DeliveryCountReader : public Reader + { + public: + DeliveryCountReader(MessageReader&); + void onUInt(uint32_t v, const Descriptor*); + private: + MessageReader& parent; + }; + + class MessageIdReader : public Reader + { + public: + MessageIdReader(MessageReader&); + void onUuid(const qpid::types::Uuid& v, const Descriptor*); + void onULong(uint64_t v, const Descriptor*); + void onString(const CharSequence& v, const Descriptor*); + void onBinary(const CharSequence& v, const Descriptor*); + private: + MessageReader& parent; + }; + class UserIdReader : public Reader + { + public: + UserIdReader(MessageReader&); + void onBinary(const CharSequence& v, const Descriptor*); + private: + MessageReader& parent; + }; + class ToReader : public Reader + { + public: + ToReader(MessageReader&); + void onString(const CharSequence& v, const Descriptor*); + private: + MessageReader& parent; + }; + class SubjectReader : public Reader + { + public: + SubjectReader(MessageReader&); + void onString(const CharSequence& v, const Descriptor*); + private: + MessageReader& parent; + }; + class ReplyToReader : public Reader + { + public: + ReplyToReader(MessageReader&); + void onString(const CharSequence& v, const Descriptor*); + private: + MessageReader& parent; + }; + class CorrelationIdReader : public Reader + { + public: + CorrelationIdReader(MessageReader&); + void onUuid(const qpid::types::Uuid& v, const Descriptor*); + void onULong(uint64_t v, const Descriptor*); + void onString(const CharSequence& v, const Descriptor*); + void onBinary(const CharSequence& v, const Descriptor*); + private: + MessageReader& parent; + }; + class ContentTypeReader : public Reader + { + public: + ContentTypeReader(MessageReader&); + void onString(const CharSequence& v, const Descriptor*); + private: + MessageReader& parent; + }; + class ContentEncodingReader : public Reader + { + public: + ContentEncodingReader(MessageReader&); + void onString(const CharSequence& v, const Descriptor*); + private: + MessageReader& parent; + }; + class AbsoluteExpiryTimeReader : public Reader + { + public: + AbsoluteExpiryTimeReader(MessageReader&); + void onTimestamp(int64_t v, const Descriptor*); + private: + MessageReader& parent; + }; + class CreationTimeReader : public Reader + { + public: + CreationTimeReader(MessageReader&); + void onTimestamp(int64_t v, const Descriptor*); + private: + MessageReader& parent; + }; + class GroupIdReader : public Reader + { + public: + GroupIdReader(MessageReader&); + void onString(const CharSequence& v, const Descriptor*); + private: + MessageReader& parent; + }; + class GroupSequenceReader : public Reader + { + public: + GroupSequenceReader(MessageReader&); + void onUInt(uint32_t v, const Descriptor*); + private: + MessageReader& parent; + }; + class ReplyToGroupIdReader : public Reader + { + public: + ReplyToGroupIdReader(MessageReader&); + void onString(const CharSequence& v, const Descriptor*); + private: + MessageReader& parent; + }; + */ + + class HeaderReader : public Reader //public ListReader + { + public: + //Reader& getReader(size_t index); + + HeaderReader(MessageReader&); + void onBoolean(bool v, const Descriptor*); // durable, first-acquirer + void onUByte(uint8_t v, const Descriptor*); // priority + void onUInt(uint32_t v, const Descriptor*); // ttl, delivery-count + void onNull(const Descriptor*); + private: + MessageReader& parent; + size_t index; + }; + class PropertiesReader : public Reader //public ListReader + { + public: + //Reader& getReader(size_t index); + + PropertiesReader(MessageReader&); + void onUuid(const CharSequence& v, const Descriptor*); // message-id, correlation-id + void onULong(uint64_t v, const Descriptor*); // message-id, correlation-id + void onBinary(const CharSequence& v, const Descriptor*); // message-id, correlation-id, user-id + void onString(const CharSequence& v, const Descriptor*); // message-id, correlation-id, group-id, reply-to-group-id, subject, to, reply-to + void onSymbol(const CharSequence& v, const Descriptor*); // content-type, content-encoding + void onTimestamp(int64_t v, const Descriptor*); // absolute-expiry-time, creation-time + void onUInt(uint32_t v, const Descriptor*); // group-sequence + void onNull(const Descriptor*); + private: + MessageReader& parent; + size_t index; + }; + HeaderReader headerReader; + PropertiesReader propertiesReader; + Reader* delegate; + CharSequence bare; +}; +}} // namespace qpid::amqp + +#endif /*!QPID_AMQP_MESSAGEREADER_H*/ diff --git a/cpp/src/qpid/amqp/Reader.h b/cpp/src/qpid/amqp/Reader.h new file mode 100644 index 0000000000..64019d1521 --- /dev/null +++ b/cpp/src/qpid/amqp/Reader.h @@ -0,0 +1,80 @@ +#ifndef QPID_AMQP_READER_H +#define QPID_AMQP_READER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/sys/IntegerTypes.h" +#include <stddef.h> + +namespace qpid { +namespace amqp { +struct CharSequence; +struct Constructor; +struct Descriptor; + +/** + * Allows an event-driven, callback-based approach to processing an + * AMQP encoded data stream. By sublassing and implementing the + * methods of interest, readers can be constructed for different + * contexts. + */ +class Reader +{ + public: + virtual ~Reader() {} + virtual void onNull(const Descriptor*) {} + virtual void onBoolean(bool, const Descriptor*) {} + virtual void onUByte(uint8_t, const Descriptor*) {} + virtual void onUShort(uint16_t, const Descriptor*) {} + virtual void onUInt(uint32_t, const Descriptor*) {} + virtual void onULong(uint64_t, const Descriptor*) {} + virtual void onByte(int8_t, const Descriptor*) {} + virtual void onShort(int16_t, const Descriptor*) {} + virtual void onInt(int32_t, const Descriptor*) {} + virtual void onLong(int64_t, const Descriptor*) {} + virtual void onFloat(float, const Descriptor*) {} + virtual void onDouble(double, const Descriptor*) {} + virtual void onUuid(const CharSequence&, const Descriptor*) {} + virtual void onTimestamp(int64_t, const Descriptor*) {} + + virtual void onBinary(const CharSequence&, const Descriptor*) {} + virtual void onString(const CharSequence&, const Descriptor*) {} + virtual void onSymbol(const CharSequence&, const Descriptor*) {} + + /** + * @return true to get elements of the compound value, false + * to skip over it + */ + virtual bool onStartList(uint32_t /*count*/, const CharSequence&, const Descriptor*) { return true; } + virtual bool onStartMap(uint32_t /*count*/, const CharSequence&, const Descriptor*) { return true; } + virtual bool onStartArray(uint32_t /*count*/, const CharSequence&, const Constructor&, const Descriptor*) { return true; } + virtual void onEndList(uint32_t /*count*/, const Descriptor*) {} + virtual void onEndMap(uint32_t /*count*/, const Descriptor*) {} + virtual void onEndArray(uint32_t /*count*/, const Descriptor*) {} + + virtual void onDescriptor(const Descriptor&, const char*) {} + + virtual bool proceed() { return true; } + private: +}; +}} // namespace qpid::amqp + +#endif /*!QPID_AMQP_READER_H*/ diff --git a/cpp/src/qpid/amqp/Sasl.cpp b/cpp/src/qpid/amqp/Sasl.cpp new file mode 100644 index 0000000000..7b0779fe94 --- /dev/null +++ b/cpp/src/qpid/amqp/Sasl.cpp @@ -0,0 +1,136 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/amqp/Sasl.h" +#include "qpid/amqp/Decoder.h" +#include "qpid/amqp/Descriptor.h" +#include "qpid/amqp/Encoder.h" +#include "qpid/log/Statement.h" +#include "qpid/framing/Buffer.h" +#include "qpid/framing/ProtocolVersion.h" +#include "qpid/framing/ProtocolInitiation.h" +#include <string.h> + +namespace qpid { +namespace amqp { + +Sasl::Sasl(const std::string& i) : id(i), buffer(2*512/*AMQP 1.0's MAX_MIN_FRAME_SIZE - is this enough though?*/), encoder(&buffer[0], buffer.size()) {} +Sasl::~Sasl() {} + +void* Sasl::startFrame() +{ + //write sasl frame header, leaving 4 bytes for total size + char* start = encoder.skip(4); + encoder.write((uint8_t) 0x02);//data offset + encoder.write((uint8_t) 0x01);//frame type + encoder.write((uint16_t) 0x0000);//ignored + return start; +} + +void Sasl::endFrame(void* frame) +{ + //now backfill the frame size + char* start = (char*) frame; + char* current = &buffer[encoder.getPosition()]; + uint32_t frameSize = current - start; + Encoder backfill(start, 4); + backfill.write(frameSize); + QPID_LOG(trace, "Completed encoding of frame of " << frameSize << " bytes"); +} + + +std::size_t Sasl::read(const char* data, size_t available) +{ + size_t consumed = 0; + while (available - consumed > 4/*framesize*/) { + Decoder decoder(data+consumed, available-consumed); + //read frame-header + uint32_t frameSize = decoder.readUInt(); + if (frameSize > decoder.getSize()) break;//don't have all the data for this frame yet + + QPID_LOG(trace, "Reading SASL frame of size " << frameSize); + decoder.resetSize(frameSize); + uint8_t dataOffset = decoder.readUByte(); + uint8_t frameType = decoder.readUByte(); + if (frameType != 0x01) { + QPID_LOG(error, "Expected SASL frame; got type " << frameType); + } + uint16_t ignored = decoder.readUShort(); + if (ignored) { + QPID_LOG(info, "Got non null bytes at end of SASL frame header"); + } + + //body is at offset 4*dataOffset from the start + size_t skip = dataOffset*4 - 8; + if (skip) { + QPID_LOG(info, "Offset for sasl frame was not as expected"); + decoder.advance(skip); + } + decoder.read(*this); + consumed += decoder.getPosition(); + } + return consumed; +} + +std::size_t Sasl::write(char* data, size_t size) +{ + size_t available = encoder.getPosition(); + if (available) { + size_t encoded = available > size ? size : available; + ::memcpy(data, &buffer[0], encoded); + size_t remainder = encoder.getPosition() - encoded; + if (remainder) { + //shuffle + ::memcpy(&buffer[0], &buffer[size], remainder); + } + encoder.resetPosition(remainder); + return encoded; + } else { + return 0; + } +} + +std::size_t Sasl::readProtocolHeader(const char* buffer, std::size_t size) +{ + framing::ProtocolInitiation pi(qpid::framing::ProtocolVersion(1,0,qpid::framing::ProtocolVersion::SASL)); + if (size >= pi.encodedSize()) { + qpid::framing::Buffer out(const_cast<char*>(buffer), size); + pi.decode(out); + QPID_LOG_CAT(debug, protocol, id << " read protocol header: " << pi); + return pi.encodedSize(); + } else { + return 0; + } +} +std::size_t Sasl::writeProtocolHeader(char* buffer, std::size_t size) +{ + framing::ProtocolInitiation pi(qpid::framing::ProtocolVersion(1,0,qpid::framing::ProtocolVersion::SASL)); + if (size >= pi.encodedSize()) { + QPID_LOG_CAT(debug, protocol, id << " writing protocol header: " << pi); + qpid::framing::Buffer out(buffer, size); + pi.encode(out); + return pi.encodedSize(); + } else { + QPID_LOG_CAT(warning, protocol, id << " insufficient buffer for protocol header: " << size) + return 0; + } +} + +}} // namespace qpid::amqp diff --git a/cpp/src/qpid/amqp/Sasl.h b/cpp/src/qpid/amqp/Sasl.h new file mode 100644 index 0000000000..558f6071fc --- /dev/null +++ b/cpp/src/qpid/amqp/Sasl.h @@ -0,0 +1,54 @@ +#ifndef QPID_AMQP_SASL_H +#define QPID_AMQP_SASL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/amqp/Encoder.h" +#include "qpid/amqp/Reader.h" +#include <string> +#include <vector> + +namespace qpid { +namespace amqp { + +/** + * Base for SASL client and server utilities + */ +class Sasl : protected Reader +{ + public: + Sasl(const std::string& id); + virtual ~Sasl(); + std::size_t read(const char* data, size_t available); + std::size_t write(char* data, size_t available); + std::size_t readProtocolHeader(const char* buffer, std::size_t size); + std::size_t writeProtocolHeader(char* buffer, std::size_t size); + protected: + const std::string id; + std::vector<char> buffer; + Encoder encoder; + + void* startFrame(); + void endFrame(void*); +}; +}} // namespace qpid::amqp + +#endif /*!QPID_AMQP_SASL_H*/ diff --git a/cpp/src/qpid/amqp/SaslClient.cpp b/cpp/src/qpid/amqp/SaslClient.cpp new file mode 100644 index 0000000000..69660e9294 --- /dev/null +++ b/cpp/src/qpid/amqp/SaslClient.cpp @@ -0,0 +1,154 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/amqp/SaslClient.h" +#include "qpid/amqp/Decoder.h" +#include "qpid/amqp/Descriptor.h" +#include "qpid/amqp/descriptors.h" +#include "qpid/amqp/Encoder.h" +#include "qpid/log/Statement.h" + +using namespace qpid::amqp::sasl; + +namespace qpid { +namespace amqp { + +SaslClient::SaslClient(const std::string& id) : Sasl(id) {} +SaslClient::~SaslClient() {} +void SaslClient::init(const std::string& mechanism, const std::string* response, const std::string* hostname) +{ + void* frame = startFrame(); + + void* token = encoder.startList32(&SASL_INIT); + encoder.writeSymbol(mechanism); + if (response) encoder.writeBinary(*response); + else encoder.writeNull(); + if (hostname) encoder.writeString(*hostname); + else encoder.writeNull(); + encoder.endList32(3, token); + + endFrame(frame); + QPID_LOG_CAT(debug, protocol, id << " Sent SASL-INIT(" << mechanism << ", " << (response ? *response : "null") << ", " << (hostname ? *hostname : "null") << ")"); +} +void SaslClient::response(const std::string* r) +{ + void* frame = startFrame(); + + void* token = encoder.startList32(&SASL_RESPONSE); + if (r) encoder.writeBinary(*r); + else encoder.writeNull(); + encoder.endList32(1, token); + + endFrame(frame); + QPID_LOG_CAT(debug, protocol, id << " Sent SASL-RESPONSE(" << (r ? *r : "null") << ")"); +} + + +namespace { +const std::string SPACE(" "); +class SaslMechanismsReader : public Reader +{ + public: + SaslMechanismsReader(SaslClient& c) : client(c), expected(0) {} + void onSymbol(const CharSequence& mechanism, const Descriptor*) + { + if (expected) { + mechanisms << mechanism.str() << SPACE; + } else { + client.mechanisms(mechanism.str()); + } + } + bool onStartArray(uint32_t count, const CharSequence&, const Constructor&, const Descriptor*) + { + expected = count; + return true; + } + void onEndArray(uint32_t, const Descriptor*) + { + client.mechanisms(mechanisms.str()); + } + private: + SaslClient& client; + uint32_t expected; + std::stringstream mechanisms; +}; +class SaslChallengeReader : public Reader +{ + public: + SaslChallengeReader(SaslClient& c) : client(c) {} + void onNull(const Descriptor*) { client.challenge(); } + void onBinary(const CharSequence& c, const Descriptor*) { client.challenge(c.str()); } + private: + SaslClient& client; +}; +class SaslOutcomeReader : public Reader +{ + public: + SaslOutcomeReader(SaslClient& c, bool e) : client(c), expectExtraData(e) {} + void onUByte(uint8_t c, const Descriptor*) + { + if (expectExtraData) code = c; + else client.outcome(c); + } + void onBinary(const CharSequence& extra, const Descriptor*) { client.outcome(code, extra.str()); } + void onNull(const Descriptor*) { client.outcome(code); } + private: + SaslClient& client; + bool expectExtraData; + uint8_t code; +}; +} + +bool SaslClient::onStartList(uint32_t count, const CharSequence& arguments, const Descriptor* descriptor) +{ + if (!descriptor) { + QPID_LOG(error, "Expected described type in SASL negotiation but got no descriptor"); + } else if (descriptor->match(SASL_MECHANISMS_SYMBOL, SASL_MECHANISMS_CODE)) { + QPID_LOG(trace, "Reading SASL-MECHANISMS"); + Decoder decoder(arguments.data, arguments.size); + if (count != 1) QPID_LOG(error, "Invalid SASL-MECHANISMS frame; exactly one field expected, got " << count); + SaslMechanismsReader reader(*this); + decoder.read(reader); + } else if (descriptor->match(SASL_CHALLENGE_SYMBOL, SASL_CHALLENGE_CODE)) { + QPID_LOG(trace, "Reading SASL-CHALLENGE"); + Decoder decoder(arguments.data, arguments.size); + if (count != 1) QPID_LOG(error, "Invalid SASL-CHALLENGE frame; exactly one field expected, got " << count); + SaslChallengeReader reader(*this); + decoder.read(reader); + } else if (descriptor->match(SASL_OUTCOME_SYMBOL, SASL_OUTCOME_CODE)) { + QPID_LOG(trace, "Reading SASL-OUTCOME"); + Decoder decoder(arguments.data, arguments.size); + if (count == 1) { + SaslOutcomeReader reader(*this, false); + decoder.read(reader); + } else if (count == 2) { + SaslOutcomeReader reader(*this, true); + decoder.read(reader); + } else { + QPID_LOG(error, "Invalid SASL-OUTCOME frame; got " << count << " fields"); + } + } else { + QPID_LOG(error, "Unexpected descriptor in SASL negotiation: " << *descriptor); + } + return false; +} + + +}} // namespace qpid::amqp diff --git a/cpp/src/qpid/amqp/SaslClient.h b/cpp/src/qpid/amqp/SaslClient.h new file mode 100644 index 0000000000..9f3eefadc2 --- /dev/null +++ b/cpp/src/qpid/amqp/SaslClient.h @@ -0,0 +1,54 @@ +#ifndef QPID_AMQP_SASLCLIENT_H +#define QPID_AMQP_SASLCLIENT_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/amqp/Sasl.h" + +namespace qpid { +namespace amqp { + +/** + * Utility for decoding and encoding SASL frames by the peer acting as + * the SASL client. + */ +class SaslClient : public Sasl +{ + public: + SaslClient(const std::string& id); + virtual ~SaslClient(); + virtual void mechanisms(const std::string&) = 0; + virtual void challenge(const std::string&) = 0; + virtual void challenge() = 0; //null != empty string + virtual void outcome(uint8_t result, const std::string&) = 0; + virtual void outcome(uint8_t result) = 0; + + void init(const std::string& mechanism, const std::string* response, const std::string* hostname); + void response(const std::string*); + + private: + bool onStartList(uint32_t count, const CharSequence& arguments, const Descriptor* descriptor); + +}; + +}} // namespace qpid::amqp + +#endif /*!QPID_AMQP_SASLCLIENT_H*/ diff --git a/cpp/src/qpid/amqp/SaslServer.cpp b/cpp/src/qpid/amqp/SaslServer.cpp new file mode 100644 index 0000000000..403730ad69 --- /dev/null +++ b/cpp/src/qpid/amqp/SaslServer.cpp @@ -0,0 +1,183 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/amqp/SaslServer.h" +#include "qpid/amqp/Constructor.h" +#include "qpid/amqp/Decoder.h" +#include "qpid/amqp/Descriptor.h" +#include "qpid/amqp/descriptors.h" +#include "qpid/amqp/Encoder.h" +#include "qpid/amqp/typecodes.h" +#include "qpid/log/Statement.h" +#include "qpid/StringUtils.h" +#include <limits> +#include <vector> + +using namespace qpid::amqp::sasl; +using namespace qpid::amqp::typecodes; + +namespace qpid { +namespace amqp { +namespace { +const std::string SPACE(" "); +const std::string NULL_("NULL"); +} + +SaslServer::SaslServer(const std::string& id) : Sasl(id) {} +SaslServer::~SaslServer() {} + +void SaslServer::mechanisms(const std::string& mechanisms) +{ + void* frameToken = startFrame(); + + std::vector<std::string> parts = split(mechanisms, SPACE); + void* listToken = encoder.startList32(&SASL_MECHANISMS); + if (parts.size() > 1) { + void* arrayToken = encoder.startArray8(Constructor(SYMBOL8)); + for (std::vector<std::string>::const_iterator i = parts.begin();i != parts.end(); ++i) { + uint8_t size = i->size() > std::numeric_limits<uint8_t>::max() ? std::numeric_limits<uint8_t>::max() : i->size(); + encoder.write(size); + encoder.writeBytes(i->data(), size); + } + encoder.endArray8(parts.size(), arrayToken); + } else { + encoder.writeSymbol(mechanisms); + } + encoder.endList32(1, listToken); + + endFrame(frameToken); + QPID_LOG_CAT(debug, protocol, id << " Sent SASL-MECHANISMS(" << mechanisms << ") " << encoder.getPosition()); +} +void SaslServer::challenge(const std::string* c) +{ + void* frameToken = startFrame(); + + void* listToken = encoder.startList32(&SASL_CHALLENGE); + if (c) encoder.writeBinary(*c); + else encoder.writeNull(); + encoder.endList32(1, listToken); + + endFrame(frameToken); + QPID_LOG_CAT(debug, protocol, id << " Sent SASL-CHALLENGE(" << (c ? *c : NULL_) << ") " << encoder.getPosition()); +} +void SaslServer::completed(bool succeeded) +{ + void* frameToken = startFrame(); + + void* listToken = encoder.startList8(&SASL_OUTCOME); + encoder.writeUByte(succeeded ? 0 : 1); + encoder.endList8(1, listToken); + + endFrame(frameToken); + QPID_LOG_CAT(debug, protocol, id << " Sent SASL-OUTCOME(" << (succeeded ? 0 : 1) << ") " << encoder.getPosition()); +} + +namespace { +class SaslInitReader : public Reader +{ + public: + SaslInitReader(SaslServer& s, uint32_t e) : server(s), expected(e), hasResponse(false), index(0) {} + void onNull(const Descriptor*) + { + ++index; + if (index == 2) { + if (--expected == 0) { + server.init(mechanism, 0, 0); + } + } else if (index == 3) { + server.init(mechanism, hasResponse ? &response : 0, 0); + } else { + QPID_LOG(warning, "Unexpected sequence of fields for SASL-INIT: got null for field " << index); + } + } + void onBinary(const CharSequence& r, const Descriptor*) + { + if (++index != 2) QPID_LOG(warning, "Unexpected sequence of fields for SASL-INIT: got binary for field " << index); + response = r.str(); + hasResponse = true; + if (--expected == 0) { + server.init(mechanism, &response, 0); + } + } + void onString(const CharSequence& h, const Descriptor*) + { + if (--expected || ++index != 3) { + QPID_LOG(warning, "Unexpected sequence of fields for SASL-INIT: got string for field " << index); + } else { + std::string hostname = h.str(); + server.init(mechanism, hasResponse ? &response : 0, &hostname); + } + } + void onSymbol(const CharSequence& m, const Descriptor*) + { + if (++index != 1) QPID_LOG(warning, "Unexpected sequence of fields for SASL-INIT: got symbol for field " << index); + if (--expected) { + mechanism = m.str(); + } else { + server.init(m.str(), 0, 0); + } + } + private: + SaslServer& server; + uint32_t expected; + std::string mechanism; + std::string response; + bool hasResponse; + uint32_t index; +}; + +class SaslResponseReader : public Reader +{ + public: + SaslResponseReader(SaslServer& s) : server(s) {} + void onNull(const Descriptor*) { server.response(0); } + void onBinary(const CharSequence& r, const Descriptor*) + { + std::string s = r.str(); + server.response(&s); + } + private: + SaslServer& server; +}; +} + +bool SaslServer::onStartList(uint32_t count, const CharSequence& arguments, const Descriptor* descriptor) +{ + if (!descriptor) { + QPID_LOG(error, "Expected described type in SASL negotiation but got no descriptor"); + } else if (descriptor->match(SASL_INIT_SYMBOL, SASL_INIT_CODE)) { + QPID_LOG(trace, "Reading SASL-INIT"); + Decoder decoder(arguments.data, arguments.size); + if (count < 1 || count > 3) QPID_LOG(error, "Invalid SASL-INIT frame; got " << count << " fields"); + SaslInitReader reader(*this, count); + decoder.read(reader); + } else if (descriptor->match(SASL_RESPONSE_SYMBOL, SASL_RESPONSE_CODE)) { + QPID_LOG(trace, "Reading SASL-RESPONSE (" << std::string(arguments.data, arguments.size) << ") " << count << " elements"); + Decoder decoder(arguments.data, arguments.size); + if (count != 1) QPID_LOG(error, "Invalid SASL-RESPONSE frame; exactly one field expected, got " << count); + SaslResponseReader reader(*this); + decoder.read(reader); + } else { + QPID_LOG(error, "Unexpected descriptor in SASL negotiation: " << *descriptor); + } + return false; +} + +}} // namespace qpid::amqp diff --git a/cpp/src/qpid/amqp/SaslServer.h b/cpp/src/qpid/amqp/SaslServer.h new file mode 100644 index 0000000000..43b960454f --- /dev/null +++ b/cpp/src/qpid/amqp/SaslServer.h @@ -0,0 +1,50 @@ +#ifndef QPID_AMQP_SASLSERVER_H +#define QPID_AMQP_SASLSERVER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/amqp/Sasl.h" + +namespace qpid { +namespace amqp { + +/** + * Utility for decoding and encoding SASL frames by the peer acting as + * the SASL server. + */ +class SaslServer : public Sasl +{ + public: + SaslServer(const std::string& id); + virtual ~SaslServer(); + virtual void init(const std::string& mechanism, const std::string* response, const std::string* hostname) = 0; + virtual void response(const std::string*) = 0; + + void mechanisms(const std::string& mechanisms); + void challenge(const std::string*); + void completed(bool succeeded); + + private: + bool onStartList(uint32_t count, const CharSequence& arguments, const Descriptor* descriptor); +}; +}} // namespace qpid::amqp + +#endif /*!QPID_AMQP_SASLSERVER_H*/ diff --git a/cpp/src/qpid/amqp/descriptors.h b/cpp/src/qpid/amqp/descriptors.h new file mode 100644 index 0000000000..19a8985433 --- /dev/null +++ b/cpp/src/qpid/amqp/descriptors.h @@ -0,0 +1,88 @@ +#ifndef QPID_AMQP_DESCRIPTORS_H +#define QPID_AMQP_DESCRIPTORS_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Descriptor.h" + +namespace qpid { +namespace amqp { + +namespace message { +const std::string HEADER_SYMBOL("amqp:header:list"); +const std::string PROPERTIES_SYMBOL("amqp:properties:list"); +const std::string DELIVERY_ANNOTATIONS_SYMBOL("amqp:delivery-annotations:map"); +const std::string MESSAGE_ANNOTATIONS_SYMBOL("amqp:message-annotations:map"); +const std::string APPLICATION_PROPERTIES_SYMBOL("amqp:application-properties:map"); +const std::string AMQP_SEQUENCE_SYMBOL("amqp:amqp-sequence:list"); +const std::string AMQP_VALUE_SYMBOL("amqp:amqp-sequence:*"); +const std::string DATA_SYMBOL("amqp:data:binary"); +const std::string FOOTER_SYMBOL("amqp:footer:map"); + +const uint64_t HEADER_CODE(0x70); +const uint64_t DELIVERY_ANNOTATIONS_CODE(0x71); +const uint64_t MESSAGE_ANNOTATIONS_CODE(0x72); +const uint64_t PROPERTIES_CODE(0x73); +const uint64_t APPLICATION_PROPERTIES_CODE(0x74); +const uint64_t DATA_CODE(0x75); +const uint64_t AMQP_SEQUENCE_CODE(0x76); +const uint64_t AMQP_VALUE_CODE(0x77); +const uint64_t FOOTER_CODE(0x78); + +const Descriptor HEADER(HEADER_CODE); +const Descriptor DELIVERY_ANNOTATIONS(DELIVERY_ANNOTATIONS_CODE); +const Descriptor MESSAGE_ANNOTATIONS(MESSAGE_ANNOTATIONS_CODE); +const Descriptor PROPERTIES(PROPERTIES_CODE); +const Descriptor APPLICATION_PROPERTIES(APPLICATION_PROPERTIES_CODE); +const Descriptor DATA(DATA_CODE); +} + +namespace sasl { +const std::string SASL_MECHANISMS_SYMBOL("amqp:sasl-mechanisms:list"); +const std::string SASL_INIT_SYMBOL("amqp:sasl-init:list"); +const std::string SASL_CHALLENGE_SYMBOL("amqp:sasl-challenge:list"); +const std::string SASL_RESPONSE_SYMBOL("amqp:sasl-response:list"); +const std::string SASL_OUTCOME_SYMBOL("amqp:sasl-outcome:list"); + +const uint64_t SASL_MECHANISMS_CODE(0x40); +const uint64_t SASL_INIT_CODE(0x41); +const uint64_t SASL_CHALLENGE_CODE(0x42); +const uint64_t SASL_RESPONSE_CODE(0x43); +const uint64_t SASL_OUTCOME_CODE(0x44); + +const Descriptor SASL_MECHANISMS(SASL_MECHANISMS_CODE); +const Descriptor SASL_INIT(SASL_INIT_CODE); +const Descriptor SASL_CHALLENGE(SASL_CHALLENGE_CODE); +const Descriptor SASL_RESPONSE(SASL_RESPONSE_CODE); +const Descriptor SASL_OUTCOME(SASL_OUTCOME_CODE); +} + +namespace filters { +const std::string LEGACY_DIRECT_FILTER_SYMBOL("apache.org:legacy-amqp-direct-binding:string"); +const std::string LEGACY_TOPIC_FILTER_SYMBOL("apache.org:legacy-amqp-direct-binding:string"); + +const uint64_t LEGACY_DIRECT_FILTER_CODE(0x0000468C00000000ULL); +const uint64_t LEGACY_TOPIC_FILTER_CODE(0x0000468C00000001ULL); +} + +}} // namespace qpid::amqp + +#endif /*!QPID_AMQP_DESCRIPTORS_H*/ diff --git a/cpp/src/qpid/amqp/typecodes.h b/cpp/src/qpid/amqp/typecodes.h new file mode 100644 index 0000000000..3c6bd17b97 --- /dev/null +++ b/cpp/src/qpid/amqp/typecodes.h @@ -0,0 +1,115 @@ +#ifndef QPID_AMQP_TYPECODES_H +#define QPID_AMQP_TYPECODES_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace qpid { +namespace amqp { + +namespace typecodes +{ +const uint8_t DESCRIPTOR(0x0); + +const uint8_t NULL_VALUE(0x40); + +const uint8_t BOOLEAN(0x56); +const uint8_t BOOLEAN_TRUE(0x41); +const uint8_t BOOLEAN_FALSE(0x42); + +const uint8_t UBYTE(0x50); +const uint8_t USHORT(0x60); +const uint8_t UINT(0x70); +const uint8_t UINT_SMALL(0x52); +const uint8_t UINT_ZERO(0x43); +const uint8_t ULONG(0x80); +const uint8_t ULONG_SMALL(0x53); +const uint8_t ULONG_ZERO(0x44); + +const uint8_t BYTE(0x51); +const uint8_t SHORT(0x61); +const uint8_t INT(0x71); +const uint8_t INT_SMALL(0x54); +const uint8_t LONG(0x81); +const uint8_t LONG_SMALL(0x55); + +const uint8_t FLOAT(0x72); +const uint8_t DOUBLE(0x82); + +const uint8_t DECIMAL32(0x74); +const uint8_t DECIMAL64(0x84); +const uint8_t DECIMAL128(0x94); + +const uint8_t CHAR_UTF32(0x73); +const uint8_t TIMESTAMP(0x83); +const uint8_t UUID(0x98); + +const uint8_t BINARY8(0xa0); +const uint8_t BINARY32(0xb0); +const uint8_t STRING8(0xa1); +const uint8_t STRING32(0xb1); +const uint8_t SYMBOL8(0xa3); +const uint8_t SYMBOL32(0xb3); + +typedef std::pair<uint8_t, uint8_t> CodePair; +const CodePair SYMBOL(SYMBOL8, SYMBOL32); +const CodePair STRING(STRING8, STRING32); +const CodePair BINARY(BINARY8, BINARY32); + +const uint8_t LIST0(0x45); +const uint8_t LIST8(0xc0); +const uint8_t LIST32(0xd0); +const uint8_t MAP8(0xc1); +const uint8_t MAP32(0xd1); +const uint8_t ARRAY8(0xe0); +const uint8_t ARRAY32(0xf0); + + +const std::string NULL_NAME("null"); +const std::string BOOLEAN_NAME("name"); + +const std::string UBYTE_NAME("ubyte"); +const std::string USHORT_NAME("ushort"); +const std::string UINT_NAME("uint"); +const std::string ULONG_NAME("ulong"); + +const std::string BYTE_NAME("byte"); +const std::string SHORT_NAME("short"); +const std::string INT_NAME("int"); +const std::string LONG_NAME("long"); + +const std::string FLOAT_NAME("float"); +const std::string DOUBLE_NAME("double"); + +const std::string TIMESTAMP_NAME("timestamp"); +const std::string UUID_NAME("uuid"); + +const std::string BINARY_NAME("binary"); +const std::string STRING_NAME("string"); +const std::string SYMBOL_NAME("symbol"); + +const std::string LIST_NAME("list"); +const std::string MAP_NAME("map"); +const std::string ARRAY_NAME("array"); +} + +}} // namespace qpid::amqp + +#endif /*!QPID_AMQP_TYPECODES_H*/ |