summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cpp/src/Makefile.am7
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.cpp15
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.h6
-rw-r--r--cpp/src/qpid/broker/BrokerChannel.cpp99
-rw-r--r--cpp/src/qpid/broker/BrokerChannel.h38
-rw-r--r--cpp/src/qpid/broker/BrokerMessageBase.h2
-rw-r--r--cpp/src/qpid/broker/Connection.cpp13
-rw-r--r--cpp/src/qpid/broker/Connection.h4
-rw-r--r--cpp/src/qpid/broker/ConsumeAdapter.cpp37
-rw-r--r--cpp/src/qpid/broker/ConsumeAdapter.h43
-rw-r--r--cpp/src/qpid/broker/DeliveryAdapter.h51
-rw-r--r--cpp/src/qpid/broker/GetAdapter.cpp41
-rw-r--r--cpp/src/qpid/broker/GetAdapter.h47
-rw-r--r--cpp/src/qpid/broker/HandlerImpl.h5
-rw-r--r--cpp/src/qpid/broker/MessageHandlerImpl.cpp11
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.cpp89
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.h55
-rw-r--r--cpp/src/qpid/framing/ChannelAdapter.h12
-rw-r--r--cpp/src/tests/BrokerChannelTest.cpp124
19 files changed, 506 insertions, 193 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 09d3f6185d..3399d861f2 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -197,6 +197,7 @@ libqpidbroker_la_SOURCES = \
qpid/broker/Connection.cpp \
qpid/broker/ConnectionAdapter.cpp \
qpid/broker/ConnectionFactory.cpp \
+ qpid/broker/ConsumeAdapter.cpp \
qpid/broker/Daemon.cpp \
qpid/broker/DeliverableMessage.cpp \
qpid/broker/DeliveryRecord.cpp \
@@ -209,6 +210,7 @@ libqpidbroker_la_SOURCES = \
qpid/broker/DtxWorkRecord.cpp \
qpid/broker/ExchangeRegistry.cpp \
qpid/broker/FanOutExchange.cpp \
+ qpid/broker/GetAdapter.cpp \
qpid/broker/HeadersExchange.cpp \
qpid/broker/InMemoryContent.cpp \
qpid/broker/LazyLoadedContent.cpp \
@@ -224,6 +226,7 @@ libqpidbroker_la_SOURCES = \
qpid/broker/RecoveredEnqueue.cpp \
qpid/broker/RecoveredDequeue.cpp \
qpid/broker/Reference.cpp \
+ qpid/broker/SemanticHandler.cpp \
qpid/broker/Timer.cpp \
qpid/broker/TopicExchange.cpp \
qpid/broker/TxAck.cpp \
@@ -253,9 +256,11 @@ nobase_include_HEADERS = \
qpid/broker/BrokerMessageBase.h \
qpid/broker/BrokerQueue.h \
qpid/broker/CompletionHandler.h \
+ qpid/broker/ConsumeAdapter.h \
qpid/broker/Consumer.h \
qpid/broker/Deliverable.h \
qpid/broker/DeliverableMessage.h \
+ qpid/broker/DeliveryAdapter.h \
qpid/broker/DirectExchange.h \
qpid/broker/DtxAck.h \
qpid/broker/DtxBuffer.h \
@@ -265,6 +270,7 @@ nobase_include_HEADERS = \
qpid/broker/DtxWorkRecord.h \
qpid/broker/ExchangeRegistry.h \
qpid/broker/FanOutExchange.h \
+ qpid/broker/GetAdapter.h \
qpid/broker/HandlerImpl.h \
qpid/broker/InMemoryContent.h \
qpid/broker/MessageBuilder.h \
@@ -306,6 +312,7 @@ nobase_include_HEADERS = \
qpid/broker/PersistableQueue.h \
qpid/broker/QueuePolicy.h \
qpid/broker/RecoveryManagerImpl.h \
+ qpid/broker/SemanticHandler.h \
qpid/broker/Timer.h \
qpid/broker/TopicExchange.h \
qpid/broker/TransactionalStore.h \
diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp
index bbf6686a6c..f0dc159752 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.cpp
+++ b/cpp/src/qpid/broker/BrokerAdapter.cpp
@@ -20,6 +20,8 @@
#include "BrokerAdapter.h"
#include "BrokerChannel.h"
#include "Connection.h"
+#include "ConsumeAdapter.h"
+#include "GetAdapter.h"
#include "qpid/framing/AMQMethodBody.h"
#include "qpid/Exception.h"
@@ -33,8 +35,8 @@ using namespace qpid::framing;
typedef std::vector<Queue::shared_ptr> QueueVector;
-BrokerAdapter::BrokerAdapter(Channel& ch, Connection& c, Broker& b) :
- CoreRefs(ch, c, b),
+ BrokerAdapter::BrokerAdapter(Channel& ch, Connection& c, Broker& b, ChannelAdapter& a) :
+ CoreRefs(ch, c, b, a),
connection(c),
basicHandler(*this),
channelHandler(*this),
@@ -299,9 +301,11 @@ void BrokerAdapter::BasicHandlerImpl::consume(
if(!consumerTag.empty() && channel.exists(consumerTag)){
throw ConnectionException(530, "Consumer tags must be unique");
}
-
string newTag = consumerTag;
- channel.consume(
+ //need to generate name here, so we have it for the adapter (it is
+ //also version specific behaviour now)
+ if (newTag.empty()) newTag = tagGenerator.generate();
+ channel.consume(std::auto_ptr<DeliveryAdapter>(new ConsumeAdapter(adapter, newTag, connection.getFrameMax())),
newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &fields);
if(!nowait) client.consumeOk(newTag, context.getRequestId());
@@ -336,7 +340,8 @@ void BrokerAdapter::BasicHandlerImpl::publish(
void BrokerAdapter::BasicHandlerImpl::get(const MethodContext& context, uint16_t /*ticket*/, const string& queueName, bool noAck){
Queue::shared_ptr queue = getQueue(queueName);
- if(!channel.get(queue, "", !noAck)){
+ GetAdapter out(adapter, queue, "", connection.getFrameMax());
+ if(!channel.get(out, queue, !noAck)){
string clusterId;//not used, part of an imatix hack
client.getEmpty(clusterId, context.getRequestId());
diff --git a/cpp/src/qpid/broker/BrokerAdapter.h b/cpp/src/qpid/broker/BrokerAdapter.h
index c66bdb3a31..795744aa9a 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.h
+++ b/cpp/src/qpid/broker/BrokerAdapter.h
@@ -56,7 +56,7 @@ class MessageHandlerImpl;
class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations
{
public:
- BrokerAdapter(Channel& ch, Connection& c, Broker& b);
+ BrokerAdapter(Channel& ch, Connection& c, Broker& b, framing::ChannelAdapter& a);
framing::ProtocolVersion getVersion() const;
ChannelHandler* getChannelHandler() { return &channelHandler; }
@@ -172,8 +172,10 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations
public BasicHandler,
public HandlerImpl<framing::AMQP_ClientProxy::Basic>
{
+ NameGenerator tagGenerator;
+
public:
- BasicHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {}
+ BasicHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent), tagGenerator("sgen") {}
void qos(const framing::MethodContext& context, uint32_t prefetchSize,
uint16_t prefetchCount, bool global);
diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp
index c81e73aba1..523a834715 100644
--- a/cpp/src/qpid/broker/BrokerChannel.cpp
+++ b/cpp/src/qpid/broker/BrokerChannel.cpp
@@ -28,7 +28,6 @@
#include <boost/bind.hpp>
#include <boost/format.hpp>
-#include "qpid/framing/ChannelAdapter.h"
#include "qpid/QpidError.h"
#include "BrokerAdapter.h"
@@ -50,8 +49,8 @@ using namespace qpid::framing;
using namespace qpid::sys;
-Channel::Channel(Connection& con, ChannelId id, MessageStore* const _store) :
- ChannelAdapter(),
+Channel::Channel(Connection& con, ChannelId _id, MessageStore* const _store) :
+ id(_id),
connection(con),
currentDeliveryTag(1),
prefetchSize(0),
@@ -62,10 +61,8 @@ Channel::Channel(Connection& con, ChannelId id, MessageStore* const _store) :
store(_store),
messageBuilder(this, _store, connection.getStagingThreshold()),
opened(id == 0),//channel 0 is automatically open, other must be explicitly opened
- flowActive(true),
- adapter(new BrokerAdapter(*this, con, con.broker))
+ flowActive(true)
{
- init(id, con.getOutput(), con.getVersion());
outstanding.reset();
}
@@ -79,14 +76,15 @@ bool Channel::exists(const string& consumerTag){
// TODO aconway 2007-02-12: Why is connection token passed in instead
// of using the channel's parent connection?
-void Channel::consume(string& tagInOut, Queue::shared_ptr queue, bool acks,
+void Channel::consume(std::auto_ptr<DeliveryAdapter> adapter, string& tagInOut,
+ Queue::shared_ptr queue, bool acks,
bool exclusive, ConnectionToken* const connection,
const FieldTable*)
{
if(tagInOut.empty())
tagInOut = tagGenerator.generate();
std::auto_ptr<ConsumerImpl> c(
- new ConsumerImpl(this, tagInOut, queue, connection, acks));
+ new ConsumerImpl(this, adapter, tagInOut, queue, connection, acks));
queue->consume(c.get(), exclusive);//may throw exception
consumers.insert(tagInOut, c.release());
}
@@ -195,22 +193,10 @@ void Channel::checkDtxTimeout()
}
}
-void Channel::deliver(
- Message::shared_ptr& msg, const string& consumerTag,
- Queue::shared_ptr& queue, bool ackExpected)
+void Channel::record(const DeliveryRecord& delivery)
{
- Mutex::ScopedLock locker(deliveryLock);
-
- // Key the delivered messages to the id of the request in which they're sent
- uint64_t deliveryTag = getNextSendRequestId();
-
- if(ackExpected){
- unacked.push_back(DeliveryRecord(msg, queue, consumerTag, deliveryTag));
- outstanding.size += msg->contentSize();
- outstanding.count++;
- }
- //send deliver method, header and content(s)
- msg->deliver(*this, consumerTag, deliveryTag, connection.getFrameMax());
+ unacked.push_back(delivery);
+ delivery.addTo(&outstanding);
}
bool Channel::checkPrefetch(Message::shared_ptr& msg){
@@ -220,11 +206,11 @@ bool Channel::checkPrefetch(Message::shared_ptr& msg){
return countOk && sizeOk;
}
-Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, const string& _tag,
- Queue::shared_ptr _queue,
- ConnectionToken* const _connection, bool ack
-) : parent(_parent), tag(_tag), queue(_queue), connection(_connection),
- ackExpected(ack), blocked(false) {}
+Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, std::auto_ptr<DeliveryAdapter> _adapter,
+ const string& _tag, Queue::shared_ptr _queue,
+ ConnectionToken* const _connection, bool ack
+ ) : parent(_parent), adapter(_adapter), tag(_tag), queue(_queue), connection(_connection),
+ ackExpected(ack), blocked(false) {}
bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){
if(!connection || connection != msg->getPublisher()){//check for no_local
@@ -232,13 +218,25 @@ bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){
blocked = true;
}else{
blocked = false;
- parent->deliver(msg, tag, queue, ackExpected);
+ Mutex::ScopedLock locker(parent->deliveryLock);
+
+ uint64_t deliveryTag = adapter->getNextDeliveryTag();
+ if(ackExpected){
+ parent->record(DeliveryRecord(msg, queue, tag, deliveryTag));
+ }
+ adapter->deliver(msg, deliveryTag);
+
return true;
}
}
return false;
}
+void Channel::ConsumerImpl::redeliver(Message::shared_ptr& msg, uint64_t deliveryTag) {
+ Mutex::ScopedLock locker(parent->deliveryLock);
+ adapter->deliver(msg, deliveryTag);
+}
+
Channel::ConsumerImpl::~ConsumerImpl() {
cancel();
}
@@ -298,10 +296,6 @@ void Channel::complete(Message::shared_ptr msg) {
}
}
-void Channel::ack(){
- ack(getFirstAckRequest(), getLastAckRequest());
-}
-
// Used by Basic
void Channel::ack(uint64_t deliveryTag, bool multiple){
if (multiple)
@@ -365,15 +359,12 @@ void Channel::recover(bool requeue){
}
}
-bool Channel::get(Queue::shared_ptr queue, const string& destination, bool ackExpected){
+bool Channel::get(DeliveryAdapter& adapter, Queue::shared_ptr queue, bool ackExpected){
Message::shared_ptr msg = queue->dequeue();
if(msg){
Mutex::ScopedLock locker(deliveryLock);
- uint64_t myDeliveryTag = getNextSendRequestId();
- msg->sendGetOk(MethodContext(this, msg->getRespondTo()),
- destination,
- queue->getMessageCount() + 1, myDeliveryTag,
- connection.getFrameMax());
+ uint64_t myDeliveryTag = adapter.getNextDeliveryTag();
+ adapter.deliver(msg, myDeliveryTag);
if(ackExpected){
unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag));
}
@@ -386,33 +377,9 @@ bool Channel::get(Queue::shared_ptr queue, const string& destination, bool ackEx
void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag,
uint64_t deliveryTag)
{
- msg->deliver(*this, consumerTag, deliveryTag, connection.getFrameMax());
-}
-
-void Channel::handleMethodInContext(
- boost::shared_ptr<qpid::framing::AMQMethodBody> method,
- const MethodContext& context
-)
-{
- try{
- if(getId() != 0 && !method->isA<ChannelOpenBody>() && !isOpen()) {
- if (!method->isA<ChannelCloseOkBody>()) {
- std::stringstream out;
- out << "Attempt to use unopened channel: " << getId();
- throw ConnectionException(504, out.str());
- }
- } else {
- method->invoke(*adapter, context);
- }
- }catch(ChannelException& e){
- adapter->getProxy().getChannel().close(
- e.code, e.toString(),
- method->amqpClassId(), method->amqpMethodId());
- connection.closeChannel(getId());
- }catch(ConnectionException& e){
- connection.close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId());
- }catch(std::exception& e){
- connection.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId());
+ ConsumerImplMap::iterator i = consumers.find(consumerTag);
+ if (i != consumers.end()){
+ i->redeliver(msg, deliveryTag);
}
}
diff --git a/cpp/src/qpid/broker/BrokerChannel.h b/cpp/src/qpid/broker/BrokerChannel.h
index 9212e8f632..a2b6bd3ef9 100644
--- a/cpp/src/qpid/broker/BrokerChannel.h
+++ b/cpp/src/qpid/broker/BrokerChannel.h
@@ -23,6 +23,7 @@
*/
#include <list>
+#include <memory>
#include <boost/scoped_ptr.hpp>
#include <boost/shared_ptr.hpp>
@@ -30,6 +31,7 @@
#include "AccumulatedAck.h"
#include "Consumer.h"
+#include "DeliveryAdapter.h"
#include "DeliveryRecord.h"
#include "DtxBuffer.h"
#include "DtxManager.h"
@@ -37,6 +39,7 @@
#include "NameGenerator.h"
#include "Prefetch.h"
#include "TxBuffer.h"
+#include "qpid/framing/amqp_types.h"
#include "qpid/framing/ChannelAdapter.h"
#include "qpid/framing/ChannelOpenBody.h"
#include "CompletionHandler.h"
@@ -55,12 +58,12 @@ using framing::string;
* Maintains state for an AMQP channel. Handles incoming and
* outgoing messages for that channel.
*/
-class Channel : public framing::ChannelAdapter,
- public CompletionHandler
+class Channel : public CompletionHandler
{
class ConsumerImpl : public Consumer
{
Channel* parent;
+ std::auto_ptr<DeliveryAdapter> adapter;
const string tag;
Queue::shared_ptr queue;
ConnectionToken* const connection;
@@ -68,17 +71,19 @@ class Channel : public framing::ChannelAdapter,
bool blocked;
public:
- ConsumerImpl(Channel* parent, const string& tag,
- Queue::shared_ptr queue,
+ ConsumerImpl(Channel* parent, std::auto_ptr<DeliveryAdapter> adapter,
+ const string& tag, Queue::shared_ptr queue,
ConnectionToken* const connection, bool ack);
~ConsumerImpl();
- virtual bool deliver(Message::shared_ptr& msg);
+ bool deliver(Message::shared_ptr& msg);
+ void redeliver(Message::shared_ptr& msg, uint64_t deliveryTag);
void cancel();
void requestDispatch();
};
typedef boost::ptr_map<string,ConsumerImpl> ConsumerImplMap;
+ framing::ChannelId id;
Connection& connection;
uint64_t currentDeliveryTag;
Queue::shared_ptr defaultQueue;
@@ -97,15 +102,10 @@ class Channel : public framing::ChannelAdapter,
MessageBuilder messageBuilder;//builder for in-progress message
bool opened;
bool flowActive;
- boost::scoped_ptr<BrokerAdapter> adapter;
-
- // completion handler for MessageBuilder
- void complete(Message::shared_ptr msg);
-
- void deliver(Message::shared_ptr& msg, const string& tag,
- Queue::shared_ptr& queue, bool ackExpected);
+
+ void complete(Message::shared_ptr msg);// completion handler for MessageBuilder
+ void record(const DeliveryRecord& delivery);
bool checkPrefetch(Message::shared_ptr& msg);
-
void checkDtxTimeout();
public:
@@ -113,7 +113,7 @@ class Channel : public framing::ChannelAdapter,
~Channel();
bool isOpen() const { return opened; }
- BrokerAdapter& getAdapter() { return *adapter; }
+ framing::ChannelId getId() const { return id; }
void open() { opened = true; }
void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; }
@@ -126,11 +126,11 @@ class Channel : public framing::ChannelAdapter,
/**
*@param tagInOut - if empty it is updated with the generated token.
*/
- void consume(string& tagInOut, Queue::shared_ptr queue, bool acks,
+ void consume(std::auto_ptr<DeliveryAdapter> adapter, string& tagInOut, Queue::shared_ptr queue, bool acks,
bool exclusive, ConnectionToken* const connection = 0,
const framing::FieldTable* = 0);
void cancel(const string& tag);
- bool get(Queue::shared_ptr queue, const std::string& destination, bool ackExpected);
+ bool get(DeliveryAdapter& adapter, Queue::shared_ptr queue, bool ackExpected);
void close();
void startTx();
void commit();
@@ -140,7 +140,6 @@ class Channel : public framing::ChannelAdapter,
void endDtx(const std::string& xid, bool fail);
void suspendDtx(const std::string& xid);
void resumeDtx(const std::string& xid);
- void ack();
void ack(uint64_t deliveryTag, bool multiple);
void ack(uint64_t deliveryTag, uint64_t endTag);
void recover(bool requeue);
@@ -152,11 +151,6 @@ class Channel : public framing::ChannelAdapter,
void handleHeartbeat(boost::shared_ptr<framing::AMQHeartbeatBody>);
void handleInlineTransfer(Message::shared_ptr msg);
-
- // For ChannelAdapter
- void handleMethodInContext(
- boost::shared_ptr<framing::AMQMethodBody> method,
- const framing::MethodContext& context);
};
}} // namespace broker
diff --git a/cpp/src/qpid/broker/BrokerMessageBase.h b/cpp/src/qpid/broker/BrokerMessageBase.h
index a254986fd9..048b1c80e2 100644
--- a/cpp/src/qpid/broker/BrokerMessageBase.h
+++ b/cpp/src/qpid/broker/BrokerMessageBase.h
@@ -103,7 +103,7 @@ class Message : public PersistableMessage{
* Used to return a message in response to a get from a queue
*/
virtual void sendGetOk(const framing::MethodContext& context,
- const std::string& destination,
+ const std::string& destination,
uint32_t messageCount,
uint64_t deliveryTag,
uint32_t framesize) = 0;
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index cdbcee1c69..dfe2101bc0 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -26,6 +26,7 @@
#include "BrokerChannel.h"
#include "qpid/framing/AMQP_ClientProxy.h"
#include "BrokerAdapter.h"
+#include "SemanticHandler.h"
using namespace boost;
using namespace qpid::sys;
@@ -55,7 +56,7 @@ void Connection::received(framing::AMQFrame& frame){
if (frame.getChannel() == 0) {
adapter.handle(frame);
} else {
- getChannel((frame.getChannel())).getHandlers().in->handle(frame);
+ getChannel((frame.getChannel())).in->handle(frame);
}
}
@@ -92,17 +93,17 @@ void Connection::closed(){
void Connection::closeChannel(uint16_t id) {
ChannelMap::iterator i = channels.find(id);
- if (i != channels.end())
- i->close();
+ if (i != channels.end()) channels.erase(i);
}
-Channel& Connection::getChannel(ChannelId id) {
+FrameHandler::Chains& Connection::getChannel(ChannelId id) {
ChannelMap::iterator i = channels.find(id);
if (i == channels.end()) {
- i = channels.insert(id, new Channel(*this, id, &broker.getStore())).first;
+ FrameHandler::Chains chains(new SemanticHandler(id, *this), new OutputHandlerFrameHandler(*out));
+ i = channels.insert(ChannelMap::value_type(id, chains)).first;
}
- return *i;
+ return i->second;
}
diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h
index a885ac4065..e38f88c2e9 100644
--- a/cpp/src/qpid/broker/Connection.h
+++ b/cpp/src/qpid/broker/Connection.h
@@ -51,7 +51,7 @@ class Connection : public sys::ConnectionInputHandler,
Connection(sys::ConnectionOutputHandler* out, Broker& broker);
/** Get a channel. Create if it does not already exist */
- Channel& getChannel(framing::ChannelId channel);
+ framing::FrameHandler::Chains& getChannel(framing::ChannelId channel);
/** Close a channel */
void closeChannel(framing::ChannelId channel);
@@ -82,7 +82,7 @@ class Connection : public sys::ConnectionInputHandler,
void closed();
private:
- typedef boost::ptr_map<framing::ChannelId, Channel> ChannelMap;
+ typedef std::map<framing::ChannelId, framing::FrameHandler::Chains> ChannelMap;
typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
Exchange::shared_ptr findExchange(const string& name);
diff --git a/cpp/src/qpid/broker/ConsumeAdapter.cpp b/cpp/src/qpid/broker/ConsumeAdapter.cpp
new file mode 100644
index 0000000000..59b6795a77
--- /dev/null
+++ b/cpp/src/qpid/broker/ConsumeAdapter.cpp
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ConsumeAdapter.h"
+
+using namespace qpid::broker;
+using qpid::framing::ChannelAdapter;
+using qpid::framing::RequestId;
+
+ConsumeAdapter::ConsumeAdapter(ChannelAdapter& a, const std::string t, uint32_t f) : adapter(a), tag(t), framesize(f) {}
+
+RequestId ConsumeAdapter::getNextDeliveryTag()
+{
+ return adapter.getNextSendRequestId();
+}
+
+void ConsumeAdapter::deliver(Message::shared_ptr& msg, RequestId deliveryTag)
+{
+ msg->deliver(adapter, tag, deliveryTag, framesize);
+}
diff --git a/cpp/src/qpid/broker/ConsumeAdapter.h b/cpp/src/qpid/broker/ConsumeAdapter.h
new file mode 100644
index 0000000000..43cda7753e
--- /dev/null
+++ b/cpp/src/qpid/broker/ConsumeAdapter.h
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _ConsumeAdapter_
+#define _ConsumeAdapter_
+
+#include "DeliveryAdapter.h"
+#include "qpid/framing/ChannelAdapter.h"
+
+namespace qpid {
+namespace broker {
+ class ConsumeAdapter : public DeliveryAdapter
+ {
+ framing::ChannelAdapter& adapter;
+ const std::string tag;
+ const uint32_t framesize;
+ public:
+ ConsumeAdapter(framing::ChannelAdapter& adapter, const std::string tag, uint32_t framesize);
+ framing::RequestId getNextDeliveryTag();
+ void deliver(Message::shared_ptr& msg, framing::RequestId tag);
+ };
+
+}}
+
+
+#endif
diff --git a/cpp/src/qpid/broker/DeliveryAdapter.h b/cpp/src/qpid/broker/DeliveryAdapter.h
new file mode 100644
index 0000000000..45b103bd68
--- /dev/null
+++ b/cpp/src/qpid/broker/DeliveryAdapter.h
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _DeliveryAdapter_
+#define _DeliveryAdapter_
+
+#include "BrokerMessageBase.h"
+#include "qpid/framing/amqp_types.h"
+
+namespace qpid {
+namespace broker {
+
+ /**
+ * The intention behind this interface is to separate the generic
+ * handling of some form of message delivery to clients that is
+ * contained in the version independent Channel class from the
+ * details required for a particular situation or
+ * version. i.e. where the existing adapters allow (through
+ * supporting the generated interface for a version of the
+ * protocol) inputs of a channel to be adapted to the version
+ * independent part, this does the same for the outputs.
+ */
+ class DeliveryAdapter
+ {
+ public:
+ virtual framing::RequestId getNextDeliveryTag() = 0;
+ virtual void deliver(Message::shared_ptr& msg, framing::RequestId tag) = 0;
+ virtual ~DeliveryAdapter(){}
+ };
+
+}}
+
+
+#endif
diff --git a/cpp/src/qpid/broker/GetAdapter.cpp b/cpp/src/qpid/broker/GetAdapter.cpp
new file mode 100644
index 0000000000..4a2f6d34d4
--- /dev/null
+++ b/cpp/src/qpid/broker/GetAdapter.cpp
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "GetAdapter.h"
+#include "qpid/framing/MethodContext.h"
+
+using namespace qpid::broker;
+using qpid::framing::ChannelAdapter;
+using qpid::framing::RequestId;
+using qpid::framing::MethodContext;
+
+GetAdapter::GetAdapter(ChannelAdapter& a, Queue::shared_ptr q, const std::string d, uint32_t f)
+ : adapter(a), queue(q), destination(d), framesize(f) {}
+
+RequestId GetAdapter::getNextDeliveryTag()
+{
+ return adapter.getNextSendRequestId();
+}
+
+void GetAdapter::deliver(Message::shared_ptr& msg, framing::RequestId deliveryTag)
+{
+ msg->sendGetOk(MethodContext(&adapter, msg->getRespondTo()), destination,
+ queue->getMessageCount(), deliveryTag, framesize);
+}
diff --git a/cpp/src/qpid/broker/GetAdapter.h b/cpp/src/qpid/broker/GetAdapter.h
new file mode 100644
index 0000000000..e90619a5f3
--- /dev/null
+++ b/cpp/src/qpid/broker/GetAdapter.h
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _GetAdapter_
+#define _GetAdapter_
+
+#include "BrokerQueue.h"
+#include "DeliveryAdapter.h"
+#include "qpid/framing/ChannelAdapter.h"
+
+namespace qpid {
+namespace broker {
+
+ class GetAdapter : public DeliveryAdapter
+ {
+ framing::ChannelAdapter& adapter;
+ Queue::shared_ptr queue;
+ const std::string destination;
+ const uint32_t framesize;
+ public:
+ GetAdapter(framing::ChannelAdapter& adapter, Queue::shared_ptr queue, const std::string destination, uint32_t framesize);
+ ~GetAdapter(){}
+ framing::RequestId getNextDeliveryTag();
+ void deliver(Message::shared_ptr& msg, framing::RequestId tag);
+ };
+
+}}
+
+
+#endif
diff --git a/cpp/src/qpid/broker/HandlerImpl.h b/cpp/src/qpid/broker/HandlerImpl.h
index 008be10867..96bf065062 100644
--- a/cpp/src/qpid/broker/HandlerImpl.h
+++ b/cpp/src/qpid/broker/HandlerImpl.h
@@ -40,12 +40,13 @@ class Connection;
*/
struct CoreRefs
{
- CoreRefs(Channel& ch, Connection& c, Broker& b)
- : channel(ch), connection(c), broker(b), proxy(ch) {}
+ CoreRefs(Channel& ch, Connection& c, Broker& b, framing::ChannelAdapter& a)
+ : channel(ch), connection(c), broker(b), adapter(a), proxy(a) {}
Channel& channel;
Connection& connection;
Broker& broker;
+ framing::ChannelAdapter& adapter;
framing::AMQP_ClientProxy proxy;
/**
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
index bbfcf209ad..f586ea92fc 100644
--- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
@@ -21,6 +21,8 @@
#include "BrokerChannel.h"
#include "qpid/framing/FramingContent.h"
#include "Connection.h"
+#include "ConsumeAdapter.h"
+#include "GetAdapter.h"
#include "Broker.h"
#include "BrokerMessageMessage.h"
#include "qpid/framing/MessageAppendBody.h"
@@ -127,7 +129,7 @@ MessageHandlerImpl::consume(const MethodContext& context,
if(!destination.empty() && channel.exists(destination))
throw ConnectionException(530, "Consumer tags must be unique");
string tag = destination;
- channel.consume(
+ channel.consume(std::auto_ptr<DeliveryAdapter>(new ConsumeAdapter(adapter, destination, connection.getFrameMax())),
tag, queue, !noAck, exclusive,
noLocal ? &connection : 0, &filter);
client.ok(context.getRequestId());
@@ -144,7 +146,8 @@ MessageHandlerImpl::get( const MethodContext& context,
{
Queue::shared_ptr queue = getQueue(queueName);
- if(channel.get(queue, destination, !noAck))
+ GetAdapter out(adapter, queue, destination, connection.getFrameMax());
+ if(channel.get(out, queue, !noAck))
client.ok(context.getRequestId());
else
client.empty(context.getRequestId());
@@ -162,7 +165,7 @@ MessageHandlerImpl::empty( const MethodContext& )
void
MessageHandlerImpl::ok(const MethodContext& /*context*/)
{
- channel.ack();
+ channel.ack(adapter.getFirstAckRequest(), adapter.getLastAckRequest());
}
void
@@ -190,7 +193,7 @@ MessageHandlerImpl::reject(const MethodContext& /*context*/,
uint16_t /*code*/,
const string& /*text*/ )
{
- channel.ack();
+ //channel.ack();
// channel.requeue();
}
diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp
new file mode 100644
index 0000000000..df92f74b14
--- /dev/null
+++ b/cpp/src/qpid/broker/SemanticHandler.cpp
@@ -0,0 +1,89 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "SemanticHandler.h"
+#include "BrokerAdapter.h"
+#include "qpid/framing/ChannelAdapter.h"
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+SemanticHandler::SemanticHandler(ChannelId id, Connection& c) :
+ connection(c),
+ channel(c, id, &c.broker.getStore())
+{
+ init(id, connection.getOutput(), connection.getVersion());
+ adapter = std::auto_ptr<BrokerAdapter>(new BrokerAdapter(channel, connection, connection.broker, *this));
+}
+
+
+void SemanticHandler::handle(framing::AMQFrame& frame)
+{
+ handleBody(frame.getBody());
+}
+
+//ChannelAdapter virtual methods:
+void SemanticHandler::handleMethodInContext(boost::shared_ptr<qpid::framing::AMQMethodBody> method,
+ const qpid::framing::MethodContext& context)
+{
+ try{
+ if(getId() != 0 && !method->isA<ChannelOpenBody>() && !isOpen()) {
+ if (!method->isA<ChannelCloseOkBody>()) {
+ std::stringstream out;
+ out << "Attempt to use unopened channel: " << getId();
+ throw ConnectionException(504, out.str());
+ }
+ } else {
+ method->invoke(*adapter, context);
+ }
+ }catch(ChannelException& e){
+ adapter->getProxy().getChannel().close(
+ e.code, e.toString(),
+ method->amqpClassId(), method->amqpMethodId());
+ connection.closeChannel(getId());
+ }catch(ConnectionException& e){
+ connection.close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId());
+ }catch(std::exception& e){
+ connection.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId());
+ }
+
+}
+
+bool SemanticHandler::isOpen() const
+{
+ return channel.isOpen();
+}
+
+void SemanticHandler::handleHeader(boost::shared_ptr<qpid::framing::AMQHeaderBody> body)
+{
+ channel.handleHeader(body);
+}
+
+void SemanticHandler::handleContent(boost::shared_ptr<qpid::framing::AMQContentBody> body)
+{
+ channel.handleContent(body);
+}
+
+void SemanticHandler::handleHeartbeat(boost::shared_ptr<qpid::framing::AMQHeartbeatBody> body)
+{
+ channel.handleHeartbeat(body);
+}
+
diff --git a/cpp/src/qpid/broker/SemanticHandler.h b/cpp/src/qpid/broker/SemanticHandler.h
new file mode 100644
index 0000000000..a179969ece
--- /dev/null
+++ b/cpp/src/qpid/broker/SemanticHandler.h
@@ -0,0 +1,55 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _SemanticHandler_
+#define _SemanticHandler_
+
+#include <memory>
+#include "BrokerChannel.h"
+#include "Connection.h"
+#include "qpid/framing/amqp_types.h"
+#include "qpid/framing/FrameHandler.h"
+
+namespace qpid {
+namespace broker {
+
+class BrokerAdapter;
+class framing::ChannelAdapter;
+
+class SemanticHandler : private framing::ChannelAdapter, public framing::FrameHandler {
+ Connection& connection;
+ Channel channel;
+ std::auto_ptr<BrokerAdapter> adapter;
+
+ //ChannelAdapter virtual methods:
+ void handleMethodInContext(boost::shared_ptr<qpid::framing::AMQMethodBody> method,
+ const qpid::framing::MethodContext& context);
+ bool isOpen() const;
+ void handleHeader(boost::shared_ptr<qpid::framing::AMQHeaderBody>);
+ void handleContent(boost::shared_ptr<qpid::framing::AMQContentBody>);
+ void handleHeartbeat(boost::shared_ptr<qpid::framing::AMQHeartbeatBody>);
+public:
+ SemanticHandler(framing::ChannelId id, Connection& c);
+ void handle(framing::AMQFrame& frame);
+};
+
+}}
+
+#endif
diff --git a/cpp/src/qpid/framing/ChannelAdapter.h b/cpp/src/qpid/framing/ChannelAdapter.h
index 5f92383ee3..1c3f29d762 100644
--- a/cpp/src/qpid/framing/ChannelAdapter.h
+++ b/cpp/src/qpid/framing/ChannelAdapter.h
@@ -52,7 +52,7 @@ class MethodContext;
* Thread safety: OBJECT UNSAFE. Instances must not be called
* concurrently. AMQP defines channels to be serialized.
*/
-class ChannelAdapter : private BodyHandler {
+class ChannelAdapter : protected BodyHandler {
public:
/**
*@param output Processed frames are forwarded to this handler.
@@ -84,6 +84,10 @@ class ChannelAdapter : private BodyHandler {
virtual bool isOpen() const = 0;
+ RequestId getFirstAckRequest() { return requester.getFirstAckRequest(); }
+ RequestId getLastAckRequest() { return requester.getLastAckRequest(); }
+ RequestId getNextSendRequestId() { return requester.getNextId(); }
+
protected:
void assertMethodOk(AMQMethodBody& method) const;
void assertChannelOpen() const;
@@ -93,13 +97,9 @@ class ChannelAdapter : private BodyHandler {
shared_ptr<AMQMethodBody> method,
const MethodContext& context) = 0;
- RequestId getFirstAckRequest() { return requester.getFirstAckRequest(); }
- RequestId getLastAckRequest() { return requester.getLastAckRequest(); }
- RequestId getNextSendRequestId() { return requester.getNextId(); }
-
private:
class ChannelAdapterHandler;
- friend class ChannelAdapterHandler;
+ friend class ChannelAdapterHandler;
void handleMethod(shared_ptr<AMQMethodBody>);
void handleRequest(shared_ptr<AMQRequestBody>);
diff --git a/cpp/src/tests/BrokerChannelTest.cpp b/cpp/src/tests/BrokerChannelTest.cpp
index 929105f6e3..251ac624ab 100644
--- a/cpp/src/tests/BrokerChannelTest.cpp
+++ b/cpp/src/tests/BrokerChannelTest.cpp
@@ -48,13 +48,38 @@ struct MockHandler : ConnectionOutputHandler{
void close() {};
};
+struct DeliveryRecorder
+{
+ typedef std::pair<Message::shared_ptr, RequestId> Delivery;
+ std::vector<Delivery> delivered;
+
+ struct Adapter : DeliveryAdapter
+ {
+ RequestId id;
+ DeliveryRecorder& recorder;
+
+ Adapter(DeliveryRecorder& r) : recorder(r) {}
+
+ RequestId getNextDeliveryTag() { return id + 1; }
+ void deliver(Message::shared_ptr& msg, RequestId tag)
+ {
+ recorder.delivered.push_back(Delivery(msg, tag));
+ id++;
+ }
+
+ };
+
+ std::auto_ptr<DeliveryAdapter> createAdapter()
+ {
+ return std::auto_ptr<DeliveryAdapter>(new Adapter(*this));
+ }
+};
class BrokerChannelTest : public CppUnit::TestCase
{
CPPUNIT_TEST_SUITE(BrokerChannelTest);
- CPPUNIT_TEST(testConsumerMgmt);
+ CPPUNIT_TEST(testConsumerMgmt);;
CPPUNIT_TEST(testDeliveryNoAck);
- CPPUNIT_TEST(testDeliveryAndRecovery);
CPPUNIT_TEST(testStaging);
CPPUNIT_TEST(testQueuePolicy);
CPPUNIT_TEST(testFlow);
@@ -160,11 +185,12 @@ class BrokerChannelTest : public CppUnit::TestCase
ConnectionToken* owner = 0;
string tag("my_consumer");
- channel.consume(tag, queue, false, false, owner);
+ std::auto_ptr<DeliveryAdapter> unused;
+ channel.consume(unused, tag, queue, false, false, owner);
string tagA;
string tagB;
- channel.consume(tagA, queue, false, false, owner);
- channel.consume(tagB, queue, false, false, owner);
+ channel.consume(unused, tagA, queue, false, false, owner);
+ channel.consume(unused, tagB, queue, false, false, owner);
CPPUNIT_ASSERT_EQUAL((uint32_t) 3, queue->getConsumerCount());
CPPUNIT_ASSERT(channel.exists("my_consumer"));
CPPUNIT_ASSERT(channel.exists(tagA));
@@ -178,65 +204,17 @@ class BrokerChannelTest : public CppUnit::TestCase
CPPUNIT_ASSERT_EQUAL((uint32_t) 0, queue->getConsumerCount());
}
- void testDeliveryNoAck(){
+ void testDeliveryNoAck(){
Channel channel(connection, 7);
- channel.open();
- const string data("abcdefghijklmn");
- Message::shared_ptr msg(
- createMessage("test", "my_routing_key", "my_message_id", 14));
- addContent(msg, data);
- Queue::shared_ptr queue(new Queue("my_queue"));
- ConnectionToken* owner(0);
- string tag("no_ack");
- channel.consume(tag, queue, false, false, owner);
-
- queue->deliver(msg);
- CPPUNIT_ASSERT_EQUAL((size_t) 4, handler.frames.size());
- CPPUNIT_ASSERT_EQUAL(ChannelId(0), handler.frames[0].getChannel());
- CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[1].getChannel());
- CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[2].getChannel());
- CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[3].getChannel());
- CPPUNIT_ASSERT(dynamic_cast<ConnectionStartBody*>(
- handler.frames[0].getBody().get()));
- CPPUNIT_ASSERT(dynamic_cast<BasicDeliverBody*>(
- handler.frames[1].getBody().get()));
- CPPUNIT_ASSERT(dynamic_cast<AMQHeaderBody*>(
- handler.frames[2].getBody().get()));
- AMQContentBody* contentBody = dynamic_cast<AMQContentBody*>(
- handler.frames[3].getBody().get());
- CPPUNIT_ASSERT(contentBody);
- CPPUNIT_ASSERT_EQUAL(data, contentBody->getData());
- }
-
- void testDeliveryAndRecovery(){
- Channel channel(connection, 7);
- channel.open();
- const string data("abcdefghijklmn");
-
Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14));
- addContent(msg, data);
-
Queue::shared_ptr queue(new Queue("my_queue"));
- ConnectionToken* owner(0);
- string tag("ack");
- channel.consume(tag, queue, true, false, owner);
-
+ DeliveryRecorder recorder;
+ string tag("test");
+ channel.consume(recorder.createAdapter(), tag, queue, false, false, 0);
queue->deliver(msg);
- CPPUNIT_ASSERT_EQUAL((size_t) 4, handler.frames.size());
- CPPUNIT_ASSERT_EQUAL(ChannelId(0), handler.frames[0].getChannel());
- CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[1].getChannel());
- CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[2].getChannel());
- CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[3].getChannel());
- CPPUNIT_ASSERT(dynamic_cast<ConnectionStartBody*>(
- handler.frames[0].getBody().get()));
- CPPUNIT_ASSERT(dynamic_cast<BasicDeliverBody*>(
- handler.frames[1].getBody().get()));
- CPPUNIT_ASSERT(dynamic_cast<AMQHeaderBody*>(
- handler.frames[2].getBody().get()));
- AMQContentBody* contentBody = dynamic_cast<AMQContentBody*>(
- handler.frames[3].getBody().get());
- CPPUNIT_ASSERT(contentBody);
- CPPUNIT_ASSERT_EQUAL(data, contentBody->getData());
+
+ CPPUNIT_ASSERT_EQUAL((size_t) 1, recorder.delivered.size());
+ CPPUNIT_ASSERT_EQUAL(msg, recorder.delivered.front().first);
}
void testStaging(){
@@ -349,26 +327,18 @@ class BrokerChannelTest : public CppUnit::TestCase
Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14));
addContent(msg, data);
Queue::shared_ptr queue(new Queue("my_queue"));
- ConnectionToken* owner(0);
- string tag("no_ack");
- channel.consume(tag, queue, false, false, owner);
+ DeliveryRecorder recorder;
+ string tag("test");
+ channel.consume(recorder.createAdapter(), tag, queue, false, false, 0);
channel.flow(false);
queue->deliver(msg);
- //ensure no more frames have been delivered
- CPPUNIT_ASSERT_EQUAL((size_t) 1, handler.frames.size());
- CPPUNIT_ASSERT_EQUAL((u_int32_t) 1, queue->getMessageCount());
+ //ensure no messages have been delivered
+ CPPUNIT_ASSERT_EQUAL((size_t) 0, recorder.delivered.size());
+
channel.flow(true);
- CPPUNIT_ASSERT_EQUAL((size_t) 4, handler.frames.size());
- CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[1].getChannel());
- CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[2].getChannel());
- CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[3].getChannel());
- BasicDeliverBody::shared_ptr deliver(dynamic_pointer_cast<BasicDeliverBody, AMQBody>(handler.frames[1].getBody()));
- AMQHeaderBody::shared_ptr contentHeader(dynamic_pointer_cast<AMQHeaderBody, AMQBody>(handler.frames[2].getBody()));
- AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[3].getBody()));
- CPPUNIT_ASSERT(deliver);
- CPPUNIT_ASSERT(contentHeader);
- CPPUNIT_ASSERT(contentBody);
- CPPUNIT_ASSERT_EQUAL(data, contentBody->getData());
+ //ensure no messages have been delivered
+ CPPUNIT_ASSERT_EQUAL((size_t) 1, recorder.delivered.size());
+ CPPUNIT_ASSERT_EQUAL(msg, recorder.delivered.front().first);
}
Message* createMessage(const string& exchange, const string& routingKey, const string& messageId, uint64_t contentSize)