From f6f1900eb98cc1773a88a3ec309afa646438a384 Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Wed, 31 Mar 2010 16:17:17 +0000 Subject: QPID-664: made changes suggested by Alan Conway, also moved 0-10 map/list codecs to common lib git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@929606 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/CMakeLists.txt | 7 +- cpp/src/Makefile.am | 14 +- cpp/src/qpid/amqp_0_10/Codecs.cpp | 327 ++++++++++++++++++++ cpp/src/qpid/client/amqp0_10/AddressResolution.cpp | 55 ++-- cpp/src/qpid/client/amqp0_10/Codecs.cpp | 332 --------------------- cpp/src/qpid/client/amqp0_10/CodecsInternal.h | 41 --- cpp/src/qpid/client/amqp0_10/FailoverUpdates.cpp | 2 +- cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp | 6 +- cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp | 4 +- cpp/src/qpid/messaging/Address.cpp | 12 +- cpp/src/qpid/messaging/Connection.cpp | 7 +- cpp/src/qpid/messaging/ListContent.cpp | 106 ------- cpp/src/qpid/messaging/ListView.cpp | 65 ---- cpp/src/qpid/messaging/MapContent.cpp | 96 ------ cpp/src/qpid/messaging/MapView.cpp | 65 ---- cpp/src/qpid/messaging/Message.cpp | 71 ++++- cpp/src/qpid/messaging/MessageImpl.cpp | 4 +- cpp/src/qpid/messaging/Receiver.cpp | 4 +- cpp/src/qpid/messaging/Sender.cpp | 2 +- cpp/src/qpid/messaging/Session.cpp | 4 +- cpp/src/qpid/types/Variant.cpp | 2 +- cpp/src/tests/Address.cpp | 18 +- cpp/src/tests/MessagingSessionTests.cpp | 70 +++-- cpp/src/tests/qpid_recv.cpp | 2 +- cpp/src/tests/qpid_send.cpp | 5 +- cpp/src/tests/qpid_stream.cpp | 2 +- 26 files changed, 482 insertions(+), 841 deletions(-) create mode 100644 cpp/src/qpid/amqp_0_10/Codecs.cpp delete mode 100644 cpp/src/qpid/client/amqp0_10/Codecs.cpp delete mode 100644 cpp/src/qpid/client/amqp0_10/CodecsInternal.h delete mode 100644 cpp/src/qpid/messaging/ListContent.cpp delete mode 100644 cpp/src/qpid/messaging/ListView.cpp delete mode 100644 cpp/src/qpid/messaging/MapContent.cpp delete mode 100644 cpp/src/qpid/messaging/MapView.cpp (limited to 'cpp/src') diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index 1695c6fa17..3840d13e88 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -569,6 +569,7 @@ set (qpidcommon_SOURCES qpid/Url.cpp qpid/types/Uuid.cpp qpid/types/Variant.cpp + qpid/amqp_0_10/Codecs.cpp qpid/amqp_0_10/SessionHandler.cpp qpid/framing/AccumulatedAck.cpp qpid/framing/AMQBody.cpp @@ -680,10 +681,6 @@ set (qpidclient_SOURCES qpid/messaging/Connection.cpp qpid/messaging/ConnectionImpl.h qpid/messaging/Duration.cpp - qpid/messaging/ListContent.cpp - qpid/messaging/ListView.cpp - qpid/messaging/MapContent.cpp - qpid/messaging/MapView.cpp qpid/messaging/Message.cpp qpid/messaging/MessageImpl.h qpid/messaging/MessageImpl.cpp @@ -697,8 +694,6 @@ set (qpidclient_SOURCES qpid/client/amqp0_10/AcceptTracker.cpp qpid/client/amqp0_10/AddressResolution.h qpid/client/amqp0_10/AddressResolution.cpp - qpid/client/amqp0_10/Codecs.cpp - qpid/client/amqp0_10/CodecsInternal.h qpid/client/amqp0_10/ConnectionImpl.h qpid/client/amqp0_10/ConnectionImpl.cpp qpid/client/amqp0_10/FailoverUpdates.cpp diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index d6867701b4..e0b6b48cf2 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -427,6 +427,7 @@ libqpidcommon_la_SOURCES += \ qpid/ptr_map.h \ qpid/types/Uuid.cpp \ qpid/types/Variant.cpp \ + qpid/amqp_0_10/Codecs.cpp \ qpid/sys/AggregateOutput.cpp \ qpid/sys/AggregateOutput.h \ qpid/sys/AsynchIO.h \ @@ -708,10 +709,6 @@ libqpidclient_la_SOURCES = \ qpid/messaging/AddressParser.cpp \ qpid/messaging/Connection.cpp \ qpid/messaging/Duration.cpp \ - qpid/messaging/ListContent.cpp \ - qpid/messaging/ListView.cpp \ - qpid/messaging/MapContent.cpp \ - qpid/messaging/MapView.cpp \ qpid/messaging/Message.cpp \ qpid/messaging/MessageImpl.h \ qpid/messaging/MessageImpl.cpp \ @@ -727,8 +724,6 @@ libqpidclient_la_SOURCES = \ qpid/client/amqp0_10/AcceptTracker.cpp \ qpid/client/amqp0_10/AddressResolution.h \ qpid/client/amqp0_10/AddressResolution.cpp \ - qpid/client/amqp0_10/Codecs.cpp \ - qpid/client/amqp0_10/CodecsInternal.h \ qpid/client/amqp0_10/ConnectionImpl.h \ qpid/client/amqp0_10/ConnectionImpl.cpp \ qpid/client/amqp0_10/FailoverUpdates.cpp \ @@ -762,6 +757,7 @@ nobase_include_HEADERS += \ ../include/qpid/RangeSet.h \ ../include/qpid/SessionId.h \ ../include/qpid/Url.h \ + ../include/qpid/amqp_0_10/Codecs.h \ ../include/qpid/client/AsyncSession.h \ ../include/qpid/client/ClientImportExport.h \ ../include/qpid/client/Completion.h \ @@ -818,22 +814,16 @@ nobase_include_HEADERS += \ ../include/qpid/sys/Thread.h \ ../include/qpid/sys/Time.h \ ../include/qpid/messaging/Address.h \ - ../include/qpid/messaging/Codec.h \ ../include/qpid/messaging/Connection.h \ ../include/qpid/messaging/Duration.h \ ../include/qpid/messaging/Handle.h \ ../include/qpid/messaging/ImportExport.h \ - ../include/qpid/messaging/ListContent.h \ - ../include/qpid/messaging/ListView.h \ - ../include/qpid/messaging/MapContent.h \ - ../include/qpid/messaging/MapView.h \ ../include/qpid/messaging/Message.h \ ../include/qpid/messaging/Receiver.h \ ../include/qpid/messaging/Sender.h \ ../include/qpid/messaging/Session.h \ ../include/qpid/types/Uuid.h \ ../include/qpid/types/Variant.h \ - ../include/qpid/client/amqp0_10/Codecs.h \ ../include/qpid/client/amqp0_10/FailoverUpdates.h # Force build of qpidd during dist phase so help2man will work. diff --git a/cpp/src/qpid/amqp_0_10/Codecs.cpp b/cpp/src/qpid/amqp_0_10/Codecs.cpp new file mode 100644 index 0000000000..6325f9f664 --- /dev/null +++ b/cpp/src/qpid/amqp_0_10/Codecs.cpp @@ -0,0 +1,327 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/amqp_0_10/Codecs.h" +#include "qpid/framing/Array.h" +#include "qpid/framing/Buffer.h" +#include "qpid/framing/FieldTable.h" +#include "qpid/framing/FieldValue.h" +#include "qpid/framing/List.h" +#include "qpid/log/Statement.h" +#include +#include +#include + +using namespace qpid::framing; +using namespace qpid::types; + +namespace qpid { +namespace amqp_0_10 { + +namespace { +const std::string iso885915("iso-8859-15"); +const std::string utf8("utf8"); +const std::string utf16("utf16"); +const std::string binary("binary"); +const std::string amqp0_10_binary("amqp0-10:binary"); +const std::string amqp0_10_bit("amqp0-10:bit"); +const std::string amqp0_10_datetime("amqp0-10:datetime"); +const std::string amqp0_10_struct("amqp0-10:struct"); +} + +template void convert(const T& from, U& to, F f) +{ + std::transform(from.begin(), from.end(), std::inserter(to, to.begin()), f); +} + +Variant::Map::value_type toVariantMapEntry(const FieldTable::value_type& in); +FieldTable::value_type toFieldTableEntry(const Variant::Map::value_type& in); +Variant toVariant(boost::shared_ptr in); +boost::shared_ptr toFieldValue(const Variant& in); + +template void translate(boost::shared_ptr in, U& u, F f) +{ + T t; + getEncodedValue(in, t); + convert(t, u, f); +} + +template T* toFieldValueCollection(const U& u, F f) +{ + typename T::ValueType t; + convert(u, t, f); + return new T(t); +} + +FieldTableValue* toFieldTableValue(const Variant::Map& map) +{ + FieldTable ft; + convert(map, ft, &toFieldTableEntry); + return new FieldTableValue(ft); +} + +ListValue* toListValue(const Variant::List& list) +{ + List l; + convert(list, l, &toFieldValue); + return new ListValue(l); +} + +void setEncodingFor(Variant& out, uint8_t code) +{ + switch(code){ + case 0x80: + case 0x90: + case 0xa0: + out.setEncoding(amqp0_10_binary); + break; + case 0x84: + case 0x94: + out.setEncoding(iso885915); + break; + case 0x85: + case 0x95: + out.setEncoding(utf8); + break; + case 0x86: + case 0x96: + out.setEncoding(utf16); + break; + case 0xab: + out.setEncoding(amqp0_10_struct); + break; + default: + //do nothing + break; + } +} + +qpid::types::Uuid getUuid(FieldValue& value) +{ + unsigned char data[16]; + value.getFixedWidthValue<16>(data); + return qpid::types::Uuid(data); +} + +Variant toVariant(boost::shared_ptr in) +{ + Variant out; + //based on AMQP 0-10 typecode, pick most appropriate variant type + switch (in->getType()) { + //Fixed Width types: + case 0x01: out.setEncoding(amqp0_10_binary); + case 0x02: out = in->getIntegerValue(); break; + case 0x03: out = in->getIntegerValue(); break; + case 0x04: break; //TODO: iso-8859-15 char + case 0x08: out = static_cast(in->getIntegerValue()); break; + case 0x10: out.setEncoding(amqp0_10_binary); + case 0x11: out = in->getIntegerValue(); break; + case 0x12: out = in->getIntegerValue(); break; + case 0x20: out.setEncoding(amqp0_10_binary); + case 0x21: out = in->getIntegerValue(); break; + case 0x22: out = in->getIntegerValue(); break; + case 0x23: out = in->get(); break; + case 0x27: break; //TODO: utf-32 char + case 0x30: out.setEncoding(amqp0_10_binary); + case 0x31: out = in->getIntegerValue(); break; + case 0x38: out.setEncoding(amqp0_10_datetime); //treat datetime as uint64_t, but set encoding + case 0x32: out = in->getIntegerValue(); break; + case 0x33: out = in->get(); break; + + case 0x48: out = getUuid(*in); break; + + //TODO: figure out whether and how to map values with codes 0x40-0xd8 + + case 0xf0: break;//void, which is the default value for Variant + case 0xf1: out.setEncoding(amqp0_10_bit); break;//treat 'bit' as void, which is the default value for Variant + + //Variable Width types: + //strings: + case 0x80: + case 0x84: + case 0x85: + case 0x86: + case 0x90: + case 0x94: + case 0x95: + case 0x96: + case 0xa0: + case 0xab: + out = in->get(); + setEncodingFor(out, in->getType()); + break; + + case 0xa8: + out = Variant::Map(); + translate(in, out.asMap(), &toVariantMapEntry); + break; + + case 0xa9: + out = Variant::List(); + translate(in, out.asList(), &toVariant); + break; + case 0xaa: //convert amqp0-10 array into variant list + out = Variant::List(); + translate(in, out.asList(), &toVariant); + break; + + default: + //error? + break; + } + return out; +} + +boost::shared_ptr convertString(const std::string& value, const std::string& encoding) +{ + bool large = value.size() > std::numeric_limits::max(); + if (encoding.empty() || encoding == amqp0_10_binary || encoding == binary) { + if (large) { + return boost::shared_ptr(new Var32Value(value, 0xa0)); + } else { + return boost::shared_ptr(new Var16Value(value, 0x90)); + } + } else if (encoding == utf8 && !large) { + return boost::shared_ptr(new Str16Value(value)); + } else if (encoding == utf16 && !large) { + return boost::shared_ptr(new Var16Value(value, 0x96)); + } else if (encoding == iso885915 && !large) { + return boost::shared_ptr(new Var16Value(value, 0x94)); + } else { + //either the string is too large for the encoding in amqp 0-10, or the encoding was not recognised + QPID_LOG(warning, "Could not encode " << value.size() << " byte value as " << encoding << ", encoding as vbin32."); + return boost::shared_ptr(new Var32Value(value, 0xa0)); + } +} + +boost::shared_ptr toFieldValue(const Variant& in) +{ + boost::shared_ptr out; + switch (in.getType()) { + case VAR_VOID: out = boost::shared_ptr(new VoidValue()); break; + case VAR_BOOL: out = boost::shared_ptr(new BoolValue(in.asBool())); break; + case VAR_UINT8: out = boost::shared_ptr(new Unsigned8Value(in.asUint8())); break; + case VAR_UINT16: out = boost::shared_ptr(new Unsigned16Value(in.asUint16())); break; + case VAR_UINT32: out = boost::shared_ptr(new Unsigned32Value(in.asUint32())); break; + case VAR_UINT64: out = boost::shared_ptr(new Unsigned64Value(in.asUint64())); break; + case VAR_INT8: out = boost::shared_ptr(new Integer8Value(in.asInt8())); break; + case VAR_INT16: out = boost::shared_ptr(new Integer16Value(in.asInt16())); break; + case VAR_INT32: out = boost::shared_ptr(new Integer32Value(in.asInt32())); break; + case VAR_INT64: out = boost::shared_ptr(new Integer64Value(in.asInt64())); break; + case VAR_FLOAT: out = boost::shared_ptr(new FloatValue(in.asFloat())); break; + case VAR_DOUBLE: out = boost::shared_ptr(new DoubleValue(in.asDouble())); break; + case VAR_STRING: out = convertString(in.asString(), in.getEncoding()); break; + case VAR_UUID: out = boost::shared_ptr(new UuidValue(in.asUuid().data())); break; + case VAR_MAP: + out = boost::shared_ptr(toFieldTableValue(in.asMap())); + break; + case VAR_LIST: + out = boost::shared_ptr(toListValue(in.asList())); + break; + } + return out; +} + +Variant::Map::value_type toVariantMapEntry(const FieldTable::value_type& in) +{ + return Variant::Map::value_type(in.first, toVariant(in.second)); +} + +FieldTable::value_type toFieldTableEntry(const Variant::Map::value_type& in) +{ + return FieldTable::value_type(in.first, toFieldValue(in.second)); +} + +struct EncodeBuffer +{ + char* data; + Buffer buffer; + + EncodeBuffer(size_t size) : data(new char[size]), buffer(data, size) {} + ~EncodeBuffer() { delete[] data; } + + template void encode(T& t) { t.encode(buffer); } + + void getData(std::string& s) { + s.assign(data, buffer.getSize()); + } +}; + +struct DecodeBuffer +{ + Buffer buffer; + + DecodeBuffer(const std::string& s) : buffer(const_cast(s.data()), s.size()) {} + + template void decode(T& t) { t.decode(buffer); } + +}; + +template void _encode(const U& value, std::string& data, F f) +{ + T t; + convert(value, t, f); + EncodeBuffer buffer(t.encodedSize()); + buffer.encode(t); + buffer.getData(data); +} + +template void _decode(const std::string& data, U& value, F f) +{ + T t; + DecodeBuffer buffer(data); + buffer.decode(t); + convert(t, value, f); +} + +void MapCodec::encode(const Variant::Map& value, std::string& data) +{ + _encode(value, data, &toFieldTableEntry); +} + +void MapCodec::decode(const std::string& data, Variant::Map& value) +{ + _decode(data, value, &toVariantMapEntry); +} + +void ListCodec::encode(const Variant::List& value, std::string& data) +{ + _encode(value, data, &toFieldValue); +} + +void ListCodec::decode(const std::string& data, Variant::List& value) +{ + _decode(data, value, &toVariant); +} + +void translate(const Variant::Map& from, FieldTable& to) +{ + convert(from, to, &toFieldTableEntry); +} + +void translate(const FieldTable& from, Variant::Map& to) +{ + convert(from, to, &toVariantMapEntry); +} + +const std::string ListCodec::contentType("amqp/list"); +const std::string MapCodec::contentType("amqp/map"); + +}} // namespace qpid::amqp_0_10 diff --git a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp index 990b2a19d8..f64a46ba01 100644 --- a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp +++ b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp @@ -19,8 +19,7 @@ * */ #include "qpid/client/amqp0_10/AddressResolution.h" -#include "qpid/client/amqp0_10/Codecs.h" -#include "qpid/client/amqp0_10/CodecsInternal.h" +#include "qpid/amqp_0_10/Codecs.h" #include "qpid/client/amqp0_10/MessageSource.h" #include "qpid/client/amqp0_10/MessageSink.h" #include "qpid/client/amqp0_10/OutgoingMessage.h" @@ -55,6 +54,7 @@ using qpid::framing::ReplyTo; using qpid::framing::Uuid; using namespace qpid::types; using namespace qpid::framing::message; +using namespace qpid::amqp_0_10; using namespace boost::assign; @@ -262,43 +262,29 @@ bool in(const Variant& value, const std::vector& choices) return false; } -bool getReceiverPolicy(const Address& address, const std::string& key) -{ - return in(address.getOption(key), list_of(ALWAYS)(RECEIVER)); -} - -bool getSenderPolicy(const Address& address, const std::string& key) -{ - return in(address.getOption(key), list_of(ALWAYS)(SENDER)); -} - -const Variant& getOption(const Variant::Map& options, const std::vector& path, size_t index=0) +const Variant& getOption(const Variant::Map& options, const std::string& name) { - Variant::Map::const_iterator j = options.find(path[index]); + Variant::Map::const_iterator j = options.find(name); if (j == options.end()) { return EMPTY_VARIANT; - } else if (++index < path.size()) { - if (j->second.getType() != VAR_MAP) - throw InvalidAddress((boost::format("Expected %1% to be a map") % j->first).str()); - return getOption(j->second.asMap(), path, index); } else { return j->second; } } -const Variant& getOption(const Address& address, const std::vector& path) +const Variant& getOption(const Address& address, const std::string& name) { - return getOption(address.getOptions(), path); + return getOption(address.getOptions(), name); } -const Variant& getOption(const Variant::Map& options, const std::string& name) +bool getReceiverPolicy(const Address& address, const std::string& key) { - Variant::Map::const_iterator j = options.find(name); - if (j == options.end()) { - return EMPTY_VARIANT; - } else { - return j->second; - } + return in(getOption(address, key), list_of(ALWAYS)(RECEIVER)); +} + +bool getSenderPolicy(const Address& address, const std::string& key) +{ + return in(getOption(address, key), list_of(ALWAYS)(SENDER)); } struct Opt @@ -360,13 +346,14 @@ void Opt::collect(qpid::framing::FieldTable& args) const bool AddressResolution::is_unreliable(const Address& address) { - return in(getOption(address, list_of(LINK)(RELIABILITY)), + + return in((Opt(address)/LINK/RELIABILITY).str(), list_of(UNRELIABLE)(AT_MOST_ONCE)); } bool AddressResolution::is_reliable(const Address& address) { - return in(getOption(address, list_of(LINK)(RELIABILITY)), + return in((Opt(address)/LINK/RELIABILITY).str(), list_of(AT_LEAST_ONCE)(EXACTLY_ONCE)); } @@ -433,7 +420,7 @@ std::auto_ptr AddressResolution::resolveSink(qpid::client::Session bool isBrowse(const Address& address) { - const Variant& mode = address.getOption(MODE); + const Variant& mode = getOption(address, MODE); if (!mode.isVoid()) { std::string value = mode.asString(); if (value == BROWSE) return true; @@ -651,9 +638,9 @@ bool isTopic(qpid::client::Session session, const qpid::messaging::Address& addr } Node::Node(const Address& address) : name(address.getName()), - createPolicy(address.getOption(CREATE)), - assertPolicy(address.getOption(ASSERT)), - deletePolicy(address.getOption(DELETE)) + createPolicy(getOption(address, CREATE)), + assertPolicy(getOption(address, ASSERT)), + deletePolicy(getOption(address, DELETE)) { nodeBindings.add((Opt(address)/NODE/X_BINDINGS).asList()); linkBindings.add((Opt(address)/LINK/X_BINDINGS).asList()); @@ -908,7 +895,7 @@ bool Node::enabled(const Variant& policy, CheckMode mode) bool Node::createEnabled(const Address& address, CheckMode mode) { - const Variant& policy = address.getOption(CREATE); + const Variant& policy = getOption(address, CREATE); return enabled(policy, mode); } diff --git a/cpp/src/qpid/client/amqp0_10/Codecs.cpp b/cpp/src/qpid/client/amqp0_10/Codecs.cpp deleted file mode 100644 index ce806572e5..0000000000 --- a/cpp/src/qpid/client/amqp0_10/Codecs.cpp +++ /dev/null @@ -1,332 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/client/amqp0_10/Codecs.h" -#include "qpid/types/Variant.h" -#include "qpid/framing/Array.h" -#include "qpid/framing/Buffer.h" -#include "qpid/framing/FieldTable.h" -#include "qpid/framing/FieldValue.h" -#include "qpid/framing/List.h" -#include "qpid/log/Statement.h" -#include -#include -#include - -using namespace qpid::framing; -using namespace qpid::messaging; -using namespace qpid::types; - -namespace qpid { -namespace client { -namespace amqp0_10 { - -namespace { -const std::string iso885915("iso-8859-15"); -const std::string utf8("utf8"); -const std::string utf16("utf16"); -const std::string binary("binary"); -const std::string amqp0_10_binary("amqp0-10:binary"); -const std::string amqp0_10_bit("amqp0-10:bit"); -const std::string amqp0_10_datetime("amqp0-10:datetime"); -const std::string amqp0_10_struct("amqp0-10:struct"); -} - -template void convert(const T& from, U& to, F f) -{ - std::transform(from.begin(), from.end(), std::inserter(to, to.begin()), f); -} - -Variant::Map::value_type toVariantMapEntry(const FieldTable::value_type& in); -FieldTable::value_type toFieldTableEntry(const Variant::Map::value_type& in); -Variant toVariant(boost::shared_ptr in); -boost::shared_ptr toFieldValue(const Variant& in); - -template void translate(boost::shared_ptr in, U& u, F f) -{ - T t; - getEncodedValue(in, t); - convert(t, u, f); -} - -template T* toFieldValueCollection(const U& u, F f) -{ - typename T::ValueType t; - convert(u, t, f); - return new T(t); -} - -FieldTableValue* toFieldTableValue(const Variant::Map& map) -{ - FieldTable ft; - convert(map, ft, &toFieldTableEntry); - return new FieldTableValue(ft); -} - -ListValue* toListValue(const Variant::List& list) -{ - List l; - convert(list, l, &toFieldValue); - return new ListValue(l); -} - -void setEncodingFor(Variant& out, uint8_t code) -{ - switch(code){ - case 0x80: - case 0x90: - case 0xa0: - out.setEncoding(amqp0_10_binary); - break; - case 0x84: - case 0x94: - out.setEncoding(iso885915); - break; - case 0x85: - case 0x95: - out.setEncoding(utf8); - break; - case 0x86: - case 0x96: - out.setEncoding(utf16); - break; - case 0xab: - out.setEncoding(amqp0_10_struct); - break; - default: - //do nothing - break; - } -} - -qpid::types::Uuid getUuid(FieldValue& value) -{ - unsigned char data[16]; - value.getFixedWidthValue<16>(data); - return qpid::types::Uuid(data); -} - -Variant toVariant(boost::shared_ptr in) -{ - Variant out; - //based on AMQP 0-10 typecode, pick most appropriate variant type - switch (in->getType()) { - //Fixed Width types: - case 0x01: out.setEncoding(amqp0_10_binary); - case 0x02: out = in->getIntegerValue(); break; - case 0x03: out = in->getIntegerValue(); break; - case 0x04: break; //TODO: iso-8859-15 char - case 0x08: out = static_cast(in->getIntegerValue()); break; - case 0x10: out.setEncoding(amqp0_10_binary); - case 0x11: out = in->getIntegerValue(); break; - case 0x12: out = in->getIntegerValue(); break; - case 0x20: out.setEncoding(amqp0_10_binary); - case 0x21: out = in->getIntegerValue(); break; - case 0x22: out = in->getIntegerValue(); break; - case 0x23: out = in->get(); break; - case 0x27: break; //TODO: utf-32 char - case 0x30: out.setEncoding(amqp0_10_binary); - case 0x31: out = in->getIntegerValue(); break; - case 0x38: out.setEncoding(amqp0_10_datetime); //treat datetime as uint64_t, but set encoding - case 0x32: out = in->getIntegerValue(); break; - case 0x33: out = in->get(); break; - - case 0x48: out = getUuid(*in); break; - - //TODO: figure out whether and how to map values with codes 0x40-0xd8 - - case 0xf0: break;//void, which is the default value for Variant - case 0xf1: out.setEncoding(amqp0_10_bit); break;//treat 'bit' as void, which is the default value for Variant - - //Variable Width types: - //strings: - case 0x80: - case 0x84: - case 0x85: - case 0x86: - case 0x90: - case 0x94: - case 0x95: - case 0x96: - case 0xa0: - case 0xab: - out = in->get(); - setEncodingFor(out, in->getType()); - break; - - case 0xa8: - out = Variant::Map(); - translate(in, out.asMap(), &toVariantMapEntry); - break; - - case 0xa9: - out = Variant::List(); - translate(in, out.asList(), &toVariant); - break; - case 0xaa: //convert amqp0-10 array into variant list - out = Variant::List(); - translate(in, out.asList(), &toVariant); - break; - - default: - //error? - break; - } - return out; -} - -boost::shared_ptr convertString(const std::string& value, const std::string& encoding) -{ - bool large = value.size() > std::numeric_limits::max(); - if (encoding.empty() || encoding == amqp0_10_binary || encoding == binary) { - if (large) { - return boost::shared_ptr(new Var32Value(value, 0xa0)); - } else { - return boost::shared_ptr(new Var16Value(value, 0x90)); - } - } else if (encoding == utf8 && !large) { - return boost::shared_ptr(new Str16Value(value)); - } else if (encoding == utf16 && !large) { - return boost::shared_ptr(new Var16Value(value, 0x96)); - } else if (encoding == iso885915 && !large) { - return boost::shared_ptr(new Var16Value(value, 0x94)); - } else { - //either the string is too large for the encoding in amqp 0-10, or the encoding was not recognised - QPID_LOG(warning, "Could not encode " << value.size() << " byte value as " << encoding << ", encoding as vbin32."); - return boost::shared_ptr(new Var32Value(value, 0xa0)); - } -} - -boost::shared_ptr toFieldValue(const Variant& in) -{ - boost::shared_ptr out; - switch (in.getType()) { - case VAR_VOID: out = boost::shared_ptr(new VoidValue()); break; - case VAR_BOOL: out = boost::shared_ptr(new BoolValue(in.asBool())); break; - case VAR_UINT8: out = boost::shared_ptr(new Unsigned8Value(in.asUint8())); break; - case VAR_UINT16: out = boost::shared_ptr(new Unsigned16Value(in.asUint16())); break; - case VAR_UINT32: out = boost::shared_ptr(new Unsigned32Value(in.asUint32())); break; - case VAR_UINT64: out = boost::shared_ptr(new Unsigned64Value(in.asUint64())); break; - case VAR_INT8: out = boost::shared_ptr(new Integer8Value(in.asInt8())); break; - case VAR_INT16: out = boost::shared_ptr(new Integer16Value(in.asInt16())); break; - case VAR_INT32: out = boost::shared_ptr(new Integer32Value(in.asInt32())); break; - case VAR_INT64: out = boost::shared_ptr(new Integer64Value(in.asInt64())); break; - case VAR_FLOAT: out = boost::shared_ptr(new FloatValue(in.asFloat())); break; - case VAR_DOUBLE: out = boost::shared_ptr(new DoubleValue(in.asDouble())); break; - case VAR_STRING: out = convertString(in.asString(), in.getEncoding()); break; - case VAR_UUID: out = boost::shared_ptr(new UuidValue(in.asUuid().data())); break; - case VAR_MAP: - out = boost::shared_ptr(toFieldTableValue(in.asMap())); - break; - case VAR_LIST: - out = boost::shared_ptr(toListValue(in.asList())); - break; - } - return out; -} - -Variant::Map::value_type toVariantMapEntry(const FieldTable::value_type& in) -{ - return Variant::Map::value_type(in.first, toVariant(in.second)); -} - -FieldTable::value_type toFieldTableEntry(const Variant::Map::value_type& in) -{ - return FieldTable::value_type(in.first, toFieldValue(in.second)); -} - -struct EncodeBuffer -{ - char* data; - Buffer buffer; - - EncodeBuffer(size_t size) : data(new char[size]), buffer(data, size) {} - ~EncodeBuffer() { delete[] data; } - - template void encode(T& t) { t.encode(buffer); } - - void getData(std::string& s) { - s.assign(data, buffer.getSize()); - } -}; - -struct DecodeBuffer -{ - Buffer buffer; - - DecodeBuffer(const std::string& s) : buffer(const_cast(s.data()), s.size()) {} - - template void decode(T& t) { t.decode(buffer); } - -}; - -template void _encode(const U& value, std::string& data, F f) -{ - T t; - convert(value, t, f); - EncodeBuffer buffer(t.encodedSize()); - buffer.encode(t); - buffer.getData(data); -} - -template void _decode(const std::string& data, U& value, F f) -{ - T t; - DecodeBuffer buffer(data); - buffer.decode(t); - convert(t, value, f); -} - -void MapCodec::encode(const Variant& value, std::string& data) -{ - _encode(value.asMap(), data, &toFieldTableEntry); -} - -void MapCodec::decode(const std::string& data, Variant& value) -{ - value = Variant::Map(); - _decode(data, value.asMap(), &toVariantMapEntry); -} - -void ListCodec::encode(const Variant& value, std::string& data) -{ - _encode(value.asList(), data, &toFieldValue); -} - -void ListCodec::decode(const std::string& data, Variant& value) -{ - value = Variant::List(); - _decode(data, value.asList(), &toVariant); -} - -void translate(const Variant::Map& from, FieldTable& to) -{ - convert(from, to, &toFieldTableEntry); -} - -void translate(const FieldTable& from, Variant::Map& to) -{ - convert(from, to, &toVariantMapEntry); -} - -const std::string ListCodec::contentType("amqp/list"); -const std::string MapCodec::contentType("amqp/map"); - -}}} // namespace qpid::client::amqp0_10 diff --git a/cpp/src/qpid/client/amqp0_10/CodecsInternal.h b/cpp/src/qpid/client/amqp0_10/CodecsInternal.h deleted file mode 100644 index a110d80b8a..0000000000 --- a/cpp/src/qpid/client/amqp0_10/CodecsInternal.h +++ /dev/null @@ -1,41 +0,0 @@ -#ifndef QPID_CLIENT_AMQP0_10_CODECSINTERNAL_H -#define QPID_CLIENT_AMQP0_10_CODECSINTERNAL_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/types/Variant.h" -#include "qpid/framing/FieldTable.h" - -namespace qpid { -namespace client { -namespace amqp0_10 { - -/** - * Declarations of a couple of conversion functions implemented in - * Codecs.cpp but not exposed through API - */ - -void translate(const qpid::types::Variant::Map& from, qpid::framing::FieldTable& to); -void translate(const qpid::framing::FieldTable& from, qpid::types::Variant::Map& to); - -}}} // namespace qpid::client::amqp0_10 - -#endif /*!QPID_CLIENT_AMQP0_10_CODECSINTERNAL_H*/ diff --git a/cpp/src/qpid/client/amqp0_10/FailoverUpdates.cpp b/cpp/src/qpid/client/amqp0_10/FailoverUpdates.cpp index 5e526a2ffc..354d6f6aba 100644 --- a/cpp/src/qpid/client/amqp0_10/FailoverUpdates.cpp +++ b/cpp/src/qpid/client/amqp0_10/FailoverUpdates.cpp @@ -46,7 +46,7 @@ struct FailoverUpdatesImpl : qpid::sys::Runnable FailoverUpdatesImpl(Connection& c) : connection(c), quit(false) { - session = connection.newSession("failover-updates"); + session = connection.createSession("failover-updates"); receiver = session.createReceiver("amq.failover"); thread = qpid::sys::Thread(*this); } diff --git a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp index 345ebfb66d..c26b2eb09f 100644 --- a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp +++ b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp @@ -20,8 +20,7 @@ */ #include "qpid/client/amqp0_10/IncomingMessages.h" #include "qpid/client/amqp0_10/AddressResolution.h" -#include "qpid/client/amqp0_10/Codecs.h" -#include "qpid/client/amqp0_10/CodecsInternal.h" +#include "qpid/amqp_0_10/Codecs.h" #include "qpid/client/SessionImpl.h" #include "qpid/client/SessionBase_0_10Access.h" #include "qpid/log/Statement.h" @@ -42,6 +41,7 @@ namespace amqp0_10 { using namespace qpid::framing; using namespace qpid::framing::message; +using namespace qpid::amqp_0_10; using qpid::sys::AbsTime; using qpid::sys::Duration; using qpid::messaging::MessageImplAccess; @@ -306,7 +306,7 @@ void populate(qpid::messaging::Message& message, FrameSet& command) //e.g. for rejecting. MessageImplAccess::get(message).setInternalId(command.getId()); - command.getContent(message.getContent()); + message.setContent(command.getContent()); populateHeaders(message, command.getHeaders()); } diff --git a/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp b/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp index b19b26f903..c22eb5403f 100644 --- a/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp +++ b/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp @@ -20,8 +20,7 @@ */ #include "qpid/client/amqp0_10/OutgoingMessage.h" #include "qpid/client/amqp0_10/AddressResolution.h" -#include "qpid/client/amqp0_10/Codecs.h" -#include "qpid/client/amqp0_10/CodecsInternal.h" +#include "qpid/amqp_0_10/Codecs.h" #include "qpid/messaging/Address.h" #include "qpid/messaging/Message.h" #include "qpid/messaging/MessageImpl.h" @@ -34,6 +33,7 @@ namespace amqp0_10 { using qpid::messaging::Address; using qpid::messaging::MessageImplAccess; using namespace qpid::framing::message; +using namespace qpid::amqp_0_10; void OutgoingMessage::convert(const qpid::messaging::Message& from) { diff --git a/cpp/src/qpid/messaging/Address.cpp b/cpp/src/qpid/messaging/Address.cpp index 2902b8fc4b..0c522888e7 100644 --- a/cpp/src/qpid/messaging/Address.cpp +++ b/cpp/src/qpid/messaging/Address.cpp @@ -92,7 +92,7 @@ Address::~Address() { delete impl; } Address& Address::operator=(const Address& a) { *impl = *a.impl; return *this; } -std::string Address::toStr() const +std::string Address::str() const { std::stringstream out; out << impl->name; @@ -106,7 +106,6 @@ bool Address::operator !() const { return impl->name.empty(); } const std::string& Address::getName() const { return impl->name; } void Address::setName(const std::string& name) { impl->name = name; } const std::string& Address::getSubject() const { return impl->subject; } -bool Address::hasSubject() const { return !(impl->subject.empty()); } void Address::setSubject(const std::string& subject) { impl->subject = subject; } const Variant::Map& Address::getOptions() const { return impl->options; } Variant::Map& Address::getOptions() { return impl->options; } @@ -128,7 +127,7 @@ const Variant& find(const Variant::Map& map, const std::string& key) std::string Address::getType() const { - const Variant& props = getOption(NODE_PROPERTIES); + const Variant& props = find(impl->options, NODE_PROPERTIES); if (props.getType() == VAR_MAP) { const Variant& type = find(props.asMap(), TYPE); if (!type.isVoid()) return type.asString(); @@ -143,14 +142,9 @@ void Address::setType(const std::string& type) props.asMap()[TYPE] = type; } -const Variant& Address::getOption(const std::string& key) const -{ - return find(impl->options, key); -} - std::ostream& operator<<(std::ostream& out, const Address& address) { - out << address.toStr(); + out << address.str(); return out; } diff --git a/cpp/src/qpid/messaging/Connection.cpp b/cpp/src/qpid/messaging/Connection.cpp index 853ba1d100..b9437c7931 100644 --- a/cpp/src/qpid/messaging/Connection.cpp +++ b/cpp/src/qpid/messaging/Connection.cpp @@ -56,11 +56,10 @@ Connection::Connection(const Variant::Map& options) void Connection::open(const std::string& url) { impl->open(url); } void Connection::close() { impl->close(); } -Session Connection::newSession(const char* name) { return impl->newSession(false, name); } -Session Connection::newSession(const std::string& name) { return impl->newSession(false, name); } -Session Connection::newSession(bool transactional, const std::string& name) +Session Connection::createSession(const std::string& name) { return impl->newSession(false, name); } +Session Connection::createTransactionalSession(const std::string& name) { - return impl->newSession(transactional, name); + return impl->newSession(true, name); } Session Connection::getSession(const std::string& name) const { return impl->getSession(name); } void Connection::setOption(const std::string& name, const Variant& value) diff --git a/cpp/src/qpid/messaging/ListContent.cpp b/cpp/src/qpid/messaging/ListContent.cpp deleted file mode 100644 index 4bd990e062..0000000000 --- a/cpp/src/qpid/messaging/ListContent.cpp +++ /dev/null @@ -1,106 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/messaging/ListContent.h" -#include "qpid/messaging/Message.h" -#include "qpid/client/amqp0_10/Codecs.h" - -namespace qpid { -namespace messaging { - -using namespace qpid::types; - -class ListContentImpl : public Variant -{ - Message* msg; - public: - ListContentImpl(Message& m) : Variant(Variant::List()), msg(&m) - { - if (msg->getContent().size()) { - qpid::client::amqp0_10::ListCodec codec; - codec.decode(msg->getContent(), *this); - } - } - - ListContentImpl(Message& m, const Variant::List& i) : Variant(i), msg(&m) - { - msg->getContent().clear(); - } - - void encode() - { - qpid::client::amqp0_10::ListCodec codec; - codec.encode(*this, msg->getContent()); - } -}; - -ListContent::ListContent(Message& m) : impl(new ListContentImpl(m)) {} -ListContent::ListContent(Message& m, const Variant::List& i) : impl(new ListContentImpl(m, i)) {} -ListContent::~ListContent() { delete impl; } -ListContent& ListContent::operator=(const ListContent& l) { *impl = *l.impl; return *this; } - -ListContent::const_iterator ListContent::begin() const { return impl->asList().begin(); } -ListContent::const_iterator ListContent::end() const { return impl->asList().end(); } -ListContent::const_reverse_iterator ListContent::rbegin() const { return impl->asList().rbegin(); } -ListContent::const_reverse_iterator ListContent::rend() const { return impl->asList().rend(); } - -ListContent::iterator ListContent::begin() { return impl->asList().begin(); } -ListContent::iterator ListContent::end() { return impl->asList().end(); } -ListContent::reverse_iterator ListContent::rbegin() { return impl->asList().rbegin(); } -ListContent::reverse_iterator ListContent::rend() { return impl->asList().rend(); } - -bool ListContent::empty() const { return impl->asList().empty(); } -size_t ListContent::size() const { return impl->asList().size(); } - -const Variant& ListContent::front() const { return impl->asList().front(); } -Variant& ListContent::front() { return impl->asList().front(); } -const Variant& ListContent::back() const { return impl->asList().back(); } -Variant& ListContent::back() { return impl->asList().back(); } - -void ListContent::push_front(const Variant& v) { impl->asList().push_front(v); } -void ListContent::push_back(const Variant& v) { impl->asList().push_back(v); } - -void ListContent::pop_front() { impl->asList().pop_front(); } -void ListContent::pop_back() { impl->asList().pop_back(); } - -ListContent::iterator ListContent::insert(iterator position, const Variant& v) -{ - return impl->asList().insert(position, v); -} -void ListContent::insert(iterator position, size_t n, const Variant& v) -{ - impl->asList().insert(position, n, v); -} -ListContent::iterator ListContent::erase(iterator position) { return impl->asList().erase(position); } -ListContent::iterator ListContent::erase(iterator first, iterator last) { return impl->asList().erase(first, last); } -void ListContent::clear() { impl->asList().clear(); } - -void ListContent::encode() { impl->encode(); } - -const Variant::List& ListContent::asList() const { return impl->asList(); } -Variant::List& ListContent::asList() { return impl->asList(); } - -std::ostream& operator<<(std::ostream& out, const ListContent& m) -{ - out << m.asList(); - return out; -} - -}} // namespace qpid::messaging diff --git a/cpp/src/qpid/messaging/ListView.cpp b/cpp/src/qpid/messaging/ListView.cpp deleted file mode 100644 index 5cf453254f..0000000000 --- a/cpp/src/qpid/messaging/ListView.cpp +++ /dev/null @@ -1,65 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/messaging/ListView.h" -#include "qpid/messaging/Message.h" -#include "qpid/client/amqp0_10/Codecs.h" - -namespace qpid { -namespace messaging { - -using namespace qpid::types; - -class ListViewImpl : public Variant -{ - public: - ListViewImpl(const Message& msg) : Variant(Variant::List()) - { - if (msg.getContent().size()) { - qpid::client::amqp0_10::ListCodec codec; - codec.decode(msg.getContent(), *this); - } - } -}; - -ListView::ListView(const Message& m) :impl(new ListViewImpl(m)) {} -ListView::~ListView() { delete impl; } -ListView& ListView::operator=(const ListView& l) { *impl = *l.impl; return *this; } - -ListView::const_iterator ListView::begin() const { return impl->asList().begin(); } -ListView::const_iterator ListView::end() const { return impl->asList().end(); } -ListView::const_reverse_iterator ListView::rbegin() const { return impl->asList().rbegin(); } -ListView::const_reverse_iterator ListView::rend() const { return impl->asList().rend(); } - -bool ListView::empty() const { return impl->asList().empty(); } -size_t ListView::size() const { return impl->asList().size(); } - -const Variant& ListView::front() const { return impl->asList().front(); } -const Variant& ListView::back() const { return impl->asList().back(); } - -const Variant::List& ListView::asList() const { return impl->asList(); } - -std::ostream& operator<<(std::ostream& out, const ListView& m) -{ - out << m.asList(); - return out; -} - -}} // namespace qpid::messaging diff --git a/cpp/src/qpid/messaging/MapContent.cpp b/cpp/src/qpid/messaging/MapContent.cpp deleted file mode 100644 index 11dcfbaf70..0000000000 --- a/cpp/src/qpid/messaging/MapContent.cpp +++ /dev/null @@ -1,96 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/messaging/MapContent.h" -#include "qpid/messaging/Message.h" -#include "qpid/client/amqp0_10/Codecs.h" - -namespace qpid { -namespace messaging { - -using namespace qpid::types; - -class MapContentImpl : public Variant -{ - Message* msg; - public: - MapContentImpl(Message& m) : Variant(Variant::Map()), msg(&m) - { - if (msg->getContent().size()) { - qpid::client::amqp0_10::MapCodec codec; - codec.decode(msg->getContent(), *this); - } - } - - MapContentImpl(Message& m, const Variant::Map& i) : Variant(i), msg(&m) - { - msg->getContent().clear(); - } - - void encode() - { - qpid::client::amqp0_10::MapCodec codec; - codec.encode(*this, msg->getContent()); - msg->setContentType(qpid::client::amqp0_10::MapCodec::contentType); - } -}; - -MapContent::MapContent(Message& m) : impl(new MapContentImpl(m)) {} -MapContent::MapContent(Message& m, const Variant::Map& i) : impl(new MapContentImpl(m, i)) {} -MapContent::~MapContent() { delete impl; } -MapContent& MapContent::operator=(const MapContent& m) { *impl = *m.impl; return *this; } - -MapContent::const_iterator MapContent::begin() const { return impl->asMap().begin(); } -MapContent::const_iterator MapContent::end() const { return impl->asMap().end(); } -MapContent::const_reverse_iterator MapContent::rbegin() const { return impl->asMap().rbegin(); } -MapContent::const_reverse_iterator MapContent::rend() const { return impl->asMap().rend(); } -MapContent::iterator MapContent::begin() { return impl->asMap().begin(); } -MapContent::iterator MapContent::end() { return impl->asMap().end(); } -MapContent::reverse_iterator MapContent::rbegin() { return impl->asMap().rbegin(); } -MapContent::reverse_iterator MapContent::rend() { return impl->asMap().rend(); } - -bool MapContent::empty() const { return impl->asMap().empty(); } -size_t MapContent::size() const { return impl->asMap().size(); } - -MapContent::const_iterator MapContent::find(const key_type& key) const { return impl->asMap().find(key); } -MapContent::iterator MapContent::find(const key_type& key) { return impl->asMap().find(key); } -const Variant& MapContent::operator[](const key_type& key) const { return impl->asMap()[key]; } -Variant& MapContent::operator[](const key_type& key) { return impl->asMap()[key]; } - -std::pair MapContent::insert(const value_type& item) { return impl->asMap().insert(item); } -MapContent::iterator MapContent::insert(iterator position, const value_type& item) { return impl->asMap().insert(position, item); } -void MapContent::erase(iterator position) { impl->asMap().erase(position); } -void MapContent::erase(iterator first, iterator last) { impl->asMap().erase(first, last); } -size_t MapContent::erase(const key_type& key) { return impl->asMap().erase(key); } -void MapContent::clear() { impl->asMap().clear(); } - -void MapContent::encode() { impl->encode(); } - -const std::map& MapContent::asMap() const { return impl->asMap(); } -std::map& MapContent::asMap() { return impl->asMap(); } - - -std::ostream& operator<<(std::ostream& out, const MapContent& m) -{ - out << m.asMap(); - return out; -} - -}} // namespace qpid::messaging diff --git a/cpp/src/qpid/messaging/MapView.cpp b/cpp/src/qpid/messaging/MapView.cpp deleted file mode 100644 index 8bb0a724d2..0000000000 --- a/cpp/src/qpid/messaging/MapView.cpp +++ /dev/null @@ -1,65 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/messaging/MapView.h" -#include "qpid/messaging/Message.h" -#include "qpid/client/amqp0_10/Codecs.h" - -namespace qpid { -namespace messaging { - -using namespace qpid::types; - -class MapViewImpl : public Variant -{ - public: - MapViewImpl(const Message& msg) : Variant(Variant::Map()) - { - if (msg.getContent().size()) { - qpid::client::amqp0_10::MapCodec codec; - codec.decode(msg.getContent(), *this); - } - } -}; - -MapView::MapView(const Message& m) : impl(new MapViewImpl(m)) {} -MapView::~MapView() { delete impl; } -MapView& MapView::operator=(const MapView& m) { *impl = *m.impl; return *this; } - -MapView::const_iterator MapView::begin() const { return impl->asMap().begin(); } -MapView::const_iterator MapView::end() const { return impl->asMap().end(); } -MapView::const_reverse_iterator MapView::rbegin() const { return impl->asMap().rbegin(); } -MapView::const_reverse_iterator MapView::rend() const { return impl->asMap().rend(); } - -bool MapView::empty() const { return impl->asMap().empty(); } -size_t MapView::size() const { return impl->asMap().size(); } - -MapView::const_iterator MapView::find(const key_type& key) const { return impl->asMap().find(key); } -const Variant& MapView::operator[](const key_type& key) const { return impl->asMap()[key]; } - -const std::map& MapView::asMap() const { return impl->asMap(); } - -std::ostream& operator<<(std::ostream& out, const MapView& m) -{ - out << m.asMap(); - return out; -} - -}} // namespace qpid::messaging diff --git a/cpp/src/qpid/messaging/Message.cpp b/cpp/src/qpid/messaging/Message.cpp index 84245b7296..bbbb257b18 100644 --- a/cpp/src/qpid/messaging/Message.cpp +++ b/cpp/src/qpid/messaging/Message.cpp @@ -20,6 +20,8 @@ */ #include "qpid/messaging/Message.h" #include "qpid/messaging/MessageImpl.h" +#include "qpid/amqp_0_10/Codecs.h" +#include namespace qpid { namespace messaging { @@ -64,18 +66,73 @@ bool Message::getDurable() const { return impl->durable; } bool Message::getRedelivered() const { return impl->redelivered; } void Message::setRedelivered(bool redelivered) { impl->redelivered = redelivered; } -const VariantMap& Message::getProperties() const { return impl->getHeaders(); } -VariantMap& Message::getProperties() { return impl->getHeaders(); } +const Variant::Map& Message::getProperties() const { return impl->getHeaders(); } +Variant::Map& Message::getProperties() { return impl->getHeaders(); } void Message::setContent(const std::string& c) { impl->setBytes(c); } void Message::setContent(const char* chars, size_t count) { impl->setBytes(chars, count); } -const std::string& Message::getContent() const { return impl->getBytes(); } -std::string& Message::getContent() { return impl->getBytes(); } +std::string Message::getContent() const { return impl->getBytes(); } -void Message::getContent(std::pair& content) const +const char* Message::getContentPtr() const { - content.first = impl->getBytes().data(); - content.second = impl->getBytes().size(); + return impl->getBytes().data(); +} + +size_t Message::getContentSize() const +{ + return impl->getBytes().size(); +} + + +EncodingException::EncodingException(const std::string& msg) : qpid::Exception(msg) {} + +const std::string BAD_ENCODING("Unsupported encoding: %1% (only %2% is supported at present)."); + +bool checkEncoding(const std::string& requested, const std::string& supported) +{ + if (requested.size()) { + if (requested == supported) return true; + else throw EncodingException((boost::format(BAD_ENCODING) % requested % supported).str()); + } else { + return false; + } +} + +/* + * Currently only support a single encoding type for both list and + * map, based on AMQP 0-10, though wider support is anticipated in the + * future. This method simply checks that the desired encoding (if one + * is specified, either through the message-content or through an + * override) is indeed supported. + */ +void checkEncoding(const Message& message, const std::string& requested, const std::string& supported) +{ + checkEncoding(requested, supported) || checkEncoding(message.getContentType(), supported); +} + +void decode(const Message& message, Variant::Map& map, const std::string& encoding) +{ + checkEncoding(message, encoding, qpid::amqp_0_10::MapCodec::contentType); + qpid::amqp_0_10::MapCodec::decode(message.getContent(), map); +} +void decode(const Message& message, Variant::List& list, const std::string& encoding) +{ + checkEncoding(message, encoding, qpid::amqp_0_10::ListCodec::contentType); + qpid::amqp_0_10::ListCodec::decode(message.getContent(), list); +} +void encode(const Variant::Map& map, Message& message, const std::string& encoding) +{ + checkEncoding(message, encoding, qpid::amqp_0_10::MapCodec::contentType); + std::string content; + qpid::amqp_0_10::MapCodec::encode(map, content); + message.setContent(content); +} +void encode(const Variant::List& list, Message& message, const std::string& encoding) +{ + checkEncoding(message, encoding, qpid::amqp_0_10::ListCodec::contentType); + std::string content; + qpid::amqp_0_10::ListCodec::encode(list, content); + message.setContent(content); } }} // namespace qpid::messaging diff --git a/cpp/src/qpid/messaging/MessageImpl.cpp b/cpp/src/qpid/messaging/MessageImpl.cpp index dea6681244..6138ab4a2c 100644 --- a/cpp/src/qpid/messaging/MessageImpl.cpp +++ b/cpp/src/qpid/messaging/MessageImpl.cpp @@ -54,8 +54,8 @@ const std::string& MessageImpl::getSubject() const { return subject; } void MessageImpl::setContentType(const std::string& s) { contentType = s; } const std::string& MessageImpl::getContentType() const { return contentType; } -const VariantMap& MessageImpl::getHeaders() const { return headers; } -VariantMap& MessageImpl::getHeaders() { return headers; } +const Variant::Map& MessageImpl::getHeaders() const { return headers; } +Variant::Map& MessageImpl::getHeaders() { return headers; } //should these methods be on MessageContent? void MessageImpl::setBytes(const std::string& c) { bytes = c; } diff --git a/cpp/src/qpid/messaging/Receiver.cpp b/cpp/src/qpid/messaging/Receiver.cpp index df13052671..ff67650cf8 100644 --- a/cpp/src/qpid/messaging/Receiver.cpp +++ b/cpp/src/qpid/messaging/Receiver.cpp @@ -39,8 +39,8 @@ bool Receiver::fetch(Message& message, Duration timeout) { return impl->fetch(me Message Receiver::fetch(Duration timeout) { return impl->fetch(timeout); } void Receiver::setCapacity(uint32_t c) { impl->setCapacity(c); } uint32_t Receiver::getCapacity() { return impl->getCapacity(); } -uint32_t Receiver::available() { return impl->available(); } -uint32_t Receiver::pendingAck() { return impl->pendingAck(); } +uint32_t Receiver::getAvailable() { return impl->available(); } +uint32_t Receiver::getPendingAck() { return impl->pendingAck(); } void Receiver::close() { impl->close(); } const std::string& Receiver::getName() const { return impl->getName(); } Session Receiver::getSession() const { return impl->getSession(); } diff --git a/cpp/src/qpid/messaging/Sender.cpp b/cpp/src/qpid/messaging/Sender.cpp index 711a857d7a..2d5cfbcec5 100644 --- a/cpp/src/qpid/messaging/Sender.cpp +++ b/cpp/src/qpid/messaging/Sender.cpp @@ -36,7 +36,7 @@ void Sender::send(const Message& message) { impl->send(message); } void Sender::close() { impl->close(); } void Sender::setCapacity(uint32_t c) { impl->setCapacity(c); } uint32_t Sender::getCapacity() { return impl->getCapacity(); } -uint32_t Sender::pending() { return impl->pending(); } +uint32_t Sender::getPending() { return impl->pending(); } const std::string& Sender::getName() const { return impl->getName(); } Session Sender::getSession() const { return impl->getSession(); } diff --git a/cpp/src/qpid/messaging/Session.cpp b/cpp/src/qpid/messaging/Session.cpp index 2ac19727e3..4d1e633a86 100644 --- a/cpp/src/qpid/messaging/Session.cpp +++ b/cpp/src/qpid/messaging/Session.cpp @@ -81,8 +81,8 @@ Receiver Session::nextReceiver(Duration timeout) return impl->nextReceiver(timeout); } -uint32_t Session::available() { return impl->available(); } -uint32_t Session::pendingAck() { return impl->pendingAck(); } +uint32_t Session::getAvailable() { return impl->available(); } +uint32_t Session::getPendingAck() { return impl->pendingAck(); } Sender Session::getSender(const std::string& name) const { diff --git a/cpp/src/qpid/types/Variant.cpp b/cpp/src/qpid/types/Variant.cpp index 3729b6c947..904a596e82 100644 --- a/cpp/src/qpid/types/Variant.cpp +++ b/cpp/src/qpid/types/Variant.cpp @@ -633,7 +633,7 @@ Variant::operator int32_t() const { return asInt32(); } Variant::operator int64_t() const { return asInt64(); } Variant::operator float() const { return asFloat(); } Variant::operator double() const { return asDouble(); } -Variant::operator const char*() const { return asString().c_str(); } +Variant::operator std::string() const { return asString(); } Variant::operator Uuid() const { return asUuid(); } std::ostream& operator<<(std::ostream& out, const Variant::Map& map) diff --git a/cpp/src/tests/Address.cpp b/cpp/src/tests/Address.cpp index 01d8683efe..a0b87e25af 100644 --- a/cpp/src/tests/Address.cpp +++ b/cpp/src/tests/Address.cpp @@ -49,9 +49,9 @@ QPID_AUTO_TEST_CASE(testParseOptions) { Address address("my-topic; {a:bc, x:101, y:'a string'}"); BOOST_CHECK_EQUAL(std::string("my-topic"), address.getName()); - BOOST_CHECK_EQUAL(std::string("bc"), address.getOption("a").asString()); - BOOST_CHECK_EQUAL((uint16_t) 101, address.getOption("x").asInt64()); - BOOST_CHECK_EQUAL(std::string("a string"), address.getOption("y").asString()); + BOOST_CHECK_EQUAL(std::string("bc"), address.getOptions()["a"].asString()); + BOOST_CHECK_EQUAL((uint16_t) 101, address.getOptions()["x"].asInt64()); + BOOST_CHECK_EQUAL(std::string("a string"), address.getOptions()["y"].asString()); } QPID_AUTO_TEST_CASE(testParseSubjectAndOptions) @@ -59,9 +59,9 @@ QPID_AUTO_TEST_CASE(testParseSubjectAndOptions) Address address("my-topic/my-subject; {a:bc, x:101, y:'a string'}"); BOOST_CHECK_EQUAL(std::string("my-topic"), address.getName()); BOOST_CHECK_EQUAL(std::string("my-subject"), address.getSubject()); - BOOST_CHECK_EQUAL(std::string("bc"), address.getOption("a").asString()); - BOOST_CHECK_EQUAL((uint16_t) 101, address.getOption("x").asInt64()); - BOOST_CHECK_EQUAL(std::string("a string"), address.getOption("y").asString()); + BOOST_CHECK_EQUAL(std::string("bc"), address.getOptions()["a"].asString()); + BOOST_CHECK_EQUAL((uint16_t) 101, address.getOptions()["x"].asInt64()); + BOOST_CHECK_EQUAL(std::string("a string"), address.getOptions()["y"].asString()); } QPID_AUTO_TEST_CASE(testParseNestedOptions) @@ -70,8 +70,8 @@ QPID_AUTO_TEST_CASE(testParseNestedOptions) BOOST_CHECK_EQUAL(std::string("my-topic"), address.getName()); BOOST_CHECK_EQUAL((uint16_t) 202, address.getOptions()["a"].asMap()["p"].asInt64()); BOOST_CHECK_EQUAL(std::string("another string"), address.getOptions()["a"].asMap()["q"].asString()); - BOOST_CHECK_EQUAL((uint16_t) 101, address.getOption("x").asInt64()); - BOOST_CHECK_EQUAL(std::string("a string"), address.getOption("y").asString()); + BOOST_CHECK_EQUAL((uint16_t) 101, address.getOptions()["x"].asInt64()); + BOOST_CHECK_EQUAL(std::string("a string"), address.getOptions()["y"].asString()); } QPID_AUTO_TEST_CASE(testParseOptionsWithList) @@ -84,7 +84,7 @@ QPID_AUTO_TEST_CASE(testParseOptionsWithList) BOOST_CHECK_EQUAL((uint16_t) 202, i->asInt64()); BOOST_CHECK(++i != list.end()); BOOST_CHECK_EQUAL(std::string("another string"), i->asString()); - BOOST_CHECK_EQUAL((uint16_t) 101, address.getOption("x").asInt64()); + BOOST_CHECK_EQUAL((uint16_t) 101, address.getOptions()["x"].asInt64()); } QPID_AUTO_TEST_CASE(testParseQuotedNameAndSubject) diff --git a/cpp/src/tests/MessagingSessionTests.cpp b/cpp/src/tests/MessagingSessionTests.cpp index d1f7441216..bdfb8b389c 100644 --- a/cpp/src/tests/MessagingSessionTests.cpp +++ b/cpp/src/tests/MessagingSessionTests.cpp @@ -23,10 +23,6 @@ #include "BrokerFixture.h" #include "qpid/messaging/Address.h" #include "qpid/messaging/Connection.h" -#include "qpid/messaging/ListContent.h" -#include "qpid/messaging/ListView.h" -#include "qpid/messaging/MapContent.h" -#include "qpid/messaging/MapView.h" #include "qpid/messaging/Message.h" #include "qpid/messaging/Receiver.h" #include "qpid/messaging/Sender.h" @@ -112,7 +108,7 @@ struct MessagingFixture : public BrokerFixture MessagingFixture(Broker::Options opts = Broker::Options()) : BrokerFixture(opts), connection(open(broker->getPort(Broker::TCP_TRANSPORT))), - session(connection.newSession()), + session(connection.createSession()), admin(broker->getPort(Broker::TCP_TRANSPORT)) { } @@ -259,7 +255,7 @@ QPID_AUTO_TEST_CASE(testSenderError) MessagingFixture fix; ScopedSuppressLogging sl; BOOST_CHECK_THROW(fix.session.createSender("NonExistentAddress"), qpid::messaging::InvalidAddress); - fix.session = fix.connection.newSession(); + fix.session = fix.connection.createSession(); BOOST_CHECK_THROW(fix.session.createSender("NonExistentAddress; {create:receiver}"), qpid::messaging::InvalidAddress); } @@ -269,7 +265,7 @@ QPID_AUTO_TEST_CASE(testReceiverError) MessagingFixture fix; ScopedSuppressLogging sl; BOOST_CHECK_THROW(fix.session.createReceiver("NonExistentAddress"), qpid::messaging::InvalidAddress); - fix.session = fix.connection.newSession(); + fix.session = fix.connection.createSession(); BOOST_CHECK_THROW(fix.session.createReceiver("NonExistentAddress; {create:sender}"), qpid::messaging::InvalidAddress); } @@ -334,7 +330,7 @@ QPID_AUTO_TEST_CASE(testMapMessage) QueueFixture fix; Sender sender = fix.session.createSender(fix.queue); Message out; - MapContent content(out); + Variant::Map content; content["abc"] = "def"; content["pi"] = 3.14f; Variant utf8("A utf 8 string"); @@ -343,11 +339,12 @@ QPID_AUTO_TEST_CASE(testMapMessage) Variant utf16("\x00\x61\x00\x62\x00\x63"); utf16.setEncoding("utf16"); content["utf16"] = utf16; - content.encode(); + encode(content, out); sender.send(out); Receiver receiver = fix.session.createReceiver(fix.queue); Message in = receiver.fetch(5 * Duration::SECOND); - MapView view(in); + Variant::Map view; + decode(in, view); BOOST_CHECK_EQUAL(view["abc"].asString(), "def"); BOOST_CHECK_EQUAL(view["pi"].asFloat(), 3.14f); BOOST_CHECK_EQUAL(view["utf8"].asString(), utf8.asString()); @@ -365,12 +362,12 @@ QPID_AUTO_TEST_CASE(testMapMessageWithInitial) Variant::Map imap; imap["abc"] = "def"; imap["pi"] = 3.14f; - MapContent content(out, imap); - content.encode(); + encode(imap, out); sender.send(out); Receiver receiver = fix.session.createReceiver(fix.queue); Message in = receiver.fetch(5 * Duration::SECOND); - MapView view(in); + Variant::Map view; + decode(in, view); BOOST_CHECK_EQUAL(view["abc"].asString(), "def"); BOOST_CHECK_EQUAL(view["pi"].asFloat(), 3.14f); fix.session.acknowledge(); @@ -381,21 +378,22 @@ QPID_AUTO_TEST_CASE(testListMessage) QueueFixture fix; Sender sender = fix.session.createSender(fix.queue); Message out; - ListContent content(out); + Variant::List content; content.push_back(Variant("abc")); content.push_back(Variant(1234)); content.push_back(Variant("def")); content.push_back(Variant(56.789)); - content.encode(); + encode(content, out); sender.send(out); Receiver receiver = fix.session.createReceiver(fix.queue); Message in = receiver.fetch(5 * Duration::SECOND); - ListView view(in); + Variant::List view; + decode(in, view); BOOST_CHECK_EQUAL(view.size(), content.size()); BOOST_CHECK_EQUAL(view.front().asString(), "abc"); BOOST_CHECK_EQUAL(view.back().asDouble(), 56.789); - ListView::const_iterator i = view.begin(); + Variant::List::const_iterator i = view.begin(); BOOST_CHECK(i != view.end()); BOOST_CHECK_EQUAL(i->asString(), "abc"); BOOST_CHECK(++i != view.end()); @@ -419,17 +417,17 @@ QPID_AUTO_TEST_CASE(testListMessageWithInitial) ilist.push_back(Variant(1234)); ilist.push_back(Variant("def")); ilist.push_back(Variant(56.789)); - ListContent content(out, ilist); - content.encode(); + encode(ilist, out); sender.send(out); Receiver receiver = fix.session.createReceiver(fix.queue); Message in = receiver.fetch(5 * Duration::SECOND); - ListView view(in); - BOOST_CHECK_EQUAL(view.size(), content.size()); + Variant::List view; + decode(in, view); + BOOST_CHECK_EQUAL(view.size(), ilist.size()); BOOST_CHECK_EQUAL(view.front().asString(), "abc"); BOOST_CHECK_EQUAL(view.back().asDouble(), 56.789); - ListView::const_iterator i = view.begin(); + Variant::List::const_iterator i = view.begin(); BOOST_CHECK(i != view.end()); BOOST_CHECK_EQUAL(i->asString(), "abc"); BOOST_CHECK(++i != view.end()); @@ -481,16 +479,16 @@ QPID_AUTO_TEST_CASE(testAvailable) } qpid::sys::sleep(1);//is there any avoid an arbitrary sleep while waiting for messages to be dispatched? for (uint i = 0; i < 5; ++i) { - BOOST_CHECK_EQUAL(fix.session.available(), 15u - 2*i); - BOOST_CHECK_EQUAL(r1.available(), 10u - i); + BOOST_CHECK_EQUAL(fix.session.getAvailable(), 15u - 2*i); + BOOST_CHECK_EQUAL(r1.getAvailable(), 10u - i); BOOST_CHECK_EQUAL(r1.fetch().getContent(), (boost::format("A_%1%") % (i+1)).str()); - BOOST_CHECK_EQUAL(r2.available(), 5u - i); + BOOST_CHECK_EQUAL(r2.getAvailable(), 5u - i); BOOST_CHECK_EQUAL(r2.fetch().getContent(), (boost::format("B_%1%") % (i+1)).str()); fix.session.acknowledge(); } for (uint i = 5; i < 10; ++i) { - BOOST_CHECK_EQUAL(fix.session.available(), 10u - i); - BOOST_CHECK_EQUAL(r1.available(), 10u - i); + BOOST_CHECK_EQUAL(fix.session.getAvailable(), 10u - i); + BOOST_CHECK_EQUAL(r1.getAvailable(), 10u - i); BOOST_CHECK_EQUAL(r1.fetch().getContent(), (boost::format("A_%1%") % (i+1)).str()); } } @@ -506,11 +504,11 @@ QPID_AUTO_TEST_CASE(testPendingAck) for (uint i = 0; i < 10; ++i) { BOOST_CHECK_EQUAL(receiver.fetch().getContent(), (boost::format("Message_%1%") % (i+1)).str()); } - BOOST_CHECK_EQUAL(fix.session.pendingAck(), 0u); + BOOST_CHECK_EQUAL(fix.session.getPendingAck(), 0u); fix.session.acknowledge(); - BOOST_CHECK_EQUAL(fix.session.pendingAck(), 10u); + BOOST_CHECK_EQUAL(fix.session.getPendingAck(), 10u); fix.session.sync(); - BOOST_CHECK_EQUAL(fix.session.pendingAck(), 0u); + BOOST_CHECK_EQUAL(fix.session.getPendingAck(), 0u); } QPID_AUTO_TEST_CASE(testPendingSend) @@ -522,9 +520,9 @@ QPID_AUTO_TEST_CASE(testPendingSend) //implementation and the fact that the simple test case makes it //possible to predict when completion information will be sent to //the client. TODO: is there a better way of testing this? - BOOST_CHECK_EQUAL(sender.pending(), 10u); + BOOST_CHECK_EQUAL(sender.getPending(), 10u); fix.session.sync(); - BOOST_CHECK_EQUAL(sender.pending(), 0u); + BOOST_CHECK_EQUAL(sender.getPending(), 0u); Receiver receiver = fix.session.createReceiver(fix.queue); receive(receiver, 10); @@ -797,7 +795,7 @@ QPID_AUTO_TEST_CASE(testGetReceiver) QPID_AUTO_TEST_CASE(testGetSessionFromConnection) { QueueFixture fix; - fix.connection.newSession("my-session"); + fix.connection.createSession("my-session"); Session session = fix.connection.getSession("my-session"); Message out(Uuid(true).str()); session.createSender(fix.queue).send(out); @@ -814,7 +812,7 @@ QPID_AUTO_TEST_CASE(testGetConnectionFromSession) Sender sender = fix.session.createSender(fix.queue); sender.send(out); Message in; - sender.getSession().getConnection().newSession("incoming"); + sender.getSession().getConnection().createSession("incoming"); BOOST_CHECK(fix.connection.getSession("incoming").createReceiver(fix.queue).fetch(in)); BOOST_CHECK_EQUAL(out.getContent(), in.getContent()); } @@ -822,8 +820,8 @@ QPID_AUTO_TEST_CASE(testGetConnectionFromSession) QPID_AUTO_TEST_CASE(testTx) { QueueFixture fix; - Session ssn1 = fix.connection.newSession(true); - Session ssn2 = fix.connection.newSession(true); + Session ssn1 = fix.connection.createTransactionalSession(); + Session ssn2 = fix.connection.createTransactionalSession(); Sender sender1 = ssn1.createSender(fix.queue); Sender sender2 = ssn2.createSender(fix.queue); Receiver receiver1 = ssn1.createReceiver(fix.queue); diff --git a/cpp/src/tests/qpid_recv.cpp b/cpp/src/tests/qpid_recv.cpp index 0a268ad6cf..160830c826 100644 --- a/cpp/src/tests/qpid_recv.cpp +++ b/cpp/src/tests/qpid_recv.cpp @@ -153,7 +153,7 @@ int main(int argc, char ** argv) try { connection.open(opts.url); std::auto_ptr updates(opts.failoverUpdates ? new FailoverUpdates(connection) : 0); - Session session = connection.newSession(opts.tx > 0); + Session session = opts.tx ? connection.createTransactionalSession() : connection.createSession(); Receiver receiver = session.createReceiver(opts.address); receiver.setCapacity(opts.capacity); Message msg; diff --git a/cpp/src/tests/qpid_send.cpp b/cpp/src/tests/qpid_send.cpp index 564007958e..f828e6077c 100644 --- a/cpp/src/tests/qpid_send.cpp +++ b/cpp/src/tests/qpid_send.cpp @@ -21,7 +21,6 @@ #include #include -#include #include #include #include @@ -158,7 +157,7 @@ struct Options : public qpid::Options } } - void setEntries(MapContent& content) const + void setEntries(Variant::Map& content) const { for (string_vector::const_iterator i = entries.begin(); i != entries.end(); ++i) { std::string name; @@ -186,7 +185,7 @@ int main(int argc, char ** argv) try { connection.open(opts.url); std::auto_ptr updates(opts.failoverUpdates ? new FailoverUpdates(connection) : 0); - Session session = connection.newSession(opts.tx > 0); + Session session = opts.tx ? connection.createTransactionalSession() : connection.createSession(); Sender sender = session.createSender(opts.address); if (opts.capacity) sender.setCapacity(opts.capacity); Message msg; diff --git a/cpp/src/tests/qpid_stream.cpp b/cpp/src/tests/qpid_stream.cpp index 642b7a821f..eafa6e96fa 100644 --- a/cpp/src/tests/qpid_stream.cpp +++ b/cpp/src/tests/qpid_stream.cpp @@ -91,7 +91,7 @@ struct Client : qpid::sys::Runnable Connection connection; try { connection.open(opts.url); - Session session = connection.newSession(); + Session session = connection.createSession(); doWork(session); session.close(); connection.close(); -- cgit v1.2.1