summaryrefslogtreecommitdiff
path: root/cpp/lib/broker/BrokerAdapter.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/lib/broker/BrokerAdapter.cpp')
-rw-r--r--cpp/lib/broker/BrokerAdapter.cpp390
1 files changed, 84 insertions, 306 deletions
diff --git a/cpp/lib/broker/BrokerAdapter.cpp b/cpp/lib/broker/BrokerAdapter.cpp
index 2a5b136b3e..16519ec646 100644
--- a/cpp/lib/broker/BrokerAdapter.cpp
+++ b/cpp/lib/broker/BrokerAdapter.cpp
@@ -20,10 +20,14 @@
#include "Exception.h"
#include "AMQMethodBody.h"
#include "Exception.h"
+#include "MessageHandlerImpl.h"
namespace qpid {
namespace broker {
+// FIXME aconway 2007-01-18: Remove channel argument from signatures,
+// adapter is already associated with a cahnnel.
+
using namespace qpid;
using namespace qpid::framing;
@@ -74,16 +78,16 @@ class BrokerAdapter::ServerOps : public AMQP_ServerOperations
public:
ConnectionHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
- void startOk(u_int16_t channel,
+ void startOk(u_int16_t /*channel*/,
const qpid::framing::FieldTable& clientProperties,
const std::string& mechanism, const std::string& response,
const std::string& locale);
- void secureOk(u_int16_t channel, const std::string& response);
- void tuneOk(u_int16_t channel, u_int16_t channelMax,
+ void secureOk(u_int16_t /*channel*/, const std::string& response);
+ void tuneOk(u_int16_t /*channel*/, u_int16_t channelMax,
u_int32_t frameMax, u_int16_t heartbeat);
- void open(u_int16_t channel, const std::string& virtualHost,
+ void open(u_int16_t /*channel*/, const std::string& virtualHost,
const std::string& capabilities, bool insist);
- void close(u_int16_t channel, u_int16_t replyCode,
+ void close(u_int16_t /*channel*/, u_int16_t replyCode,
const std::string& replyText,
u_int16_t classId, u_int16_t methodId);
void closeOk(u_int16_t channel);
@@ -92,14 +96,14 @@ class BrokerAdapter::ServerOps : public AMQP_ServerOperations
class ChannelHandlerImpl : private CoreRefs, public ChannelHandler{
public:
ChannelHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
- void open(u_int16_t channel, const std::string& outOfBand);
- void flow(u_int16_t channel, bool active);
- void flowOk(u_int16_t channel, bool active);
+ void open(u_int16_t /*channel*/, const std::string& outOfBand);
+ void flow(u_int16_t /*channel*/, bool active);
+ void flowOk(u_int16_t /*channel*/, bool active);
void ok( u_int16_t channel );
void ping( u_int16_t channel );
void pong( u_int16_t channel );
- void resume( u_int16_t channel, const std::string& channelId );
- void close(u_int16_t channel, u_int16_t replyCode, const
+ void resume( u_int16_t /*channel*/, const std::string& channelId );
+ void close(u_int16_t /*channel*/, u_int16_t replyCode, const
std::string& replyText, u_int16_t classId, u_int16_t methodId);
void closeOk(u_int16_t channel);
};
@@ -107,14 +111,14 @@ class BrokerAdapter::ServerOps : public AMQP_ServerOperations
class ExchangeHandlerImpl : private CoreRefs, public ExchangeHandler{
public:
ExchangeHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
- void declare(u_int16_t channel, u_int16_t ticket,
+ void declare(u_int16_t /*channel*/, u_int16_t ticket,
const std::string& exchange, const std::string& type,
bool passive, bool durable, bool autoDelete,
bool internal, bool nowait,
const qpid::framing::FieldTable& arguments);
- void delete_(u_int16_t channel, u_int16_t ticket,
+ void delete_(u_int16_t /*channel*/, u_int16_t ticket,
const std::string& exchange, bool ifUnused, bool nowait);
- void unbind(u_int16_t channel,
+ void unbind(u_int16_t /*channel*/,
u_int16_t ticket, const std::string& queue,
const std::string& exchange, const std::string& routingKey,
const qpid::framing::FieldTable& arguments );
@@ -123,22 +127,22 @@ class BrokerAdapter::ServerOps : public AMQP_ServerOperations
class QueueHandlerImpl : private CoreRefs, public QueueHandler{
public:
QueueHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
- void declare(u_int16_t channel, u_int16_t ticket, const std::string& queue,
+ void declare(u_int16_t /*channel*/, u_int16_t ticket, const std::string& queue,
bool passive, bool durable, bool exclusive,
bool autoDelete, bool nowait,
const qpid::framing::FieldTable& arguments);
- void bind(u_int16_t channel, u_int16_t ticket, const std::string& queue,
+ void bind(u_int16_t /*channel*/, u_int16_t ticket, const std::string& queue,
const std::string& exchange, const std::string& routingKey,
bool nowait, const qpid::framing::FieldTable& arguments);
- void unbind(u_int16_t channel,
+ void unbind(u_int16_t /*channel*/,
u_int16_t ticket,
const std::string& queue,
const std::string& exchange,
const std::string& routingKey,
const qpid::framing::FieldTable& arguments );
- void purge(u_int16_t channel, u_int16_t ticket, const std::string& queue,
+ void purge(u_int16_t /*channel*/, u_int16_t ticket, const std::string& queue,
bool nowait);
- void delete_(u_int16_t channel, u_int16_t ticket, const std::string& queue,
+ void delete_(u_int16_t /*channel*/, u_int16_t ticket, const std::string& queue,
bool ifUnused, bool ifEmpty,
bool nowait);
};
@@ -146,23 +150,23 @@ class BrokerAdapter::ServerOps : public AMQP_ServerOperations
class BasicHandlerImpl : private CoreRefs, public BasicHandler{
public:
BasicHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
- void qos(u_int16_t channel, u_int32_t prefetchSize,
+ void qos(u_int16_t /*channel*/, u_int32_t prefetchSize,
u_int16_t prefetchCount, bool global);
void consume(
- u_int16_t channel, u_int16_t ticket, const std::string& queue,
+ u_int16_t /*channel*/, u_int16_t ticket, const std::string& queue,
const std::string& consumerTag, bool noLocal, bool noAck,
bool exclusive, bool nowait,
const qpid::framing::FieldTable& fields);
- void cancel(u_int16_t channel, const std::string& consumerTag,
+ void cancel(u_int16_t /*channel*/, const std::string& consumerTag,
bool nowait);
- void publish(u_int16_t channel, u_int16_t ticket,
+ void publish(u_int16_t /*channel*/, u_int16_t ticket,
const std::string& exchange, const std::string& routingKey,
bool mandatory, bool immediate);
- void get(u_int16_t channel, u_int16_t ticket, const std::string& queue,
+ void get(u_int16_t /*channel*/, u_int16_t ticket, const std::string& queue,
bool noAck);
- void ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple);
- void reject(u_int16_t channel, u_int64_t deliveryTag, bool requeue);
- void recover(u_int16_t channel, bool requeue);
+ void ack(u_int16_t /*channel*/, u_int64_t deliveryTag, bool multiple);
+ void reject(u_int16_t /*channel*/, u_int64_t deliveryTag, bool requeue);
+ void recover(u_int16_t /*channel*/, bool requeue);
};
class TxHandlerImpl : private CoreRefs, public TxHandler{
@@ -173,90 +177,6 @@ class BrokerAdapter::ServerOps : public AMQP_ServerOperations
void rollback(u_int16_t channel);
};
- class MessageHandlerImpl : private CoreRefs, public MessageHandler {
- public:
- MessageHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
-
- void append( u_int16_t channel,
- const std::string& reference,
- const std::string& bytes );
-
- void cancel( u_int16_t channel,
- const std::string& destination );
-
- void checkpoint( u_int16_t channel,
- const std::string& reference,
- const std::string& identifier );
-
- void close( u_int16_t channel,
- const std::string& reference );
-
- void consume( u_int16_t channel,
- u_int16_t ticket,
- const std::string& queue,
- const std::string& destination,
- bool noLocal,
- bool noAck,
- bool exclusive,
- const qpid::framing::FieldTable& filter );
-
- void empty( u_int16_t channel );
-
- void get( u_int16_t channel,
- u_int16_t ticket,
- const std::string& queue,
- const std::string& destination,
- bool noAck );
-
- void offset( u_int16_t channel,
- u_int64_t value );
-
- void ok( u_int16_t channel );
-
- void open( u_int16_t channel,
- const std::string& reference );
-
- void qos( u_int16_t channel,
- u_int32_t prefetchSize,
- u_int16_t prefetchCount,
- bool global );
-
- void recover( u_int16_t channel,
- bool requeue );
-
- void reject( u_int16_t channel,
- u_int16_t code,
- const std::string& text );
-
- void resume( u_int16_t channel,
- const std::string& reference,
- const std::string& identifier );
-
- void transfer( u_int16_t channel,
- u_int16_t ticket,
- const std::string& destination,
- bool redelivered,
- bool immediate,
- u_int64_t ttl,
- u_int8_t priority,
- u_int64_t timestamp,
- u_int8_t deliveryMode,
- u_int64_t expiration,
- const std::string& exchange,
- const std::string& routingKey,
- const std::string& messageId,
- const std::string& correlationId,
- const std::string& replyTo,
- const std::string& contentType,
- const std::string& contentEncoding,
- const std::string& userId,
- const std::string& appId,
- const std::string& transactionId,
- const std::string& securityToken,
- const qpid::framing::FieldTable& applicationHeaders,
- qpid::framing::Content body );
- };
-
BasicHandlerImpl basicHandler;
ChannelHandlerImpl channelHandler;
ConnectionHandlerImpl connectionHandler;
@@ -298,31 +218,31 @@ void BrokerAdapter::ServerOps::ConnectionHandlerImpl::closeOk(u_int16_t /*channe
}
void BrokerAdapter::ServerOps::ChannelHandlerImpl::open(
- u_int16_t channelId, const string& /*outOfBand*/){
+ u_int16_t /*channel*/, const string& /*outOfBand*/){
// FIXME aconway 2007-01-17: Assertions on all channel methods,
- // Drop channelId param.
assertChannelNonZero(channel.getId());
if (channel.isOpen())
throw ConnectionException(504, "Channel already open");
channel.open();
// FIXME aconway 2007-01-04: provide valid channel Id as per ampq 0-9
- connection.client->getChannel().openOk(channelId, std::string()/* ID */);
+ connection.client->getChannel().openOk(channel.getId(), std::string()/* ID */);
}
void BrokerAdapter::ServerOps::ChannelHandlerImpl::flow(u_int16_t /*channel*/, bool /*active*/){}
void BrokerAdapter::ServerOps::ChannelHandlerImpl::flowOk(u_int16_t /*channel*/, bool /*active*/){}
-void BrokerAdapter::ServerOps::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t /*replyCode*/, const string& /*replyText*/,
+void BrokerAdapter::ServerOps::ChannelHandlerImpl::close(u_int16_t /*channel*/, u_int16_t /*replyCode*/, const string& /*replyText*/,
u_int16_t /*classId*/, u_int16_t /*methodId*/){
- connection.closeChannel(channel);
- connection.client->getChannel().closeOk(channel);
+ connection.client->getChannel().closeOk(channel.getId());
+ // FIXME aconway 2007-01-18: Following line destroys this. Ugly.
+ connection.closeChannel(channel.getId());
}
void BrokerAdapter::ServerOps::ChannelHandlerImpl::closeOk(u_int16_t /*channel*/){}
-void BrokerAdapter::ServerOps::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& exchange, const string& type,
+void BrokerAdapter::ServerOps::ExchangeHandlerImpl::declare(u_int16_t /*channel*/, u_int16_t /*ticket*/, const string& exchange, const string& type,
bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait,
const FieldTable& /*arguments*/){
@@ -345,7 +265,7 @@ void BrokerAdapter::ServerOps::ExchangeHandlerImpl::declare(u_int16_t channel, u
}
}
if(!nowait){
- connection.client->getExchange().declareOk(channel);
+ connection.client->getExchange().declareOk(channel.getId());
}
}
@@ -363,27 +283,27 @@ void BrokerAdapter::ServerOps::ExchangeHandlerImpl::unbind(
-void BrokerAdapter::ServerOps::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/,
+void BrokerAdapter::ServerOps::ExchangeHandlerImpl::delete_(u_int16_t /*channel*/, u_int16_t /*ticket*/,
const string& exchange, bool /*ifUnused*/, bool nowait){
//TODO: implement unused
broker.getExchanges().destroy(exchange);
- if(!nowait) connection.client->getExchange().deleteOk(channel);
+ if(!nowait) connection.client->getExchange().deleteOk(channel.getId());
}
-void BrokerAdapter::ServerOps::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& name,
+void BrokerAdapter::ServerOps::QueueHandlerImpl::declare(u_int16_t /*channel*/, u_int16_t /*ticket*/, const string& name,
bool passive, bool durable, bool exclusive,
bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){
Queue::shared_ptr queue;
if (passive && !name.empty()) {
- queue = connection.getQueue(name, channel);
+ queue = connection.getQueue(name, channel.getId());
} else {
std::pair<Queue::shared_ptr, bool> queue_created =
broker.getQueues().declare(name, durable, autoDelete ? connection.settings.timeout : 0, exclusive ? &connection : 0);
queue = queue_created.first;
assert(queue);
if (queue_created.second) { // This is a new queue
- connection.getChannel(channel).setDefaultQueue(queue);
+ channel.setDefaultQueue(queue);
//apply settings & create persistent record if required
queue_created.first->create(arguments);
@@ -402,15 +322,15 @@ void BrokerAdapter::ServerOps::QueueHandlerImpl::declare(u_int16_t channel, u_in
}
if (!nowait) {
string queueName = queue->getName();
- connection.client->getQueue().declareOk(channel, queueName, queue->getMessageCount(), queue->getConsumerCount());
+ connection.client->getQueue().declareOk(channel.getId(), queueName, queue->getMessageCount(), queue->getConsumerCount());
}
}
-void BrokerAdapter::ServerOps::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName,
+void BrokerAdapter::ServerOps::QueueHandlerImpl::bind(u_int16_t /*channel*/, u_int16_t /*ticket*/, const string& queueName,
const string& exchangeName, const string& routingKey, bool nowait,
const FieldTable& arguments){
- Queue::shared_ptr queue = connection.getQueue(queueName, channel);
+ Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName);
if(exchange){
// kpvdr - cannot use this any longer as routingKey is now const
@@ -418,25 +338,25 @@ void BrokerAdapter::ServerOps::QueueHandlerImpl::bind(u_int16_t channel, u_int16
// exchange->bind(queue, routingKey, &arguments);
string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey;
exchange->bind(queue, exchangeRoutingKey, &arguments);
- if(!nowait) connection.client->getQueue().bindOk(channel);
+ if(!nowait) connection.client->getQueue().bindOk(channel.getId());
}else{
throw ChannelException(
404, "Bind failed. No such exchange: " + exchangeName);
}
}
-void BrokerAdapter::ServerOps::QueueHandlerImpl::purge(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName, bool nowait){
+void BrokerAdapter::ServerOps::QueueHandlerImpl::purge(u_int16_t /*channel*/, u_int16_t /*ticket*/, const string& queueName, bool nowait){
- Queue::shared_ptr queue = connection.getQueue(queueName, channel);
+ Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
int count = queue->purge();
- if(!nowait) connection.client->getQueue().purgeOk(channel, count);
+ if(!nowait) connection.client->getQueue().purgeOk(channel.getId(), count);
}
-void BrokerAdapter::ServerOps::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, const string& queue,
+void BrokerAdapter::ServerOps::QueueHandlerImpl::delete_(u_int16_t /*channel*/, u_int16_t /*ticket*/, const string& queue,
bool ifUnused, bool ifEmpty, bool nowait){
ChannelException error(0, "");
int count(0);
- Queue::shared_ptr q = connection.getQueue(queue, channel);
+ Queue::shared_ptr q = connection.getQueue(queue, channel.getId());
if(ifEmpty && q->getMessageCount() > 0){
throw ChannelException(406, "Queue not empty.");
}else if(ifUnused && q->getConsumerCount() > 0){
@@ -452,28 +372,27 @@ void BrokerAdapter::ServerOps::QueueHandlerImpl::delete_(u_int16_t channel, u_in
broker.getQueues().destroy(queue);
}
- if(!nowait) connection.client->getQueue().deleteOk(channel, count);
+ if(!nowait) connection.client->getQueue().deleteOk(channel.getId(), count);
}
-void BrokerAdapter::ServerOps::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){
+void BrokerAdapter::ServerOps::BasicHandlerImpl::qos(u_int16_t /*channel*/, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){
//TODO: handle global
- connection.getChannel(channel).setPrefetchSize(prefetchSize);
- connection.getChannel(channel).setPrefetchCount(prefetchCount);
- connection.client->getBasic().qosOk(channel);
+ channel.setPrefetchSize(prefetchSize);
+ channel.setPrefetchCount(prefetchCount);
+ connection.client->getBasic().qosOk(channel.getId());
}
void BrokerAdapter::ServerOps::BasicHandlerImpl::consume(
- u_int16_t channelId, u_int16_t /*ticket*/,
+ u_int16_t /*channel*/, u_int16_t /*ticket*/,
const string& queueName, const string& consumerTag,
bool noLocal, bool noAck, bool exclusive,
bool nowait, const FieldTable& fields)
{
- Queue::shared_ptr queue = connection.getQueue(queueName, channelId);
- Channel& channel = connection.getChannel(channelId);
+ Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
if(!consumerTag.empty() && channel.exists(consumerTag)){
throw ConnectionException(530, "Consumer tags must be unique");
}
@@ -483,7 +402,7 @@ void BrokerAdapter::ServerOps::BasicHandlerImpl::consume(
channel.consume(
newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &fields);
- if(!nowait) connection.client->getBasic().consumeOk(channelId, newTag);
+ if(!nowait) connection.client->getBasic().consumeOk(channel.getId(), newTag);
//allow messages to be dispatched if required as there is now a consumer:
queue->dispatch();
@@ -494,38 +413,38 @@ void BrokerAdapter::ServerOps::BasicHandlerImpl::consume(
}
-void BrokerAdapter::ServerOps::BasicHandlerImpl::cancel(u_int16_t channel, const string& consumerTag, bool nowait){
- connection.getChannel(channel).cancel(consumerTag);
+void BrokerAdapter::ServerOps::BasicHandlerImpl::cancel(u_int16_t /*channel*/, const string& consumerTag, bool nowait){
+ channel.cancel(consumerTag);
- if(!nowait) connection.client->getBasic().cancelOk(channel, consumerTag);
+ if(!nowait) connection.client->getBasic().cancelOk(channel.getId(), consumerTag);
}
-void BrokerAdapter::ServerOps::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t /*ticket*/,
+void BrokerAdapter::ServerOps::BasicHandlerImpl::publish(u_int16_t /*channel*/, u_int16_t /*ticket*/,
const string& exchangeName, const string& routingKey,
bool mandatory, bool immediate){
Exchange::shared_ptr exchange = exchangeName.empty() ? broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName);
if(exchange){
Message* msg = new Message(&connection, exchangeName, routingKey, mandatory, immediate);
- connection.getChannel(channel).handlePublish(msg, exchange);
+ channel.handlePublish(msg, exchange);
}else{
throw ChannelException(
404, "Exchange not found '" + exchangeName + "'");
}
}
-void BrokerAdapter::ServerOps::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t /*ticket*/, const string& queueName, bool noAck){
- Queue::shared_ptr queue = connection.getQueue(queueName, channelId);
- if(!connection.getChannel(channelId).get(queue, !noAck)){
+void BrokerAdapter::ServerOps::BasicHandlerImpl::get(u_int16_t /*channel*/, u_int16_t /*ticket*/, const string& queueName, bool noAck){
+ Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
+ if(!connection.getChannel(channel.getId()).get(queue, !noAck)){
string clusterId;//not used, part of an imatix hack
- connection.client->getBasic().getEmpty(channelId, clusterId);
+ connection.client->getBasic().getEmpty(channel.getId(), clusterId);
}
}
-void BrokerAdapter::ServerOps::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple){
+void BrokerAdapter::ServerOps::BasicHandlerImpl::ack(u_int16_t /*channel*/, u_int64_t deliveryTag, bool multiple){
try{
- connection.getChannel(channel).ack(deliveryTag, multiple);
+ channel.ack(deliveryTag, multiple);
}catch(InvalidAckException& e){
throw ConnectionException(530, "Received ack for unrecognised delivery tag");
}
@@ -533,25 +452,25 @@ void BrokerAdapter::ServerOps::BasicHandlerImpl::ack(u_int16_t channel, u_int64_
void BrokerAdapter::ServerOps::BasicHandlerImpl::reject(u_int16_t /*channel*/, u_int64_t /*deliveryTag*/, bool /*requeue*/){}
-void BrokerAdapter::ServerOps::BasicHandlerImpl::recover(u_int16_t channel, bool requeue){
- connection.getChannel(channel).recover(requeue);
+void BrokerAdapter::ServerOps::BasicHandlerImpl::recover(u_int16_t /*channel*/, bool requeue){
+ channel.recover(requeue);
}
-void BrokerAdapter::ServerOps::TxHandlerImpl::select(u_int16_t channel){
- connection.getChannel(channel).begin();
- connection.client->getTx().selectOk(channel);
+void BrokerAdapter::ServerOps::TxHandlerImpl::select(u_int16_t /*channel*/){
+ channel.begin();
+ connection.client->getTx().selectOk(channel.getId());
}
-void BrokerAdapter::ServerOps::TxHandlerImpl::commit(u_int16_t channel){
- connection.getChannel(channel).commit();
- connection.client->getTx().commitOk(channel);
+void BrokerAdapter::ServerOps::TxHandlerImpl::commit(u_int16_t /*channel*/){
+ channel.commit();
+ connection.client->getTx().commitOk(channel.getId());
}
-void BrokerAdapter::ServerOps::TxHandlerImpl::rollback(u_int16_t channel){
+void BrokerAdapter::ServerOps::TxHandlerImpl::rollback(u_int16_t /*channel*/){
- connection.getChannel(channel).rollback();
- connection.client->getTx().rollbackOk(channel);
- connection.getChannel(channel).recover(false);
+ channel.rollback();
+ connection.client->getTx().rollbackOk(channel.getId());
+ channel.recover(false);
}
void
@@ -587,152 +506,11 @@ BrokerAdapter::ServerOps::ChannelHandlerImpl::pong( u_int16_t /*channel*/ )
void
BrokerAdapter::ServerOps::ChannelHandlerImpl::resume(
u_int16_t /*channel*/,
- const string& /*channelId*/ )
+ const string& /*channel*/ )
{
assert(0); // FIXME aconway 2007-01-04: 0-9 feature
}
-// Message class method handlers
-void
-BrokerAdapter::ServerOps::MessageHandlerImpl::append( u_int16_t /*channel*/,
- const string& /*reference*/,
- const string& /*bytes*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-
-void
-BrokerAdapter::ServerOps::MessageHandlerImpl::cancel( u_int16_t /*channel*/,
- const string& /*destination*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-BrokerAdapter::ServerOps::MessageHandlerImpl::checkpoint( u_int16_t /*channel*/,
- const string& /*reference*/,
- const string& /*identifier*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-BrokerAdapter::ServerOps::MessageHandlerImpl::close( u_int16_t /*channel*/,
- const string& /*reference*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-BrokerAdapter::ServerOps::MessageHandlerImpl::consume( u_int16_t /*channel*/,
- u_int16_t /*ticket*/,
- const string& /*queue*/,
- const string& /*destination*/,
- bool /*noLocal*/,
- bool /*noAck*/,
- bool /*exclusive*/,
- const qpid::framing::FieldTable& /*filter*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-BrokerAdapter::ServerOps::MessageHandlerImpl::empty( u_int16_t /*channel*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-BrokerAdapter::ServerOps::MessageHandlerImpl::get( u_int16_t /*channel*/,
- u_int16_t /*ticket*/,
- const string& /*queue*/,
- const string& /*destination*/,
- bool /*noAck*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-BrokerAdapter::ServerOps::MessageHandlerImpl::offset( u_int16_t /*channel*/,
- u_int64_t /*value*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-BrokerAdapter::ServerOps::MessageHandlerImpl::ok( u_int16_t /*channel*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-BrokerAdapter::ServerOps::MessageHandlerImpl::open( u_int16_t /*channel*/,
- const string& /*reference*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-BrokerAdapter::ServerOps::MessageHandlerImpl::qos( u_int16_t /*channel*/,
- u_int32_t /*prefetchSize*/,
- u_int16_t /*prefetchCount*/,
- bool /*global*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-BrokerAdapter::ServerOps::MessageHandlerImpl::recover( u_int16_t /*channel*/,
- bool /*requeue*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-BrokerAdapter::ServerOps::MessageHandlerImpl::reject( u_int16_t /*channel*/,
- u_int16_t /*code*/,
- const string& /*text*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-BrokerAdapter::ServerOps::MessageHandlerImpl::resume( u_int16_t /*channel*/,
- const string& /*reference*/,
- const string& /*identifier*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-BrokerAdapter::ServerOps::MessageHandlerImpl::transfer( u_int16_t /*channel*/,
- u_int16_t /*ticket*/,
- const string& /*destination*/,
- bool /*redelivered*/,
- bool /*immediate*/,
- u_int64_t /*ttl*/,
- u_int8_t /*priority*/,
- u_int64_t /*timestamp*/,
- u_int8_t /*deliveryMode*/,
- u_int64_t /*expiration*/,
- const string& /*exchange*/,
- const string& /*routingKey*/,
- const string& /*messageId*/,
- const string& /*correlationId*/,
- const string& /*replyTo*/,
- const string& /*contentType*/,
- const string& /*contentEncoding*/,
- const string& /*userId*/,
- const string& /*appId*/,
- const string& /*transactionId*/,
- const string& /*securityToken*/,
- const qpid::framing::FieldTable& /*applicationHeaders*/,
- qpid::framing::Content /*body*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
BrokerAdapter::BrokerAdapter(
Channel* ch, Connection& c, Broker& b
) :