diff options
author | Gordon Sim <gsim@apache.org> | 2007-07-17 08:28:48 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-07-17 08:28:48 +0000 |
commit | ce9743f8f1640d42af5fe7aaa8fe7e3ca82a914d (patch) | |
tree | ea71b96a92eb5402b71a4c08312fbe1d8b835bbc | |
parent | 54b8fe305e87f623bbeb2c50bea20a332f71a983 (diff) | |
download | qpid-python-ce9743f8f1640d42af5fe7aaa8fe7e3ca82a914d.tar.gz |
Some refactoring towards a more decoupled handler chain structure:
* Connection no longer depends on Channel; it contains a map of
FrameHandler::Chains. (The construction of the chains still refers
to specific handlers).
* Channel is no longer tied to ChannelAdapter through inheritance. The
former is independent of any particular handler chain or protocol
version, the latter is still used by ConnectionAdapter and
SemanticHandler in the 0-9 chain.
* A DeliveryAdapter interface has been introduced as part of the
separation of ChannelAdapter from Channel. This is intended to adapt
from a version independent core to version specific mechanisms for
sending messages. i.e. it fulfills the same role for outputs that
e.g. BrokerAdapter does for inputs. (Its not perfect yet by any
means but is a step on the way to the correct model I think).
* The connection related methods sent over channel zero are
implemented in their own adapter (ConnectionAdapter), and are
entirely separate from the semantic layer. The channel control
methods are still bundled with the proper semantic layer methods;
they too can be separated but would have to share the request id
with the semantic method handler due to the nature of the 0-9 WIP.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@556846 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/src/Makefile.am | 7 | ||||
-rw-r--r-- | cpp/src/qpid/broker/BrokerAdapter.cpp | 15 | ||||
-rw-r--r-- | cpp/src/qpid/broker/BrokerAdapter.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/broker/BrokerChannel.cpp | 99 | ||||
-rw-r--r-- | cpp/src/qpid/broker/BrokerChannel.h | 38 | ||||
-rw-r--r-- | cpp/src/qpid/broker/BrokerMessageBase.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Connection.cpp | 13 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Connection.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/ConsumeAdapter.cpp | 37 | ||||
-rw-r--r-- | cpp/src/qpid/broker/ConsumeAdapter.h | 43 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DeliveryAdapter.h | 51 | ||||
-rw-r--r-- | cpp/src/qpid/broker/GetAdapter.cpp | 41 | ||||
-rw-r--r-- | cpp/src/qpid/broker/GetAdapter.h | 47 | ||||
-rw-r--r-- | cpp/src/qpid/broker/HandlerImpl.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageHandlerImpl.cpp | 11 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticHandler.cpp | 89 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticHandler.h | 55 | ||||
-rw-r--r-- | cpp/src/qpid/framing/ChannelAdapter.h | 12 | ||||
-rw-r--r-- | cpp/src/tests/BrokerChannelTest.cpp | 124 |
19 files changed, 506 insertions, 193 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 09d3f6185d..3399d861f2 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -197,6 +197,7 @@ libqpidbroker_la_SOURCES = \ qpid/broker/Connection.cpp \ qpid/broker/ConnectionAdapter.cpp \ qpid/broker/ConnectionFactory.cpp \ + qpid/broker/ConsumeAdapter.cpp \ qpid/broker/Daemon.cpp \ qpid/broker/DeliverableMessage.cpp \ qpid/broker/DeliveryRecord.cpp \ @@ -209,6 +210,7 @@ libqpidbroker_la_SOURCES = \ qpid/broker/DtxWorkRecord.cpp \ qpid/broker/ExchangeRegistry.cpp \ qpid/broker/FanOutExchange.cpp \ + qpid/broker/GetAdapter.cpp \ qpid/broker/HeadersExchange.cpp \ qpid/broker/InMemoryContent.cpp \ qpid/broker/LazyLoadedContent.cpp \ @@ -224,6 +226,7 @@ libqpidbroker_la_SOURCES = \ qpid/broker/RecoveredEnqueue.cpp \ qpid/broker/RecoveredDequeue.cpp \ qpid/broker/Reference.cpp \ + qpid/broker/SemanticHandler.cpp \ qpid/broker/Timer.cpp \ qpid/broker/TopicExchange.cpp \ qpid/broker/TxAck.cpp \ @@ -253,9 +256,11 @@ nobase_include_HEADERS = \ qpid/broker/BrokerMessageBase.h \ qpid/broker/BrokerQueue.h \ qpid/broker/CompletionHandler.h \ + qpid/broker/ConsumeAdapter.h \ qpid/broker/Consumer.h \ qpid/broker/Deliverable.h \ qpid/broker/DeliverableMessage.h \ + qpid/broker/DeliveryAdapter.h \ qpid/broker/DirectExchange.h \ qpid/broker/DtxAck.h \ qpid/broker/DtxBuffer.h \ @@ -265,6 +270,7 @@ nobase_include_HEADERS = \ qpid/broker/DtxWorkRecord.h \ qpid/broker/ExchangeRegistry.h \ qpid/broker/FanOutExchange.h \ + qpid/broker/GetAdapter.h \ qpid/broker/HandlerImpl.h \ qpid/broker/InMemoryContent.h \ qpid/broker/MessageBuilder.h \ @@ -306,6 +312,7 @@ nobase_include_HEADERS = \ qpid/broker/PersistableQueue.h \ qpid/broker/QueuePolicy.h \ qpid/broker/RecoveryManagerImpl.h \ + qpid/broker/SemanticHandler.h \ qpid/broker/Timer.h \ qpid/broker/TopicExchange.h \ qpid/broker/TransactionalStore.h \ diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp index bbf6686a6c..f0dc159752 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.cpp +++ b/cpp/src/qpid/broker/BrokerAdapter.cpp @@ -20,6 +20,8 @@ #include "BrokerAdapter.h" #include "BrokerChannel.h" #include "Connection.h" +#include "ConsumeAdapter.h" +#include "GetAdapter.h" #include "qpid/framing/AMQMethodBody.h" #include "qpid/Exception.h" @@ -33,8 +35,8 @@ using namespace qpid::framing; typedef std::vector<Queue::shared_ptr> QueueVector; -BrokerAdapter::BrokerAdapter(Channel& ch, Connection& c, Broker& b) : - CoreRefs(ch, c, b), + BrokerAdapter::BrokerAdapter(Channel& ch, Connection& c, Broker& b, ChannelAdapter& a) : + CoreRefs(ch, c, b, a), connection(c), basicHandler(*this), channelHandler(*this), @@ -299,9 +301,11 @@ void BrokerAdapter::BasicHandlerImpl::consume( if(!consumerTag.empty() && channel.exists(consumerTag)){ throw ConnectionException(530, "Consumer tags must be unique"); } - string newTag = consumerTag; - channel.consume( + //need to generate name here, so we have it for the adapter (it is + //also version specific behaviour now) + if (newTag.empty()) newTag = tagGenerator.generate(); + channel.consume(std::auto_ptr<DeliveryAdapter>(new ConsumeAdapter(adapter, newTag, connection.getFrameMax())), newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &fields); if(!nowait) client.consumeOk(newTag, context.getRequestId()); @@ -336,7 +340,8 @@ void BrokerAdapter::BasicHandlerImpl::publish( void BrokerAdapter::BasicHandlerImpl::get(const MethodContext& context, uint16_t /*ticket*/, const string& queueName, bool noAck){ Queue::shared_ptr queue = getQueue(queueName); - if(!channel.get(queue, "", !noAck)){ + GetAdapter out(adapter, queue, "", connection.getFrameMax()); + if(!channel.get(out, queue, !noAck)){ string clusterId;//not used, part of an imatix hack client.getEmpty(clusterId, context.getRequestId()); diff --git a/cpp/src/qpid/broker/BrokerAdapter.h b/cpp/src/qpid/broker/BrokerAdapter.h index c66bdb3a31..795744aa9a 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.h +++ b/cpp/src/qpid/broker/BrokerAdapter.h @@ -56,7 +56,7 @@ class MessageHandlerImpl; class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations { public: - BrokerAdapter(Channel& ch, Connection& c, Broker& b); + BrokerAdapter(Channel& ch, Connection& c, Broker& b, framing::ChannelAdapter& a); framing::ProtocolVersion getVersion() const; ChannelHandler* getChannelHandler() { return &channelHandler; } @@ -172,8 +172,10 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations public BasicHandler, public HandlerImpl<framing::AMQP_ClientProxy::Basic> { + NameGenerator tagGenerator; + public: - BasicHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {} + BasicHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent), tagGenerator("sgen") {} void qos(const framing::MethodContext& context, uint32_t prefetchSize, uint16_t prefetchCount, bool global); diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp index c81e73aba1..523a834715 100644 --- a/cpp/src/qpid/broker/BrokerChannel.cpp +++ b/cpp/src/qpid/broker/BrokerChannel.cpp @@ -28,7 +28,6 @@ #include <boost/bind.hpp> #include <boost/format.hpp> -#include "qpid/framing/ChannelAdapter.h" #include "qpid/QpidError.h" #include "BrokerAdapter.h" @@ -50,8 +49,8 @@ using namespace qpid::framing; using namespace qpid::sys; -Channel::Channel(Connection& con, ChannelId id, MessageStore* const _store) : - ChannelAdapter(), +Channel::Channel(Connection& con, ChannelId _id, MessageStore* const _store) : + id(_id), connection(con), currentDeliveryTag(1), prefetchSize(0), @@ -62,10 +61,8 @@ Channel::Channel(Connection& con, ChannelId id, MessageStore* const _store) : store(_store), messageBuilder(this, _store, connection.getStagingThreshold()), opened(id == 0),//channel 0 is automatically open, other must be explicitly opened - flowActive(true), - adapter(new BrokerAdapter(*this, con, con.broker)) + flowActive(true) { - init(id, con.getOutput(), con.getVersion()); outstanding.reset(); } @@ -79,14 +76,15 @@ bool Channel::exists(const string& consumerTag){ // TODO aconway 2007-02-12: Why is connection token passed in instead // of using the channel's parent connection? -void Channel::consume(string& tagInOut, Queue::shared_ptr queue, bool acks, +void Channel::consume(std::auto_ptr<DeliveryAdapter> adapter, string& tagInOut, + Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection, const FieldTable*) { if(tagInOut.empty()) tagInOut = tagGenerator.generate(); std::auto_ptr<ConsumerImpl> c( - new ConsumerImpl(this, tagInOut, queue, connection, acks)); + new ConsumerImpl(this, adapter, tagInOut, queue, connection, acks)); queue->consume(c.get(), exclusive);//may throw exception consumers.insert(tagInOut, c.release()); } @@ -195,22 +193,10 @@ void Channel::checkDtxTimeout() } } -void Channel::deliver( - Message::shared_ptr& msg, const string& consumerTag, - Queue::shared_ptr& queue, bool ackExpected) +void Channel::record(const DeliveryRecord& delivery) { - Mutex::ScopedLock locker(deliveryLock); - - // Key the delivered messages to the id of the request in which they're sent - uint64_t deliveryTag = getNextSendRequestId(); - - if(ackExpected){ - unacked.push_back(DeliveryRecord(msg, queue, consumerTag, deliveryTag)); - outstanding.size += msg->contentSize(); - outstanding.count++; - } - //send deliver method, header and content(s) - msg->deliver(*this, consumerTag, deliveryTag, connection.getFrameMax()); + unacked.push_back(delivery); + delivery.addTo(&outstanding); } bool Channel::checkPrefetch(Message::shared_ptr& msg){ @@ -220,11 +206,11 @@ bool Channel::checkPrefetch(Message::shared_ptr& msg){ return countOk && sizeOk; } -Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, const string& _tag, - Queue::shared_ptr _queue, - ConnectionToken* const _connection, bool ack -) : parent(_parent), tag(_tag), queue(_queue), connection(_connection), - ackExpected(ack), blocked(false) {} +Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, std::auto_ptr<DeliveryAdapter> _adapter, + const string& _tag, Queue::shared_ptr _queue, + ConnectionToken* const _connection, bool ack + ) : parent(_parent), adapter(_adapter), tag(_tag), queue(_queue), connection(_connection), + ackExpected(ack), blocked(false) {} bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){ if(!connection || connection != msg->getPublisher()){//check for no_local @@ -232,13 +218,25 @@ bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){ blocked = true; }else{ blocked = false; - parent->deliver(msg, tag, queue, ackExpected); + Mutex::ScopedLock locker(parent->deliveryLock); + + uint64_t deliveryTag = adapter->getNextDeliveryTag(); + if(ackExpected){ + parent->record(DeliveryRecord(msg, queue, tag, deliveryTag)); + } + adapter->deliver(msg, deliveryTag); + return true; } } return false; } +void Channel::ConsumerImpl::redeliver(Message::shared_ptr& msg, uint64_t deliveryTag) { + Mutex::ScopedLock locker(parent->deliveryLock); + adapter->deliver(msg, deliveryTag); +} + Channel::ConsumerImpl::~ConsumerImpl() { cancel(); } @@ -298,10 +296,6 @@ void Channel::complete(Message::shared_ptr msg) { } } -void Channel::ack(){ - ack(getFirstAckRequest(), getLastAckRequest()); -} - // Used by Basic void Channel::ack(uint64_t deliveryTag, bool multiple){ if (multiple) @@ -365,15 +359,12 @@ void Channel::recover(bool requeue){ } } -bool Channel::get(Queue::shared_ptr queue, const string& destination, bool ackExpected){ +bool Channel::get(DeliveryAdapter& adapter, Queue::shared_ptr queue, bool ackExpected){ Message::shared_ptr msg = queue->dequeue(); if(msg){ Mutex::ScopedLock locker(deliveryLock); - uint64_t myDeliveryTag = getNextSendRequestId(); - msg->sendGetOk(MethodContext(this, msg->getRespondTo()), - destination, - queue->getMessageCount() + 1, myDeliveryTag, - connection.getFrameMax()); + uint64_t myDeliveryTag = adapter.getNextDeliveryTag(); + adapter.deliver(msg, myDeliveryTag); if(ackExpected){ unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag)); } @@ -386,33 +377,9 @@ bool Channel::get(Queue::shared_ptr queue, const string& destination, bool ackEx void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, uint64_t deliveryTag) { - msg->deliver(*this, consumerTag, deliveryTag, connection.getFrameMax()); -} - -void Channel::handleMethodInContext( - boost::shared_ptr<qpid::framing::AMQMethodBody> method, - const MethodContext& context -) -{ - try{ - if(getId() != 0 && !method->isA<ChannelOpenBody>() && !isOpen()) { - if (!method->isA<ChannelCloseOkBody>()) { - std::stringstream out; - out << "Attempt to use unopened channel: " << getId(); - throw ConnectionException(504, out.str()); - } - } else { - method->invoke(*adapter, context); - } - }catch(ChannelException& e){ - adapter->getProxy().getChannel().close( - e.code, e.toString(), - method->amqpClassId(), method->amqpMethodId()); - connection.closeChannel(getId()); - }catch(ConnectionException& e){ - connection.close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId()); - }catch(std::exception& e){ - connection.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId()); + ConsumerImplMap::iterator i = consumers.find(consumerTag); + if (i != consumers.end()){ + i->redeliver(msg, deliveryTag); } } diff --git a/cpp/src/qpid/broker/BrokerChannel.h b/cpp/src/qpid/broker/BrokerChannel.h index 9212e8f632..a2b6bd3ef9 100644 --- a/cpp/src/qpid/broker/BrokerChannel.h +++ b/cpp/src/qpid/broker/BrokerChannel.h @@ -23,6 +23,7 @@ */ #include <list> +#include <memory> #include <boost/scoped_ptr.hpp> #include <boost/shared_ptr.hpp> @@ -30,6 +31,7 @@ #include "AccumulatedAck.h" #include "Consumer.h" +#include "DeliveryAdapter.h" #include "DeliveryRecord.h" #include "DtxBuffer.h" #include "DtxManager.h" @@ -37,6 +39,7 @@ #include "NameGenerator.h" #include "Prefetch.h" #include "TxBuffer.h" +#include "qpid/framing/amqp_types.h" #include "qpid/framing/ChannelAdapter.h" #include "qpid/framing/ChannelOpenBody.h" #include "CompletionHandler.h" @@ -55,12 +58,12 @@ using framing::string; * Maintains state for an AMQP channel. Handles incoming and * outgoing messages for that channel. */ -class Channel : public framing::ChannelAdapter, - public CompletionHandler +class Channel : public CompletionHandler { class ConsumerImpl : public Consumer { Channel* parent; + std::auto_ptr<DeliveryAdapter> adapter; const string tag; Queue::shared_ptr queue; ConnectionToken* const connection; @@ -68,17 +71,19 @@ class Channel : public framing::ChannelAdapter, bool blocked; public: - ConsumerImpl(Channel* parent, const string& tag, - Queue::shared_ptr queue, + ConsumerImpl(Channel* parent, std::auto_ptr<DeliveryAdapter> adapter, + const string& tag, Queue::shared_ptr queue, ConnectionToken* const connection, bool ack); ~ConsumerImpl(); - virtual bool deliver(Message::shared_ptr& msg); + bool deliver(Message::shared_ptr& msg); + void redeliver(Message::shared_ptr& msg, uint64_t deliveryTag); void cancel(); void requestDispatch(); }; typedef boost::ptr_map<string,ConsumerImpl> ConsumerImplMap; + framing::ChannelId id; Connection& connection; uint64_t currentDeliveryTag; Queue::shared_ptr defaultQueue; @@ -97,15 +102,10 @@ class Channel : public framing::ChannelAdapter, MessageBuilder messageBuilder;//builder for in-progress message bool opened; bool flowActive; - boost::scoped_ptr<BrokerAdapter> adapter; - - // completion handler for MessageBuilder - void complete(Message::shared_ptr msg); - - void deliver(Message::shared_ptr& msg, const string& tag, - Queue::shared_ptr& queue, bool ackExpected); + + void complete(Message::shared_ptr msg);// completion handler for MessageBuilder + void record(const DeliveryRecord& delivery); bool checkPrefetch(Message::shared_ptr& msg); - void checkDtxTimeout(); public: @@ -113,7 +113,7 @@ class Channel : public framing::ChannelAdapter, ~Channel(); bool isOpen() const { return opened; } - BrokerAdapter& getAdapter() { return *adapter; } + framing::ChannelId getId() const { return id; } void open() { opened = true; } void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; } @@ -126,11 +126,11 @@ class Channel : public framing::ChannelAdapter, /** *@param tagInOut - if empty it is updated with the generated token. */ - void consume(string& tagInOut, Queue::shared_ptr queue, bool acks, + void consume(std::auto_ptr<DeliveryAdapter> adapter, string& tagInOut, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection = 0, const framing::FieldTable* = 0); void cancel(const string& tag); - bool get(Queue::shared_ptr queue, const std::string& destination, bool ackExpected); + bool get(DeliveryAdapter& adapter, Queue::shared_ptr queue, bool ackExpected); void close(); void startTx(); void commit(); @@ -140,7 +140,6 @@ class Channel : public framing::ChannelAdapter, void endDtx(const std::string& xid, bool fail); void suspendDtx(const std::string& xid); void resumeDtx(const std::string& xid); - void ack(); void ack(uint64_t deliveryTag, bool multiple); void ack(uint64_t deliveryTag, uint64_t endTag); void recover(bool requeue); @@ -152,11 +151,6 @@ class Channel : public framing::ChannelAdapter, void handleHeartbeat(boost::shared_ptr<framing::AMQHeartbeatBody>); void handleInlineTransfer(Message::shared_ptr msg); - - // For ChannelAdapter - void handleMethodInContext( - boost::shared_ptr<framing::AMQMethodBody> method, - const framing::MethodContext& context); }; }} // namespace broker diff --git a/cpp/src/qpid/broker/BrokerMessageBase.h b/cpp/src/qpid/broker/BrokerMessageBase.h index a254986fd9..048b1c80e2 100644 --- a/cpp/src/qpid/broker/BrokerMessageBase.h +++ b/cpp/src/qpid/broker/BrokerMessageBase.h @@ -103,7 +103,7 @@ class Message : public PersistableMessage{ * Used to return a message in response to a get from a queue */ virtual void sendGetOk(const framing::MethodContext& context, - const std::string& destination, + const std::string& destination, uint32_t messageCount, uint64_t deliveryTag, uint32_t framesize) = 0; diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index cdbcee1c69..dfe2101bc0 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -26,6 +26,7 @@ #include "BrokerChannel.h" #include "qpid/framing/AMQP_ClientProxy.h" #include "BrokerAdapter.h" +#include "SemanticHandler.h" using namespace boost; using namespace qpid::sys; @@ -55,7 +56,7 @@ void Connection::received(framing::AMQFrame& frame){ if (frame.getChannel() == 0) { adapter.handle(frame); } else { - getChannel((frame.getChannel())).getHandlers().in->handle(frame); + getChannel((frame.getChannel())).in->handle(frame); } } @@ -92,17 +93,17 @@ void Connection::closed(){ void Connection::closeChannel(uint16_t id) { ChannelMap::iterator i = channels.find(id); - if (i != channels.end()) - i->close(); + if (i != channels.end()) channels.erase(i); } -Channel& Connection::getChannel(ChannelId id) { +FrameHandler::Chains& Connection::getChannel(ChannelId id) { ChannelMap::iterator i = channels.find(id); if (i == channels.end()) { - i = channels.insert(id, new Channel(*this, id, &broker.getStore())).first; + FrameHandler::Chains chains(new SemanticHandler(id, *this), new OutputHandlerFrameHandler(*out)); + i = channels.insert(ChannelMap::value_type(id, chains)).first; } - return *i; + return i->second; } diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h index a885ac4065..e38f88c2e9 100644 --- a/cpp/src/qpid/broker/Connection.h +++ b/cpp/src/qpid/broker/Connection.h @@ -51,7 +51,7 @@ class Connection : public sys::ConnectionInputHandler, Connection(sys::ConnectionOutputHandler* out, Broker& broker); /** Get a channel. Create if it does not already exist */ - Channel& getChannel(framing::ChannelId channel); + framing::FrameHandler::Chains& getChannel(framing::ChannelId channel); /** Close a channel */ void closeChannel(framing::ChannelId channel); @@ -82,7 +82,7 @@ class Connection : public sys::ConnectionInputHandler, void closed(); private: - typedef boost::ptr_map<framing::ChannelId, Channel> ChannelMap; + typedef std::map<framing::ChannelId, framing::FrameHandler::Chains> ChannelMap; typedef std::vector<Queue::shared_ptr>::iterator queue_iterator; Exchange::shared_ptr findExchange(const string& name); diff --git a/cpp/src/qpid/broker/ConsumeAdapter.cpp b/cpp/src/qpid/broker/ConsumeAdapter.cpp new file mode 100644 index 0000000000..59b6795a77 --- /dev/null +++ b/cpp/src/qpid/broker/ConsumeAdapter.cpp @@ -0,0 +1,37 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ConsumeAdapter.h" + +using namespace qpid::broker; +using qpid::framing::ChannelAdapter; +using qpid::framing::RequestId; + +ConsumeAdapter::ConsumeAdapter(ChannelAdapter& a, const std::string t, uint32_t f) : adapter(a), tag(t), framesize(f) {} + +RequestId ConsumeAdapter::getNextDeliveryTag() +{ + return adapter.getNextSendRequestId(); +} + +void ConsumeAdapter::deliver(Message::shared_ptr& msg, RequestId deliveryTag) +{ + msg->deliver(adapter, tag, deliveryTag, framesize); +} diff --git a/cpp/src/qpid/broker/ConsumeAdapter.h b/cpp/src/qpid/broker/ConsumeAdapter.h new file mode 100644 index 0000000000..43cda7753e --- /dev/null +++ b/cpp/src/qpid/broker/ConsumeAdapter.h @@ -0,0 +1,43 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _ConsumeAdapter_ +#define _ConsumeAdapter_ + +#include "DeliveryAdapter.h" +#include "qpid/framing/ChannelAdapter.h" + +namespace qpid { +namespace broker { + class ConsumeAdapter : public DeliveryAdapter + { + framing::ChannelAdapter& adapter; + const std::string tag; + const uint32_t framesize; + public: + ConsumeAdapter(framing::ChannelAdapter& adapter, const std::string tag, uint32_t framesize); + framing::RequestId getNextDeliveryTag(); + void deliver(Message::shared_ptr& msg, framing::RequestId tag); + }; + +}} + + +#endif diff --git a/cpp/src/qpid/broker/DeliveryAdapter.h b/cpp/src/qpid/broker/DeliveryAdapter.h new file mode 100644 index 0000000000..45b103bd68 --- /dev/null +++ b/cpp/src/qpid/broker/DeliveryAdapter.h @@ -0,0 +1,51 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _DeliveryAdapter_ +#define _DeliveryAdapter_ + +#include "BrokerMessageBase.h" +#include "qpid/framing/amqp_types.h" + +namespace qpid { +namespace broker { + + /** + * The intention behind this interface is to separate the generic + * handling of some form of message delivery to clients that is + * contained in the version independent Channel class from the + * details required for a particular situation or + * version. i.e. where the existing adapters allow (through + * supporting the generated interface for a version of the + * protocol) inputs of a channel to be adapted to the version + * independent part, this does the same for the outputs. + */ + class DeliveryAdapter + { + public: + virtual framing::RequestId getNextDeliveryTag() = 0; + virtual void deliver(Message::shared_ptr& msg, framing::RequestId tag) = 0; + virtual ~DeliveryAdapter(){} + }; + +}} + + +#endif diff --git a/cpp/src/qpid/broker/GetAdapter.cpp b/cpp/src/qpid/broker/GetAdapter.cpp new file mode 100644 index 0000000000..4a2f6d34d4 --- /dev/null +++ b/cpp/src/qpid/broker/GetAdapter.cpp @@ -0,0 +1,41 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "GetAdapter.h" +#include "qpid/framing/MethodContext.h" + +using namespace qpid::broker; +using qpid::framing::ChannelAdapter; +using qpid::framing::RequestId; +using qpid::framing::MethodContext; + +GetAdapter::GetAdapter(ChannelAdapter& a, Queue::shared_ptr q, const std::string d, uint32_t f) + : adapter(a), queue(q), destination(d), framesize(f) {} + +RequestId GetAdapter::getNextDeliveryTag() +{ + return adapter.getNextSendRequestId(); +} + +void GetAdapter::deliver(Message::shared_ptr& msg, framing::RequestId deliveryTag) +{ + msg->sendGetOk(MethodContext(&adapter, msg->getRespondTo()), destination, + queue->getMessageCount(), deliveryTag, framesize); +} diff --git a/cpp/src/qpid/broker/GetAdapter.h b/cpp/src/qpid/broker/GetAdapter.h new file mode 100644 index 0000000000..e90619a5f3 --- /dev/null +++ b/cpp/src/qpid/broker/GetAdapter.h @@ -0,0 +1,47 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _GetAdapter_ +#define _GetAdapter_ + +#include "BrokerQueue.h" +#include "DeliveryAdapter.h" +#include "qpid/framing/ChannelAdapter.h" + +namespace qpid { +namespace broker { + + class GetAdapter : public DeliveryAdapter + { + framing::ChannelAdapter& adapter; + Queue::shared_ptr queue; + const std::string destination; + const uint32_t framesize; + public: + GetAdapter(framing::ChannelAdapter& adapter, Queue::shared_ptr queue, const std::string destination, uint32_t framesize); + ~GetAdapter(){} + framing::RequestId getNextDeliveryTag(); + void deliver(Message::shared_ptr& msg, framing::RequestId tag); + }; + +}} + + +#endif diff --git a/cpp/src/qpid/broker/HandlerImpl.h b/cpp/src/qpid/broker/HandlerImpl.h index 008be10867..96bf065062 100644 --- a/cpp/src/qpid/broker/HandlerImpl.h +++ b/cpp/src/qpid/broker/HandlerImpl.h @@ -40,12 +40,13 @@ class Connection; */ struct CoreRefs { - CoreRefs(Channel& ch, Connection& c, Broker& b) - : channel(ch), connection(c), broker(b), proxy(ch) {} + CoreRefs(Channel& ch, Connection& c, Broker& b, framing::ChannelAdapter& a) + : channel(ch), connection(c), broker(b), adapter(a), proxy(a) {} Channel& channel; Connection& connection; Broker& broker; + framing::ChannelAdapter& adapter; framing::AMQP_ClientProxy proxy; /** diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp index bbfcf209ad..f586ea92fc 100644 --- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp +++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp @@ -21,6 +21,8 @@ #include "BrokerChannel.h" #include "qpid/framing/FramingContent.h" #include "Connection.h" +#include "ConsumeAdapter.h" +#include "GetAdapter.h" #include "Broker.h" #include "BrokerMessageMessage.h" #include "qpid/framing/MessageAppendBody.h" @@ -127,7 +129,7 @@ MessageHandlerImpl::consume(const MethodContext& context, if(!destination.empty() && channel.exists(destination)) throw ConnectionException(530, "Consumer tags must be unique"); string tag = destination; - channel.consume( + channel.consume(std::auto_ptr<DeliveryAdapter>(new ConsumeAdapter(adapter, destination, connection.getFrameMax())), tag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter); client.ok(context.getRequestId()); @@ -144,7 +146,8 @@ MessageHandlerImpl::get( const MethodContext& context, { Queue::shared_ptr queue = getQueue(queueName); - if(channel.get(queue, destination, !noAck)) + GetAdapter out(adapter, queue, destination, connection.getFrameMax()); + if(channel.get(out, queue, !noAck)) client.ok(context.getRequestId()); else client.empty(context.getRequestId()); @@ -162,7 +165,7 @@ MessageHandlerImpl::empty( const MethodContext& ) void MessageHandlerImpl::ok(const MethodContext& /*context*/) { - channel.ack(); + channel.ack(adapter.getFirstAckRequest(), adapter.getLastAckRequest()); } void @@ -190,7 +193,7 @@ MessageHandlerImpl::reject(const MethodContext& /*context*/, uint16_t /*code*/, const string& /*text*/ ) { - channel.ack(); + //channel.ack(); // channel.requeue(); } diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp new file mode 100644 index 0000000000..df92f74b14 --- /dev/null +++ b/cpp/src/qpid/broker/SemanticHandler.cpp @@ -0,0 +1,89 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "SemanticHandler.h" +#include "BrokerAdapter.h" +#include "qpid/framing/ChannelAdapter.h" + +using namespace qpid::broker; +using namespace qpid::framing; + +SemanticHandler::SemanticHandler(ChannelId id, Connection& c) : + connection(c), + channel(c, id, &c.broker.getStore()) +{ + init(id, connection.getOutput(), connection.getVersion()); + adapter = std::auto_ptr<BrokerAdapter>(new BrokerAdapter(channel, connection, connection.broker, *this)); +} + + +void SemanticHandler::handle(framing::AMQFrame& frame) +{ + handleBody(frame.getBody()); +} + +//ChannelAdapter virtual methods: +void SemanticHandler::handleMethodInContext(boost::shared_ptr<qpid::framing::AMQMethodBody> method, + const qpid::framing::MethodContext& context) +{ + try{ + if(getId() != 0 && !method->isA<ChannelOpenBody>() && !isOpen()) { + if (!method->isA<ChannelCloseOkBody>()) { + std::stringstream out; + out << "Attempt to use unopened channel: " << getId(); + throw ConnectionException(504, out.str()); + } + } else { + method->invoke(*adapter, context); + } + }catch(ChannelException& e){ + adapter->getProxy().getChannel().close( + e.code, e.toString(), + method->amqpClassId(), method->amqpMethodId()); + connection.closeChannel(getId()); + }catch(ConnectionException& e){ + connection.close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId()); + }catch(std::exception& e){ + connection.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId()); + } + +} + +bool SemanticHandler::isOpen() const +{ + return channel.isOpen(); +} + +void SemanticHandler::handleHeader(boost::shared_ptr<qpid::framing::AMQHeaderBody> body) +{ + channel.handleHeader(body); +} + +void SemanticHandler::handleContent(boost::shared_ptr<qpid::framing::AMQContentBody> body) +{ + channel.handleContent(body); +} + +void SemanticHandler::handleHeartbeat(boost::shared_ptr<qpid::framing::AMQHeartbeatBody> body) +{ + channel.handleHeartbeat(body); +} + diff --git a/cpp/src/qpid/broker/SemanticHandler.h b/cpp/src/qpid/broker/SemanticHandler.h new file mode 100644 index 0000000000..a179969ece --- /dev/null +++ b/cpp/src/qpid/broker/SemanticHandler.h @@ -0,0 +1,55 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _SemanticHandler_ +#define _SemanticHandler_ + +#include <memory> +#include "BrokerChannel.h" +#include "Connection.h" +#include "qpid/framing/amqp_types.h" +#include "qpid/framing/FrameHandler.h" + +namespace qpid { +namespace broker { + +class BrokerAdapter; +class framing::ChannelAdapter; + +class SemanticHandler : private framing::ChannelAdapter, public framing::FrameHandler { + Connection& connection; + Channel channel; + std::auto_ptr<BrokerAdapter> adapter; + + //ChannelAdapter virtual methods: + void handleMethodInContext(boost::shared_ptr<qpid::framing::AMQMethodBody> method, + const qpid::framing::MethodContext& context); + bool isOpen() const; + void handleHeader(boost::shared_ptr<qpid::framing::AMQHeaderBody>); + void handleContent(boost::shared_ptr<qpid::framing::AMQContentBody>); + void handleHeartbeat(boost::shared_ptr<qpid::framing::AMQHeartbeatBody>); +public: + SemanticHandler(framing::ChannelId id, Connection& c); + void handle(framing::AMQFrame& frame); +}; + +}} + +#endif diff --git a/cpp/src/qpid/framing/ChannelAdapter.h b/cpp/src/qpid/framing/ChannelAdapter.h index 5f92383ee3..1c3f29d762 100644 --- a/cpp/src/qpid/framing/ChannelAdapter.h +++ b/cpp/src/qpid/framing/ChannelAdapter.h @@ -52,7 +52,7 @@ class MethodContext; * Thread safety: OBJECT UNSAFE. Instances must not be called * concurrently. AMQP defines channels to be serialized. */ -class ChannelAdapter : private BodyHandler { +class ChannelAdapter : protected BodyHandler { public: /** *@param output Processed frames are forwarded to this handler. @@ -84,6 +84,10 @@ class ChannelAdapter : private BodyHandler { virtual bool isOpen() const = 0; + RequestId getFirstAckRequest() { return requester.getFirstAckRequest(); } + RequestId getLastAckRequest() { return requester.getLastAckRequest(); } + RequestId getNextSendRequestId() { return requester.getNextId(); } + protected: void assertMethodOk(AMQMethodBody& method) const; void assertChannelOpen() const; @@ -93,13 +97,9 @@ class ChannelAdapter : private BodyHandler { shared_ptr<AMQMethodBody> method, const MethodContext& context) = 0; - RequestId getFirstAckRequest() { return requester.getFirstAckRequest(); } - RequestId getLastAckRequest() { return requester.getLastAckRequest(); } - RequestId getNextSendRequestId() { return requester.getNextId(); } - private: class ChannelAdapterHandler; - friend class ChannelAdapterHandler; + friend class ChannelAdapterHandler; void handleMethod(shared_ptr<AMQMethodBody>); void handleRequest(shared_ptr<AMQRequestBody>); diff --git a/cpp/src/tests/BrokerChannelTest.cpp b/cpp/src/tests/BrokerChannelTest.cpp index 929105f6e3..251ac624ab 100644 --- a/cpp/src/tests/BrokerChannelTest.cpp +++ b/cpp/src/tests/BrokerChannelTest.cpp @@ -48,13 +48,38 @@ struct MockHandler : ConnectionOutputHandler{ void close() {}; }; +struct DeliveryRecorder +{ + typedef std::pair<Message::shared_ptr, RequestId> Delivery; + std::vector<Delivery> delivered; + + struct Adapter : DeliveryAdapter + { + RequestId id; + DeliveryRecorder& recorder; + + Adapter(DeliveryRecorder& r) : recorder(r) {} + + RequestId getNextDeliveryTag() { return id + 1; } + void deliver(Message::shared_ptr& msg, RequestId tag) + { + recorder.delivered.push_back(Delivery(msg, tag)); + id++; + } + + }; + + std::auto_ptr<DeliveryAdapter> createAdapter() + { + return std::auto_ptr<DeliveryAdapter>(new Adapter(*this)); + } +}; class BrokerChannelTest : public CppUnit::TestCase { CPPUNIT_TEST_SUITE(BrokerChannelTest); - CPPUNIT_TEST(testConsumerMgmt); + CPPUNIT_TEST(testConsumerMgmt);; CPPUNIT_TEST(testDeliveryNoAck); - CPPUNIT_TEST(testDeliveryAndRecovery); CPPUNIT_TEST(testStaging); CPPUNIT_TEST(testQueuePolicy); CPPUNIT_TEST(testFlow); @@ -160,11 +185,12 @@ class BrokerChannelTest : public CppUnit::TestCase ConnectionToken* owner = 0; string tag("my_consumer"); - channel.consume(tag, queue, false, false, owner); + std::auto_ptr<DeliveryAdapter> unused; + channel.consume(unused, tag, queue, false, false, owner); string tagA; string tagB; - channel.consume(tagA, queue, false, false, owner); - channel.consume(tagB, queue, false, false, owner); + channel.consume(unused, tagA, queue, false, false, owner); + channel.consume(unused, tagB, queue, false, false, owner); CPPUNIT_ASSERT_EQUAL((uint32_t) 3, queue->getConsumerCount()); CPPUNIT_ASSERT(channel.exists("my_consumer")); CPPUNIT_ASSERT(channel.exists(tagA)); @@ -178,65 +204,17 @@ class BrokerChannelTest : public CppUnit::TestCase CPPUNIT_ASSERT_EQUAL((uint32_t) 0, queue->getConsumerCount()); } - void testDeliveryNoAck(){ + void testDeliveryNoAck(){ Channel channel(connection, 7); - channel.open(); - const string data("abcdefghijklmn"); - Message::shared_ptr msg( - createMessage("test", "my_routing_key", "my_message_id", 14)); - addContent(msg, data); - Queue::shared_ptr queue(new Queue("my_queue")); - ConnectionToken* owner(0); - string tag("no_ack"); - channel.consume(tag, queue, false, false, owner); - - queue->deliver(msg); - CPPUNIT_ASSERT_EQUAL((size_t) 4, handler.frames.size()); - CPPUNIT_ASSERT_EQUAL(ChannelId(0), handler.frames[0].getChannel()); - CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[1].getChannel()); - CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[2].getChannel()); - CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[3].getChannel()); - CPPUNIT_ASSERT(dynamic_cast<ConnectionStartBody*>( - handler.frames[0].getBody().get())); - CPPUNIT_ASSERT(dynamic_cast<BasicDeliverBody*>( - handler.frames[1].getBody().get())); - CPPUNIT_ASSERT(dynamic_cast<AMQHeaderBody*>( - handler.frames[2].getBody().get())); - AMQContentBody* contentBody = dynamic_cast<AMQContentBody*>( - handler.frames[3].getBody().get()); - CPPUNIT_ASSERT(contentBody); - CPPUNIT_ASSERT_EQUAL(data, contentBody->getData()); - } - - void testDeliveryAndRecovery(){ - Channel channel(connection, 7); - channel.open(); - const string data("abcdefghijklmn"); - Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14)); - addContent(msg, data); - Queue::shared_ptr queue(new Queue("my_queue")); - ConnectionToken* owner(0); - string tag("ack"); - channel.consume(tag, queue, true, false, owner); - + DeliveryRecorder recorder; + string tag("test"); + channel.consume(recorder.createAdapter(), tag, queue, false, false, 0); queue->deliver(msg); - CPPUNIT_ASSERT_EQUAL((size_t) 4, handler.frames.size()); - CPPUNIT_ASSERT_EQUAL(ChannelId(0), handler.frames[0].getChannel()); - CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[1].getChannel()); - CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[2].getChannel()); - CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[3].getChannel()); - CPPUNIT_ASSERT(dynamic_cast<ConnectionStartBody*>( - handler.frames[0].getBody().get())); - CPPUNIT_ASSERT(dynamic_cast<BasicDeliverBody*>( - handler.frames[1].getBody().get())); - CPPUNIT_ASSERT(dynamic_cast<AMQHeaderBody*>( - handler.frames[2].getBody().get())); - AMQContentBody* contentBody = dynamic_cast<AMQContentBody*>( - handler.frames[3].getBody().get()); - CPPUNIT_ASSERT(contentBody); - CPPUNIT_ASSERT_EQUAL(data, contentBody->getData()); + + CPPUNIT_ASSERT_EQUAL((size_t) 1, recorder.delivered.size()); + CPPUNIT_ASSERT_EQUAL(msg, recorder.delivered.front().first); } void testStaging(){ @@ -349,26 +327,18 @@ class BrokerChannelTest : public CppUnit::TestCase Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14)); addContent(msg, data); Queue::shared_ptr queue(new Queue("my_queue")); - ConnectionToken* owner(0); - string tag("no_ack"); - channel.consume(tag, queue, false, false, owner); + DeliveryRecorder recorder; + string tag("test"); + channel.consume(recorder.createAdapter(), tag, queue, false, false, 0); channel.flow(false); queue->deliver(msg); - //ensure no more frames have been delivered - CPPUNIT_ASSERT_EQUAL((size_t) 1, handler.frames.size()); - CPPUNIT_ASSERT_EQUAL((u_int32_t) 1, queue->getMessageCount()); + //ensure no messages have been delivered + CPPUNIT_ASSERT_EQUAL((size_t) 0, recorder.delivered.size()); + channel.flow(true); - CPPUNIT_ASSERT_EQUAL((size_t) 4, handler.frames.size()); - CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[1].getChannel()); - CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[2].getChannel()); - CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[3].getChannel()); - BasicDeliverBody::shared_ptr deliver(dynamic_pointer_cast<BasicDeliverBody, AMQBody>(handler.frames[1].getBody())); - AMQHeaderBody::shared_ptr contentHeader(dynamic_pointer_cast<AMQHeaderBody, AMQBody>(handler.frames[2].getBody())); - AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[3].getBody())); - CPPUNIT_ASSERT(deliver); - CPPUNIT_ASSERT(contentHeader); - CPPUNIT_ASSERT(contentBody); - CPPUNIT_ASSERT_EQUAL(data, contentBody->getData()); + //ensure no messages have been delivered + CPPUNIT_ASSERT_EQUAL((size_t) 1, recorder.delivered.size()); + CPPUNIT_ASSERT_EQUAL(msg, recorder.delivered.front().first); } Message* createMessage(const string& exchange, const string& routingKey, const string& messageId, uint64_t contentSize) |