diff options
-rw-r--r-- | cpp/src/Makefile.am | 7 | ||||
-rw-r--r-- | cpp/src/qpid/broker/BrokerAdapter.cpp | 15 | ||||
-rw-r--r-- | cpp/src/qpid/broker/BrokerAdapter.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/broker/BrokerChannel.cpp | 99 | ||||
-rw-r--r-- | cpp/src/qpid/broker/BrokerChannel.h | 38 | ||||
-rw-r--r-- | cpp/src/qpid/broker/BrokerMessageBase.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Connection.cpp | 13 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Connection.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/ConsumeAdapter.cpp | 37 | ||||
-rw-r--r-- | cpp/src/qpid/broker/ConsumeAdapter.h | 43 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DeliveryAdapter.h | 51 | ||||
-rw-r--r-- | cpp/src/qpid/broker/GetAdapter.cpp | 41 | ||||
-rw-r--r-- | cpp/src/qpid/broker/GetAdapter.h | 47 | ||||
-rw-r--r-- | cpp/src/qpid/broker/HandlerImpl.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageHandlerImpl.cpp | 11 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticHandler.cpp | 89 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticHandler.h | 55 | ||||
-rw-r--r-- | cpp/src/qpid/framing/ChannelAdapter.h | 12 | ||||
-rw-r--r-- | cpp/src/tests/BrokerChannelTest.cpp | 124 |
19 files changed, 506 insertions, 193 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 09d3f6185d..3399d861f2 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -197,6 +197,7 @@ libqpidbroker_la_SOURCES = \ qpid/broker/Connection.cpp \ qpid/broker/ConnectionAdapter.cpp \ qpid/broker/ConnectionFactory.cpp \ + qpid/broker/ConsumeAdapter.cpp \ qpid/broker/Daemon.cpp \ qpid/broker/DeliverableMessage.cpp \ qpid/broker/DeliveryRecord.cpp \ @@ -209,6 +210,7 @@ libqpidbroker_la_SOURCES = \ qpid/broker/DtxWorkRecord.cpp \ qpid/broker/ExchangeRegistry.cpp \ qpid/broker/FanOutExchange.cpp \ + qpid/broker/GetAdapter.cpp \ qpid/broker/HeadersExchange.cpp \ qpid/broker/InMemoryContent.cpp \ qpid/broker/LazyLoadedContent.cpp \ @@ -224,6 +226,7 @@ libqpidbroker_la_SOURCES = \ qpid/broker/RecoveredEnqueue.cpp \ qpid/broker/RecoveredDequeue.cpp \ qpid/broker/Reference.cpp \ + qpid/broker/SemanticHandler.cpp \ qpid/broker/Timer.cpp \ qpid/broker/TopicExchange.cpp \ qpid/broker/TxAck.cpp \ @@ -253,9 +256,11 @@ nobase_include_HEADERS = \ qpid/broker/BrokerMessageBase.h \ qpid/broker/BrokerQueue.h \ qpid/broker/CompletionHandler.h \ + qpid/broker/ConsumeAdapter.h \ qpid/broker/Consumer.h \ qpid/broker/Deliverable.h \ qpid/broker/DeliverableMessage.h \ + qpid/broker/DeliveryAdapter.h \ qpid/broker/DirectExchange.h \ qpid/broker/DtxAck.h \ qpid/broker/DtxBuffer.h \ @@ -265,6 +270,7 @@ nobase_include_HEADERS = \ qpid/broker/DtxWorkRecord.h \ qpid/broker/ExchangeRegistry.h \ qpid/broker/FanOutExchange.h \ + qpid/broker/GetAdapter.h \ qpid/broker/HandlerImpl.h \ qpid/broker/InMemoryContent.h \ qpid/broker/MessageBuilder.h \ @@ -306,6 +312,7 @@ nobase_include_HEADERS = \ qpid/broker/PersistableQueue.h \ qpid/broker/QueuePolicy.h \ qpid/broker/RecoveryManagerImpl.h \ + qpid/broker/SemanticHandler.h \ qpid/broker/Timer.h \ qpid/broker/TopicExchange.h \ qpid/broker/TransactionalStore.h \ diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp index bbf6686a6c..f0dc159752 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.cpp +++ b/cpp/src/qpid/broker/BrokerAdapter.cpp @@ -20,6 +20,8 @@ #include "BrokerAdapter.h" #include "BrokerChannel.h" #include "Connection.h" +#include "ConsumeAdapter.h" +#include "GetAdapter.h" #include "qpid/framing/AMQMethodBody.h" #include "qpid/Exception.h" @@ -33,8 +35,8 @@ using namespace qpid::framing; typedef std::vector<Queue::shared_ptr> QueueVector; -BrokerAdapter::BrokerAdapter(Channel& ch, Connection& c, Broker& b) : - CoreRefs(ch, c, b), + BrokerAdapter::BrokerAdapter(Channel& ch, Connection& c, Broker& b, ChannelAdapter& a) : + CoreRefs(ch, c, b, a), connection(c), basicHandler(*this), channelHandler(*this), @@ -299,9 +301,11 @@ void BrokerAdapter::BasicHandlerImpl::consume( if(!consumerTag.empty() && channel.exists(consumerTag)){ throw ConnectionException(530, "Consumer tags must be unique"); } - string newTag = consumerTag; - channel.consume( + //need to generate name here, so we have it for the adapter (it is + //also version specific behaviour now) + if (newTag.empty()) newTag = tagGenerator.generate(); + channel.consume(std::auto_ptr<DeliveryAdapter>(new ConsumeAdapter(adapter, newTag, connection.getFrameMax())), newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &fields); if(!nowait) client.consumeOk(newTag, context.getRequestId()); @@ -336,7 +340,8 @@ void BrokerAdapter::BasicHandlerImpl::publish( void BrokerAdapter::BasicHandlerImpl::get(const MethodContext& context, uint16_t /*ticket*/, const string& queueName, bool noAck){ Queue::shared_ptr queue = getQueue(queueName); - if(!channel.get(queue, "", !noAck)){ + GetAdapter out(adapter, queue, "", connection.getFrameMax()); + if(!channel.get(out, queue, !noAck)){ string clusterId;//not used, part of an imatix hack client.getEmpty(clusterId, context.getRequestId()); diff --git a/cpp/src/qpid/broker/BrokerAdapter.h b/cpp/src/qpid/broker/BrokerAdapter.h index c66bdb3a31..795744aa9a 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.h +++ b/cpp/src/qpid/broker/BrokerAdapter.h @@ -56,7 +56,7 @@ class MessageHandlerImpl; class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations { public: - BrokerAdapter(Channel& ch, Connection& c, Broker& b); + BrokerAdapter(Channel& ch, Connection& c, Broker& b, framing::ChannelAdapter& a); framing::ProtocolVersion getVersion() const; ChannelHandler* getChannelHandler() { return &channelHandler; } @@ -172,8 +172,10 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations public BasicHandler, public HandlerImpl<framing::AMQP_ClientProxy::Basic> { + NameGenerator tagGenerator; + public: - BasicHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {} + BasicHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent), tagGenerator("sgen") {} void qos(const framing::MethodContext& context, uint32_t prefetchSize, uint16_t prefetchCount, bool global); diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp index c81e73aba1..523a834715 100644 --- a/cpp/src/qpid/broker/BrokerChannel.cpp +++ b/cpp/src/qpid/broker/BrokerChannel.cpp @@ -28,7 +28,6 @@ #include <boost/bind.hpp> #include <boost/format.hpp> -#include "qpid/framing/ChannelAdapter.h" #include "qpid/QpidError.h" #include "BrokerAdapter.h" @@ -50,8 +49,8 @@ using namespace qpid::framing; using namespace qpid::sys; -Channel::Channel(Connection& con, ChannelId id, MessageStore* const _store) : - ChannelAdapter(), +Channel::Channel(Connection& con, ChannelId _id, MessageStore* const _store) : + id(_id), connection(con), currentDeliveryTag(1), prefetchSize(0), @@ -62,10 +61,8 @@ Channel::Channel(Connection& con, ChannelId id, MessageStore* const _store) : store(_store), messageBuilder(this, _store, connection.getStagingThreshold()), opened(id == 0),//channel 0 is automatically open, other must be explicitly opened - flowActive(true), - adapter(new BrokerAdapter(*this, con, con.broker)) + flowActive(true) { - init(id, con.getOutput(), con.getVersion()); outstanding.reset(); } @@ -79,14 +76,15 @@ bool Channel::exists(const string& consumerTag){ // TODO aconway 2007-02-12: Why is connection token passed in instead // of using the channel's parent connection? -void Channel::consume(string& tagInOut, Queue::shared_ptr queue, bool acks, +void Channel::consume(std::auto_ptr<DeliveryAdapter> adapter, string& tagInOut, + Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection, const FieldTable*) { if(tagInOut.empty()) tagInOut = tagGenerator.generate(); std::auto_ptr<ConsumerImpl> c( - new ConsumerImpl(this, tagInOut, queue, connection, acks)); + new ConsumerImpl(this, adapter, tagInOut, queue, connection, acks)); queue->consume(c.get(), exclusive);//may throw exception consumers.insert(tagInOut, c.release()); } @@ -195,22 +193,10 @@ void Channel::checkDtxTimeout() } } -void Channel::deliver( - Message::shared_ptr& msg, const string& consumerTag, - Queue::shared_ptr& queue, bool ackExpected) +void Channel::record(const DeliveryRecord& delivery) { - Mutex::ScopedLock locker(deliveryLock); - - // Key the delivered messages to the id of the request in which they're sent - uint64_t deliveryTag = getNextSendRequestId(); - - if(ackExpected){ - unacked.push_back(DeliveryRecord(msg, queue, consumerTag, deliveryTag)); - outstanding.size += msg->contentSize(); - outstanding.count++; - } - //send deliver method, header and content(s) - msg->deliver(*this, consumerTag, deliveryTag, connection.getFrameMax()); + unacked.push_back(delivery); + delivery.addTo(&outstanding); } bool Channel::checkPrefetch(Message::shared_ptr& msg){ @@ -220,11 +206,11 @@ bool Channel::checkPrefetch(Message::shared_ptr& msg){ return countOk && sizeOk; } -Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, const string& _tag, - Queue::shared_ptr _queue, - ConnectionToken* const _connection, bool ack -) : parent(_parent), tag(_tag), queue(_queue), connection(_connection), - ackExpected(ack), blocked(false) {} +Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, std::auto_ptr<DeliveryAdapter> _adapter, + const string& _tag, Queue::shared_ptr _queue, + ConnectionToken* const _connection, bool ack + ) : parent(_parent), adapter(_adapter), tag(_tag), queue(_queue), connection(_connection), + ackExpected(ack), blocked(false) {} bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){ if(!connection || connection != msg->getPublisher()){//check for no_local @@ -232,13 +218,25 @@ bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){ blocked = true; }else{ blocked = false; - parent->deliver(msg, tag, queue, ackExpected); + Mutex::ScopedLock locker(parent->deliveryLock); + + uint64_t deliveryTag = adapter->getNextDeliveryTag(); + if(ackExpected){ + parent->record(DeliveryRecord(msg, queue, tag, deliveryTag)); + } + adapter->deliver(msg, deliveryTag); + return true; } } return false; } +void Channel::ConsumerImpl::redeliver(Message::shared_ptr& msg, uint64_t deliveryTag) { + Mutex::ScopedLock locker(parent->deliveryLock); + adapter->deliver(msg, deliveryTag); +} + Channel::ConsumerImpl::~ConsumerImpl() { cancel(); } @@ -298,10 +296,6 @@ void Channel::complete(Message::shared_ptr msg) { } } -void Channel::ack(){ - ack(getFirstAckRequest(), getLastAckRequest()); -} - // Used by Basic void Channel::ack(uint64_t deliveryTag, bool multiple){ if (multiple) @@ -365,15 +359,12 @@ void Channel::recover(bool requeue){ } } -bool Channel::get(Queue::shared_ptr queue, const string& destination, bool ackExpected){ +bool Channel::get(DeliveryAdapter& adapter, Queue::shared_ptr queue, bool ackExpected){ Message::shared_ptr msg = queue->dequeue(); if(msg){ Mutex::ScopedLock locker(deliveryLock); - uint64_t myDeliveryTag = getNextSendRequestId(); - msg->sendGetOk(MethodContext(this, msg->getRespondTo()), - destination, - queue->getMessageCount() + 1, myDeliveryTag, - connection.getFrameMax()); + uint64_t myDeliveryTag = adapter.getNextDeliveryTag(); + adapter.deliver(msg, myDeliveryTag); if(ackExpected){ unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag)); } @@ -386,33 +377,9 @@ bool Channel::get(Queue::shared_ptr queue, const string& destination, bool ackEx void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, uint64_t deliveryTag) { - msg->deliver(*this, consumerTag, deliveryTag, connection.getFrameMax()); -} - -void Channel::handleMethodInContext( - boost::shared_ptr<qpid::framing::AMQMethodBody> method, - const MethodContext& context -) -{ - try{ - if(getId() != 0 && !method->isA<ChannelOpenBody>() && !isOpen()) { - if (!method->isA<ChannelCloseOkBody>()) { - std::stringstream out; - out << "Attempt to use unopened channel: " << getId(); - throw ConnectionException(504, out.str()); - } - } else { - method->invoke(*adapter, context); - } - }catch(ChannelException& e){ - adapter->getProxy().getChannel().close( - e.code, e.toString(), - method->amqpClassId(), method->amqpMethodId()); - connection.closeChannel(getId()); - }catch(ConnectionException& e){ - connection.close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId()); - }catch(std::exception& e){ - connection.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId()); + ConsumerImplMap::iterator i = consumers.find(consumerTag); + if (i != consumers.end()){ + i->redeliver(msg, deliveryTag); } } diff --git a/cpp/src/qpid/broker/BrokerChannel.h b/cpp/src/qpid/broker/BrokerChannel.h index 9212e8f632..a2b6bd3ef9 100644 --- a/cpp/src/qpid/broker/BrokerChannel.h +++ b/cpp/src/qpid/broker/BrokerChannel.h @@ -23,6 +23,7 @@ */ #include <list> +#include <memory> #include <boost/scoped_ptr.hpp> #include <boost/shared_ptr.hpp> @@ -30,6 +31,7 @@ #include "AccumulatedAck.h" #include "Consumer.h" +#include "DeliveryAdapter.h" #include "DeliveryRecord.h" #include "DtxBuffer.h" #include "DtxManager.h" @@ -37,6 +39,7 @@ #include "NameGenerator.h" #include "Prefetch.h" #include "TxBuffer.h" +#include "qpid/framing/amqp_types.h" #include "qpid/framing/ChannelAdapter.h" #include "qpid/framing/ChannelOpenBody.h" #include "CompletionHandler.h" @@ -55,12 +58,12 @@ using framing::string; * Maintains state for an AMQP channel. Handles incoming and * outgoing messages for that channel. */ -class Channel : public framing::ChannelAdapter, - public CompletionHandler +class Channel : public CompletionHandler { class ConsumerImpl : public Consumer { Channel* parent; + std::auto_ptr<DeliveryAdapter> adapter; const string tag; Queue::shared_ptr queue; ConnectionToken* const connection; @@ -68,17 +71,19 @@ class Channel : public framing::ChannelAdapter, bool blocked; public: - ConsumerImpl(Channel* parent, const string& tag, - Queue::shared_ptr queue, + ConsumerImpl(Channel* parent, std::auto_ptr<DeliveryAdapter> adapter, + const string& tag, Queue::shared_ptr queue, ConnectionToken* const connection, bool ack); ~ConsumerImpl(); - virtual bool deliver(Message::shared_ptr& msg); + bool deliver(Message::shared_ptr& msg); + void redeliver(Message::shared_ptr& msg, uint64_t deliveryTag); void cancel(); void requestDispatch(); }; typedef boost::ptr_map<string,ConsumerImpl> ConsumerImplMap; + framing::ChannelId id; Connection& connection; uint64_t currentDeliveryTag; Queue::shared_ptr defaultQueue; @@ -97,15 +102,10 @@ class Channel : public framing::ChannelAdapter, MessageBuilder messageBuilder;//builder for in-progress message bool opened; bool flowActive; - boost::scoped_ptr<BrokerAdapter> adapter; - - // completion handler for MessageBuilder - void complete(Message::shared_ptr msg); - - void deliver(Message::shared_ptr& msg, const string& tag, - Queue::shared_ptr& queue, bool ackExpected); + + void complete(Message::shared_ptr msg);// completion handler for MessageBuilder + void record(const DeliveryRecord& delivery); bool checkPrefetch(Message::shared_ptr& msg); - void checkDtxTimeout(); public: @@ -113,7 +113,7 @@ class Channel : public framing::ChannelAdapter, ~Channel(); bool isOpen() const { return opened; } - BrokerAdapter& getAdapter() { return *adapter; } + framing::ChannelId getId() const { return id; } void open() { opened = true; } void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; } @@ -126,11 +126,11 @@ class Channel : public framing::ChannelAdapter, /** *@param tagInOut - if empty it is updated with the generated token. */ - void consume(string& tagInOut, Queue::shared_ptr queue, bool acks, + void consume(std::auto_ptr<DeliveryAdapter> adapter, string& tagInOut, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection = 0, const framing::FieldTable* = 0); void cancel(const string& tag); - bool get(Queue::shared_ptr queue, const std::string& destination, bool ackExpected); + bool get(DeliveryAdapter& adapter, Queue::shared_ptr queue, bool ackExpected); void close(); void startTx(); void commit(); @@ -140,7 +140,6 @@ class Channel : public framing::ChannelAdapter, void endDtx(const std::string& xid, bool fail); void suspendDtx(const std::string& xid); void resumeDtx(const std::string& xid); - void ack(); void ack(uint64_t deliveryTag, bool multiple); void ack(uint64_t deliveryTag, uint64_t endTag); void recover(bool requeue); @@ -152,11 +151,6 @@ class Channel : public framing::ChannelAdapter, void handleHeartbeat(boost::shared_ptr<framing::AMQHeartbeatBody>); void handleInlineTransfer(Message::shared_ptr msg); - - // For ChannelAdapter - void handleMethodInContext( - boost::shared_ptr<framing::AMQMethodBody> method, - const framing::MethodContext& context); }; }} // namespace broker diff --git a/cpp/src/qpid/broker/BrokerMessageBase.h b/cpp/src/qpid/broker/BrokerMessageBase.h index a254986fd9..048b1c80e2 100644 --- a/cpp/src/qpid/broker/BrokerMessageBase.h +++ b/cpp/src/qpid/broker/BrokerMessageBase.h @@ -103,7 +103,7 @@ class Message : public PersistableMessage{ * Used to return a message in response to a get from a queue */ virtual void sendGetOk(const framing::MethodContext& context, - const std::string& destination, + const std::string& destination, uint32_t messageCount, uint64_t deliveryTag, uint32_t framesize) = 0; diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index cdbcee1c69..dfe2101bc0 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -26,6 +26,7 @@ #include "BrokerChannel.h" #include "qpid/framing/AMQP_ClientProxy.h" #include "BrokerAdapter.h" +#include "SemanticHandler.h" using namespace boost; using namespace qpid::sys; @@ -55,7 +56,7 @@ void Connection::received(framing::AMQFrame& frame){ if (frame.getChannel() == 0) { adapter.handle(frame); } else { - getChannel((frame.getChannel())).getHandlers().in->handle(frame); + getChannel((frame.getChannel())).in->handle(frame); } } @@ -92,17 +93,17 @@ void Connection::closed(){ void Connection::closeChannel(uint16_t id) { ChannelMap::iterator i = channels.find(id); - if (i != channels.end()) - i->close(); + if (i != channels.end()) channels.erase(i); } -Channel& Connection::getChannel(ChannelId id) { +FrameHandler::Chains& Connection::getChannel(ChannelId id) { ChannelMap::iterator i = channels.find(id); if (i == channels.end()) { - i = channels.insert(id, new Channel(*this, id, &broker.getStore())).first; + FrameHandler::Chains chains(new SemanticHandler(id, *this), new OutputHandlerFrameHandler(*out)); + i = channels.insert(ChannelMap::value_type(id, chains)).first; } - return *i; + return i->second; } diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h index a885ac4065..e38f88c2e9 100644 --- a/cpp/src/qpid/broker/Connection.h +++ b/cpp/src/qpid/broker/Connection.h @@ -51,7 +51,7 @@ class Connection : public sys::ConnectionInputHandler, Connection(sys::ConnectionOutputHandler* out, Broker& broker); /** Get a channel. Create if it does not already exist */ - Channel& getChannel(framing::ChannelId channel); + framing::FrameHandler::Chains& getChannel(framing::ChannelId channel); /** Close a channel */ void closeChannel(framing::ChannelId channel); @@ -82,7 +82,7 @@ class Connection : public sys::ConnectionInputHandler, void closed(); private: - typedef boost::ptr_map<framing::ChannelId, Channel> ChannelMap; + typedef std::map<framing::ChannelId, framing::FrameHandler::Chains> ChannelMap; typedef std::vector<Queue::shared_ptr>::iterator queue_iterator; Exchange::shared_ptr findExchange(const string& name); diff --git a/cpp/src/qpid/broker/ConsumeAdapter.cpp b/cpp/src/qpid/broker/ConsumeAdapter.cpp new file mode 100644 index 0000000000..59b6795a77 --- /dev/null +++ b/cpp/src/qpid/broker/ConsumeAdapter.cpp @@ -0,0 +1,37 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ConsumeAdapter.h" + +using namespace qpid::broker; +using qpid::framing::ChannelAdapter; +using qpid::framing::RequestId; + +ConsumeAdapter::ConsumeAdapter(ChannelAdapter& a, const std::string t, uint32_t f) : adapter(a), tag(t), framesize(f) {} + +RequestId ConsumeAdapter::getNextDeliveryTag() +{ + return adapter.getNextSendRequestId(); +} + +void ConsumeAdapter::deliver(Message::shared_ptr& msg, RequestId deliveryTag) +{ + msg->deliver(adapter, tag, deliveryTag, framesize); +} diff --git a/cpp/src/qpid/broker/ConsumeAdapter.h b/cpp/src/qpid/broker/ConsumeAdapter.h new file mode 100644 index 0000000000..43cda7753e --- /dev/null +++ b/cpp/src/qpid/broker/ConsumeAdapter.h @@ -0,0 +1,43 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _ConsumeAdapter_ +#define _ConsumeAdapter_ + +#include "DeliveryAdapter.h" +#include "qpid/framing/ChannelAdapter.h" + +namespace qpid { +namespace broker { + class ConsumeAdapter : public DeliveryAdapter + { + framing::ChannelAdapter& adapter; + const std::string tag; + const uint32_t framesize; + public: + ConsumeAdapter(framing::ChannelAdapter& adapter, const std::string tag, uint32_t framesize); + framing::RequestId getNextDeliveryTag(); + void deliver(Message::shared_ptr& msg, framing::RequestId tag); + }; + +}} + + +#endif diff --git a/cpp/src/qpid/broker/DeliveryAdapter.h b/cpp/src/qpid/broker/DeliveryAdapter.h new file mode 100644 index 0000000000..45b103bd68 --- /dev/null +++ b/cpp/src/qpid/broker/DeliveryAdapter.h @@ -0,0 +1,51 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _DeliveryAdapter_ +#define _DeliveryAdapter_ + +#include "BrokerMessageBase.h" +#include "qpid/framing/amqp_types.h" + +namespace qpid { +namespace broker { + + /** + * The intention behind this interface is to separate the generic + * handling of some form of message delivery to clients that is + * contained in the version independent Channel class from the + * details required for a particular situation or + * version. i.e. where the existing adapters allow (through + * supporting the generated interface for a version of the + * protocol) inputs of a channel to be adapted to the version + * independent part, this does the same for the outputs. + */ + class DeliveryAdapter + { + public: + virtual framing::RequestId getNextDeliveryTag() = 0; + virtual void deliver(Message::shared_ptr& msg, framing::RequestId tag) = 0; + virtual ~DeliveryAdapter(){} + }; + +}} + + +#endif diff --git a/cpp/src/qpid/broker/GetAdapter.cpp b/cpp/src/qpid/broker/GetAdapter.cpp new file mode 100644 index 0000000000..4a2f6d34d4 --- /dev/null +++ b/cpp/src/qpid/broker/GetAdapter.cpp @@ -0,0 +1,41 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "GetAdapter.h" +#include "qpid/framing/MethodContext.h" + +using namespace qpid::broker; +using qpid::framing::ChannelAdapter; +using qpid::framing::RequestId; +using qpid::framing::MethodContext; + +GetAdapter::GetAdapter(ChannelAdapter& a, Queue::shared_ptr q, const std::string d, uint32_t f) + : adapter(a), queue(q), destination(d), framesize(f) {} + +RequestId GetAdapter::getNextDeliveryTag() +{ + return adapter.getNextSendRequestId(); +} + +void GetAdapter::deliver(Message::shared_ptr& msg, framing::RequestId deliveryTag) +{ + msg->sendGetOk(MethodContext(&adapter, msg->getRespondTo()), destination, + queue->getMessageCount(), deliveryTag, framesize); +} diff --git a/cpp/src/qpid/broker/GetAdapter.h b/cpp/src/qpid/broker/GetAdapter.h new file mode 100644 index 0000000000..e90619a5f3 --- /dev/null +++ b/cpp/src/qpid/broker/GetAdapter.h @@ -0,0 +1,47 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _GetAdapter_ +#define _GetAdapter_ + +#include "BrokerQueue.h" +#include "DeliveryAdapter.h" +#include "qpid/framing/ChannelAdapter.h" + +namespace qpid { +namespace broker { + + class GetAdapter : public DeliveryAdapter + { + framing::ChannelAdapter& adapter; + Queue::shared_ptr queue; + const std::string destination; + const uint32_t framesize; + public: + GetAdapter(framing::ChannelAdapter& adapter, Queue::shared_ptr queue, const std::string destination, uint32_t framesize); + ~GetAdapter(){} + framing::RequestId getNextDeliveryTag(); + void deliver(Message::shared_ptr& msg, framing::RequestId tag); + }; + +}} + + +#endif diff --git a/cpp/src/qpid/broker/HandlerImpl.h b/cpp/src/qpid/broker/HandlerImpl.h index 008be10867..96bf065062 100644 --- a/cpp/src/qpid/broker/HandlerImpl.h +++ b/cpp/src/qpid/broker/HandlerImpl.h @@ -40,12 +40,13 @@ class Connection; */ struct CoreRefs { - CoreRefs(Channel& ch, Connection& c, Broker& b) - : channel(ch), connection(c), broker(b), proxy(ch) {} + CoreRefs(Channel& ch, Connection& c, Broker& b, framing::ChannelAdapter& a) + : channel(ch), connection(c), broker(b), adapter(a), proxy(a) {} Channel& channel; Connection& connection; Broker& broker; + framing::ChannelAdapter& adapter; framing::AMQP_ClientProxy proxy; /** diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp index bbfcf209ad..f586ea92fc 100644 --- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp +++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp @@ -21,6 +21,8 @@ #include "BrokerChannel.h" #include "qpid/framing/FramingContent.h" #include "Connection.h" +#include "ConsumeAdapter.h" +#include "GetAdapter.h" #include "Broker.h" #include "BrokerMessageMessage.h" #include "qpid/framing/MessageAppendBody.h" @@ -127,7 +129,7 @@ MessageHandlerImpl::consume(const MethodContext& context, if(!destination.empty() && channel.exists(destination)) throw ConnectionException(530, "Consumer tags must be unique"); string tag = destination; - channel.consume( + channel.consume(std::auto_ptr<DeliveryAdapter>(new ConsumeAdapter(adapter, destination, connection.getFrameMax())), tag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter); client.ok(context.getRequestId()); @@ -144,7 +146,8 @@ MessageHandlerImpl::get( const MethodContext& context, { Queue::shared_ptr queue = getQueue(queueName); - if(channel.get(queue, destination, !noAck)) + GetAdapter out(adapter, queue, destination, connection.getFrameMax()); + if(channel.get(out, queue, !noAck)) client.ok(context.getRequestId()); else client.empty(context.getRequestId()); @@ -162,7 +165,7 @@ MessageHandlerImpl::empty( const MethodContext& ) void MessageHandlerImpl::ok(const MethodContext& /*context*/) { - channel.ack(); + channel.ack(adapter.getFirstAckRequest(), adapter.getLastAckRequest()); } void @@ -190,7 +193,7 @@ MessageHandlerImpl::reject(const MethodContext& /*context*/, uint16_t /*code*/, const string& /*text*/ ) { - channel.ack(); + //channel.ack(); // channel.requeue(); } diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp new file mode 100644 index 0000000000..df92f74b14 --- /dev/null +++ b/cpp/src/qpid/broker/SemanticHandler.cpp @@ -0,0 +1,89 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "SemanticHandler.h" +#include "BrokerAdapter.h" +#include "qpid/framing/ChannelAdapter.h" + +using namespace qpid::broker; +using namespace qpid::framing; + +SemanticHandler::SemanticHandler(ChannelId id, Connection& c) : + connection(c), + channel(c, id, &c.broker.getStore()) +{ + init(id, connection.getOutput(), connection.getVersion()); + adapter = std::auto_ptr<BrokerAdapter>(new BrokerAdapter(channel, connection, connection.broker, *this)); +} + + +void SemanticHandler::handle(framing::AMQFrame& frame) +{ + handleBody(frame.getBody()); +} + +//ChannelAdapter virtual methods: +void SemanticHandler::handleMethodInContext(boost::shared_ptr<qpid::framing::AMQMethodBody> method, + const qpid::framing::MethodContext& context) +{ + try{ + if(getId() != 0 && !method->isA<ChannelOpenBody>() && !isOpen()) { + if (!method->isA<ChannelCloseOkBody>()) { + std::stringstream out; + out << "Attempt to use unopened channel: " << getId(); + throw ConnectionException(504, out.str()); + } + } else { + method->invoke(*adapter, context); + } + }catch(ChannelException& e){ + adapter->getProxy().getChannel().close( + e.code, e.toString(), + method->amqpClassId(), method->amqpMethodId()); + connection.closeChannel(getId()); + }catch(ConnectionException& e){ + connection.close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId()); + }catch(std::exception& e){ + connection.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId()); + } + +} + +bool SemanticHandler::isOpen() const +{ + return channel.isOpen(); +} + +void SemanticHandler::handleHeader(boost::shared_ptr<qpid::framing::AMQHeaderBody> body) +{ + channel.handleHeader(body); +} + +void SemanticHandler::handleContent(boost::shared_ptr<qpid::framing::AMQContentBody> body) +{ + channel.handleContent(body); +} + +void SemanticHandler::handleHeartbeat(boost::shared_ptr<qpid::framing::AMQHeartbeatBody> body) +{ + channel.handleHeartbeat(body); +} + diff --git a/cpp/src/qpid/broker/SemanticHandler.h b/cpp/src/qpid/broker/SemanticHandler.h new file mode 100644 index 0000000000..a179969ece --- /dev/null +++ b/cpp/src/qpid/broker/SemanticHandler.h @@ -0,0 +1,55 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _SemanticHandler_ +#define _SemanticHandler_ + +#include <memory> +#include "BrokerChannel.h" +#include "Connection.h" +#include "qpid/framing/amqp_types.h" +#include "qpid/framing/FrameHandler.h" + +namespace qpid { +namespace broker { + +class BrokerAdapter; +class framing::ChannelAdapter; + +class SemanticHandler : private framing::ChannelAdapter, public framing::FrameHandler { + Connection& connection; + Channel channel; + std::auto_ptr<BrokerAdapter> adapter; + + //ChannelAdapter virtual methods: + void handleMethodInContext(boost::shared_ptr<qpid::framing::AMQMethodBody> method, + const qpid::framing::MethodContext& context); + bool isOpen() const; + void handleHeader(boost::shared_ptr<qpid::framing::AMQHeaderBody>); + void handleContent(boost::shared_ptr<qpid::framing::AMQContentBody>); + void handleHeartbeat(boost::shared_ptr<qpid::framing::AMQHeartbeatBody>); +public: + SemanticHandler(framing::ChannelId id, Connection& c); + void handle(framing::AMQFrame& frame); +}; + +}} + +#endif diff --git a/cpp/src/qpid/framing/ChannelAdapter.h b/cpp/src/qpid/framing/ChannelAdapter.h index 5f92383ee3..1c3f29d762 100644 --- a/cpp/src/qpid/framing/ChannelAdapter.h +++ b/cpp/src/qpid/framing/ChannelAdapter.h @@ -52,7 +52,7 @@ class MethodContext; * Thread safety: OBJECT UNSAFE. Instances must not be called * concurrently. AMQP defines channels to be serialized. */ -class ChannelAdapter : private BodyHandler { +class ChannelAdapter : protected BodyHandler { public: /** *@param output Processed frames are forwarded to this handler. @@ -84,6 +84,10 @@ class ChannelAdapter : private BodyHandler { virtual bool isOpen() const = 0; + RequestId getFirstAckRequest() { return requester.getFirstAckRequest(); } + RequestId getLastAckRequest() { return requester.getLastAckRequest(); } + RequestId getNextSendRequestId() { return requester.getNextId(); } + protected: void assertMethodOk(AMQMethodBody& method) const; void assertChannelOpen() const; @@ -93,13 +97,9 @@ class ChannelAdapter : private BodyHandler { shared_ptr<AMQMethodBody> method, const MethodContext& context) = 0; - RequestId getFirstAckRequest() { return requester.getFirstAckRequest(); } - RequestId getLastAckRequest() { return requester.getLastAckRequest(); } - RequestId getNextSendRequestId() { return requester.getNextId(); } - private: class ChannelAdapterHandler; - friend class ChannelAdapterHandler; + friend class ChannelAdapterHandler; void handleMethod(shared_ptr<AMQMethodBody>); void handleRequest(shared_ptr<AMQRequestBody>); diff --git a/cpp/src/tests/BrokerChannelTest.cpp b/cpp/src/tests/BrokerChannelTest.cpp index 929105f6e3..251ac624ab 100644 --- a/cpp/src/tests/BrokerChannelTest.cpp +++ b/cpp/src/tests/BrokerChannelTest.cpp @@ -48,13 +48,38 @@ struct MockHandler : ConnectionOutputHandler{ void close() {}; }; +struct DeliveryRecorder +{ + typedef std::pair<Message::shared_ptr, RequestId> Delivery; + std::vector<Delivery> delivered; + + struct Adapter : DeliveryAdapter + { + RequestId id; + DeliveryRecorder& recorder; + + Adapter(DeliveryRecorder& r) : recorder(r) {} + + RequestId getNextDeliveryTag() { return id + 1; } + void deliver(Message::shared_ptr& msg, RequestId tag) + { + recorder.delivered.push_back(Delivery(msg, tag)); + id++; + } + + }; + + std::auto_ptr<DeliveryAdapter> createAdapter() + { + return std::auto_ptr<DeliveryAdapter>(new Adapter(*this)); + } +}; class BrokerChannelTest : public CppUnit::TestCase { CPPUNIT_TEST_SUITE(BrokerChannelTest); - CPPUNIT_TEST(testConsumerMgmt); + CPPUNIT_TEST(testConsumerMgmt);; CPPUNIT_TEST(testDeliveryNoAck); - CPPUNIT_TEST(testDeliveryAndRecovery); CPPUNIT_TEST(testStaging); CPPUNIT_TEST(testQueuePolicy); CPPUNIT_TEST(testFlow); @@ -160,11 +185,12 @@ class BrokerChannelTest : public CppUnit::TestCase ConnectionToken* owner = 0; string tag("my_consumer"); - channel.consume(tag, queue, false, false, owner); + std::auto_ptr<DeliveryAdapter> unused; + channel.consume(unused, tag, queue, false, false, owner); string tagA; string tagB; - channel.consume(tagA, queue, false, false, owner); - channel.consume(tagB, queue, false, false, owner); + channel.consume(unused, tagA, queue, false, false, owner); + channel.consume(unused, tagB, queue, false, false, owner); CPPUNIT_ASSERT_EQUAL((uint32_t) 3, queue->getConsumerCount()); CPPUNIT_ASSERT(channel.exists("my_consumer")); CPPUNIT_ASSERT(channel.exists(tagA)); @@ -178,65 +204,17 @@ class BrokerChannelTest : public CppUnit::TestCase CPPUNIT_ASSERT_EQUAL((uint32_t) 0, queue->getConsumerCount()); } - void testDeliveryNoAck(){ + void testDeliveryNoAck(){ Channel channel(connection, 7); - channel.open(); - const string data("abcdefghijklmn"); - Message::shared_ptr msg( - createMessage("test", "my_routing_key", "my_message_id", 14)); - addContent(msg, data); - Queue::shared_ptr queue(new Queue("my_queue")); - ConnectionToken* owner(0); - string tag("no_ack"); - channel.consume(tag, queue, false, false, owner); - - queue->deliver(msg); - CPPUNIT_ASSERT_EQUAL((size_t) 4, handler.frames.size()); - CPPUNIT_ASSERT_EQUAL(ChannelId(0), handler.frames[0].getChannel()); - CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[1].getChannel()); - CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[2].getChannel()); - CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[3].getChannel()); - CPPUNIT_ASSERT(dynamic_cast<ConnectionStartBody*>( - handler.frames[0].getBody().get())); - CPPUNIT_ASSERT(dynamic_cast<BasicDeliverBody*>( - handler.frames[1].getBody().get())); - CPPUNIT_ASSERT(dynamic_cast<AMQHeaderBody*>( - handler.frames[2].getBody().get())); - AMQContentBody* contentBody = dynamic_cast<AMQContentBody*>( - handler.frames[3].getBody().get()); - CPPUNIT_ASSERT(contentBody); - CPPUNIT_ASSERT_EQUAL(data, contentBody->getData()); - } - - void testDeliveryAndRecovery(){ - Channel channel(connection, 7); - channel.open(); - const string data("abcdefghijklmn"); - Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14)); - addContent(msg, data); - Queue::shared_ptr queue(new Queue("my_queue")); - ConnectionToken* owner(0); - string tag("ack"); - channel.consume(tag, queue, true, false, owner); - + DeliveryRecorder recorder; + string tag("test"); + channel.consume(recorder.createAdapter(), tag, queue, false, false, 0); queue->deliver(msg); - CPPUNIT_ASSERT_EQUAL((size_t) 4, handler.frames.size()); - CPPUNIT_ASSERT_EQUAL(ChannelId(0), handler.frames[0].getChannel()); - CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[1].getChannel()); - CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[2].getChannel()); - CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[3].getChannel()); - CPPUNIT_ASSERT(dynamic_cast<ConnectionStartBody*>( - handler.frames[0].getBody().get())); - CPPUNIT_ASSERT(dynamic_cast<BasicDeliverBody*>( - handler.frames[1].getBody().get())); - CPPUNIT_ASSERT(dynamic_cast<AMQHeaderBody*>( - handler.frames[2].getBody().get())); - AMQContentBody* contentBody = dynamic_cast<AMQContentBody*>( - handler.frames[3].getBody().get()); - CPPUNIT_ASSERT(contentBody); - CPPUNIT_ASSERT_EQUAL(data, contentBody->getData()); + + CPPUNIT_ASSERT_EQUAL((size_t) 1, recorder.delivered.size()); + CPPUNIT_ASSERT_EQUAL(msg, recorder.delivered.front().first); } void testStaging(){ @@ -349,26 +327,18 @@ class BrokerChannelTest : public CppUnit::TestCase Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14)); addContent(msg, data); Queue::shared_ptr queue(new Queue("my_queue")); - ConnectionToken* owner(0); - string tag("no_ack"); - channel.consume(tag, queue, false, false, owner); + DeliveryRecorder recorder; + string tag("test"); + channel.consume(recorder.createAdapter(), tag, queue, false, false, 0); channel.flow(false); queue->deliver(msg); - //ensure no more frames have been delivered - CPPUNIT_ASSERT_EQUAL((size_t) 1, handler.frames.size()); - CPPUNIT_ASSERT_EQUAL((u_int32_t) 1, queue->getMessageCount()); + //ensure no messages have been delivered + CPPUNIT_ASSERT_EQUAL((size_t) 0, recorder.delivered.size()); + channel.flow(true); - CPPUNIT_ASSERT_EQUAL((size_t) 4, handler.frames.size()); - CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[1].getChannel()); - CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[2].getChannel()); - CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[3].getChannel()); - BasicDeliverBody::shared_ptr deliver(dynamic_pointer_cast<BasicDeliverBody, AMQBody>(handler.frames[1].getBody())); - AMQHeaderBody::shared_ptr contentHeader(dynamic_pointer_cast<AMQHeaderBody, AMQBody>(handler.frames[2].getBody())); - AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[3].getBody())); - CPPUNIT_ASSERT(deliver); - CPPUNIT_ASSERT(contentHeader); - CPPUNIT_ASSERT(contentBody); - CPPUNIT_ASSERT_EQUAL(data, contentBody->getData()); + //ensure no messages have been delivered + CPPUNIT_ASSERT_EQUAL((size_t) 1, recorder.delivered.size()); + CPPUNIT_ASSERT_EQUAL(msg, recorder.delivered.front().first); } Message* createMessage(const string& exchange, const string& routingKey, const string& messageId, uint64_t contentSize) |