summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-07-17 08:28:48 +0000
committerGordon Sim <gsim@apache.org>2007-07-17 08:28:48 +0000
commitce9743f8f1640d42af5fe7aaa8fe7e3ca82a914d (patch)
treeea71b96a92eb5402b71a4c08312fbe1d8b835bbc
parent54b8fe305e87f623bbeb2c50bea20a332f71a983 (diff)
downloadqpid-python-ce9743f8f1640d42af5fe7aaa8fe7e3ca82a914d.tar.gz
Some refactoring towards a more decoupled handler chain structure:
* Connection no longer depends on Channel; it contains a map of FrameHandler::Chains. (The construction of the chains still refers to specific handlers). * Channel is no longer tied to ChannelAdapter through inheritance. The former is independent of any particular handler chain or protocol version, the latter is still used by ConnectionAdapter and SemanticHandler in the 0-9 chain. * A DeliveryAdapter interface has been introduced as part of the separation of ChannelAdapter from Channel. This is intended to adapt from a version independent core to version specific mechanisms for sending messages. i.e. it fulfills the same role for outputs that e.g. BrokerAdapter does for inputs. (Its not perfect yet by any means but is a step on the way to the correct model I think). * The connection related methods sent over channel zero are implemented in their own adapter (ConnectionAdapter), and are entirely separate from the semantic layer. The channel control methods are still bundled with the proper semantic layer methods; they too can be separated but would have to share the request id with the semantic method handler due to the nature of the 0-9 WIP. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@556846 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/Makefile.am7
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.cpp15
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.h6
-rw-r--r--cpp/src/qpid/broker/BrokerChannel.cpp99
-rw-r--r--cpp/src/qpid/broker/BrokerChannel.h38
-rw-r--r--cpp/src/qpid/broker/BrokerMessageBase.h2
-rw-r--r--cpp/src/qpid/broker/Connection.cpp13
-rw-r--r--cpp/src/qpid/broker/Connection.h4
-rw-r--r--cpp/src/qpid/broker/ConsumeAdapter.cpp37
-rw-r--r--cpp/src/qpid/broker/ConsumeAdapter.h43
-rw-r--r--cpp/src/qpid/broker/DeliveryAdapter.h51
-rw-r--r--cpp/src/qpid/broker/GetAdapter.cpp41
-rw-r--r--cpp/src/qpid/broker/GetAdapter.h47
-rw-r--r--cpp/src/qpid/broker/HandlerImpl.h5
-rw-r--r--cpp/src/qpid/broker/MessageHandlerImpl.cpp11
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.cpp89
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.h55
-rw-r--r--cpp/src/qpid/framing/ChannelAdapter.h12
-rw-r--r--cpp/src/tests/BrokerChannelTest.cpp124
19 files changed, 506 insertions, 193 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 09d3f6185d..3399d861f2 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -197,6 +197,7 @@ libqpidbroker_la_SOURCES = \
qpid/broker/Connection.cpp \
qpid/broker/ConnectionAdapter.cpp \
qpid/broker/ConnectionFactory.cpp \
+ qpid/broker/ConsumeAdapter.cpp \
qpid/broker/Daemon.cpp \
qpid/broker/DeliverableMessage.cpp \
qpid/broker/DeliveryRecord.cpp \
@@ -209,6 +210,7 @@ libqpidbroker_la_SOURCES = \
qpid/broker/DtxWorkRecord.cpp \
qpid/broker/ExchangeRegistry.cpp \
qpid/broker/FanOutExchange.cpp \
+ qpid/broker/GetAdapter.cpp \
qpid/broker/HeadersExchange.cpp \
qpid/broker/InMemoryContent.cpp \
qpid/broker/LazyLoadedContent.cpp \
@@ -224,6 +226,7 @@ libqpidbroker_la_SOURCES = \
qpid/broker/RecoveredEnqueue.cpp \
qpid/broker/RecoveredDequeue.cpp \
qpid/broker/Reference.cpp \
+ qpid/broker/SemanticHandler.cpp \
qpid/broker/Timer.cpp \
qpid/broker/TopicExchange.cpp \
qpid/broker/TxAck.cpp \
@@ -253,9 +256,11 @@ nobase_include_HEADERS = \
qpid/broker/BrokerMessageBase.h \
qpid/broker/BrokerQueue.h \
qpid/broker/CompletionHandler.h \
+ qpid/broker/ConsumeAdapter.h \
qpid/broker/Consumer.h \
qpid/broker/Deliverable.h \
qpid/broker/DeliverableMessage.h \
+ qpid/broker/DeliveryAdapter.h \
qpid/broker/DirectExchange.h \
qpid/broker/DtxAck.h \
qpid/broker/DtxBuffer.h \
@@ -265,6 +270,7 @@ nobase_include_HEADERS = \
qpid/broker/DtxWorkRecord.h \
qpid/broker/ExchangeRegistry.h \
qpid/broker/FanOutExchange.h \
+ qpid/broker/GetAdapter.h \
qpid/broker/HandlerImpl.h \
qpid/broker/InMemoryContent.h \
qpid/broker/MessageBuilder.h \
@@ -306,6 +312,7 @@ nobase_include_HEADERS = \
qpid/broker/PersistableQueue.h \
qpid/broker/QueuePolicy.h \
qpid/broker/RecoveryManagerImpl.h \
+ qpid/broker/SemanticHandler.h \
qpid/broker/Timer.h \
qpid/broker/TopicExchange.h \
qpid/broker/TransactionalStore.h \
diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp
index bbf6686a6c..f0dc159752 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.cpp
+++ b/cpp/src/qpid/broker/BrokerAdapter.cpp
@@ -20,6 +20,8 @@
#include "BrokerAdapter.h"
#include "BrokerChannel.h"
#include "Connection.h"
+#include "ConsumeAdapter.h"
+#include "GetAdapter.h"
#include "qpid/framing/AMQMethodBody.h"
#include "qpid/Exception.h"
@@ -33,8 +35,8 @@ using namespace qpid::framing;
typedef std::vector<Queue::shared_ptr> QueueVector;
-BrokerAdapter::BrokerAdapter(Channel& ch, Connection& c, Broker& b) :
- CoreRefs(ch, c, b),
+ BrokerAdapter::BrokerAdapter(Channel& ch, Connection& c, Broker& b, ChannelAdapter& a) :
+ CoreRefs(ch, c, b, a),
connection(c),
basicHandler(*this),
channelHandler(*this),
@@ -299,9 +301,11 @@ void BrokerAdapter::BasicHandlerImpl::consume(
if(!consumerTag.empty() && channel.exists(consumerTag)){
throw ConnectionException(530, "Consumer tags must be unique");
}
-
string newTag = consumerTag;
- channel.consume(
+ //need to generate name here, so we have it for the adapter (it is
+ //also version specific behaviour now)
+ if (newTag.empty()) newTag = tagGenerator.generate();
+ channel.consume(std::auto_ptr<DeliveryAdapter>(new ConsumeAdapter(adapter, newTag, connection.getFrameMax())),
newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &fields);
if(!nowait) client.consumeOk(newTag, context.getRequestId());
@@ -336,7 +340,8 @@ void BrokerAdapter::BasicHandlerImpl::publish(
void BrokerAdapter::BasicHandlerImpl::get(const MethodContext& context, uint16_t /*ticket*/, const string& queueName, bool noAck){
Queue::shared_ptr queue = getQueue(queueName);
- if(!channel.get(queue, "", !noAck)){
+ GetAdapter out(adapter, queue, "", connection.getFrameMax());
+ if(!channel.get(out, queue, !noAck)){
string clusterId;//not used, part of an imatix hack
client.getEmpty(clusterId, context.getRequestId());
diff --git a/cpp/src/qpid/broker/BrokerAdapter.h b/cpp/src/qpid/broker/BrokerAdapter.h
index c66bdb3a31..795744aa9a 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.h
+++ b/cpp/src/qpid/broker/BrokerAdapter.h
@@ -56,7 +56,7 @@ class MessageHandlerImpl;
class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations
{
public:
- BrokerAdapter(Channel& ch, Connection& c, Broker& b);
+ BrokerAdapter(Channel& ch, Connection& c, Broker& b, framing::ChannelAdapter& a);
framing::ProtocolVersion getVersion() const;
ChannelHandler* getChannelHandler() { return &channelHandler; }
@@ -172,8 +172,10 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations
public BasicHandler,
public HandlerImpl<framing::AMQP_ClientProxy::Basic>
{
+ NameGenerator tagGenerator;
+
public:
- BasicHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {}
+ BasicHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent), tagGenerator("sgen") {}
void qos(const framing::MethodContext& context, uint32_t prefetchSize,
uint16_t prefetchCount, bool global);
diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp
index c81e73aba1..523a834715 100644
--- a/cpp/src/qpid/broker/BrokerChannel.cpp
+++ b/cpp/src/qpid/broker/BrokerChannel.cpp
@@ -28,7 +28,6 @@
#include <boost/bind.hpp>
#include <boost/format.hpp>
-#include "qpid/framing/ChannelAdapter.h"
#include "qpid/QpidError.h"
#include "BrokerAdapter.h"
@@ -50,8 +49,8 @@ using namespace qpid::framing;
using namespace qpid::sys;
-Channel::Channel(Connection& con, ChannelId id, MessageStore* const _store) :
- ChannelAdapter(),
+Channel::Channel(Connection& con, ChannelId _id, MessageStore* const _store) :
+ id(_id),
connection(con),
currentDeliveryTag(1),
prefetchSize(0),
@@ -62,10 +61,8 @@ Channel::Channel(Connection& con, ChannelId id, MessageStore* const _store) :
store(_store),
messageBuilder(this, _store, connection.getStagingThreshold()),
opened(id == 0),//channel 0 is automatically open, other must be explicitly opened
- flowActive(true),
- adapter(new BrokerAdapter(*this, con, con.broker))
+ flowActive(true)
{
- init(id, con.getOutput(), con.getVersion());
outstanding.reset();
}
@@ -79,14 +76,15 @@ bool Channel::exists(const string& consumerTag){
// TODO aconway 2007-02-12: Why is connection token passed in instead
// of using the channel's parent connection?
-void Channel::consume(string& tagInOut, Queue::shared_ptr queue, bool acks,
+void Channel::consume(std::auto_ptr<DeliveryAdapter> adapter, string& tagInOut,
+ Queue::shared_ptr queue, bool acks,
bool exclusive, ConnectionToken* const connection,
const FieldTable*)
{
if(tagInOut.empty())
tagInOut = tagGenerator.generate();
std::auto_ptr<ConsumerImpl> c(
- new ConsumerImpl(this, tagInOut, queue, connection, acks));
+ new ConsumerImpl(this, adapter, tagInOut, queue, connection, acks));
queue->consume(c.get(), exclusive);//may throw exception
consumers.insert(tagInOut, c.release());
}
@@ -195,22 +193,10 @@ void Channel::checkDtxTimeout()
}
}
-void Channel::deliver(
- Message::shared_ptr& msg, const string& consumerTag,
- Queue::shared_ptr& queue, bool ackExpected)
+void Channel::record(const DeliveryRecord& delivery)
{
- Mutex::ScopedLock locker(deliveryLock);
-
- // Key the delivered messages to the id of the request in which they're sent
- uint64_t deliveryTag = getNextSendRequestId();
-
- if(ackExpected){
- unacked.push_back(DeliveryRecord(msg, queue, consumerTag, deliveryTag));
- outstanding.size += msg->contentSize();
- outstanding.count++;
- }
- //send deliver method, header and content(s)
- msg->deliver(*this, consumerTag, deliveryTag, connection.getFrameMax());
+ unacked.push_back(delivery);
+ delivery.addTo(&outstanding);
}
bool Channel::checkPrefetch(Message::shared_ptr& msg){
@@ -220,11 +206,11 @@ bool Channel::checkPrefetch(Message::shared_ptr& msg){
return countOk && sizeOk;
}
-Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, const string& _tag,
- Queue::shared_ptr _queue,
- ConnectionToken* const _connection, bool ack
-) : parent(_parent), tag(_tag), queue(_queue), connection(_connection),
- ackExpected(ack), blocked(false) {}
+Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, std::auto_ptr<DeliveryAdapter> _adapter,
+ const string& _tag, Queue::shared_ptr _queue,
+ ConnectionToken* const _connection, bool ack
+ ) : parent(_parent), adapter(_adapter), tag(_tag), queue(_queue), connection(_connection),
+ ackExpected(ack), blocked(false) {}
bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){
if(!connection || connection != msg->getPublisher()){//check for no_local
@@ -232,13 +218,25 @@ bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){
blocked = true;
}else{
blocked = false;
- parent->deliver(msg, tag, queue, ackExpected);
+ Mutex::ScopedLock locker(parent->deliveryLock);
+
+ uint64_t deliveryTag = adapter->getNextDeliveryTag();
+ if(ackExpected){
+ parent->record(DeliveryRecord(msg, queue, tag, deliveryTag));
+ }
+ adapter->deliver(msg, deliveryTag);
+
return true;
}
}
return false;
}
+void Channel::ConsumerImpl::redeliver(Message::shared_ptr& msg, uint64_t deliveryTag) {
+ Mutex::ScopedLock locker(parent->deliveryLock);
+ adapter->deliver(msg, deliveryTag);
+}
+
Channel::ConsumerImpl::~ConsumerImpl() {
cancel();
}
@@ -298,10 +296,6 @@ void Channel::complete(Message::shared_ptr msg) {
}
}
-void Channel::ack(){
- ack(getFirstAckRequest(), getLastAckRequest());
-}
-
// Used by Basic
void Channel::ack(uint64_t deliveryTag, bool multiple){
if (multiple)
@@ -365,15 +359,12 @@ void Channel::recover(bool requeue){
}
}
-bool Channel::get(Queue::shared_ptr queue, const string& destination, bool ackExpected){
+bool Channel::get(DeliveryAdapter& adapter, Queue::shared_ptr queue, bool ackExpected){
Message::shared_ptr msg = queue->dequeue();
if(msg){
Mutex::ScopedLock locker(deliveryLock);
- uint64_t myDeliveryTag = getNextSendRequestId();
- msg->sendGetOk(MethodContext(this, msg->getRespondTo()),
- destination,
- queue->getMessageCount() + 1, myDeliveryTag,
- connection.getFrameMax());
+ uint64_t myDeliveryTag = adapter.getNextDeliveryTag();
+ adapter.deliver(msg, myDeliveryTag);
if(ackExpected){
unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag));
}
@@ -386,33 +377,9 @@ bool Channel::get(Queue::shared_ptr queue, const string& destination, bool ackEx
void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag,
uint64_t deliveryTag)
{
- msg->deliver(*this, consumerTag, deliveryTag, connection.getFrameMax());
-}
-
-void Channel::handleMethodInContext(
- boost::shared_ptr<qpid::framing::AMQMethodBody> method,
- const MethodContext& context
-)
-{
- try{
- if(getId() != 0 && !method->isA<ChannelOpenBody>() && !isOpen()) {
- if (!method->isA<ChannelCloseOkBody>()) {
- std::stringstream out;
- out << "Attempt to use unopened channel: " << getId();
- throw ConnectionException(504, out.str());
- }
- } else {
- method->invoke(*adapter, context);
- }
- }catch(ChannelException& e){
- adapter->getProxy().getChannel().close(
- e.code, e.toString(),
- method->amqpClassId(), method->amqpMethodId());
- connection.closeChannel(getId());
- }catch(ConnectionException& e){
- connection.close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId());
- }catch(std::exception& e){
- connection.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId());
+ ConsumerImplMap::iterator i = consumers.find(consumerTag);
+ if (i != consumers.end()){
+ i->redeliver(msg, deliveryTag);
}
}
diff --git a/cpp/src/qpid/broker/BrokerChannel.h b/cpp/src/qpid/broker/BrokerChannel.h
index 9212e8f632..a2b6bd3ef9 100644
--- a/cpp/src/qpid/broker/BrokerChannel.h
+++ b/cpp/src/qpid/broker/BrokerChannel.h
@@ -23,6 +23,7 @@
*/
#include <list>
+#include <memory>
#include <boost/scoped_ptr.hpp>
#include <boost/shared_ptr.hpp>
@@ -30,6 +31,7 @@
#include "AccumulatedAck.h"
#include "Consumer.h"
+#include "DeliveryAdapter.h"
#include "DeliveryRecord.h"
#include "DtxBuffer.h"
#include "DtxManager.h"
@@ -37,6 +39,7 @@
#include "NameGenerator.h"
#include "Prefetch.h"
#include "TxBuffer.h"
+#include "qpid/framing/amqp_types.h"
#include "qpid/framing/ChannelAdapter.h"
#include "qpid/framing/ChannelOpenBody.h"
#include "CompletionHandler.h"
@@ -55,12 +58,12 @@ using framing::string;
* Maintains state for an AMQP channel. Handles incoming and
* outgoing messages for that channel.
*/
-class Channel : public framing::ChannelAdapter,
- public CompletionHandler
+class Channel : public CompletionHandler
{
class ConsumerImpl : public Consumer
{
Channel* parent;
+ std::auto_ptr<DeliveryAdapter> adapter;
const string tag;
Queue::shared_ptr queue;
ConnectionToken* const connection;
@@ -68,17 +71,19 @@ class Channel : public framing::ChannelAdapter,
bool blocked;
public:
- ConsumerImpl(Channel* parent, const string& tag,
- Queue::shared_ptr queue,
+ ConsumerImpl(Channel* parent, std::auto_ptr<DeliveryAdapter> adapter,
+ const string& tag, Queue::shared_ptr queue,
ConnectionToken* const connection, bool ack);
~ConsumerImpl();
- virtual bool deliver(Message::shared_ptr& msg);
+ bool deliver(Message::shared_ptr& msg);
+ void redeliver(Message::shared_ptr& msg, uint64_t deliveryTag);
void cancel();
void requestDispatch();
};
typedef boost::ptr_map<string,ConsumerImpl> ConsumerImplMap;
+ framing::ChannelId id;
Connection& connection;
uint64_t currentDeliveryTag;
Queue::shared_ptr defaultQueue;
@@ -97,15 +102,10 @@ class Channel : public framing::ChannelAdapter,
MessageBuilder messageBuilder;//builder for in-progress message
bool opened;
bool flowActive;
- boost::scoped_ptr<BrokerAdapter> adapter;
-
- // completion handler for MessageBuilder
- void complete(Message::shared_ptr msg);
-
- void deliver(Message::shared_ptr& msg, const string& tag,
- Queue::shared_ptr& queue, bool ackExpected);
+
+ void complete(Message::shared_ptr msg);// completion handler for MessageBuilder
+ void record(const DeliveryRecord& delivery);
bool checkPrefetch(Message::shared_ptr& msg);
-
void checkDtxTimeout();
public:
@@ -113,7 +113,7 @@ class Channel : public framing::ChannelAdapter,
~Channel();
bool isOpen() const { return opened; }
- BrokerAdapter& getAdapter() { return *adapter; }
+ framing::ChannelId getId() const { return id; }
void open() { opened = true; }
void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; }
@@ -126,11 +126,11 @@ class Channel : public framing::ChannelAdapter,
/**
*@param tagInOut - if empty it is updated with the generated token.
*/
- void consume(string& tagInOut, Queue::shared_ptr queue, bool acks,
+ void consume(std::auto_ptr<DeliveryAdapter> adapter, string& tagInOut, Queue::shared_ptr queue, bool acks,
bool exclusive, ConnectionToken* const connection = 0,
const framing::FieldTable* = 0);
void cancel(const string& tag);
- bool get(Queue::shared_ptr queue, const std::string& destination, bool ackExpected);
+ bool get(DeliveryAdapter& adapter, Queue::shared_ptr queue, bool ackExpected);
void close();
void startTx();
void commit();
@@ -140,7 +140,6 @@ class Channel : public framing::ChannelAdapter,
void endDtx(const std::string& xid, bool fail);
void suspendDtx(const std::string& xid);
void resumeDtx(const std::string& xid);
- void ack();
void ack(uint64_t deliveryTag, bool multiple);
void ack(uint64_t deliveryTag, uint64_t endTag);
void recover(bool requeue);
@@ -152,11 +151,6 @@ class Channel : public framing::ChannelAdapter,
void handleHeartbeat(boost::shared_ptr<framing::AMQHeartbeatBody>);
void handleInlineTransfer(Message::shared_ptr msg);
-
- // For ChannelAdapter
- void handleMethodInContext(
- boost::shared_ptr<framing::AMQMethodBody> method,
- const framing::MethodContext& context);
};
}} // namespace broker
diff --git a/cpp/src/qpid/broker/BrokerMessageBase.h b/cpp/src/qpid/broker/BrokerMessageBase.h
index a254986fd9..048b1c80e2 100644
--- a/cpp/src/qpid/broker/BrokerMessageBase.h
+++ b/cpp/src/qpid/broker/BrokerMessageBase.h
@@ -103,7 +103,7 @@ class Message : public PersistableMessage{
* Used to return a message in response to a get from a queue
*/
virtual void sendGetOk(const framing::MethodContext& context,
- const std::string& destination,
+ const std::string& destination,
uint32_t messageCount,
uint64_t deliveryTag,
uint32_t framesize) = 0;
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index cdbcee1c69..dfe2101bc0 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -26,6 +26,7 @@
#include "BrokerChannel.h"
#include "qpid/framing/AMQP_ClientProxy.h"
#include "BrokerAdapter.h"
+#include "SemanticHandler.h"
using namespace boost;
using namespace qpid::sys;
@@ -55,7 +56,7 @@ void Connection::received(framing::AMQFrame& frame){
if (frame.getChannel() == 0) {
adapter.handle(frame);
} else {
- getChannel((frame.getChannel())).getHandlers().in->handle(frame);
+ getChannel((frame.getChannel())).in->handle(frame);
}
}
@@ -92,17 +93,17 @@ void Connection::closed(){
void Connection::closeChannel(uint16_t id) {
ChannelMap::iterator i = channels.find(id);
- if (i != channels.end())
- i->close();
+ if (i != channels.end()) channels.erase(i);
}
-Channel& Connection::getChannel(ChannelId id) {
+FrameHandler::Chains& Connection::getChannel(ChannelId id) {
ChannelMap::iterator i = channels.find(id);
if (i == channels.end()) {
- i = channels.insert(id, new Channel(*this, id, &broker.getStore())).first;
+ FrameHandler::Chains chains(new SemanticHandler(id, *this), new OutputHandlerFrameHandler(*out));
+ i = channels.insert(ChannelMap::value_type(id, chains)).first;
}
- return *i;
+ return i->second;
}
diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h
index a885ac4065..e38f88c2e9 100644
--- a/cpp/src/qpid/broker/Connection.h
+++ b/cpp/src/qpid/broker/Connection.h
@@ -51,7 +51,7 @@ class Connection : public sys::ConnectionInputHandler,
Connection(sys::ConnectionOutputHandler* out, Broker& broker);
/** Get a channel. Create if it does not already exist */
- Channel& getChannel(framing::ChannelId channel);
+ framing::FrameHandler::Chains& getChannel(framing::ChannelId channel);
/** Close a channel */
void closeChannel(framing::ChannelId channel);
@@ -82,7 +82,7 @@ class Connection : public sys::ConnectionInputHandler,
void closed();
private:
- typedef boost::ptr_map<framing::ChannelId, Channel> ChannelMap;
+ typedef std::map<framing::ChannelId, framing::FrameHandler::Chains> ChannelMap;
typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
Exchange::shared_ptr findExchange(const string& name);
diff --git a/cpp/src/qpid/broker/ConsumeAdapter.cpp b/cpp/src/qpid/broker/ConsumeAdapter.cpp
new file mode 100644
index 0000000000..59b6795a77
--- /dev/null
+++ b/cpp/src/qpid/broker/ConsumeAdapter.cpp
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ConsumeAdapter.h"
+
+using namespace qpid::broker;
+using qpid::framing::ChannelAdapter;
+using qpid::framing::RequestId;
+
+ConsumeAdapter::ConsumeAdapter(ChannelAdapter& a, const std::string t, uint32_t f) : adapter(a), tag(t), framesize(f) {}
+
+RequestId ConsumeAdapter::getNextDeliveryTag()
+{
+ return adapter.getNextSendRequestId();
+}
+
+void ConsumeAdapter::deliver(Message::shared_ptr& msg, RequestId deliveryTag)
+{
+ msg->deliver(adapter, tag, deliveryTag, framesize);
+}
diff --git a/cpp/src/qpid/broker/ConsumeAdapter.h b/cpp/src/qpid/broker/ConsumeAdapter.h
new file mode 100644
index 0000000000..43cda7753e
--- /dev/null
+++ b/cpp/src/qpid/broker/ConsumeAdapter.h
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _ConsumeAdapter_
+#define _ConsumeAdapter_
+
+#include "DeliveryAdapter.h"
+#include "qpid/framing/ChannelAdapter.h"
+
+namespace qpid {
+namespace broker {
+ class ConsumeAdapter : public DeliveryAdapter
+ {
+ framing::ChannelAdapter& adapter;
+ const std::string tag;
+ const uint32_t framesize;
+ public:
+ ConsumeAdapter(framing::ChannelAdapter& adapter, const std::string tag, uint32_t framesize);
+ framing::RequestId getNextDeliveryTag();
+ void deliver(Message::shared_ptr& msg, framing::RequestId tag);
+ };
+
+}}
+
+
+#endif
diff --git a/cpp/src/qpid/broker/DeliveryAdapter.h b/cpp/src/qpid/broker/DeliveryAdapter.h
new file mode 100644
index 0000000000..45b103bd68
--- /dev/null
+++ b/cpp/src/qpid/broker/DeliveryAdapter.h
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _DeliveryAdapter_
+#define _DeliveryAdapter_
+
+#include "BrokerMessageBase.h"
+#include "qpid/framing/amqp_types.h"
+
+namespace qpid {
+namespace broker {
+
+ /**
+ * The intention behind this interface is to separate the generic
+ * handling of some form of message delivery to clients that is
+ * contained in the version independent Channel class from the
+ * details required for a particular situation or
+ * version. i.e. where the existing adapters allow (through
+ * supporting the generated interface for a version of the
+ * protocol) inputs of a channel to be adapted to the version
+ * independent part, this does the same for the outputs.
+ */
+ class DeliveryAdapter
+ {
+ public:
+ virtual framing::RequestId getNextDeliveryTag() = 0;
+ virtual void deliver(Message::shared_ptr& msg, framing::RequestId tag) = 0;
+ virtual ~DeliveryAdapter(){}
+ };
+
+}}
+
+
+#endif
diff --git a/cpp/src/qpid/broker/GetAdapter.cpp b/cpp/src/qpid/broker/GetAdapter.cpp
new file mode 100644
index 0000000000..4a2f6d34d4
--- /dev/null
+++ b/cpp/src/qpid/broker/GetAdapter.cpp
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "GetAdapter.h"
+#include "qpid/framing/MethodContext.h"
+
+using namespace qpid::broker;
+using qpid::framing::ChannelAdapter;
+using qpid::framing::RequestId;
+using qpid::framing::MethodContext;
+
+GetAdapter::GetAdapter(ChannelAdapter& a, Queue::shared_ptr q, const std::string d, uint32_t f)
+ : adapter(a), queue(q), destination(d), framesize(f) {}
+
+RequestId GetAdapter::getNextDeliveryTag()
+{
+ return adapter.getNextSendRequestId();
+}
+
+void GetAdapter::deliver(Message::shared_ptr& msg, framing::RequestId deliveryTag)
+{
+ msg->sendGetOk(MethodContext(&adapter, msg->getRespondTo()), destination,
+ queue->getMessageCount(), deliveryTag, framesize);
+}
diff --git a/cpp/src/qpid/broker/GetAdapter.h b/cpp/src/qpid/broker/GetAdapter.h
new file mode 100644
index 0000000000..e90619a5f3
--- /dev/null
+++ b/cpp/src/qpid/broker/GetAdapter.h
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _GetAdapter_
+#define _GetAdapter_
+
+#include "BrokerQueue.h"
+#include "DeliveryAdapter.h"
+#include "qpid/framing/ChannelAdapter.h"
+
+namespace qpid {
+namespace broker {
+
+ class GetAdapter : public DeliveryAdapter
+ {
+ framing::ChannelAdapter& adapter;
+ Queue::shared_ptr queue;
+ const std::string destination;
+ const uint32_t framesize;
+ public:
+ GetAdapter(framing::ChannelAdapter& adapter, Queue::shared_ptr queue, const std::string destination, uint32_t framesize);
+ ~GetAdapter(){}
+ framing::RequestId getNextDeliveryTag();
+ void deliver(Message::shared_ptr& msg, framing::RequestId tag);
+ };
+
+}}
+
+
+#endif
diff --git a/cpp/src/qpid/broker/HandlerImpl.h b/cpp/src/qpid/broker/HandlerImpl.h
index 008be10867..96bf065062 100644
--- a/cpp/src/qpid/broker/HandlerImpl.h
+++ b/cpp/src/qpid/broker/HandlerImpl.h
@@ -40,12 +40,13 @@ class Connection;
*/
struct CoreRefs
{
- CoreRefs(Channel& ch, Connection& c, Broker& b)
- : channel(ch), connection(c), broker(b), proxy(ch) {}
+ CoreRefs(Channel& ch, Connection& c, Broker& b, framing::ChannelAdapter& a)
+ : channel(ch), connection(c), broker(b), adapter(a), proxy(a) {}
Channel& channel;
Connection& connection;
Broker& broker;
+ framing::ChannelAdapter& adapter;
framing::AMQP_ClientProxy proxy;
/**
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
index bbfcf209ad..f586ea92fc 100644
--- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
@@ -21,6 +21,8 @@
#include "BrokerChannel.h"
#include "qpid/framing/FramingContent.h"
#include "Connection.h"
+#include "ConsumeAdapter.h"
+#include "GetAdapter.h"
#include "Broker.h"
#include "BrokerMessageMessage.h"
#include "qpid/framing/MessageAppendBody.h"
@@ -127,7 +129,7 @@ MessageHandlerImpl::consume(const MethodContext& context,
if(!destination.empty() && channel.exists(destination))
throw ConnectionException(530, "Consumer tags must be unique");
string tag = destination;
- channel.consume(
+ channel.consume(std::auto_ptr<DeliveryAdapter>(new ConsumeAdapter(adapter, destination, connection.getFrameMax())),
tag, queue, !noAck, exclusive,
noLocal ? &connection : 0, &filter);
client.ok(context.getRequestId());
@@ -144,7 +146,8 @@ MessageHandlerImpl::get( const MethodContext& context,
{
Queue::shared_ptr queue = getQueue(queueName);
- if(channel.get(queue, destination, !noAck))
+ GetAdapter out(adapter, queue, destination, connection.getFrameMax());
+ if(channel.get(out, queue, !noAck))
client.ok(context.getRequestId());
else
client.empty(context.getRequestId());
@@ -162,7 +165,7 @@ MessageHandlerImpl::empty( const MethodContext& )
void
MessageHandlerImpl::ok(const MethodContext& /*context*/)
{
- channel.ack();
+ channel.ack(adapter.getFirstAckRequest(), adapter.getLastAckRequest());
}
void
@@ -190,7 +193,7 @@ MessageHandlerImpl::reject(const MethodContext& /*context*/,
uint16_t /*code*/,
const string& /*text*/ )
{
- channel.ack();
+ //channel.ack();
// channel.requeue();
}
diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp
new file mode 100644
index 0000000000..df92f74b14
--- /dev/null
+++ b/cpp/src/qpid/broker/SemanticHandler.cpp
@@ -0,0 +1,89 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "SemanticHandler.h"
+#include "BrokerAdapter.h"
+#include "qpid/framing/ChannelAdapter.h"
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+SemanticHandler::SemanticHandler(ChannelId id, Connection& c) :
+ connection(c),
+ channel(c, id, &c.broker.getStore())
+{
+ init(id, connection.getOutput(), connection.getVersion());
+ adapter = std::auto_ptr<BrokerAdapter>(new BrokerAdapter(channel, connection, connection.broker, *this));
+}
+
+
+void SemanticHandler::handle(framing::AMQFrame& frame)
+{
+ handleBody(frame.getBody());
+}
+
+//ChannelAdapter virtual methods:
+void SemanticHandler::handleMethodInContext(boost::shared_ptr<qpid::framing::AMQMethodBody> method,
+ const qpid::framing::MethodContext& context)
+{
+ try{
+ if(getId() != 0 && !method->isA<ChannelOpenBody>() && !isOpen()) {
+ if (!method->isA<ChannelCloseOkBody>()) {
+ std::stringstream out;
+ out << "Attempt to use unopened channel: " << getId();
+ throw ConnectionException(504, out.str());
+ }
+ } else {
+ method->invoke(*adapter, context);
+ }
+ }catch(ChannelException& e){
+ adapter->getProxy().getChannel().close(
+ e.code, e.toString(),
+ method->amqpClassId(), method->amqpMethodId());
+ connection.closeChannel(getId());
+ }catch(ConnectionException& e){
+ connection.close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId());
+ }catch(std::exception& e){
+ connection.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId());
+ }
+
+}
+
+bool SemanticHandler::isOpen() const
+{
+ return channel.isOpen();
+}
+
+void SemanticHandler::handleHeader(boost::shared_ptr<qpid::framing::AMQHeaderBody> body)
+{
+ channel.handleHeader(body);
+}
+
+void SemanticHandler::handleContent(boost::shared_ptr<qpid::framing::AMQContentBody> body)
+{
+ channel.handleContent(body);
+}
+
+void SemanticHandler::handleHeartbeat(boost::shared_ptr<qpid::framing::AMQHeartbeatBody> body)
+{
+ channel.handleHeartbeat(body);
+}
+
diff --git a/cpp/src/qpid/broker/SemanticHandler.h b/cpp/src/qpid/broker/SemanticHandler.h
new file mode 100644
index 0000000000..a179969ece
--- /dev/null
+++ b/cpp/src/qpid/broker/SemanticHandler.h
@@ -0,0 +1,55 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _SemanticHandler_
+#define _SemanticHandler_
+
+#include <memory>
+#include "BrokerChannel.h"
+#include "Connection.h"
+#include "qpid/framing/amqp_types.h"
+#include "qpid/framing/FrameHandler.h"
+
+namespace qpid {
+namespace broker {
+
+class BrokerAdapter;
+class framing::ChannelAdapter;
+
+class SemanticHandler : private framing::ChannelAdapter, public framing::FrameHandler {
+ Connection& connection;
+ Channel channel;
+ std::auto_ptr<BrokerAdapter> adapter;
+
+ //ChannelAdapter virtual methods:
+ void handleMethodInContext(boost::shared_ptr<qpid::framing::AMQMethodBody> method,
+ const qpid::framing::MethodContext& context);
+ bool isOpen() const;
+ void handleHeader(boost::shared_ptr<qpid::framing::AMQHeaderBody>);
+ void handleContent(boost::shared_ptr<qpid::framing::AMQContentBody>);
+ void handleHeartbeat(boost::shared_ptr<qpid::framing::AMQHeartbeatBody>);
+public:
+ SemanticHandler(framing::ChannelId id, Connection& c);
+ void handle(framing::AMQFrame& frame);
+};
+
+}}
+
+#endif
diff --git a/cpp/src/qpid/framing/ChannelAdapter.h b/cpp/src/qpid/framing/ChannelAdapter.h
index 5f92383ee3..1c3f29d762 100644
--- a/cpp/src/qpid/framing/ChannelAdapter.h
+++ b/cpp/src/qpid/framing/ChannelAdapter.h
@@ -52,7 +52,7 @@ class MethodContext;
* Thread safety: OBJECT UNSAFE. Instances must not be called
* concurrently. AMQP defines channels to be serialized.
*/
-class ChannelAdapter : private BodyHandler {
+class ChannelAdapter : protected BodyHandler {
public:
/**
*@param output Processed frames are forwarded to this handler.
@@ -84,6 +84,10 @@ class ChannelAdapter : private BodyHandler {
virtual bool isOpen() const = 0;
+ RequestId getFirstAckRequest() { return requester.getFirstAckRequest(); }
+ RequestId getLastAckRequest() { return requester.getLastAckRequest(); }
+ RequestId getNextSendRequestId() { return requester.getNextId(); }
+
protected:
void assertMethodOk(AMQMethodBody& method) const;
void assertChannelOpen() const;
@@ -93,13 +97,9 @@ class ChannelAdapter : private BodyHandler {
shared_ptr<AMQMethodBody> method,
const MethodContext& context) = 0;
- RequestId getFirstAckRequest() { return requester.getFirstAckRequest(); }
- RequestId getLastAckRequest() { return requester.getLastAckRequest(); }
- RequestId getNextSendRequestId() { return requester.getNextId(); }
-
private:
class ChannelAdapterHandler;
- friend class ChannelAdapterHandler;
+ friend class ChannelAdapterHandler;
void handleMethod(shared_ptr<AMQMethodBody>);
void handleRequest(shared_ptr<AMQRequestBody>);
diff --git a/cpp/src/tests/BrokerChannelTest.cpp b/cpp/src/tests/BrokerChannelTest.cpp
index 929105f6e3..251ac624ab 100644
--- a/cpp/src/tests/BrokerChannelTest.cpp
+++ b/cpp/src/tests/BrokerChannelTest.cpp
@@ -48,13 +48,38 @@ struct MockHandler : ConnectionOutputHandler{
void close() {};
};
+struct DeliveryRecorder
+{
+ typedef std::pair<Message::shared_ptr, RequestId> Delivery;
+ std::vector<Delivery> delivered;
+
+ struct Adapter : DeliveryAdapter
+ {
+ RequestId id;
+ DeliveryRecorder& recorder;
+
+ Adapter(DeliveryRecorder& r) : recorder(r) {}
+
+ RequestId getNextDeliveryTag() { return id + 1; }
+ void deliver(Message::shared_ptr& msg, RequestId tag)
+ {
+ recorder.delivered.push_back(Delivery(msg, tag));
+ id++;
+ }
+
+ };
+
+ std::auto_ptr<DeliveryAdapter> createAdapter()
+ {
+ return std::auto_ptr<DeliveryAdapter>(new Adapter(*this));
+ }
+};
class BrokerChannelTest : public CppUnit::TestCase
{
CPPUNIT_TEST_SUITE(BrokerChannelTest);
- CPPUNIT_TEST(testConsumerMgmt);
+ CPPUNIT_TEST(testConsumerMgmt);;
CPPUNIT_TEST(testDeliveryNoAck);
- CPPUNIT_TEST(testDeliveryAndRecovery);
CPPUNIT_TEST(testStaging);
CPPUNIT_TEST(testQueuePolicy);
CPPUNIT_TEST(testFlow);
@@ -160,11 +185,12 @@ class BrokerChannelTest : public CppUnit::TestCase
ConnectionToken* owner = 0;
string tag("my_consumer");
- channel.consume(tag, queue, false, false, owner);
+ std::auto_ptr<DeliveryAdapter> unused;
+ channel.consume(unused, tag, queue, false, false, owner);
string tagA;
string tagB;
- channel.consume(tagA, queue, false, false, owner);
- channel.consume(tagB, queue, false, false, owner);
+ channel.consume(unused, tagA, queue, false, false, owner);
+ channel.consume(unused, tagB, queue, false, false, owner);
CPPUNIT_ASSERT_EQUAL((uint32_t) 3, queue->getConsumerCount());
CPPUNIT_ASSERT(channel.exists("my_consumer"));
CPPUNIT_ASSERT(channel.exists(tagA));
@@ -178,65 +204,17 @@ class BrokerChannelTest : public CppUnit::TestCase
CPPUNIT_ASSERT_EQUAL((uint32_t) 0, queue->getConsumerCount());
}
- void testDeliveryNoAck(){
+ void testDeliveryNoAck(){
Channel channel(connection, 7);
- channel.open();
- const string data("abcdefghijklmn");
- Message::shared_ptr msg(
- createMessage("test", "my_routing_key", "my_message_id", 14));
- addContent(msg, data);
- Queue::shared_ptr queue(new Queue("my_queue"));
- ConnectionToken* owner(0);
- string tag("no_ack");
- channel.consume(tag, queue, false, false, owner);
-
- queue->deliver(msg);
- CPPUNIT_ASSERT_EQUAL((size_t) 4, handler.frames.size());
- CPPUNIT_ASSERT_EQUAL(ChannelId(0), handler.frames[0].getChannel());
- CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[1].getChannel());
- CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[2].getChannel());
- CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[3].getChannel());
- CPPUNIT_ASSERT(dynamic_cast<ConnectionStartBody*>(
- handler.frames[0].getBody().get()));
- CPPUNIT_ASSERT(dynamic_cast<BasicDeliverBody*>(
- handler.frames[1].getBody().get()));
- CPPUNIT_ASSERT(dynamic_cast<AMQHeaderBody*>(
- handler.frames[2].getBody().get()));
- AMQContentBody* contentBody = dynamic_cast<AMQContentBody*>(
- handler.frames[3].getBody().get());
- CPPUNIT_ASSERT(contentBody);
- CPPUNIT_ASSERT_EQUAL(data, contentBody->getData());
- }
-
- void testDeliveryAndRecovery(){
- Channel channel(connection, 7);
- channel.open();
- const string data("abcdefghijklmn");
-
Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14));
- addContent(msg, data);
-
Queue::shared_ptr queue(new Queue("my_queue"));
- ConnectionToken* owner(0);
- string tag("ack");
- channel.consume(tag, queue, true, false, owner);
-
+ DeliveryRecorder recorder;
+ string tag("test");
+ channel.consume(recorder.createAdapter(), tag, queue, false, false, 0);
queue->deliver(msg);
- CPPUNIT_ASSERT_EQUAL((size_t) 4, handler.frames.size());
- CPPUNIT_ASSERT_EQUAL(ChannelId(0), handler.frames[0].getChannel());
- CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[1].getChannel());
- CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[2].getChannel());
- CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[3].getChannel());
- CPPUNIT_ASSERT(dynamic_cast<ConnectionStartBody*>(
- handler.frames[0].getBody().get()));
- CPPUNIT_ASSERT(dynamic_cast<BasicDeliverBody*>(
- handler.frames[1].getBody().get()));
- CPPUNIT_ASSERT(dynamic_cast<AMQHeaderBody*>(
- handler.frames[2].getBody().get()));
- AMQContentBody* contentBody = dynamic_cast<AMQContentBody*>(
- handler.frames[3].getBody().get());
- CPPUNIT_ASSERT(contentBody);
- CPPUNIT_ASSERT_EQUAL(data, contentBody->getData());
+
+ CPPUNIT_ASSERT_EQUAL((size_t) 1, recorder.delivered.size());
+ CPPUNIT_ASSERT_EQUAL(msg, recorder.delivered.front().first);
}
void testStaging(){
@@ -349,26 +327,18 @@ class BrokerChannelTest : public CppUnit::TestCase
Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14));
addContent(msg, data);
Queue::shared_ptr queue(new Queue("my_queue"));
- ConnectionToken* owner(0);
- string tag("no_ack");
- channel.consume(tag, queue, false, false, owner);
+ DeliveryRecorder recorder;
+ string tag("test");
+ channel.consume(recorder.createAdapter(), tag, queue, false, false, 0);
channel.flow(false);
queue->deliver(msg);
- //ensure no more frames have been delivered
- CPPUNIT_ASSERT_EQUAL((size_t) 1, handler.frames.size());
- CPPUNIT_ASSERT_EQUAL((u_int32_t) 1, queue->getMessageCount());
+ //ensure no messages have been delivered
+ CPPUNIT_ASSERT_EQUAL((size_t) 0, recorder.delivered.size());
+
channel.flow(true);
- CPPUNIT_ASSERT_EQUAL((size_t) 4, handler.frames.size());
- CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[1].getChannel());
- CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[2].getChannel());
- CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[3].getChannel());
- BasicDeliverBody::shared_ptr deliver(dynamic_pointer_cast<BasicDeliverBody, AMQBody>(handler.frames[1].getBody()));
- AMQHeaderBody::shared_ptr contentHeader(dynamic_pointer_cast<AMQHeaderBody, AMQBody>(handler.frames[2].getBody()));
- AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[3].getBody()));
- CPPUNIT_ASSERT(deliver);
- CPPUNIT_ASSERT(contentHeader);
- CPPUNIT_ASSERT(contentBody);
- CPPUNIT_ASSERT_EQUAL(data, contentBody->getData());
+ //ensure no messages have been delivered
+ CPPUNIT_ASSERT_EQUAL((size_t) 1, recorder.delivered.size());
+ CPPUNIT_ASSERT_EQUAL(msg, recorder.delivered.front().first);
}
Message* createMessage(const string& exchange, const string& routingKey, const string& messageId, uint64_t contentSize)