diff options
author | Gordon Sim <gsim@apache.org> | 2009-09-07 18:09:00 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2009-09-07 18:09:00 +0000 |
commit | 8f0fd209fb0b31814fb7b1ff7a7271d1e3239cd3 (patch) | |
tree | 004b6eb7cfede3cf2b043373b002b4bccf3a3fd9 | |
parent | f44db8535081dfbf83197ccb0ae5bc24e82b714b (diff) | |
download | qpid-python-8f0fd209fb0b31814fb7b1ff7a7271d1e3239cd3.tar.gz |
QPID-664: Added automatic message replay on reconnection.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@812243 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/include/qpid/messaging/Sender.h | 2 | ||||
-rw-r--r-- | cpp/src/CMakeLists.txt | 3 | ||||
-rw-r--r-- | cpp/src/Makefile.am | 3 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/AddressResolution.cpp | 23 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/CodecsInternal.h | 41 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/MessageSink.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp | 64 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/OutgoingMessage.h | 46 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/SenderImpl.cpp | 36 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/SenderImpl.h | 18 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/MessageImpl.cpp | 13 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/MessageImpl.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/Sender.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/SenderImpl.h | 2 | ||||
-rw-r--r-- | cpp/src/tests/MessagingSessionTests.cpp | 2 |
16 files changed, 233 insertions, 31 deletions
diff --git a/cpp/include/qpid/messaging/Sender.h b/cpp/include/qpid/messaging/Sender.h index 657c4b8cfe..45ec659ecf 100644 --- a/cpp/include/qpid/messaging/Sender.h +++ b/cpp/include/qpid/messaging/Sender.h @@ -47,7 +47,7 @@ class Sender : public qpid::client::Handle<SenderImpl> QPID_CLIENT_EXTERN ~Sender(); QPID_CLIENT_EXTERN Sender& operator=(const Sender&); - QPID_CLIENT_EXTERN void send(Message& message); + QPID_CLIENT_EXTERN void send(const Message& message); QPID_CLIENT_EXTERN void cancel(); private: friend class qpid::client::PrivateImplRef<Sender>; diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index 92b76b5e41..ff0188890b 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -540,6 +540,7 @@ set (libqpidclient_SOURCES qpid/client/amqp0_10/AddressResolution.h qpid/client/amqp0_10/AddressResolution.cpp qpid/client/amqp0_10/Codecs.cpp + qpid/client/amqp0_10/CodecsInternal.h qpid/client/amqp0_10/CompletionTracker.h qpid/client/amqp0_10/CompletionTracker.cpp qpid/client/amqp0_10/ConnectionImpl.h @@ -548,6 +549,8 @@ set (libqpidclient_SOURCES qpid/client/amqp0_10/IncomingMessages.cpp qpid/client/amqp0_10/MessageSink.h qpid/client/amqp0_10/MessageSource.h + qpid/client/amqp0_10/OutgoingMessage.h + qpid/client/amqp0_10/OutgoingMessage.cpp qpid/client/amqp0_10/ReceiverImpl.h qpid/client/amqp0_10/ReceiverImpl.cpp qpid/client/amqp0_10/SessionImpl.h diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 744e3b7a42..75cda31dbb 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -700,6 +700,7 @@ libqpidclient_la_SOURCES = \ qpid/client/amqp0_10/AddressResolution.h \ qpid/client/amqp0_10/AddressResolution.cpp \ qpid/client/amqp0_10/Codecs.cpp \ + qpid/client/amqp0_10/CodecsInternal.h \ qpid/client/amqp0_10/ConnectionImpl.h \ qpid/client/amqp0_10/ConnectionImpl.cpp \ qpid/client/amqp0_10/CompletionTracker.h \ @@ -708,6 +709,8 @@ libqpidclient_la_SOURCES = \ qpid/client/amqp0_10/IncomingMessages.cpp \ qpid/client/amqp0_10/MessageSink.h \ qpid/client/amqp0_10/MessageSource.h \ + qpid/client/amqp0_10/OutgoingMessage.h \ + qpid/client/amqp0_10/OutgoingMessage.cpp \ qpid/client/amqp0_10/ReceiverImpl.h \ qpid/client/amqp0_10/ReceiverImpl.cpp \ qpid/client/amqp0_10/SessionImpl.h \ diff --git a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp index 6ff9c2397a..9b9f06ec57 100644 --- a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp +++ b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp @@ -22,6 +22,7 @@ #include "qpid/client/amqp0_10/Codecs.h" #include "qpid/client/amqp0_10/MessageSource.h" #include "qpid/client/amqp0_10/MessageSink.h" +#include "qpid/client/amqp0_10/OutgoingMessage.h" #include "qpid/messaging/Address.h" #include "qpid/messaging/Filter.h" #include "qpid/messaging/Message.h" @@ -122,7 +123,7 @@ class Exchange : public MessageSink bool passive = true, const std::string& type = EMPTY_STRING, bool durable = false, const FieldTable& options = EMPTY_FIELD_TABLE); void declare(qpid::client::AsyncSession& session, const std::string& name); - void send(qpid::client::AsyncSession& session, const std::string& name, qpid::messaging::Message& message); + void send(qpid::client::AsyncSession& session, const std::string& name, OutgoingMessage& message); void cancel(qpid::client::AsyncSession& session, const std::string& name); private: const std::string name; @@ -139,7 +140,7 @@ class QueueSink : public MessageSink QueueSink(const std::string& name, bool passive=true, bool exclusive=false, bool autoDelete=false, bool durable=false, const FieldTable& options = EMPTY_FIELD_TABLE); void declare(qpid::client::AsyncSession& session, const std::string& name); - void send(qpid::client::AsyncSession& session, const std::string& name, qpid::messaging::Message& message); + void send(qpid::client::AsyncSession& session, const std::string& name, OutgoingMessage& message); void cancel(qpid::client::AsyncSession& session, const std::string& name); private: const std::string name; @@ -328,14 +329,12 @@ void Exchange::declare(qpid::client::AsyncSession& session, const std::string&) } } -void Exchange::send(qpid::client::AsyncSession& session, const std::string&, qpid::messaging::Message& m) +void Exchange::send(qpid::client::AsyncSession& session, const std::string&, OutgoingMessage& m) { - qpid::client::Message message; - convert(m, message); - if (message.getDeliveryProperties().getRoutingKey().empty() && !defaultSubject.empty()) { - message.getDeliveryProperties().setRoutingKey(defaultSubject); + if (m.message.getDeliveryProperties().getRoutingKey().empty() && !defaultSubject.empty()) { + m.message.getDeliveryProperties().setRoutingKey(defaultSubject); } - session.messageTransfer(arg::destination=name, arg::content=message); + m.status = session.messageTransfer(arg::destination=name, arg::content=m.message); } void Exchange::cancel(qpid::client::AsyncSession&, const std::string&) {} @@ -355,12 +354,10 @@ void QueueSink::declare(qpid::client::AsyncSession& session, const std::string&) arg::autoDelete=autoDelete, arg::arguments=options); } } -void QueueSink::send(qpid::client::AsyncSession& session, const std::string&, qpid::messaging::Message& m) +void QueueSink::send(qpid::client::AsyncSession& session, const std::string&, OutgoingMessage& m) { - qpid::client::Message message; - convert(m, message); - message.getDeliveryProperties().setRoutingKey(name); - session.messageTransfer(arg::content=message); + m.message.getDeliveryProperties().setRoutingKey(name); + m.status = session.messageTransfer(arg::content=m.message); } void QueueSink::cancel(qpid::client::AsyncSession&, const std::string&) {} diff --git a/cpp/src/qpid/client/amqp0_10/CodecsInternal.h b/cpp/src/qpid/client/amqp0_10/CodecsInternal.h new file mode 100644 index 0000000000..b44b725f52 --- /dev/null +++ b/cpp/src/qpid/client/amqp0_10/CodecsInternal.h @@ -0,0 +1,41 @@ +#ifndef QPID_CLIENT_AMQP0_10_CODECSINTERNAL_H +#define QPID_CLIENT_AMQP0_10_CODECSINTERNAL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/Variant.h" +#include "qpid/framing/FieldTable.h" + +namespace qpid { +namespace client { +namespace amqp0_10 { + +/** + * Declarations of a couple of conversion functions implemented in + * Codecs.cpp but not exposed through API + */ + +void translate(const qpid::messaging::Variant::Map& from, qpid::framing::FieldTable& to); +void translate(const qpid::framing::FieldTable& from, qpid::messaging::Variant::Variant::Map& to); + +}}} // namespace qpid::client::amqp0_10 + +#endif /*!QPID_CLIENT_AMQP0_10_CODECSINTERNAL_H*/ diff --git a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp index c54a186365..b0a16674e1 100644 --- a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp +++ b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp @@ -21,6 +21,7 @@ #include "qpid/client/amqp0_10/IncomingMessages.h" #include "qpid/client/amqp0_10/AddressResolution.h" #include "qpid/client/amqp0_10/Codecs.h" +#include "qpid/client/amqp0_10/CodecsInternal.h" #include "qpid/client/SessionImpl.h" #include "qpid/client/SessionBase_0_10Access.h" #include "qpid/log/Statement.h" @@ -195,8 +196,6 @@ void IncomingMessages::MessageTransfer::retrieve(qpid::messaging::Message* messa parent.retrieve(content, message); } -void translate(const FieldTable& from, Variant::Map& to);//implemented in Codecs.cpp - void populateHeaders(qpid::messaging::Message& message, const DeliveryProperties* deliveryProperties, const MessageProperties* messageProperties) diff --git a/cpp/src/qpid/client/amqp0_10/MessageSink.h b/cpp/src/qpid/client/amqp0_10/MessageSink.h index 19d5e4ef82..d66d2ecb3c 100644 --- a/cpp/src/qpid/client/amqp0_10/MessageSink.h +++ b/cpp/src/qpid/client/amqp0_10/MessageSink.h @@ -33,6 +33,8 @@ class Message; namespace client { namespace amqp0_10 { +class OutgoingMessage; + /** * */ @@ -41,7 +43,7 @@ class MessageSink public: virtual ~MessageSink() {} virtual void declare(qpid::client::AsyncSession& session, const std::string& name) = 0; - virtual void send(qpid::client::AsyncSession& session, const std::string& name, qpid::messaging::Message& message) = 0; + virtual void send(qpid::client::AsyncSession& session, const std::string& name, OutgoingMessage& message) = 0; virtual void cancel(qpid::client::AsyncSession& session, const std::string& name) = 0; private: }; diff --git a/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp b/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp new file mode 100644 index 0000000000..716f955f98 --- /dev/null +++ b/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp @@ -0,0 +1,64 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/client/amqp0_10/OutgoingMessage.h" +#include "qpid/client/amqp0_10/AddressResolution.h" +#include "qpid/client/amqp0_10/Codecs.h" +#include "qpid/client/amqp0_10/CodecsInternal.h" +#include "qpid/messaging/Address.h" +#include "qpid/messaging/Message.h" +#include "qpid/messaging/MessageImpl.h" + +namespace qpid { +namespace client { +namespace amqp0_10 { + +using qpid::messaging::Address; +using qpid::messaging::MessageImplAccess; + +template <class T> void encode(const qpid::messaging::Message& from, qpid::client::Message& to) +{ + T codec; + MessageImplAccess::get(from).getEncodedContent(codec, to.getData()); + to.getMessageProperties().setContentType(T::contentType); +} + +void OutgoingMessage::convert(const qpid::messaging::Message& from) +{ + //TODO: need to avoid copying as much as possible + if (from.getContent().isList()) { + encode<ListCodec>(from, message); + } else if (from.getContent().isMap()) { + encode<MapCodec>(from, message); + } else { + message.setData(from.getBytes()); + message.getMessageProperties().setContentType(from.getContentType()); + } + const Address& address = from.getReplyTo(); + if (!address.value.empty()) { + message.getMessageProperties().setReplyTo(AddressResolution::convert(address)); + } + translate(from.getHeaders(), message.getMessageProperties().getApplicationHeaders()); + //TODO: set other message properties + message.getDeliveryProperties().setRoutingKey(from.getSubject()); + //TODO: set other delivery properties +} + +}}} // namespace qpid::client::amqp0_10 diff --git a/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h b/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h new file mode 100644 index 0000000000..8801e4e769 --- /dev/null +++ b/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h @@ -0,0 +1,46 @@ +#ifndef QPID_CLIENT_AMQP0_10_OUTGOINGMESSAGE_H +#define QPID_CLIENT_AMQP0_10_OUTGOINGMESSAGE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/client/Completion.h" +#include "qpid/client/Message.h" + +namespace qpid { +namespace messaging { +class Message; +} +namespace client { +namespace amqp0_10 { + +struct OutgoingMessage +{ + qpid::client::Message message; + qpid::client::Completion status; + + void convert(const qpid::messaging::Message&); +}; + + + +}}} // namespace qpid::client::amqp0_10 + +#endif /*!QPID_CLIENT_AMQP0_10_OUTGOINGMESSAGE_H*/ diff --git a/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp b/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp index e70ee8af6f..c619d1226a 100644 --- a/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp @@ -22,6 +22,7 @@ #include "MessageSink.h" #include "SessionImpl.h" #include "AddressResolution.h" +#include "OutgoingMessage.h" namespace qpid { namespace client { @@ -30,9 +31,10 @@ namespace amqp0_10 { SenderImpl::SenderImpl(SessionImpl& _parent, const std::string& _name, const qpid::messaging::Address& _address, const qpid::messaging::Variant::Map& _options) : - parent(_parent), name(_name), address(_address), options(_options), state(UNRESOLVED) {} + parent(_parent), name(_name), address(_address), options(_options), state(UNRESOLVED), + capacity(50), window(0) {} -void SenderImpl::send(qpid::messaging::Message& m) +void SenderImpl::send(const qpid::messaging::Message& m) { execute1<Send>(&m); } @@ -54,14 +56,36 @@ void SenderImpl::init(qpid::client::AsyncSession s, AddressResolution& resolver) parent.senderCancelled(name); } else { sink->declare(session, name); - //TODO: replay + replay(); } } -void SenderImpl::sendImpl(qpid::messaging::Message& m) +void SenderImpl::sendImpl(const qpid::messaging::Message& m) { - //TODO: record for replay if appropriate - sink->send(session, name, m); + //TODO: make recoding for replay optional + std::auto_ptr<OutgoingMessage> msg(new OutgoingMessage()); + msg->convert(m); + outgoing.push_back(msg.release()); + sink->send(session, name, outgoing.back()); + if (++window > (capacity / 2)) {//TODO: make this configurable? + session.flush(); + checkPendingSends(); + window = 0; + } +} + +void SenderImpl::replay() +{ + for (OutgoingMessages::iterator i = outgoing.begin(); i != outgoing.end(); ++i) { + sink->send(session, name, *i); + } +} + +void SenderImpl::checkPendingSends() +{ + while (!outgoing.empty() && outgoing.front().status.isComplete()) { + outgoing.pop_front(); + } } void SenderImpl::cancelImpl() diff --git a/cpp/src/qpid/client/amqp0_10/SenderImpl.h b/cpp/src/qpid/client/amqp0_10/SenderImpl.h index e7d7b11c0e..4ba793d71c 100644 --- a/cpp/src/qpid/client/amqp0_10/SenderImpl.h +++ b/cpp/src/qpid/client/amqp0_10/SenderImpl.h @@ -28,6 +28,7 @@ #include "qpid/client/AsyncSession.h" #include "qpid/client/amqp0_10/SessionImpl.h" #include <memory> +#include <boost/ptr_container/ptr_deque.hpp> namespace qpid { namespace client { @@ -35,6 +36,7 @@ namespace amqp0_10 { class AddressResolution; class MessageSink; +class OutgoingMessage; /** * @@ -47,7 +49,7 @@ class SenderImpl : public qpid::messaging::SenderImpl SenderImpl(SessionImpl& parent, const std::string& name, const qpid::messaging::Address& address, const qpid::messaging::Variant::Map& options); - void send(qpid::messaging::Message&); + void send(const qpid::messaging::Message&); void cancel(); void init(qpid::client::AsyncSession, AddressResolution&); @@ -63,8 +65,16 @@ class SenderImpl : public qpid::messaging::SenderImpl std::string destination; std::string routingKey; + typedef boost::ptr_deque<OutgoingMessage> OutgoingMessages; + OutgoingMessages outgoing; + uint32_t capacity; + uint32_t window; + + void checkPendingSends(); + void replay(); + //logic for application visible methods: - void sendImpl(qpid::messaging::Message&); + void sendImpl(const qpid::messaging::Message&); void cancelImpl(); //functors for application visible methods (allowing locking and @@ -78,9 +88,9 @@ class SenderImpl : public qpid::messaging::SenderImpl struct Send : Command { - qpid::messaging::Message* message; + const qpid::messaging::Message* message; - Send(SenderImpl& i, qpid::messaging::Message* m) : Command(i), message(m) {} + Send(SenderImpl& i, const qpid::messaging::Message* m) : Command(i), message(m) {} void operator()() { impl.sendImpl(*message); } }; diff --git a/cpp/src/qpid/messaging/MessageImpl.cpp b/cpp/src/qpid/messaging/MessageImpl.cpp index 402a93e753..5df9218e03 100644 --- a/cpp/src/qpid/messaging/MessageImpl.cpp +++ b/cpp/src/qpid/messaging/MessageImpl.cpp @@ -126,6 +126,15 @@ void MessageImpl::encode(Codec& codec) } } +void MessageImpl::getEncodedContent(Codec& codec, std::string& out) const +{ + if (content.getType() != VAR_VOID) { + codec.encode(content, out); + } else { + out = bytes; + } +} + void MessageImpl::decode(Codec& codec) { codec.decode(bytes, content); @@ -188,5 +197,9 @@ MessageImpl& MessageImplAccess::get(Message& msg) { return *msg.impl; } +const MessageImpl& MessageImplAccess::get(const Message& msg) +{ + return *msg.impl; +} }} // namespace qpid::messaging diff --git a/cpp/src/qpid/messaging/MessageImpl.h b/cpp/src/qpid/messaging/MessageImpl.h index 3b8f103688..1173e7570a 100644 --- a/cpp/src/qpid/messaging/MessageImpl.h +++ b/cpp/src/qpid/messaging/MessageImpl.h @@ -84,6 +84,7 @@ struct MessageImpl : MessageContent void clear(); + void getEncodedContent(Codec& codec, std::string&) const; void encode(Codec& codec); void decode(Codec& codec); @@ -125,6 +126,7 @@ class Message; struct MessageImplAccess { static MessageImpl& get(Message&); + static const MessageImpl& get(const Message&); }; }} // namespace qpid::messaging diff --git a/cpp/src/qpid/messaging/Sender.cpp b/cpp/src/qpid/messaging/Sender.cpp index 12a3a8eb0f..8db700b060 100644 --- a/cpp/src/qpid/messaging/Sender.cpp +++ b/cpp/src/qpid/messaging/Sender.cpp @@ -38,7 +38,7 @@ Sender::Sender(SenderImpl* impl) { PI::ctor(*this, impl); } Sender::Sender(const Sender& s) : qpid::client::Handle<SenderImpl>() { PI::copy(*this, s); } Sender::~Sender() { PI::dtor(*this); } Sender& Sender::operator=(const Sender& s) { return PI::assign(*this, s); } -void Sender::send(Message& message) { impl->send(message); } +void Sender::send(const Message& message) { impl->send(message); } void Sender::cancel() { impl->cancel(); } }} // namespace qpid::messaging diff --git a/cpp/src/qpid/messaging/SenderImpl.h b/cpp/src/qpid/messaging/SenderImpl.h index 3b61a37423..77d2cfaeaf 100644 --- a/cpp/src/qpid/messaging/SenderImpl.h +++ b/cpp/src/qpid/messaging/SenderImpl.h @@ -35,7 +35,7 @@ class SenderImpl : public virtual qpid::RefCounted { public: virtual ~SenderImpl() {} - virtual void send(Message& message) = 0; + virtual void send(const Message& message) = 0; virtual void cancel() = 0; private: }; diff --git a/cpp/src/tests/MessagingSessionTests.cpp b/cpp/src/tests/MessagingSessionTests.cpp index 2cbcc8d500..4ee27f0764 100644 --- a/cpp/src/tests/MessagingSessionTests.cpp +++ b/cpp/src/tests/MessagingSessionTests.cpp @@ -310,7 +310,6 @@ QPID_AUTO_TEST_CASE(testMapMessage) sender.send(out); Receiver receiver = fix.session.createReceiver(fix.queue); Message in = receiver.fetch(5 * qpid::sys::TIME_SEC); - BOOST_CHECK_EQUAL(in.getBytes(), out.getBytes()); BOOST_CHECK_EQUAL(in.getContent().asMap()["abc"].asString(), "def"); BOOST_CHECK_EQUAL(in.getContent().asMap()["pi"].asFloat(), 3.14f); fix.session.acknowledge(); @@ -329,7 +328,6 @@ QPID_AUTO_TEST_CASE(testListMessage) sender.send(out); Receiver receiver = fix.session.createReceiver(fix.queue); Message in = receiver.fetch(5 * qpid::sys::TIME_SEC); - BOOST_CHECK_EQUAL(in.getBytes(), out.getBytes()); Variant::List& list = in.getContent().asList(); BOOST_CHECK_EQUAL(list.size(), out.getContent().asList().size()); BOOST_CHECK_EQUAL(list.front().asString(), "abc"); |