summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-01-18 07:54:09 +0000
committerAlan Conway <aconway@apache.org>2007-01-18 07:54:09 +0000
commitcdf7469e2688f9f52487b7968664ced2db560980 (patch)
tree3cce5e41fd7792e1b8305ca0a813664cc89ee98b /cpp
parent9e8a7c77a94a92c6cf92cf60be508817f0778040 (diff)
downloadqpid-python-cdf7469e2688f9f52487b7968664ced2db560980.tar.gz
From: Andrew Stitcher <astitcher@redhat.com>
r723@fuschia: andrew | 2007-01-12 00:35:16 +0000 Branch for my work on Qpid.0-9 r724@fuschia: andrew | 2007-01-12 00:59:28 +0000 Added in empty implementation of handler class for protocol Message class r768@fuschia: andrew | 2007-01-17 01:25:16 +0000 * Added Test for new MessageHandlerImpl (but no actual tests yet) * Filled in lots of the blanks in the MessageHandlerImpl with code stolen from the BasicHandlerImpl r800@fuschia: andrew | 2007-01-17 17:34:13 +0000 Updated to latest upstream changes Alan Conway <aconway@redhat.com> * Took the changes from Andrew's patch and separated the MessageHandlerImpl into its own .cpp/.h file. Other handlers should be separated also. * BrokerAdapter inner classes ignore channel arg and use channel member instead. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@497336 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/lib/broker/BrokerAdapter.cpp390
-rw-r--r--cpp/lib/broker/Makefile.am2
-rw-r--r--cpp/lib/broker/MessageHandlerImpl.cpp219
-rw-r--r--cpp/lib/broker/MessageHandlerImpl.h124
-rw-r--r--cpp/tests/Makefile.am3
-rw-r--r--cpp/tests/MessageHandlerTest.cpp57
-rw-r--r--cpp/tests/client_test.cpp2
-rwxr-xr-xcpp/tests/start_broker2
8 files changed, 490 insertions, 309 deletions
diff --git a/cpp/lib/broker/BrokerAdapter.cpp b/cpp/lib/broker/BrokerAdapter.cpp
index 2a5b136b3e..16519ec646 100644
--- a/cpp/lib/broker/BrokerAdapter.cpp
+++ b/cpp/lib/broker/BrokerAdapter.cpp
@@ -20,10 +20,14 @@
#include "Exception.h"
#include "AMQMethodBody.h"
#include "Exception.h"
+#include "MessageHandlerImpl.h"
namespace qpid {
namespace broker {
+// FIXME aconway 2007-01-18: Remove channel argument from signatures,
+// adapter is already associated with a cahnnel.
+
using namespace qpid;
using namespace qpid::framing;
@@ -74,16 +78,16 @@ class BrokerAdapter::ServerOps : public AMQP_ServerOperations
public:
ConnectionHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
- void startOk(u_int16_t channel,
+ void startOk(u_int16_t /*channel*/,
const qpid::framing::FieldTable& clientProperties,
const std::string& mechanism, const std::string& response,
const std::string& locale);
- void secureOk(u_int16_t channel, const std::string& response);
- void tuneOk(u_int16_t channel, u_int16_t channelMax,
+ void secureOk(u_int16_t /*channel*/, const std::string& response);
+ void tuneOk(u_int16_t /*channel*/, u_int16_t channelMax,
u_int32_t frameMax, u_int16_t heartbeat);
- void open(u_int16_t channel, const std::string& virtualHost,
+ void open(u_int16_t /*channel*/, const std::string& virtualHost,
const std::string& capabilities, bool insist);
- void close(u_int16_t channel, u_int16_t replyCode,
+ void close(u_int16_t /*channel*/, u_int16_t replyCode,
const std::string& replyText,
u_int16_t classId, u_int16_t methodId);
void closeOk(u_int16_t channel);
@@ -92,14 +96,14 @@ class BrokerAdapter::ServerOps : public AMQP_ServerOperations
class ChannelHandlerImpl : private CoreRefs, public ChannelHandler{
public:
ChannelHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
- void open(u_int16_t channel, const std::string& outOfBand);
- void flow(u_int16_t channel, bool active);
- void flowOk(u_int16_t channel, bool active);
+ void open(u_int16_t /*channel*/, const std::string& outOfBand);
+ void flow(u_int16_t /*channel*/, bool active);
+ void flowOk(u_int16_t /*channel*/, bool active);
void ok( u_int16_t channel );
void ping( u_int16_t channel );
void pong( u_int16_t channel );
- void resume( u_int16_t channel, const std::string& channelId );
- void close(u_int16_t channel, u_int16_t replyCode, const
+ void resume( u_int16_t /*channel*/, const std::string& channelId );
+ void close(u_int16_t /*channel*/, u_int16_t replyCode, const
std::string& replyText, u_int16_t classId, u_int16_t methodId);
void closeOk(u_int16_t channel);
};
@@ -107,14 +111,14 @@ class BrokerAdapter::ServerOps : public AMQP_ServerOperations
class ExchangeHandlerImpl : private CoreRefs, public ExchangeHandler{
public:
ExchangeHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
- void declare(u_int16_t channel, u_int16_t ticket,
+ void declare(u_int16_t /*channel*/, u_int16_t ticket,
const std::string& exchange, const std::string& type,
bool passive, bool durable, bool autoDelete,
bool internal, bool nowait,
const qpid::framing::FieldTable& arguments);
- void delete_(u_int16_t channel, u_int16_t ticket,
+ void delete_(u_int16_t /*channel*/, u_int16_t ticket,
const std::string& exchange, bool ifUnused, bool nowait);
- void unbind(u_int16_t channel,
+ void unbind(u_int16_t /*channel*/,
u_int16_t ticket, const std::string& queue,
const std::string& exchange, const std::string& routingKey,
const qpid::framing::FieldTable& arguments );
@@ -123,22 +127,22 @@ class BrokerAdapter::ServerOps : public AMQP_ServerOperations
class QueueHandlerImpl : private CoreRefs, public QueueHandler{
public:
QueueHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
- void declare(u_int16_t channel, u_int16_t ticket, const std::string& queue,
+ void declare(u_int16_t /*channel*/, u_int16_t ticket, const std::string& queue,
bool passive, bool durable, bool exclusive,
bool autoDelete, bool nowait,
const qpid::framing::FieldTable& arguments);
- void bind(u_int16_t channel, u_int16_t ticket, const std::string& queue,
+ void bind(u_int16_t /*channel*/, u_int16_t ticket, const std::string& queue,
const std::string& exchange, const std::string& routingKey,
bool nowait, const qpid::framing::FieldTable& arguments);
- void unbind(u_int16_t channel,
+ void unbind(u_int16_t /*channel*/,
u_int16_t ticket,
const std::string& queue,
const std::string& exchange,
const std::string& routingKey,
const qpid::framing::FieldTable& arguments );
- void purge(u_int16_t channel, u_int16_t ticket, const std::string& queue,
+ void purge(u_int16_t /*channel*/, u_int16_t ticket, const std::string& queue,
bool nowait);
- void delete_(u_int16_t channel, u_int16_t ticket, const std::string& queue,
+ void delete_(u_int16_t /*channel*/, u_int16_t ticket, const std::string& queue,
bool ifUnused, bool ifEmpty,
bool nowait);
};
@@ -146,23 +150,23 @@ class BrokerAdapter::ServerOps : public AMQP_ServerOperations
class BasicHandlerImpl : private CoreRefs, public BasicHandler{
public:
BasicHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
- void qos(u_int16_t channel, u_int32_t prefetchSize,
+ void qos(u_int16_t /*channel*/, u_int32_t prefetchSize,
u_int16_t prefetchCount, bool global);
void consume(
- u_int16_t channel, u_int16_t ticket, const std::string& queue,
+ u_int16_t /*channel*/, u_int16_t ticket, const std::string& queue,
const std::string& consumerTag, bool noLocal, bool noAck,
bool exclusive, bool nowait,
const qpid::framing::FieldTable& fields);
- void cancel(u_int16_t channel, const std::string& consumerTag,
+ void cancel(u_int16_t /*channel*/, const std::string& consumerTag,
bool nowait);
- void publish(u_int16_t channel, u_int16_t ticket,
+ void publish(u_int16_t /*channel*/, u_int16_t ticket,
const std::string& exchange, const std::string& routingKey,
bool mandatory, bool immediate);
- void get(u_int16_t channel, u_int16_t ticket, const std::string& queue,
+ void get(u_int16_t /*channel*/, u_int16_t ticket, const std::string& queue,
bool noAck);
- void ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple);
- void reject(u_int16_t channel, u_int64_t deliveryTag, bool requeue);
- void recover(u_int16_t channel, bool requeue);
+ void ack(u_int16_t /*channel*/, u_int64_t deliveryTag, bool multiple);
+ void reject(u_int16_t /*channel*/, u_int64_t deliveryTag, bool requeue);
+ void recover(u_int16_t /*channel*/, bool requeue);
};
class TxHandlerImpl : private CoreRefs, public TxHandler{
@@ -173,90 +177,6 @@ class BrokerAdapter::ServerOps : public AMQP_ServerOperations
void rollback(u_int16_t channel);
};
- class MessageHandlerImpl : private CoreRefs, public MessageHandler {
- public:
- MessageHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
-
- void append( u_int16_t channel,
- const std::string& reference,
- const std::string& bytes );
-
- void cancel( u_int16_t channel,
- const std::string& destination );
-
- void checkpoint( u_int16_t channel,
- const std::string& reference,
- const std::string& identifier );
-
- void close( u_int16_t channel,
- const std::string& reference );
-
- void consume( u_int16_t channel,
- u_int16_t ticket,
- const std::string& queue,
- const std::string& destination,
- bool noLocal,
- bool noAck,
- bool exclusive,
- const qpid::framing::FieldTable& filter );
-
- void empty( u_int16_t channel );
-
- void get( u_int16_t channel,
- u_int16_t ticket,
- const std::string& queue,
- const std::string& destination,
- bool noAck );
-
- void offset( u_int16_t channel,
- u_int64_t value );
-
- void ok( u_int16_t channel );
-
- void open( u_int16_t channel,
- const std::string& reference );
-
- void qos( u_int16_t channel,
- u_int32_t prefetchSize,
- u_int16_t prefetchCount,
- bool global );
-
- void recover( u_int16_t channel,
- bool requeue );
-
- void reject( u_int16_t channel,
- u_int16_t code,
- const std::string& text );
-
- void resume( u_int16_t channel,
- const std::string& reference,
- const std::string& identifier );
-
- void transfer( u_int16_t channel,
- u_int16_t ticket,
- const std::string& destination,
- bool redelivered,
- bool immediate,
- u_int64_t ttl,
- u_int8_t priority,
- u_int64_t timestamp,
- u_int8_t deliveryMode,
- u_int64_t expiration,
- const std::string& exchange,
- const std::string& routingKey,
- const std::string& messageId,
- const std::string& correlationId,
- const std::string& replyTo,
- const std::string& contentType,
- const std::string& contentEncoding,
- const std::string& userId,
- const std::string& appId,
- const std::string& transactionId,
- const std::string& securityToken,
- const qpid::framing::FieldTable& applicationHeaders,
- qpid::framing::Content body );
- };
-
BasicHandlerImpl basicHandler;
ChannelHandlerImpl channelHandler;
ConnectionHandlerImpl connectionHandler;
@@ -298,31 +218,31 @@ void BrokerAdapter::ServerOps::ConnectionHandlerImpl::closeOk(u_int16_t /*channe
}
void BrokerAdapter::ServerOps::ChannelHandlerImpl::open(
- u_int16_t channelId, const string& /*outOfBand*/){
+ u_int16_t /*channel*/, const string& /*outOfBand*/){
// FIXME aconway 2007-01-17: Assertions on all channel methods,
- // Drop channelId param.
assertChannelNonZero(channel.getId());
if (channel.isOpen())
throw ConnectionException(504, "Channel already open");
channel.open();
// FIXME aconway 2007-01-04: provide valid channel Id as per ampq 0-9
- connection.client->getChannel().openOk(channelId, std::string()/* ID */);
+ connection.client->getChannel().openOk(channel.getId(), std::string()/* ID */);
}
void BrokerAdapter::ServerOps::ChannelHandlerImpl::flow(u_int16_t /*channel*/, bool /*active*/){}
void BrokerAdapter::ServerOps::ChannelHandlerImpl::flowOk(u_int16_t /*channel*/, bool /*active*/){}
-void BrokerAdapter::ServerOps::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t /*replyCode*/, const string& /*replyText*/,
+void BrokerAdapter::ServerOps::ChannelHandlerImpl::close(u_int16_t /*channel*/, u_int16_t /*replyCode*/, const string& /*replyText*/,
u_int16_t /*classId*/, u_int16_t /*methodId*/){
- connection.closeChannel(channel);
- connection.client->getChannel().closeOk(channel);
+ connection.client->getChannel().closeOk(channel.getId());
+ // FIXME aconway 2007-01-18: Following line destroys this. Ugly.
+ connection.closeChannel(channel.getId());
}
void BrokerAdapter::ServerOps::ChannelHandlerImpl::closeOk(u_int16_t /*channel*/){}
-void BrokerAdapter::ServerOps::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& exchange, const string& type,
+void BrokerAdapter::ServerOps::ExchangeHandlerImpl::declare(u_int16_t /*channel*/, u_int16_t /*ticket*/, const string& exchange, const string& type,
bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait,
const FieldTable& /*arguments*/){
@@ -345,7 +265,7 @@ void BrokerAdapter::ServerOps::ExchangeHandlerImpl::declare(u_int16_t channel, u
}
}
if(!nowait){
- connection.client->getExchange().declareOk(channel);
+ connection.client->getExchange().declareOk(channel.getId());
}
}
@@ -363,27 +283,27 @@ void BrokerAdapter::ServerOps::ExchangeHandlerImpl::unbind(
-void BrokerAdapter::ServerOps::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/,
+void BrokerAdapter::ServerOps::ExchangeHandlerImpl::delete_(u_int16_t /*channel*/, u_int16_t /*ticket*/,
const string& exchange, bool /*ifUnused*/, bool nowait){
//TODO: implement unused
broker.getExchanges().destroy(exchange);
- if(!nowait) connection.client->getExchange().deleteOk(channel);
+ if(!nowait) connection.client->getExchange().deleteOk(channel.getId());
}
-void BrokerAdapter::ServerOps::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& name,
+void BrokerAdapter::ServerOps::QueueHandlerImpl::declare(u_int16_t /*channel*/, u_int16_t /*ticket*/, const string& name,
bool passive, bool durable, bool exclusive,
bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){
Queue::shared_ptr queue;
if (passive && !name.empty()) {
- queue = connection.getQueue(name, channel);
+ queue = connection.getQueue(name, channel.getId());
} else {
std::pair<Queue::shared_ptr, bool> queue_created =
broker.getQueues().declare(name, durable, autoDelete ? connection.settings.timeout : 0, exclusive ? &connection : 0);
queue = queue_created.first;
assert(queue);
if (queue_created.second) { // This is a new queue
- connection.getChannel(channel).setDefaultQueue(queue);
+ channel.setDefaultQueue(queue);
//apply settings & create persistent record if required
queue_created.first->create(arguments);
@@ -402,15 +322,15 @@ void BrokerAdapter::ServerOps::QueueHandlerImpl::declare(u_int16_t channel, u_in
}
if (!nowait) {
string queueName = queue->getName();
- connection.client->getQueue().declareOk(channel, queueName, queue->getMessageCount(), queue->getConsumerCount());
+ connection.client->getQueue().declareOk(channel.getId(), queueName, queue->getMessageCount(), queue->getConsumerCount());
}
}
-void BrokerAdapter::ServerOps::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName,
+void BrokerAdapter::ServerOps::QueueHandlerImpl::bind(u_int16_t /*channel*/, u_int16_t /*ticket*/, const string& queueName,
const string& exchangeName, const string& routingKey, bool nowait,
const FieldTable& arguments){
- Queue::shared_ptr queue = connection.getQueue(queueName, channel);
+ Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName);
if(exchange){
// kpvdr - cannot use this any longer as routingKey is now const
@@ -418,25 +338,25 @@ void BrokerAdapter::ServerOps::QueueHandlerImpl::bind(u_int16_t channel, u_int16
// exchange->bind(queue, routingKey, &arguments);
string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey;
exchange->bind(queue, exchangeRoutingKey, &arguments);
- if(!nowait) connection.client->getQueue().bindOk(channel);
+ if(!nowait) connection.client->getQueue().bindOk(channel.getId());
}else{
throw ChannelException(
404, "Bind failed. No such exchange: " + exchangeName);
}
}
-void BrokerAdapter::ServerOps::QueueHandlerImpl::purge(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName, bool nowait){
+void BrokerAdapter::ServerOps::QueueHandlerImpl::purge(u_int16_t /*channel*/, u_int16_t /*ticket*/, const string& queueName, bool nowait){
- Queue::shared_ptr queue = connection.getQueue(queueName, channel);
+ Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
int count = queue->purge();
- if(!nowait) connection.client->getQueue().purgeOk(channel, count);
+ if(!nowait) connection.client->getQueue().purgeOk(channel.getId(), count);
}
-void BrokerAdapter::ServerOps::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, const string& queue,
+void BrokerAdapter::ServerOps::QueueHandlerImpl::delete_(u_int16_t /*channel*/, u_int16_t /*ticket*/, const string& queue,
bool ifUnused, bool ifEmpty, bool nowait){
ChannelException error(0, "");
int count(0);
- Queue::shared_ptr q = connection.getQueue(queue, channel);
+ Queue::shared_ptr q = connection.getQueue(queue, channel.getId());
if(ifEmpty && q->getMessageCount() > 0){
throw ChannelException(406, "Queue not empty.");
}else if(ifUnused && q->getConsumerCount() > 0){
@@ -452,28 +372,27 @@ void BrokerAdapter::ServerOps::QueueHandlerImpl::delete_(u_int16_t channel, u_in
broker.getQueues().destroy(queue);
}
- if(!nowait) connection.client->getQueue().deleteOk(channel, count);
+ if(!nowait) connection.client->getQueue().deleteOk(channel.getId(), count);
}
-void BrokerAdapter::ServerOps::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){
+void BrokerAdapter::ServerOps::BasicHandlerImpl::qos(u_int16_t /*channel*/, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){
//TODO: handle global
- connection.getChannel(channel).setPrefetchSize(prefetchSize);
- connection.getChannel(channel).setPrefetchCount(prefetchCount);
- connection.client->getBasic().qosOk(channel);
+ channel.setPrefetchSize(prefetchSize);
+ channel.setPrefetchCount(prefetchCount);
+ connection.client->getBasic().qosOk(channel.getId());
}
void BrokerAdapter::ServerOps::BasicHandlerImpl::consume(
- u_int16_t channelId, u_int16_t /*ticket*/,
+ u_int16_t /*channel*/, u_int16_t /*ticket*/,
const string& queueName, const string& consumerTag,
bool noLocal, bool noAck, bool exclusive,
bool nowait, const FieldTable& fields)
{
- Queue::shared_ptr queue = connection.getQueue(queueName, channelId);
- Channel& channel = connection.getChannel(channelId);
+ Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
if(!consumerTag.empty() && channel.exists(consumerTag)){
throw ConnectionException(530, "Consumer tags must be unique");
}
@@ -483,7 +402,7 @@ void BrokerAdapter::ServerOps::BasicHandlerImpl::consume(
channel.consume(
newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &fields);
- if(!nowait) connection.client->getBasic().consumeOk(channelId, newTag);
+ if(!nowait) connection.client->getBasic().consumeOk(channel.getId(), newTag);
//allow messages to be dispatched if required as there is now a consumer:
queue->dispatch();
@@ -494,38 +413,38 @@ void BrokerAdapter::ServerOps::BasicHandlerImpl::consume(
}
-void BrokerAdapter::ServerOps::BasicHandlerImpl::cancel(u_int16_t channel, const string& consumerTag, bool nowait){
- connection.getChannel(channel).cancel(consumerTag);
+void BrokerAdapter::ServerOps::BasicHandlerImpl::cancel(u_int16_t /*channel*/, const string& consumerTag, bool nowait){
+ channel.cancel(consumerTag);
- if(!nowait) connection.client->getBasic().cancelOk(channel, consumerTag);
+ if(!nowait) connection.client->getBasic().cancelOk(channel.getId(), consumerTag);
}
-void BrokerAdapter::ServerOps::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t /*ticket*/,
+void BrokerAdapter::ServerOps::BasicHandlerImpl::publish(u_int16_t /*channel*/, u_int16_t /*ticket*/,
const string& exchangeName, const string& routingKey,
bool mandatory, bool immediate){
Exchange::shared_ptr exchange = exchangeName.empty() ? broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName);
if(exchange){
Message* msg = new Message(&connection, exchangeName, routingKey, mandatory, immediate);
- connection.getChannel(channel).handlePublish(msg, exchange);
+ channel.handlePublish(msg, exchange);
}else{
throw ChannelException(
404, "Exchange not found '" + exchangeName + "'");
}
}
-void BrokerAdapter::ServerOps::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t /*ticket*/, const string& queueName, bool noAck){
- Queue::shared_ptr queue = connection.getQueue(queueName, channelId);
- if(!connection.getChannel(channelId).get(queue, !noAck)){
+void BrokerAdapter::ServerOps::BasicHandlerImpl::get(u_int16_t /*channel*/, u_int16_t /*ticket*/, const string& queueName, bool noAck){
+ Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
+ if(!connection.getChannel(channel.getId()).get(queue, !noAck)){
string clusterId;//not used, part of an imatix hack
- connection.client->getBasic().getEmpty(channelId, clusterId);
+ connection.client->getBasic().getEmpty(channel.getId(), clusterId);
}
}
-void BrokerAdapter::ServerOps::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple){
+void BrokerAdapter::ServerOps::BasicHandlerImpl::ack(u_int16_t /*channel*/, u_int64_t deliveryTag, bool multiple){
try{
- connection.getChannel(channel).ack(deliveryTag, multiple);
+ channel.ack(deliveryTag, multiple);
}catch(InvalidAckException& e){
throw ConnectionException(530, "Received ack for unrecognised delivery tag");
}
@@ -533,25 +452,25 @@ void BrokerAdapter::ServerOps::BasicHandlerImpl::ack(u_int16_t channel, u_int64_
void BrokerAdapter::ServerOps::BasicHandlerImpl::reject(u_int16_t /*channel*/, u_int64_t /*deliveryTag*/, bool /*requeue*/){}
-void BrokerAdapter::ServerOps::BasicHandlerImpl::recover(u_int16_t channel, bool requeue){
- connection.getChannel(channel).recover(requeue);
+void BrokerAdapter::ServerOps::BasicHandlerImpl::recover(u_int16_t /*channel*/, bool requeue){
+ channel.recover(requeue);
}
-void BrokerAdapter::ServerOps::TxHandlerImpl::select(u_int16_t channel){
- connection.getChannel(channel).begin();
- connection.client->getTx().selectOk(channel);
+void BrokerAdapter::ServerOps::TxHandlerImpl::select(u_int16_t /*channel*/){
+ channel.begin();
+ connection.client->getTx().selectOk(channel.getId());
}
-void BrokerAdapter::ServerOps::TxHandlerImpl::commit(u_int16_t channel){
- connection.getChannel(channel).commit();
- connection.client->getTx().commitOk(channel);
+void BrokerAdapter::ServerOps::TxHandlerImpl::commit(u_int16_t /*channel*/){
+ channel.commit();
+ connection.client->getTx().commitOk(channel.getId());
}
-void BrokerAdapter::ServerOps::TxHandlerImpl::rollback(u_int16_t channel){
+void BrokerAdapter::ServerOps::TxHandlerImpl::rollback(u_int16_t /*channel*/){
- connection.getChannel(channel).rollback();
- connection.client->getTx().rollbackOk(channel);
- connection.getChannel(channel).recover(false);
+ channel.rollback();
+ connection.client->getTx().rollbackOk(channel.getId());
+ channel.recover(false);
}
void
@@ -587,152 +506,11 @@ BrokerAdapter::ServerOps::ChannelHandlerImpl::pong( u_int16_t /*channel*/ )
void
BrokerAdapter::ServerOps::ChannelHandlerImpl::resume(
u_int16_t /*channel*/,
- const string& /*channelId*/ )
+ const string& /*channel*/ )
{
assert(0); // FIXME aconway 2007-01-04: 0-9 feature
}
-// Message class method handlers
-void
-BrokerAdapter::ServerOps::MessageHandlerImpl::append( u_int16_t /*channel*/,
- const string& /*reference*/,
- const string& /*bytes*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-
-void
-BrokerAdapter::ServerOps::MessageHandlerImpl::cancel( u_int16_t /*channel*/,
- const string& /*destination*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-BrokerAdapter::ServerOps::MessageHandlerImpl::checkpoint( u_int16_t /*channel*/,
- const string& /*reference*/,
- const string& /*identifier*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-BrokerAdapter::ServerOps::MessageHandlerImpl::close( u_int16_t /*channel*/,
- const string& /*reference*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-BrokerAdapter::ServerOps::MessageHandlerImpl::consume( u_int16_t /*channel*/,
- u_int16_t /*ticket*/,
- const string& /*queue*/,
- const string& /*destination*/,
- bool /*noLocal*/,
- bool /*noAck*/,
- bool /*exclusive*/,
- const qpid::framing::FieldTable& /*filter*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-BrokerAdapter::ServerOps::MessageHandlerImpl::empty( u_int16_t /*channel*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-BrokerAdapter::ServerOps::MessageHandlerImpl::get( u_int16_t /*channel*/,
- u_int16_t /*ticket*/,
- const string& /*queue*/,
- const string& /*destination*/,
- bool /*noAck*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-BrokerAdapter::ServerOps::MessageHandlerImpl::offset( u_int16_t /*channel*/,
- u_int64_t /*value*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-BrokerAdapter::ServerOps::MessageHandlerImpl::ok( u_int16_t /*channel*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-BrokerAdapter::ServerOps::MessageHandlerImpl::open( u_int16_t /*channel*/,
- const string& /*reference*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-BrokerAdapter::ServerOps::MessageHandlerImpl::qos( u_int16_t /*channel*/,
- u_int32_t /*prefetchSize*/,
- u_int16_t /*prefetchCount*/,
- bool /*global*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-BrokerAdapter::ServerOps::MessageHandlerImpl::recover( u_int16_t /*channel*/,
- bool /*requeue*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-BrokerAdapter::ServerOps::MessageHandlerImpl::reject( u_int16_t /*channel*/,
- u_int16_t /*code*/,
- const string& /*text*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-BrokerAdapter::ServerOps::MessageHandlerImpl::resume( u_int16_t /*channel*/,
- const string& /*reference*/,
- const string& /*identifier*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-BrokerAdapter::ServerOps::MessageHandlerImpl::transfer( u_int16_t /*channel*/,
- u_int16_t /*ticket*/,
- const string& /*destination*/,
- bool /*redelivered*/,
- bool /*immediate*/,
- u_int64_t /*ttl*/,
- u_int8_t /*priority*/,
- u_int64_t /*timestamp*/,
- u_int8_t /*deliveryMode*/,
- u_int64_t /*expiration*/,
- const string& /*exchange*/,
- const string& /*routingKey*/,
- const string& /*messageId*/,
- const string& /*correlationId*/,
- const string& /*replyTo*/,
- const string& /*contentType*/,
- const string& /*contentEncoding*/,
- const string& /*userId*/,
- const string& /*appId*/,
- const string& /*transactionId*/,
- const string& /*securityToken*/,
- const qpid::framing::FieldTable& /*applicationHeaders*/,
- qpid::framing::Content /*body*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
BrokerAdapter::BrokerAdapter(
Channel* ch, Connection& c, Broker& b
) :
diff --git a/cpp/lib/broker/Makefile.am b/cpp/lib/broker/Makefile.am
index 06d81d13e5..105abc8ad2 100644
--- a/cpp/lib/broker/Makefile.am
+++ b/cpp/lib/broker/Makefile.am
@@ -71,6 +71,8 @@ libqpidbroker_la_SOURCES = \
Connection.h \
BrokerAdapter.cpp \
BrokerAdapter.h \
+ MessageHandlerImpl.cpp \
+ MessageHandlerImpl.h \
TopicExchange.cpp \
TopicExchange.h \
TransactionalStore.h \
diff --git a/cpp/lib/broker/MessageHandlerImpl.cpp b/cpp/lib/broker/MessageHandlerImpl.cpp
new file mode 100644
index 0000000000..0c0f9e96eb
--- /dev/null
+++ b/cpp/lib/broker/MessageHandlerImpl.cpp
@@ -0,0 +1,219 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "MessageHandlerImpl.h"
+#include "BrokerChannel.h"
+#include "Connection.h"
+#include "Broker.h"
+namespace qpid {
+namespace broker {
+
+//
+// Message class method handlers
+//
+void
+MessageHandlerImpl::append( u_int16_t /*channel*/,
+ const string& /*reference*/,
+ const string& /*bytes*/ )
+{
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+}
+
+
+void
+MessageHandlerImpl::cancel( u_int16_t channel,
+ const string& destination )
+{
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+
+ connection.getChannel(channel).cancel(destination);
+
+ connection.client->getMessageHandler()->ok(channel);
+}
+
+void
+MessageHandlerImpl::checkpoint( u_int16_t /*channel*/,
+ const string& /*reference*/,
+ const string& /*identifier*/ )
+{
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+}
+
+void
+MessageHandlerImpl::close( u_int16_t /*channel*/,
+ const string& /*reference*/ )
+{
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+}
+
+void
+MessageHandlerImpl::consume( u_int16_t /*channel*/,
+ u_int16_t /*ticket*/,
+ const string& queueName,
+ const string& destination,
+ bool noLocal,
+ bool noAck,
+ bool exclusive,
+ const qpid::framing::FieldTable& filter )
+{
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+
+ Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
+ Channel& channel = connection.getChannel(channel.getId());
+ if(!destination.empty() && channel.exists(destination)){
+ throw ConnectionException(530, "Consumer tags must be unique");
+ }
+
+ try{
+ string newTag = destination;
+ channel.consume(newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter);
+
+ connection.client->getMessageHandler()->ok(channel.getId());
+
+ //allow messages to be dispatched if required as there is now a consumer:
+ queue->dispatch();
+ }catch(ExclusiveAccessException& e){
+ if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted");
+ else throw ChannelException(403, "Access would violate previously granted exclusivity");
+ }
+}
+
+void
+MessageHandlerImpl::empty( u_int16_t /*channel*/ )
+{
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+}
+
+void
+MessageHandlerImpl::get( u_int16_t /*channelId*/,
+ u_int16_t /*ticket*/,
+ const string& queueName,
+ const string& /*destination*/,
+ bool noAck )
+{
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+
+ Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
+
+ // FIXME: get is probably Basic specific
+ if(!connection.getChannel(channel.getId()).get(queue, !noAck)){
+
+ connection.client->getMessageHandler()->empty(channel.getId());
+ }
+
+}
+
+void
+MessageHandlerImpl::offset( u_int16_t /*channel*/,
+ u_int64_t /*value*/ )
+{
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+}
+
+void
+MessageHandlerImpl::ok( u_int16_t /*channel*/ )
+{
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+}
+
+void
+MessageHandlerImpl::open( u_int16_t /*channel*/,
+ const string& /*reference*/ )
+{
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+}
+
+void
+MessageHandlerImpl::qos( u_int16_t /*channel*/,
+ u_int32_t prefetchSize,
+ u_int16_t prefetchCount,
+ bool /*global*/ )
+{
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+
+ //TODO: handle global
+ channel.setPrefetchSize(prefetchSize);
+ channel.setPrefetchCount(prefetchCount);
+
+ connection.client->getMessageHandler()->ok(channel.getId());
+}
+
+void
+MessageHandlerImpl::recover( u_int16_t /*channel*/,
+ bool requeue )
+{
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+
+ channel.recover(requeue);
+
+}
+
+void
+MessageHandlerImpl::reject( u_int16_t /*channel*/,
+ u_int16_t /*code*/,
+ const string& /*text*/ )
+{
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+}
+
+void
+MessageHandlerImpl::resume( u_int16_t /*channel*/,
+ const string& /*reference*/,
+ const string& /*identifier*/ )
+{
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+}
+
+void
+MessageHandlerImpl::transfer( u_int16_t /*channel*/,
+ u_int16_t /*ticket*/,
+ const string& /*destination*/,
+ bool /*redelivered*/,
+ bool immediate,
+ u_int64_t /*ttl*/,
+ u_int8_t /*priority*/,
+ u_int64_t /*timestamp*/,
+ u_int8_t /*deliveryMode*/,
+ u_int64_t /*expiration*/,
+ const string& exchangeName,
+ const string& routingKey,
+ const string& /*messageId*/,
+ const string& /*correlationId*/,
+ const string& /*replyTo*/,
+ const string& /*contentType*/,
+ const string& /*contentEncoding*/,
+ const string& /*userId*/,
+ const string& /*appId*/,
+ const string& /*transactionId*/,
+ const string& /*securityToken*/,
+ const qpid::framing::FieldTable& /*applicationHeaders*/,
+ qpid::framing::Content /*body*/ )
+{
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+
+ Exchange::shared_ptr exchange = exchangeName.empty() ?
+ broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName);
+ if(exchange){
+ Message* msg = new Message(&connection, exchangeName, routingKey, false /*mandatory?*/, immediate);
+ channel.handlePublish(msg, exchange);
+ }else{
+ throw ChannelException(404, "Exchange not found '" + exchangeName + "'");
+ }
+}
+
+}} // namespace qpid::broker
diff --git a/cpp/lib/broker/MessageHandlerImpl.h b/cpp/lib/broker/MessageHandlerImpl.h
new file mode 100644
index 0000000000..77e30abe05
--- /dev/null
+++ b/cpp/lib/broker/MessageHandlerImpl.h
@@ -0,0 +1,124 @@
+#ifndef _broker_MessageHandlerImpl_h
+#define _broker_MessageHandlerImpl_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "AMQP_ServerOperations.h"
+
+namespace qpid {
+namespace broker {
+
+class Channel;
+class Connection;
+class Broker;
+
+class MessageHandlerImpl : public qpid::framing::AMQP_ServerOperations::MessageHandler {
+ Channel& channel;
+ Connection& connection;
+ Broker& broker;
+
+ public:
+ MessageHandlerImpl(Channel& ch, Connection& c, Broker& b)
+ : channel(ch), connection(c), broker(b) {}
+
+ void append( u_int16_t channel,
+ const std::string& reference,
+ const std::string& bytes );
+
+ void cancel( u_int16_t channel,
+ const std::string& destination );
+
+ void checkpoint( u_int16_t channel,
+ const std::string& reference,
+ const std::string& identifier );
+
+ void close( u_int16_t channel,
+ const std::string& reference );
+
+ void consume( u_int16_t channel,
+ u_int16_t ticket,
+ const std::string& queue,
+ const std::string& destination,
+ bool noLocal,
+ bool noAck,
+ bool exclusive,
+ const qpid::framing::FieldTable& filter );
+
+ void empty( u_int16_t channel );
+
+ void get( u_int16_t channel,
+ u_int16_t ticket,
+ const std::string& queue,
+ const std::string& destination,
+ bool noAck );
+
+ void offset( u_int16_t channel,
+ u_int64_t value );
+
+ void ok( u_int16_t channel );
+
+ void open( u_int16_t channel,
+ const std::string& reference );
+
+ void qos( u_int16_t channel,
+ u_int32_t prefetchSize,
+ u_int16_t prefetchCount,
+ bool global );
+
+ void recover( u_int16_t channel,
+ bool requeue );
+
+ void reject( u_int16_t channel,
+ u_int16_t code,
+ const std::string& text );
+
+ void resume( u_int16_t channel,
+ const std::string& reference,
+ const std::string& identifier );
+
+ void transfer( u_int16_t channel,
+ u_int16_t ticket,
+ const std::string& destination,
+ bool redelivered,
+ bool immediate,
+ u_int64_t ttl,
+ u_int8_t priority,
+ u_int64_t timestamp,
+ u_int8_t deliveryMode,
+ u_int64_t expiration,
+ const std::string& exchange,
+ const std::string& routingKey,
+ const std::string& messageId,
+ const std::string& correlationId,
+ const std::string& replyTo,
+ const std::string& contentType,
+ const std::string& contentEncoding,
+ const std::string& userId,
+ const std::string& appId,
+ const std::string& transactionId,
+ const std::string& securityToken,
+ const qpid::framing::FieldTable& applicationHeaders,
+ qpid::framing::Content body );
+};
+
+}} // namespace qpid::broker
+
+
+
+#endif /*!_broker_MessageHandlerImpl_h*/
diff --git a/cpp/tests/Makefile.am b/cpp/tests/Makefile.am
index 774ecafa6e..8fb0350d2b 100644
--- a/cpp/tests/Makefile.am
+++ b/cpp/tests/Makefile.am
@@ -38,7 +38,8 @@ broker_tests = \
TxAckTest \
TxBufferTest \
TxPublishTest \
- ValueTest
+ ValueTest \
+ MessageHandlerTest
framing_tests = \
FieldTableTest \
diff --git a/cpp/tests/MessageHandlerTest.cpp b/cpp/tests/MessageHandlerTest.cpp
new file mode 100644
index 0000000000..55971355f6
--- /dev/null
+++ b/cpp/tests/MessageHandlerTest.cpp
@@ -0,0 +1,57 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+//#include <iostream>
+//#include <AMQP_HighestVersion.h>
+#include <amqp_framing.h>
+#include <qpid_test_plugin.h>
+
+#include <BrokerAdapter.h>
+
+using namespace qpid::framing;
+using namespace qpid::broker;
+
+class MessageHandlerTest : public CppUnit::TestCase
+{
+ CPPUNIT_TEST_SUITE(MessageHandlerTest);
+ CPPUNIT_TEST(testOpenMethod);
+ CPPUNIT_TEST_SUITE_END();
+private:
+
+public:
+
+ MessageHandlerTest()
+ {
+ }
+
+ void testOpenMethod()
+ {
+ //AMQFrame frame(highestProtocolVersion, 0, method);
+ //TestBodyHandler handler(method);
+ //handler.handleBody(frame.getBody());
+ }
+
+};
+
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(MessageHandlerTest);
+
diff --git a/cpp/tests/client_test.cpp b/cpp/tests/client_test.cpp
index 869cbd33e7..aaa7f4e9ca 100644
--- a/cpp/tests/client_test.cpp
+++ b/cpp/tests/client_test.cpp
@@ -70,7 +70,7 @@ int main(int argc, char**)
Queue queue("MyQueue", true);
- Connection con(argc > 1);
+ Connection con(verbose);
string host("localhost");
con.open(host);
if (verbose) std::cout << "Opened connection." << std::endl;
diff --git a/cpp/tests/start_broker b/cpp/tests/start_broker
index cc95083d85..05510b17ac 100755
--- a/cpp/tests/start_broker
+++ b/cpp/tests/start_broker
@@ -11,4 +11,4 @@ rm -rf $LOG $PID
# FIXME aconway 2007-01-18: qpidd should not return till it is accepting
# connections, remove arbitrary sleep.
-sleep 1
+sleep 2