summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-07-27 15:44:52 +0000
committerGordon Sim <gsim@apache.org>2007-07-27 15:44:52 +0000
commit80406d0fb680239a0141b81fb0b9f20d20c9b1e1 (patch)
tree13677bf773bf25db03144aa72c97a49d2810240d /cpp/src
parenta9232d5a02a19f093f212cb0b76772a20b45cb1b (diff)
downloadqpid-python-80406d0fb680239a0141b81fb0b9f20d20c9b1e1.tar.gz
Use execution layer to acknowledge messages.
Turn off 0-9 framing of requests and responses. Some refactoring around message delivery. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@560285 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/Makefile.am7
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.cpp11
-rw-r--r--cpp/src/qpid/broker/BrokerChannel.cpp86
-rw-r--r--cpp/src/qpid/broker/BrokerChannel.h12
-rw-r--r--cpp/src/qpid/broker/BrokerMessage.cpp61
-rw-r--r--cpp/src/qpid/broker/BrokerMessage.h6
-rw-r--r--cpp/src/qpid/broker/BrokerMessageBase.h21
-rw-r--r--cpp/src/qpid/broker/BrokerMessageMessage.cpp29
-rw-r--r--cpp/src/qpid/broker/BrokerMessageMessage.h15
-rw-r--r--cpp/src/qpid/broker/ConsumeAdapter.cpp37
-rw-r--r--cpp/src/qpid/broker/DeliveryAdapter.h6
-rw-r--r--cpp/src/qpid/broker/DeliveryToken.h (renamed from cpp/src/qpid/broker/ConsumeAdapter.h)24
-rw-r--r--cpp/src/qpid/broker/GetAdapter.cpp40
-rw-r--r--cpp/src/qpid/broker/MessageHandlerImpl.cpp65
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.cpp52
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.h15
-rw-r--r--cpp/src/qpid/client/ClientChannel.cpp10
-rw-r--r--cpp/src/qpid/cluster/SessionManager.cpp8
-rw-r--r--cpp/src/qpid/framing/AMQMethodBody.cpp5
-rw-r--r--cpp/src/qpid/framing/AMQMethodBody.h6
-rw-r--r--cpp/src/qpid/framing/SequenceNumberSet.cpp61
-rw-r--r--cpp/src/qpid/framing/SequenceNumberSet.h (renamed from cpp/src/qpid/broker/GetAdapter.h)44
-rw-r--r--cpp/src/qpid/framing/amqp_types.h1
-rw-r--r--cpp/src/tests/BrokerChannelTest.cpp48
-rw-r--r--cpp/src/tests/FramingTest.cpp38
25 files changed, 395 insertions, 313 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index f4e807cf66..cf1598bcca 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -155,6 +155,7 @@ libqpidcommon_la_SOURCES = \
qpid/framing/Requester.cpp \
qpid/framing/Responder.cpp \
qpid/framing/SequenceNumber.cpp \
+ qpid/framing/SequenceNumberSet.cpp \
qpid/framing/Correlator.cpp \
qpid/framing/Value.cpp \
qpid/framing/Proxy.cpp \
@@ -200,7 +201,6 @@ libqpidbroker_la_SOURCES = \
qpid/broker/Connection.cpp \
qpid/broker/ConnectionAdapter.cpp \
qpid/broker/ConnectionFactory.cpp \
- qpid/broker/ConsumeAdapter.cpp \
qpid/broker/Daemon.cpp \
qpid/broker/DeliverableMessage.cpp \
qpid/broker/DeliveryRecord.cpp \
@@ -213,7 +213,6 @@ libqpidbroker_la_SOURCES = \
qpid/broker/DtxWorkRecord.cpp \
qpid/broker/ExchangeRegistry.cpp \
qpid/broker/FanOutExchange.cpp \
- qpid/broker/GetAdapter.cpp \
qpid/broker/HeadersExchange.cpp \
qpid/broker/InMemoryContent.cpp \
qpid/broker/LazyLoadedContent.cpp \
@@ -258,11 +257,11 @@ nobase_include_HEADERS = \
qpid/broker/BrokerMessageBase.h \
qpid/broker/BrokerQueue.h \
qpid/broker/CompletionHandler.h \
- qpid/broker/ConsumeAdapter.h \
qpid/broker/Consumer.h \
qpid/broker/Deliverable.h \
qpid/broker/DeliverableMessage.h \
qpid/broker/DeliveryAdapter.h \
+ qpid/broker/DeliveryToken.h \
qpid/broker/DirectExchange.h \
qpid/broker/DtxAck.h \
qpid/broker/DtxBuffer.h \
@@ -272,7 +271,6 @@ nobase_include_HEADERS = \
qpid/broker/DtxWorkRecord.h \
qpid/broker/ExchangeRegistry.h \
qpid/broker/FanOutExchange.h \
- qpid/broker/GetAdapter.h \
qpid/broker/HandlerImpl.h \
qpid/broker/InMemoryContent.h \
qpid/broker/MessageBuilder.h \
@@ -362,6 +360,7 @@ nobase_include_HEADERS = \
qpid/framing/Responder.h \
qpid/framing/SerializeHandler.h \
qpid/framing/SequenceNumber.h \
+ qpid/framing/SequenceNumberSet.h \
qpid/framing/Value.h \
qpid/framing/Uuid.h \
qpid/framing/amqp_framing.h \
diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp
index 376108193a..8edf448bc4 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.cpp
+++ b/cpp/src/qpid/broker/BrokerAdapter.cpp
@@ -20,8 +20,7 @@
#include "BrokerAdapter.h"
#include "BrokerChannel.h"
#include "Connection.h"
-#include "ConsumeAdapter.h"
-#include "GetAdapter.h"
+#include "DeliveryToken.h"
#include "qpid/framing/AMQMethodBody.h"
#include "qpid/Exception.h"
@@ -325,8 +324,8 @@ void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/,
//need to generate name here, so we have it for the adapter (it is
//also version specific behaviour now)
if (newTag.empty()) newTag = tagGenerator.generate();
- channel.consume(std::auto_ptr<DeliveryAdapter>(new ConsumeAdapter(adapter, newTag, connection.getFrameMax())),
- newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &fields);
+ DeliveryToken::shared_ptr token(BasicMessage::createConsumeToken(newTag));
+ channel.consume(token, newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &fields);
if(!nowait) client.consumeOk(newTag);
@@ -357,8 +356,8 @@ void BrokerAdapter::BasicHandlerImpl::publish(uint16_t /*ticket*/,
void BrokerAdapter::BasicHandlerImpl::get(uint16_t /*ticket*/, const string& queueName, bool noAck){
Queue::shared_ptr queue = getQueue(queueName);
- GetAdapter out(adapter, queue, "", connection.getFrameMax());
- if(!channel.get(out, queue, !noAck)){
+ DeliveryToken::shared_ptr token(BasicMessage::createGetToken(queue));
+ if(!channel.get(token, queue, !noAck)){
string clusterId;//not used, part of an imatix hack
client.getEmpty(clusterId);
diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp
index a598717c5d..c50fbd5559 100644
--- a/cpp/src/qpid/broker/BrokerChannel.cpp
+++ b/cpp/src/qpid/broker/BrokerChannel.cpp
@@ -49,9 +49,10 @@ using namespace qpid::framing;
using namespace qpid::sys;
-Channel::Channel(Connection& con, ChannelId _id, MessageStore* const _store) :
+Channel::Channel(Connection& con, DeliveryAdapter& _out, ChannelId _id, MessageStore* const _store) :
id(_id),
connection(con),
+ out(_out),
currentDeliveryTag(1),
prefetchSize(0),
prefetchCount(0),
@@ -76,7 +77,7 @@ bool Channel::exists(const string& consumerTag){
// TODO aconway 2007-02-12: Why is connection token passed in instead
// of using the channel's parent connection?
-void Channel::consume(std::auto_ptr<DeliveryAdapter> adapter, string& tagInOut,
+void Channel::consume(DeliveryToken::shared_ptr token, string& tagInOut,
Queue::shared_ptr queue, bool acks,
bool exclusive, ConnectionToken* const connection,
const FieldTable*)
@@ -84,7 +85,7 @@ void Channel::consume(std::auto_ptr<DeliveryAdapter> adapter, string& tagInOut,
if(tagInOut.empty())
tagInOut = tagGenerator.generate();
std::auto_ptr<ConsumerImpl> c(
- new ConsumerImpl(this, adapter, tagInOut, queue, connection, acks));
+ new ConsumerImpl(this, token, tagInOut, queue, connection, acks));
queue->consume(c.get(), exclusive);//may throw exception
consumers.insert(tagInOut, c.release());
}
@@ -97,7 +98,8 @@ void Channel::cancel(const string& tag){
consumers.erase(i);
}
-void Channel::close(){
+void Channel::close()
+{
opened = false;
consumers.clear();
if (dtxBuffer.get()) {
@@ -106,11 +108,15 @@ void Channel::close(){
recover(true);
}
-void Channel::startTx(){
+void Channel::startTx()
+{
txBuffer = TxBuffer::shared_ptr(new TxBuffer());
}
-void Channel::commit(){
+void Channel::commit()
+{
+ if (!txBuffer) throw ConnectionException(503, "Channel has not been selected for use with transactions");
+
TxOp::shared_ptr txAck(new TxAck(accumulatedAck, unacked));
txBuffer->enlist(txAck);
if (txBuffer->commitLocal(store)) {
@@ -118,16 +124,21 @@ void Channel::commit(){
}
}
-void Channel::rollback(){
+void Channel::rollback()
+{
+ if (!txBuffer) throw ConnectionException(503, "Channel has not been selected for use with transactions");
+
txBuffer->rollback();
accumulatedAck.clear();
}
-void Channel::selectDtx(){
+void Channel::selectDtx()
+{
dtxSelected = true;
}
-void Channel::startDtx(const std::string& xid, DtxManager& mgr, bool join){
+void Channel::startDtx(const std::string& xid, DtxManager& mgr, bool join)
+{
if (!dtxSelected) {
throw ConnectionException(503, "Channel has not been selected for use with dtx");
}
@@ -140,7 +151,8 @@ void Channel::startDtx(const std::string& xid, DtxManager& mgr, bool join){
}
}
-void Channel::endDtx(const std::string& xid, bool fail){
+void Channel::endDtx(const std::string& xid, bool fail)
+{
if (!dtxBuffer) {
throw ConnectionException(503, boost::format("xid %1% not associated with this channel") % xid);
}
@@ -160,7 +172,8 @@ void Channel::endDtx(const std::string& xid, bool fail){
dtxBuffer.reset();
}
-void Channel::suspendDtx(const std::string& xid){
+void Channel::suspendDtx(const std::string& xid)
+{
if (dtxBuffer->getXid() != xid) {
throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on suspend")
% dtxBuffer->getXid() % xid);
@@ -171,7 +184,8 @@ void Channel::suspendDtx(const std::string& xid){
dtxBuffer->setSuspended(true);
}
-void Channel::resumeDtx(const std::string& xid){
+void Channel::resumeDtx(const std::string& xid)
+{
if (dtxBuffer->getXid() != xid) {
throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on resume")
% dtxBuffer->getXid() % xid);
@@ -199,20 +213,22 @@ void Channel::record(const DeliveryRecord& delivery)
delivery.addTo(&outstanding);
}
-bool Channel::checkPrefetch(Message::shared_ptr& msg){
+bool Channel::checkPrefetch(Message::shared_ptr& msg)
+{
Mutex::ScopedLock locker(deliveryLock);
bool countOk = !prefetchCount || prefetchCount > unacked.size();
bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstanding.size || unacked.empty();
return countOk && sizeOk;
}
-Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, std::auto_ptr<DeliveryAdapter> _adapter,
+Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, DeliveryToken::shared_ptr _token,
const string& _tag, Queue::shared_ptr _queue,
ConnectionToken* const _connection, bool ack
- ) : parent(_parent), adapter(_adapter), tag(_tag), queue(_queue), connection(_connection),
+ ) : parent(_parent), token(_token), tag(_tag), queue(_queue), connection(_connection),
ackExpected(ack), blocked(false) {}
-bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){
+bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg)
+{
if(!connection || connection != msg->getPublisher()){//check for no_local
if(!parent->flowActive || (ackExpected && !parent->checkPrefetch(msg))){
blocked = true;
@@ -220,11 +236,10 @@ bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){
blocked = false;
Mutex::ScopedLock locker(parent->deliveryLock);
- uint64_t deliveryTag = adapter->getNextDeliveryTag();
+ uint64_t deliveryTag = parent->out.deliver(msg, token);
if(ackExpected){
parent->record(DeliveryRecord(msg, queue, tag, deliveryTag));
}
- adapter->deliver(msg, deliveryTag);
return true;
}
@@ -234,14 +249,15 @@ bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){
void Channel::ConsumerImpl::redeliver(Message::shared_ptr& msg, uint64_t deliveryTag) {
Mutex::ScopedLock locker(parent->deliveryLock);
- adapter->deliver(msg, deliveryTag);
+ parent->out.redeliver(msg, token, deliveryTag);
}
Channel::ConsumerImpl::~ConsumerImpl() {
cancel();
}
-void Channel::ConsumerImpl::cancel(){
+void Channel::ConsumerImpl::cancel()
+{
if(queue) {
queue->cancel(this);
if (queue->canAutoDelete()) {
@@ -251,27 +267,32 @@ void Channel::ConsumerImpl::cancel(){
}
}
-void Channel::ConsumerImpl::requestDispatch(){
+void Channel::ConsumerImpl::requestDispatch()
+{
if(blocked)
queue->requestDispatch();
}
-void Channel::handleInlineTransfer(Message::shared_ptr msg){
+void Channel::handleInlineTransfer(Message::shared_ptr msg)
+{
complete(msg);
}
-void Channel::handlePublish(Message* _message){
+void Channel::handlePublish(Message* _message)
+{
Message::shared_ptr message(_message);
messageBuilder.initialise(message);
}
-void Channel::handleHeader(AMQHeaderBody::shared_ptr header){
+void Channel::handleHeader(AMQHeaderBody::shared_ptr header)
+{
messageBuilder.setHeader(header);
//at this point, decide based on the size of the message whether we want
//to stage it by saving content directly to disk as it arrives
}
-void Channel::handleContent(AMQContentBody::shared_ptr content){
+void Channel::handleContent(AMQContentBody::shared_ptr content)
+{
messageBuilder.addContent(content);
}
@@ -306,14 +327,16 @@ void Channel::route(Message::shared_ptr msg, Deliverable& strategy) {
}
// Used by Basic
-void Channel::ack(uint64_t deliveryTag, bool multiple){
+void Channel::ack(uint64_t deliveryTag, bool multiple)
+{
if (multiple)
ack(0, deliveryTag);
else
ack(deliveryTag, deliveryTag);
}
-void Channel::ack(uint64_t firstTag, uint64_t lastTag){
+void Channel::ack(uint64_t firstTag, uint64_t lastTag)
+{
if (txBuffer.get()) {
accumulatedAck.update(firstTag, lastTag);
//TODO: I think the outstanding prefetch size & count should be updated at this point...
@@ -355,7 +378,8 @@ void Channel::ack(uint64_t firstTag, uint64_t lastTag){
}
}
-void Channel::recover(bool requeue){
+void Channel::recover(bool requeue)
+{
Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
if(requeue){
@@ -368,12 +392,12 @@ void Channel::recover(bool requeue){
}
}
-bool Channel::get(DeliveryAdapter& adapter, Queue::shared_ptr queue, bool ackExpected){
+bool Channel::get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected)
+{
Message::shared_ptr msg = queue->dequeue();
if(msg){
Mutex::ScopedLock locker(deliveryLock);
- uint64_t myDeliveryTag = adapter.getNextDeliveryTag();
- adapter.deliver(msg, myDeliveryTag);
+ uint64_t myDeliveryTag = out.deliver(msg, token);
if(ackExpected){
unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag));
}
diff --git a/cpp/src/qpid/broker/BrokerChannel.h b/cpp/src/qpid/broker/BrokerChannel.h
index a70dce0ce8..e9672c96d7 100644
--- a/cpp/src/qpid/broker/BrokerChannel.h
+++ b/cpp/src/qpid/broker/BrokerChannel.h
@@ -33,6 +33,7 @@
#include "Consumer.h"
#include "DeliveryAdapter.h"
#include "DeliveryRecord.h"
+#include "DeliveryToken.h"
#include "Deliverable.h"
#include "DtxBuffer.h"
#include "DtxManager.h"
@@ -64,7 +65,7 @@ class Channel : public CompletionHandler
class ConsumerImpl : public Consumer
{
Channel* parent;
- std::auto_ptr<DeliveryAdapter> adapter;
+ DeliveryToken::shared_ptr token;
const string tag;
Queue::shared_ptr queue;
ConnectionToken* const connection;
@@ -72,7 +73,7 @@ class Channel : public CompletionHandler
bool blocked;
public:
- ConsumerImpl(Channel* parent, std::auto_ptr<DeliveryAdapter> adapter,
+ ConsumerImpl(Channel* parent, DeliveryToken::shared_ptr token,
const string& tag, Queue::shared_ptr queue,
ConnectionToken* const connection, bool ack);
~ConsumerImpl();
@@ -86,6 +87,7 @@ class Channel : public CompletionHandler
framing::ChannelId id;
Connection& connection;
+ DeliveryAdapter& out;
uint64_t currentDeliveryTag;
Queue::shared_ptr defaultQueue;
ConsumerImplMap consumers;
@@ -110,7 +112,7 @@ class Channel : public CompletionHandler
void checkDtxTimeout();
public:
- Channel(Connection& parent, framing::ChannelId id, MessageStore* const store = 0);
+ Channel(Connection& parent, DeliveryAdapter& out, framing::ChannelId id, MessageStore* const store = 0);
~Channel();
bool isOpen() const { return opened; }
@@ -127,11 +129,11 @@ class Channel : public CompletionHandler
/**
*@param tagInOut - if empty it is updated with the generated token.
*/
- void consume(std::auto_ptr<DeliveryAdapter> adapter, string& tagInOut, Queue::shared_ptr queue, bool acks,
+ void consume(DeliveryToken::shared_ptr token, string& tagInOut, Queue::shared_ptr queue, bool acks,
bool exclusive, ConnectionToken* const connection = 0,
const framing::FieldTable* = 0);
void cancel(const string& tag);
- bool get(DeliveryAdapter& adapter, Queue::shared_ptr queue, bool ackExpected);
+ bool get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected);
void close();
void startTx();
void commit();
diff --git a/cpp/src/qpid/broker/BrokerMessage.cpp b/cpp/src/qpid/broker/BrokerMessage.cpp
index d192b09a63..bf0e37e8e3 100644
--- a/cpp/src/qpid/broker/BrokerMessage.cpp
+++ b/cpp/src/qpid/broker/BrokerMessage.cpp
@@ -26,6 +26,7 @@
#include "InMemoryContent.h"
#include "LazyLoadedContent.h"
#include "MessageStore.h"
+#include "BrokerQueue.h"
#include "qpid/log/Statement.h"
#include "qpid/framing/BasicDeliverBody.h"
#include "qpid/framing/BasicGetOkBody.h"
@@ -37,6 +38,30 @@
#include "qpid/framing/ChannelAdapter.h"
#include "RecoveryManagerImpl.h"
+namespace qpid{
+namespace broker{
+
+struct BasicGetToken : DeliveryToken
+{
+ typedef boost::shared_ptr<BasicGetToken> shared_ptr;
+
+ Queue::shared_ptr queue;
+
+ BasicGetToken(Queue::shared_ptr q) : queue(q) {}
+};
+
+struct BasicConsumeToken : DeliveryToken
+{
+ typedef boost::shared_ptr<BasicConsumeToken> shared_ptr;
+
+ const string consumer;
+
+ BasicConsumeToken(const string c) : consumer(c) {}
+};
+
+}
+}
+
using namespace boost;
using namespace qpid::broker;
using namespace qpid::framing;
@@ -74,6 +99,16 @@ bool BasicMessage::isComplete(){
return header.get() && (header->getContentSize() == contentSize());
}
+DeliveryToken::shared_ptr BasicMessage::createGetToken(Queue::shared_ptr queue)
+{
+ return DeliveryToken::shared_ptr(new BasicGetToken(queue));
+}
+
+DeliveryToken::shared_ptr BasicMessage::createConsumeToken(const string& consumer)
+{
+ return DeliveryToken::shared_ptr(new BasicConsumeToken(consumer));
+}
+
void BasicMessage::deliver(ChannelAdapter& channel,
const string& consumerTag, uint64_t deliveryTag,
uint32_t framesize)
@@ -86,23 +121,39 @@ void BasicMessage::deliver(ChannelAdapter& channel,
}
void BasicMessage::sendGetOk(ChannelAdapter& channel,
- const std::string& /*destination*/,
uint32_t messageCount,
- uint64_t responseTo,
+ uint64_t /*responseTo*/,
uint64_t deliveryTag,
uint32_t framesize)
{
channel.send(make_shared_ptr(
new BasicGetOkBody(
channel.getVersion(),
- responseTo,
+ //responseTo,
deliveryTag, getRedelivered(), getExchange(),
getRoutingKey(), messageCount)));
sendContent(channel, framesize);
}
-void BasicMessage::sendContent(
- ChannelAdapter& channel, uint32_t framesize)
+void BasicMessage::deliver(framing::ChannelAdapter& channel, uint64_t deliveryTag, DeliveryToken::shared_ptr token, uint32_t framesize)
+{
+ BasicConsumeToken::shared_ptr consume = dynamic_pointer_cast<BasicConsumeToken>(token);
+ if (consume) {
+ deliver(channel, consume->consumer, deliveryTag, framesize);
+ } else {
+ BasicGetToken::shared_ptr get = dynamic_pointer_cast<BasicGetToken>(token);
+ if (get) {
+ uint64_t request(1/*actual value doesn't affect anything at present*/);
+ sendGetOk(channel, get->queue->getMessageCount(), request, deliveryTag, framesize);
+ } else {
+ //TODO:
+ //either need to be able to convert to a message transfer or
+ //throw error of some kind to allow this to be handled higher up
+ }
+ }
+}
+
+void BasicMessage::sendContent(ChannelAdapter& channel, uint32_t framesize)
{
channel.send(header);
Mutex::ScopedLock locker(contentLock);
diff --git a/cpp/src/qpid/broker/BrokerMessage.h b/cpp/src/qpid/broker/BrokerMessage.h
index 2e031d0bb2..e6483b4733 100644
--- a/cpp/src/qpid/broker/BrokerMessage.h
+++ b/cpp/src/qpid/broker/BrokerMessage.h
@@ -43,6 +43,7 @@ class AMQHeaderBody;
namespace broker {
class MessageStore;
+class Queue;
using framing::string;
/**
@@ -70,13 +71,16 @@ class BasicMessage : public Message {
void addContent(framing::AMQContentBody::shared_ptr data);
bool isComplete();
+ static DeliveryToken::shared_ptr createGetToken(boost::shared_ptr<Queue> queue);
+ static DeliveryToken::shared_ptr createConsumeToken(const string& consumer);
+ void deliver(framing::ChannelAdapter& channel, uint64_t deliveryTag, DeliveryToken::shared_ptr token, uint32_t framesize);
+
void deliver(framing::ChannelAdapter&,
const string& consumerTag,
uint64_t deliveryTag,
uint32_t framesize);
void sendGetOk(framing::ChannelAdapter& channel,
- const std::string& destination,
uint32_t messageCount,
uint64_t responseTo,
uint64_t deliveryTag,
diff --git a/cpp/src/qpid/broker/BrokerMessageBase.h b/cpp/src/qpid/broker/BrokerMessageBase.h
index 73af3935a8..d9269fa94f 100644
--- a/cpp/src/qpid/broker/BrokerMessageBase.h
+++ b/cpp/src/qpid/broker/BrokerMessageBase.h
@@ -25,6 +25,7 @@
#include <string>
#include <boost/shared_ptr.hpp>
#include "Content.h"
+#include "DeliveryToken.h"
#include "PersistableMessage.h"
#include "qpid/framing/amqp_types.h"
@@ -91,23 +92,9 @@ class Message : public PersistableMessage{
void setPersistenceId(uint64_t _persistenceId) const { persistenceId = _persistenceId; }
void redeliver() { redelivered = true; }
- /**
- * Used to deliver the message from the queue
- */
- virtual void deliver(framing::ChannelAdapter& channel,
- const std::string& consumerTag,
- uint64_t deliveryTag,
- uint32_t framesize) = 0;
- /**
- * Used to return a message in response to a get from a queue
- */
- virtual void sendGetOk(framing::ChannelAdapter& channel,
- const std::string& destination,
- uint32_t messageCount,
- uint64_t responseTo,
- uint64_t deliveryTag,
- uint32_t framesize) = 0;
-
+ virtual void deliver(framing::ChannelAdapter& channel, uint64_t deliveryTag/*only needed for basic class*/,
+ DeliveryToken::shared_ptr token, uint32_t framesize) = 0;
+
virtual bool isComplete() = 0;
virtual uint64_t contentSize() const = 0;
diff --git a/cpp/src/qpid/broker/BrokerMessageMessage.cpp b/cpp/src/qpid/broker/BrokerMessageMessage.cpp
index efa295e44f..8e8eaf23f0 100644
--- a/cpp/src/qpid/broker/BrokerMessageMessage.cpp
+++ b/cpp/src/qpid/broker/BrokerMessageMessage.cpp
@@ -34,10 +34,18 @@
#include <algorithm>
using namespace std;
+using namespace boost;
using namespace qpid::framing;
namespace qpid {
namespace broker {
+
+struct MessageDeliveryToken : public DeliveryToken
+{
+ const std::string destination;
+
+ MessageDeliveryToken(const std::string& d) : destination(d) {}
+};
MessageMessage::MessageMessage(
ConnectionToken* publisher, RequestId requestId_, TransferPtr transfer_
@@ -179,22 +187,13 @@ void MessageMessage::transferMessage(
channel.send(make_shared_ptr(new MessageCloseBody(channel.getVersion(), ref->getId())));
}
-void MessageMessage::deliver(
- framing::ChannelAdapter& channel,
- const std::string& consumerTag,
- uint64_t /*deliveryTag*/,
- uint32_t framesize)
+
+void MessageMessage::deliver(ChannelAdapter& channel, uint64_t, DeliveryToken::shared_ptr token, uint32_t framesize)
{
- transferMessage(channel, consumerTag, framesize);
+ transferMessage(channel, shared_polymorphic_cast<MessageDeliveryToken>(token)->destination, framesize);
}
-void MessageMessage::sendGetOk(
- framing::ChannelAdapter& channel,
- const std::string& destination,
- uint32_t /*messageCount*/,
- uint64_t /*responseTo*/,
- uint64_t /*deliveryTag*/,
- uint32_t framesize)
+void MessageMessage::deliver(ChannelAdapter& channel, const std::string& destination, uint32_t framesize)
{
transferMessage(channel, destination, framesize);
}
@@ -321,6 +320,10 @@ MessageMessage::ReferencePtr MessageMessage::getReference() const {
return reference;
}
+DeliveryToken::shared_ptr MessageMessage::getToken(const std::string& destination)
+{
+ return DeliveryToken::shared_ptr(new MessageDeliveryToken(destination));
+}
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/BrokerMessageMessage.h b/cpp/src/qpid/broker/BrokerMessageMessage.h
index c2d4b7f20b..612f457ae4 100644
--- a/cpp/src/qpid/broker/BrokerMessageMessage.h
+++ b/cpp/src/qpid/broker/BrokerMessageMessage.h
@@ -53,17 +53,8 @@ class MessageMessage: public Message{
TransferPtr getTransfer() const { return transfer; }
ReferencePtr getReference() const ;
- void deliver(framing::ChannelAdapter& channel,
- const std::string& consumerTag,
- uint64_t deliveryTag,
- uint32_t framesize);
-
- void sendGetOk(framing::ChannelAdapter& channel,
- const std::string& destination,
- uint32_t messageCount,
- uint64_t responseTo,
- uint64_t deliveryTag,
- uint32_t framesize);
+ void deliver(framing::ChannelAdapter& channel, uint64_t deliveryTag, DeliveryToken::shared_ptr token, uint32_t framesize);
+ void deliver(framing::ChannelAdapter&, const std::string& destination, uint32_t framesize);
bool isComplete();
@@ -81,6 +72,8 @@ class MessageMessage: public Message{
void decodeHeader(framing::Buffer& buffer);
void decodeContent(framing::Buffer& buffer, uint32_t contentChunkSize = 0);
+ static DeliveryToken::shared_ptr getToken(const std::string& destination);
+
private:
void transferMessage(
framing::ChannelAdapter& channel,
diff --git a/cpp/src/qpid/broker/ConsumeAdapter.cpp b/cpp/src/qpid/broker/ConsumeAdapter.cpp
deleted file mode 100644
index 59b6795a77..0000000000
--- a/cpp/src/qpid/broker/ConsumeAdapter.cpp
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "ConsumeAdapter.h"
-
-using namespace qpid::broker;
-using qpid::framing::ChannelAdapter;
-using qpid::framing::RequestId;
-
-ConsumeAdapter::ConsumeAdapter(ChannelAdapter& a, const std::string t, uint32_t f) : adapter(a), tag(t), framesize(f) {}
-
-RequestId ConsumeAdapter::getNextDeliveryTag()
-{
- return adapter.getNextSendRequestId();
-}
-
-void ConsumeAdapter::deliver(Message::shared_ptr& msg, RequestId deliveryTag)
-{
- msg->deliver(adapter, tag, deliveryTag, framesize);
-}
diff --git a/cpp/src/qpid/broker/DeliveryAdapter.h b/cpp/src/qpid/broker/DeliveryAdapter.h
index 45b103bd68..971f4095cf 100644
--- a/cpp/src/qpid/broker/DeliveryAdapter.h
+++ b/cpp/src/qpid/broker/DeliveryAdapter.h
@@ -22,11 +22,13 @@
#define _DeliveryAdapter_
#include "BrokerMessageBase.h"
+#include "DeliveryToken.h"
#include "qpid/framing/amqp_types.h"
namespace qpid {
namespace broker {
+ typedef framing::RequestId DeliveryId;
/**
* The intention behind this interface is to separate the generic
* handling of some form of message delivery to clients that is
@@ -40,8 +42,8 @@ namespace broker {
class DeliveryAdapter
{
public:
- virtual framing::RequestId getNextDeliveryTag() = 0;
- virtual void deliver(Message::shared_ptr& msg, framing::RequestId tag) = 0;
+ virtual DeliveryId deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token) = 0;
+ virtual void redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag) = 0;
virtual ~DeliveryAdapter(){}
};
diff --git a/cpp/src/qpid/broker/ConsumeAdapter.h b/cpp/src/qpid/broker/DeliveryToken.h
index 43cda7753e..8bdf5e6359 100644
--- a/cpp/src/qpid/broker/ConsumeAdapter.h
+++ b/cpp/src/qpid/broker/DeliveryToken.h
@@ -18,23 +18,25 @@
* under the License.
*
*/
-#ifndef _ConsumeAdapter_
-#define _ConsumeAdapter_
+#ifndef _DeliveryToken_
+#define _DeliveryToken_
-#include "DeliveryAdapter.h"
-#include "qpid/framing/ChannelAdapter.h"
+#include <boost/shared_ptr.hpp>
namespace qpid {
namespace broker {
- class ConsumeAdapter : public DeliveryAdapter
+
+ /**
+ * A DeliveryToken allows the delivery of a message to be
+ * associated with whatever mechanism caused it to be
+ * delivered. (i.e. its a form of Memento).
+ */
+ class DeliveryToken
{
- framing::ChannelAdapter& adapter;
- const std::string tag;
- const uint32_t framesize;
public:
- ConsumeAdapter(framing::ChannelAdapter& adapter, const std::string tag, uint32_t framesize);
- framing::RequestId getNextDeliveryTag();
- void deliver(Message::shared_ptr& msg, framing::RequestId tag);
+ typedef boost::shared_ptr<DeliveryToken> shared_ptr;
+
+ virtual ~DeliveryToken(){}
};
}}
diff --git a/cpp/src/qpid/broker/GetAdapter.cpp b/cpp/src/qpid/broker/GetAdapter.cpp
deleted file mode 100644
index bbffade712..0000000000
--- a/cpp/src/qpid/broker/GetAdapter.cpp
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "GetAdapter.h"
-#include "qpid/framing/MethodContext.h"
-
-using namespace qpid::broker;
-using qpid::framing::ChannelAdapter;
-using qpid::framing::RequestId;
-using qpid::framing::MethodContext;
-
-GetAdapter::GetAdapter(ChannelAdapter& a, Queue::shared_ptr q, const std::string d, uint32_t f)
- : adapter(a), queue(q), destination(d), framesize(f) {}
-
-RequestId GetAdapter::getNextDeliveryTag()
-{
- return adapter.getNextSendRequestId();
-}
-
-void GetAdapter::deliver(Message::shared_ptr& msg, framing::RequestId deliveryTag)
-{
- msg->sendGetOk(adapter, destination, queue->getMessageCount(), 1, deliveryTag, framesize);
-}
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
index 41dd8cc145..da57439e21 100644
--- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
@@ -21,8 +21,6 @@
#include "BrokerChannel.h"
#include "qpid/framing/FramingContent.h"
#include "Connection.h"
-#include "ConsumeAdapter.h"
-#include "GetAdapter.h"
#include "Broker.h"
#include "BrokerMessageMessage.h"
#include "qpid/framing/MessageAppendBody.h"
@@ -45,66 +43,44 @@ void
MessageHandlerImpl::cancel(const string& destination )
{
channel.cancel(destination);
- //client.ok();
}
void
-MessageHandlerImpl::open(const string& reference)
+MessageHandlerImpl::open(const string& /*reference*/)
{
- references.open(reference);
- //client.ok();
+ throw ConnectionException(540, "References no longer supported");
}
void
-MessageHandlerImpl::append(const framing::MethodContext& context)
+MessageHandlerImpl::append(const framing::MethodContext& /*context*/)
{
- MessageAppendBody::shared_ptr body(boost::shared_polymorphic_downcast<MessageAppendBody>(context.methodBody));
- references.get(body->getReference())->append(body);
- //client.ok();
+ throw ConnectionException(540, "References no longer supported");
}
void
-MessageHandlerImpl::close(const string& reference)
+MessageHandlerImpl::close(const string& /*reference*/)
{
- Reference::shared_ptr ref = references.get(reference);
- //client.ok();
-
- // Send any transfer messages to their correct exchanges and okay them
- const Reference::Messages& msgs = ref->getMessages();
- for (Reference::Messages::const_iterator m = msgs.begin(); m != msgs.end(); ++m) {
- channel.handleInlineTransfer(*m);
- client.setResponseTo((*m)->getRequestId());
- client.ok();
- }
- ref->close();
+ throw ConnectionException(540, "References no longer supported");
}
void
MessageHandlerImpl::checkpoint(const string& /*reference*/,
const string& /*identifier*/ )
{
- // Initial implementation (which is conforming) is to do nothing here
- // and return offset zero for the resume
- //client.ok();
+ throw ConnectionException(540, "References no longer supported");
}
void
-MessageHandlerImpl::resume(const string& reference,
+MessageHandlerImpl::resume(const string& /*reference*/,
const string& /*identifier*/ )
{
- // Initial (null) implementation
- // open reference and return 0 offset
- references.open(reference);
- client.offset(0);
+ throw ConnectionException(540, "References no longer supported");
}
void
MessageHandlerImpl::offset(uint64_t /*value*/ )
{
- // Shouldn't ever receive this as it is reponse to resume
- // which is never sent
- // TODO astitcher 2007-02-16 What is the correct exception to throw here?
- THROW_QPID_ERROR(INTERNAL_ERROR, "impossible");
+ throw ConnectionException(540, "References no longer supported");
}
void
@@ -120,14 +96,12 @@ MessageHandlerImpl::consume(uint16_t /*ticket*/,
if(!destination.empty() && channel.exists(destination))
throw ConnectionException(530, "Consumer tags must be unique");
string tag = destination;
- channel.consume(std::auto_ptr<DeliveryAdapter>(new ConsumeAdapter(adapter, destination, connection.getFrameMax())),
- tag, queue, !noAck, exclusive,
- noLocal ? &connection : 0, &filter);
- //client.ok();
+ channel.consume(MessageMessage::getToken(destination), tag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter);
// Dispatch messages as there is now a consumer.
queue->requestDispatch();
}
+
void
MessageHandlerImpl::get(uint16_t /*ticket*/,
const string& queueName,
@@ -136,11 +110,11 @@ MessageHandlerImpl::get(uint16_t /*ticket*/,
{
Queue::shared_ptr queue = getQueue(queueName);
- GetAdapter out(adapter, queue, destination, connection.getFrameMax());
- if(channel.get(out, queue, !noAck)) {
- client.ok();
+ if (channel.get(MessageMessage::getToken(destination), queue, !noAck)){
+ //don't send any response... rely on execution completion
} else {
- client.empty();
+ //temporarily disabled:
+ //client.empty();
}
}
@@ -167,14 +141,12 @@ MessageHandlerImpl::qos(uint32_t prefetchSize,
//TODO: handle global
channel.setPrefetchSize(prefetchSize);
channel.setPrefetchCount(prefetchCount);
- //client.ok();
}
void
MessageHandlerImpl::recover(bool requeue)
{
channel.recover(requeue);
- //client.ok();
}
void
@@ -193,11 +165,8 @@ MessageHandlerImpl::transfer(const framing::MethodContext& context)
if (transfer->getBody().isInline()) {
MessageMessage::shared_ptr message(new MessageMessage(&connection, requestId, transfer));
channel.handleInlineTransfer(message);
- client.ok();
} else {
- Reference::shared_ptr ref(references.get(transfer->getBody().getValue()));
- MessageMessage::shared_ptr message(new MessageMessage(&connection, requestId, transfer, ref));
- ref->addMessage(message);
+ throw ConnectionException(540, "References no longer supported");
}
}
diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp
index 2b1de1bbc0..e9ec698400 100644
--- a/cpp/src/qpid/broker/SemanticHandler.cpp
+++ b/cpp/src/qpid/broker/SemanticHandler.cpp
@@ -25,10 +25,11 @@
using namespace qpid::broker;
using namespace qpid::framing;
+using namespace qpid::sys;
SemanticHandler::SemanticHandler(ChannelId id, Connection& c) :
connection(c),
- channel(c, id, &c.broker.getStore())
+ channel(c, *this, id, &c.broker.getStore())
{
init(id, connection.getOutput(), connection.getVersion());
adapter = std::auto_ptr<BrokerAdapter>(new BrokerAdapter(channel, connection, connection.broker, *this));
@@ -75,10 +76,24 @@ void SemanticHandler::handleMethodInContext(boost::shared_ptr<qpid::framing::AMQ
}
}
-void SemanticHandler::complete(uint32_t mark, uint16_t /*range- not decoded correctly yet*/)
+void SemanticHandler::complete(uint32_t cumulative, SequenceNumberSet range)
{
- //just record it for now (will eventually need to use it to ack messages):
- outgoing.lwm = SequenceNumber(mark);
+ //record:
+ SequenceNumber mark(cumulative);
+ if (outgoing.lwm < mark) {
+ outgoing.lwm = mark;
+ //ack messages:
+ channel.ack(mark.getValue(), true);
+ //std::cout << "[" << this << "] acknowledged: " << mark << std::endl;
+ }
+ if (range.size() % 2) { //must be even number
+ throw ConnectionException(530, "Received odd number of elements in ranged mark");
+ } else {
+ //TODO: need to keep a record of the full range previously acked
+ for (SequenceNumberSet::iterator i = range.begin(); i != range.end(); i++) {
+ channel.ack((uint64_t) i->getValue(), (uint64_t) (++i)->getValue());
+ }
+ }
}
void SemanticHandler::flush()
@@ -86,8 +101,8 @@ void SemanticHandler::flush()
//flush doubles as a sync to begin with - send an execution.complete
incoming.lwm = incoming.hwm;
if (isOpen()) {
- /*use dummy value for range which is not yet encoded correctly*/
- send(make_shared_ptr(new ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), 0)));
+ Mutex::ScopedLock l(outLock);
+ ChannelAdapter::send(make_shared_ptr(new ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), SequenceNumberSet())));
}
}
@@ -140,3 +155,28 @@ void SemanticHandler::handleHeartbeat(boost::shared_ptr<qpid::framing::AMQHeartb
channel.handleHeartbeat(body);
}
+DeliveryId SemanticHandler::deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token)
+{
+ Mutex::ScopedLock l(outLock);
+ SequenceNumber copy(outgoing.hwm);
+ ++copy;
+ msg->deliver(*this, copy.getValue(), token, connection.getFrameMax());
+ //std::cout << "[" << this << "] delivered: " << outgoing.hwm.getValue() << std::endl;
+ return outgoing.hwm.getValue();
+}
+
+void SemanticHandler::redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag)
+{
+ msg->deliver(*this, tag, token, connection.getFrameMax());
+}
+
+RequestId SemanticHandler::send(shared_ptr<AMQBody> body, Correlator::Action action)
+{
+ Mutex::ScopedLock l(outLock);
+ uint8_t type(body->type());
+ if (type == REQUEST_BODY || type == RESPONSE_BODY || type == METHOD_BODY) {
+ ++outgoing.hwm;
+ //std::cout << "[" << this << "] allocated: " << outgoing.hwm.getValue() << " to " << *body << std::endl;
+ }
+ return ChannelAdapter::send(body, action);
+}
diff --git a/cpp/src/qpid/broker/SemanticHandler.h b/cpp/src/qpid/broker/SemanticHandler.h
index a57559d043..b863b3486e 100644
--- a/cpp/src/qpid/broker/SemanticHandler.h
+++ b/cpp/src/qpid/broker/SemanticHandler.h
@@ -24,6 +24,7 @@
#include <memory>
#include "BrokerChannel.h"
#include "Connection.h"
+#include "DeliveryAdapter.h"
#include "qpid/framing/amqp_types.h"
#include "qpid/framing/AMQP_ServerOperations.h"
#include "qpid/framing/FrameHandler.h"
@@ -36,6 +37,7 @@ class BrokerAdapter;
class framing::ChannelAdapter;
class SemanticHandler : private framing::ChannelAdapter,
+ private DeliveryAdapter,
public framing::FrameHandler,
public framing::AMQP_ServerOperations::ExecutionHandler
{
@@ -44,6 +46,7 @@ class SemanticHandler : private framing::ChannelAdapter,
std::auto_ptr<BrokerAdapter> adapter;
framing::Window incoming;
framing::Window outgoing;
+ sys::Mutex outLock;
void handleL4(boost::shared_ptr<qpid::framing::AMQMethodBody> method,
const qpid::framing::MethodContext& context);
@@ -55,12 +58,22 @@ class SemanticHandler : private framing::ChannelAdapter,
void handleHeader(boost::shared_ptr<qpid::framing::AMQHeaderBody>);
void handleContent(boost::shared_ptr<qpid::framing::AMQContentBody>);
void handleHeartbeat(boost::shared_ptr<qpid::framing::AMQHeartbeatBody>);
+
+ framing::RequestId send(shared_ptr<framing::AMQBody> body, framing::Correlator::Action action=framing::Correlator::Action());
+
+
+ //delivery adapter methods:
+ DeliveryId deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token);
+ void redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag);
+
public:
SemanticHandler(framing::ChannelId id, Connection& c);
+
+ //frame handler:
void handle(framing::AMQFrame& frame);
//execution class method handlers:
- void complete(uint32_t cumulativeExecutionMark, uint16_t);
+ void complete(uint32_t cumulativeExecutionMark, framing::SequenceNumberSet range);
void flush();
};
diff --git a/cpp/src/qpid/client/ClientChannel.cpp b/cpp/src/qpid/client/ClientChannel.cpp
index 0033cbdbe4..19b4726a72 100644
--- a/cpp/src/qpid/client/ClientChannel.cpp
+++ b/cpp/src/qpid/client/ClientChannel.cpp
@@ -77,7 +77,7 @@ void Channel::protocolInit(
connection->connector->init(); // Send ProtocolInit block.
ConnectionStartBody::shared_ptr connectionStart =
responses.receive<ConnectionStartBody>();
-
+
FieldTable props;
string mechanism("PLAIN");
string response = ((char)0) + uid + ((char)0) + pwd;
@@ -85,7 +85,7 @@ void Channel::protocolInit(
ConnectionTuneBody::shared_ptr proposal =
sendAndReceive<ConnectionTuneBody>(
make_shared_ptr(new ConnectionStartOkBody(
- version, connectionStart->getRequestId(),
+ version, //connectionStart->getRequestId(),
props, mechanism,
response, locale)));
@@ -98,7 +98,7 @@ void Channel::protocolInit(
**/
sendCommand(make_shared_ptr(new ConnectionTuneOkBody(
- version, proposal->getRequestId(),
+ version, //proposal->getRequestId(),
proposal->getChannelMax(), connection->getMaxFrameSize(),
proposal->getHeartbeat())));
@@ -222,10 +222,10 @@ AMQMethodBody::shared_ptr method, const MethodContext& ctxt)
}
}
-void Channel::handleChannel(AMQMethodBody::shared_ptr method, const MethodContext& ctxt) {
+void Channel::handleChannel(AMQMethodBody::shared_ptr method, const MethodContext& /*ctxt*/) {
switch (method->amqpMethodId()) {
case ChannelCloseBody::METHOD_ID:
- sendCommand(make_shared_ptr(new ChannelCloseOkBody(version, ctxt.getRequestId())));
+ sendCommand(make_shared_ptr(new ChannelCloseOkBody(version/*, ctxt.getRequestId()*/)));
peerClose(shared_polymorphic_downcast<ChannelCloseBody>(method));
return;
case ChannelFlowBody::METHOD_ID:
diff --git a/cpp/src/qpid/cluster/SessionManager.cpp b/cpp/src/qpid/cluster/SessionManager.cpp
index 9f6438cf92..88ddfe843f 100644
--- a/cpp/src/qpid/cluster/SessionManager.cpp
+++ b/cpp/src/qpid/cluster/SessionManager.cpp
@@ -36,7 +36,7 @@ using namespace sys;
using namespace broker;
/** Handler to send frames direct to local broker (bypass correlation etc.) */
-struct BrokerHandler : public FrameHandler, private ChannelAdapter {
+ struct BrokerHandler : public FrameHandler, private ChannelAdapter, private DeliveryAdapter {
Connection connection;
Channel channel;
BrokerAdapter adapter;
@@ -51,7 +51,7 @@ struct BrokerHandler : public FrameHandler, private ChannelAdapter {
//
BrokerHandler(Broker& broker) :
connection(0, broker),
- channel(connection, 1, 0),
+ channel(connection, *this, 1, 0),
adapter(channel, connection, broker, *this) {}
void handle(AMQFrame& frame) {
@@ -68,6 +68,10 @@ struct BrokerHandler : public FrameHandler, private ChannelAdapter {
virtual void handleMethodInContext(shared_ptr<AMQMethodBody>, const MethodContext&){}
// No-op send.
virtual RequestId send(shared_ptr<AMQBody>, Correlator::Action) { return 0; }
+
+ //delivery adapter methods, also no-ops:
+ virtual DeliveryId deliver(Message::shared_ptr&, DeliveryToken::shared_ptr) { return 0; }
+ virtual void redeliver(Message::shared_ptr&, DeliveryToken::shared_ptr, DeliveryId) {}
};
/** Wrap plain AMQFrames in SessionFrames */
diff --git a/cpp/src/qpid/framing/AMQMethodBody.cpp b/cpp/src/qpid/framing/AMQMethodBody.cpp
index 04941eaa58..eb34d48c5f 100644
--- a/cpp/src/qpid/framing/AMQMethodBody.cpp
+++ b/cpp/src/qpid/framing/AMQMethodBody.cpp
@@ -60,4 +60,9 @@ void AMQMethodBody::decode(Buffer& buffer, uint32_t /*size*/) {
decodeContent(buffer);
}
+void AMQMethodBody::encode(Buffer& buffer) const {
+ encodeId(buffer);
+ encodeContent(buffer);
+}
+
}} // namespace qpid::framing
diff --git a/cpp/src/qpid/framing/AMQMethodBody.h b/cpp/src/qpid/framing/AMQMethodBody.h
index 55cf5cb864..2b46c6ea00 100644
--- a/cpp/src/qpid/framing/AMQMethodBody.h
+++ b/cpp/src/qpid/framing/AMQMethodBody.h
@@ -47,6 +47,7 @@ class AMQMethodBody : public AMQBody
AMQMethodBody(ProtocolVersion ver) : version(ver) {}
virtual ~AMQMethodBody() {}
void decode(Buffer&, uint32_t);
+ virtual void encode(Buffer& buffer) const;
virtual MethodId amqpMethodId() const = 0;
virtual ClassId amqpClassId() const = 0;
@@ -64,8 +65,8 @@ class AMQMethodBody : public AMQBody
virtual bool isRequest() const { return false; }
virtual bool isResponse() const { return false; }
- protected:
static uint32_t baseSize() { return 4; }
+ protected:
struct ClassMethodId {
uint16_t classId;
@@ -76,6 +77,9 @@ class AMQMethodBody : public AMQBody
void encodeId(Buffer& buffer) const;
virtual void encodeContent(Buffer& buffer) const = 0;
virtual void decodeContent(Buffer& buffer) = 0;
+
+ virtual void printPrefix(std::ostream&) const {}
+
};
diff --git a/cpp/src/qpid/framing/SequenceNumberSet.cpp b/cpp/src/qpid/framing/SequenceNumberSet.cpp
new file mode 100644
index 0000000000..357b5dabd7
--- /dev/null
+++ b/cpp/src/qpid/framing/SequenceNumberSet.cpp
@@ -0,0 +1,61 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "SequenceNumberSet.h"
+
+using namespace qpid::framing;
+
+void SequenceNumberSet::encode(Buffer& buffer) const
+{
+ buffer.putShort(size());
+ for (const_iterator i = begin(); i != end(); i++) {
+ buffer.putLong(i->getValue());
+ }
+}
+
+void SequenceNumberSet::decode(Buffer& buffer)
+{
+ uint16_t count = buffer.getShort();
+ for (uint16_t i = 0; i < count; i++) {
+ push_back(SequenceNumber(buffer.getLong()));
+ }
+}
+
+uint32_t SequenceNumberSet::encodedSize() const
+{
+ return 2 /*count*/ + (size() * 4);
+}
+
+namespace qpid{
+namespace framing{
+
+std::ostream& operator<<(std::ostream& out, const SequenceNumberSet& set) {
+ out << "{";
+ for (SequenceNumberSet::const_iterator i = set.begin(); i != set.end(); i++) {
+ if (i != set.begin()) out << ", ";
+ out << (i->getValue());
+ }
+ out << "}";
+ return out;
+}
+
+}
+}
diff --git a/cpp/src/qpid/broker/GetAdapter.h b/cpp/src/qpid/framing/SequenceNumberSet.h
index e90619a5f3..bcf78d4f22 100644
--- a/cpp/src/qpid/broker/GetAdapter.h
+++ b/cpp/src/qpid/framing/SequenceNumberSet.h
@@ -18,30 +18,32 @@
* under the License.
*
*/
-#ifndef _GetAdapter_
-#define _GetAdapter_
+#ifndef _framing_SequenceNumberSet_h
+#define _framing_SequenceNumberSet_h
-#include "BrokerQueue.h"
-#include "DeliveryAdapter.h"
-#include "qpid/framing/ChannelAdapter.h"
+#include <ostream>
+#include <vector>
+#include "amqp_types.h"
+#include "Buffer.h"
+#include "SequenceNumber.h"
namespace qpid {
-namespace broker {
-
- class GetAdapter : public DeliveryAdapter
- {
- framing::ChannelAdapter& adapter;
- Queue::shared_ptr queue;
- const std::string destination;
- const uint32_t framesize;
- public:
- GetAdapter(framing::ChannelAdapter& adapter, Queue::shared_ptr queue, const std::string destination, uint32_t framesize);
- ~GetAdapter(){}
- framing::RequestId getNextDeliveryTag();
- void deliver(Message::shared_ptr& msg, framing::RequestId tag);
- };
-
-}}
+namespace framing {
+
+class SequenceNumberSet : public std::vector<SequenceNumber>
+{
+public:
+ typedef std::vector<SequenceNumber>::const_iterator const_iterator;
+ typedef std::vector<SequenceNumber>::iterator iterator;
+
+ void encode(Buffer& buffer) const;
+ void decode(Buffer& buffer);
+ uint32_t encodedSize() const;
+
+ friend std::ostream& operator<<(std::ostream&, const SequenceNumberSet&);
+};
+
+}} // namespace qpid::framing
#endif
diff --git a/cpp/src/qpid/framing/amqp_types.h b/cpp/src/qpid/framing/amqp_types.h
index efb720f047..ff75b28468 100644
--- a/cpp/src/qpid/framing/amqp_types.h
+++ b/cpp/src/qpid/framing/amqp_types.h
@@ -53,6 +53,7 @@ typedef uint16_t ReplyCode;
// Types represented by classes.
class Content;
class FieldTable;
+class SequenceNumberSet;
// Useful constants
diff --git a/cpp/src/tests/BrokerChannelTest.cpp b/cpp/src/tests/BrokerChannelTest.cpp
index 05bdb7b3f0..eb67601875 100644
--- a/cpp/src/tests/BrokerChannelTest.cpp
+++ b/cpp/src/tests/BrokerChannelTest.cpp
@@ -48,30 +48,21 @@ struct MockHandler : ConnectionOutputHandler{
void close() {};
};
-struct DeliveryRecorder
+struct DeliveryRecorder : DeliveryAdapter
{
- typedef std::pair<Message::shared_ptr, RequestId> Delivery;
+ DeliveryId id;
+ typedef std::pair<Message::shared_ptr, DeliveryToken::shared_ptr> Delivery;
std::vector<Delivery> delivered;
- struct Adapter : DeliveryAdapter
+ DeliveryId deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token)
{
- RequestId id;
- DeliveryRecorder& recorder;
-
- Adapter(DeliveryRecorder& r) : recorder(r) {}
-
- RequestId getNextDeliveryTag() { return id + 1; }
- void deliver(Message::shared_ptr& msg, RequestId tag)
- {
- recorder.delivered.push_back(Delivery(msg, tag));
- id++;
- }
-
- };
+ delivered.push_back(Delivery(msg, token));
+ return ++id;
+ }
- std::auto_ptr<DeliveryAdapter> createAdapter()
+ void redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId /*tag*/)
{
- return std::auto_ptr<DeliveryAdapter>(new Adapter(*this));
+ delivered.push_back(Delivery(msg, token));
}
};
@@ -166,6 +157,7 @@ class BrokerChannelTest : public CppUnit::TestCase
}
};
+ DeliveryRecorder recorder;
public:
@@ -179,13 +171,13 @@ class BrokerChannelTest : public CppUnit::TestCase
void testConsumerMgmt(){
Queue::shared_ptr queue(new Queue("my_queue"));
- Channel channel(connection, 0, 0);
+ Channel channel(connection, recorder, 0, 0);
channel.open();
CPPUNIT_ASSERT(!channel.exists("my_consumer"));
ConnectionToken* owner = 0;
string tag("my_consumer");
- std::auto_ptr<DeliveryAdapter> unused;
+ DeliveryToken::shared_ptr unused;
channel.consume(unused, tag, queue, false, false, owner);
string tagA;
string tagB;
@@ -205,24 +197,25 @@ class BrokerChannelTest : public CppUnit::TestCase
}
void testDeliveryNoAck(){
- Channel channel(connection, 7);
+ Channel channel(connection, recorder, 7);
Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14));
Queue::shared_ptr queue(new Queue("my_queue"));
- DeliveryRecorder recorder;
string tag("test");
- channel.consume(recorder.createAdapter(), tag, queue, false, false, 0);
+ DeliveryToken::shared_ptr token(BasicMessage::createConsumeToken("my-token"));
+ channel.consume(token, tag, queue, false, false, 0);
queue->deliver(msg);
sleep(2);
CPPUNIT_ASSERT_EQUAL((size_t) 1, recorder.delivered.size());
CPPUNIT_ASSERT_EQUAL(msg, recorder.delivered.front().first);
+ CPPUNIT_ASSERT_EQUAL(token, recorder.delivered.front().second);
}
void testStaging(){
MockMessageStore store;
connection.setFrameMax(1000);
connection.setStagingThreshold(10);
- Channel channel(connection, 1, &store);
+ Channel channel(connection, recorder, 1, &store);
const string data[] = {"abcde", "fghij", "klmno"};
Message* msg = new BasicMessage(0, "my_exchange", "my_routing_key", false, false);
@@ -314,7 +307,7 @@ class BrokerChannelTest : public CppUnit::TestCase
}
void testFlow(){
- Channel channel(connection, 7);
+ Channel channel(connection, recorder, 7);
channel.open();
//there will always be a connection-start frame
CPPUNIT_ASSERT_EQUAL((size_t) 1, handler.frames.size());
@@ -327,9 +320,9 @@ class BrokerChannelTest : public CppUnit::TestCase
Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14));
addContent(msg, data);
Queue::shared_ptr queue(new Queue("my_queue"));
- DeliveryRecorder recorder;
string tag("test");
- channel.consume(recorder.createAdapter(), tag, queue, false, false, 0);
+ DeliveryToken::shared_ptr token(BasicMessage::createConsumeToken("my-token"));
+ channel.consume(token, tag, queue, false, false, 0);
channel.flow(false);
queue->deliver(msg);
//ensure no messages have been delivered
@@ -340,6 +333,7 @@ class BrokerChannelTest : public CppUnit::TestCase
//ensure no messages have been delivered
CPPUNIT_ASSERT_EQUAL((size_t) 1, recorder.delivered.size());
CPPUNIT_ASSERT_EQUAL(msg, recorder.delivered.front().first);
+ CPPUNIT_ASSERT_EQUAL(token, recorder.delivered.front().second);
}
Message* createMessage(const string& exchange, const string& routingKey, const string& messageId, uint64_t contentSize)
diff --git a/cpp/src/tests/FramingTest.cpp b/cpp/src/tests/FramingTest.cpp
index 134acb94f9..98f89b59be 100644
--- a/cpp/src/tests/FramingTest.cpp
+++ b/cpp/src/tests/FramingTest.cpp
@@ -108,7 +108,7 @@ class FramingTest : public CppUnit::TestCase
{
std::string a = "hostA";
std::string b = "hostB";
- ConnectionRedirectBody in(version, 0, a, b);
+ ConnectionRedirectBody in(version, a, b);
in.encodeContent(buffer);
buffer.flip();
ConnectionRedirectBody out(version);
@@ -146,7 +146,7 @@ class FramingTest : public CppUnit::TestCase
std::string a = "hostA";
std::string b = "hostB";
AMQFrame in(version, 999,
- new ConnectionRedirectBody(version, 0, a, b));
+ new ConnectionRedirectBody(version, a, b));
in.encode(buffer);
buffer.flip();
AMQFrame out;
@@ -157,7 +157,7 @@ class FramingTest : public CppUnit::TestCase
void testBasicConsumeOkBodyFrame()
{
std::string s = "hostA";
- AMQFrame in(version, 999, new BasicConsumeOkBody(version, 0, s));
+ AMQFrame in(version, 999, new BasicConsumeOkBody(version, s));
in.encode(buffer);
buffer.flip();
AMQFrame out;
@@ -400,22 +400,22 @@ class FramingTest : public CppUnit::TestCase
c.declareQueue(queue);
c.bind(exchange, queue, "MyTopic", framing::FieldTable());
broker::InProcessBroker::Conversation::const_iterator i = ibroker.conversation.begin();
- ASSERT_FRAME("BROKER: Frame[channel=0; request(id=1,mark=0): ConnectionStart: versionMajor=0; versionMinor=10; serverProperties={}; mechanisms=PLAIN; locales=en_US]", *i++);
- ASSERT_FRAME("CLIENT: Frame[channel=0; response(id=1,request=1,batch=0): ConnectionStartOk: clientProperties={}; mechanism=PLAIN; response=\000guest\000guest; locale=en_US]", *i++);
- ASSERT_FRAME("BROKER: Frame[channel=0; request(id=2,mark=1): ConnectionTune: channelMax=32767; frameMax=65536; heartbeat=0]", *i++);
- ASSERT_FRAME("CLIENT: Frame[channel=0; response(id=2,request=2,batch=0): ConnectionTuneOk: channelMax=32767; frameMax=65536; heartbeat=0]", *i++);
- ASSERT_FRAME("CLIENT: Frame[channel=0; request(id=1,mark=0): ConnectionOpen: virtualHost=/; capabilities=; insist=1]", *i++);
- ASSERT_FRAME("BROKER: Frame[channel=0; response(id=1,request=1,batch=0): ConnectionOpenOk: knownHosts=]", *i++);
- ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=1,mark=0): ChannelOpen: outOfBand=]", *i++);
- ASSERT_FRAME("BROKER: Frame[channel=1; response(id=1,request=1,batch=0): ChannelOpenOk: ]", *i++);
- ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=2,mark=1): ExchangeDeclare: ticket=0; exchange=MyExchange; type=topic; alternateExchange=; passive=0; durable=0; autoDelete=0; arguments={}]", *i++);
- ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=3,mark=1): ExecutionFlush: ]", *i++);
- ASSERT_FRAME("BROKER: Frame[channel=1; request(id=1,mark=0): ExecutionComplete: cumulativeExecutionMark=2; rangedExecutionSet=0]", *i++);
- ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=4,mark=1): QueueDeclare: ticket=0; queue=MyQueue; alternateExchange=; passive=0; durable=0; exclusive=1; autoDelete=1; nowait=0; arguments={}]", *i++);
- ASSERT_FRAME("BROKER: Frame[channel=1; response(id=2,request=4,batch=0): QueueDeclareOk: queue=MyQueue; messageCount=0; consumerCount=0]", *i++);
- ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=5,mark=2): QueueBind: ticket=0; queue=MyQueue; exchange=MyExchange; routingKey=MyTopic; arguments={}]", *i++);
- ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=6,mark=2): ExecutionFlush: ]", *i++);
- ASSERT_FRAME("BROKER: Frame[channel=1; request(id=2,mark=0): ExecutionComplete: cumulativeExecutionMark=4; rangedExecutionSet=0]", *i++);
+ ASSERT_FRAME("BROKER: Frame[channel=0; ConnectionStart: versionMajor=0; versionMinor=10; serverProperties={}; mechanisms=PLAIN; locales=en_US]", *i++);
+ ASSERT_FRAME("CLIENT: Frame[channel=0; ConnectionStartOk: clientProperties={}; mechanism=PLAIN; response=\000guest\000guest; locale=en_US]", *i++);
+ ASSERT_FRAME("BROKER: Frame[channel=0; ConnectionTune: channelMax=32767; frameMax=65536; heartbeat=0]", *i++);
+ ASSERT_FRAME("CLIENT: Frame[channel=0; ConnectionTuneOk: channelMax=32767; frameMax=65536; heartbeat=0]", *i++);
+ ASSERT_FRAME("CLIENT: Frame[channel=0; ConnectionOpen: virtualHost=/; capabilities=; insist=1]", *i++);
+ ASSERT_FRAME("BROKER: Frame[channel=0; ConnectionOpenOk: knownHosts=]", *i++);
+ ASSERT_FRAME("CLIENT: Frame[channel=1; ChannelOpen: outOfBand=]", *i++);
+ ASSERT_FRAME("BROKER: Frame[channel=1; ChannelOpenOk: ]", *i++);
+ ASSERT_FRAME("CLIENT: Frame[channel=1; ExchangeDeclare: ticket=0; exchange=MyExchange; type=topic; alternateExchange=; passive=0; durable=0; autoDelete=0; arguments={}]", *i++);
+ ASSERT_FRAME("CLIENT: Frame[channel=1; ExecutionFlush: ]", *i++);
+ ASSERT_FRAME("BROKER: Frame[channel=1; ExecutionComplete: cumulativeExecutionMark=2; rangedExecutionSet={}]", *i++);
+ ASSERT_FRAME("CLIENT: Frame[channel=1; QueueDeclare: ticket=0; queue=MyQueue; alternateExchange=; passive=0; durable=0; exclusive=1; autoDelete=1; nowait=0; arguments={}]", *i++);
+ ASSERT_FRAME("BROKER: Frame[channel=1; QueueDeclareOk: queue=MyQueue; messageCount=0; consumerCount=0]", *i++);
+ ASSERT_FRAME("CLIENT: Frame[channel=1; QueueBind: ticket=0; queue=MyQueue; exchange=MyExchange; routingKey=MyTopic; arguments={}]", *i++);
+ ASSERT_FRAME("CLIENT: Frame[channel=1; ExecutionFlush: ]", *i++);
+ ASSERT_FRAME("BROKER: Frame[channel=1; ExecutionComplete: cumulativeExecutionMark=4; rangedExecutionSet={}]", *i++);
}
};