summaryrefslogtreecommitdiff
path: root/cpp/lib/broker/BrokerAdapter.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/lib/broker/BrokerAdapter.cpp')
-rw-r--r--cpp/lib/broker/BrokerAdapter.cpp167
1 files changed, 98 insertions, 69 deletions
diff --git a/cpp/lib/broker/BrokerAdapter.cpp b/cpp/lib/broker/BrokerAdapter.cpp
index 8b081874fc..ec80241c66 100644
--- a/cpp/lib/broker/BrokerAdapter.cpp
+++ b/cpp/lib/broker/BrokerAdapter.cpp
@@ -18,11 +18,10 @@
#include <boost/format.hpp>
#include "BrokerAdapter.h"
+#include "BrokerChannel.h"
#include "Connection.h"
-#include "Exception.h"
#include "AMQMethodBody.h"
#include "Exception.h"
-#include "MessageHandlerImpl.h"
namespace qpid {
namespace broker {
@@ -33,18 +32,37 @@ using namespace qpid::framing;
typedef std::vector<Queue::shared_ptr> QueueVector;
-void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::startOk(
- const MethodContext& context , const FieldTable& /*clientProperties*/,
+
+BrokerAdapter::BrokerAdapter(Channel& ch, Connection& c, Broker& b) :
+ CoreRefs(ch, c, b),
+ connection(c),
+ basicHandler(*this),
+ channelHandler(*this),
+ connectionHandler(*this),
+ exchangeHandler(*this),
+ messageHandler(*this),
+ queueHandler(*this),
+ txHandler(*this)
+{}
+
+
+ProtocolVersion BrokerAdapter::getVersion() const {
+ return connection.getVersion();
+}
+
+void BrokerAdapter::ConnectionHandlerImpl::startOk(
+ const MethodContext&, const FieldTable& /*clientProperties*/,
const string& /*mechanism*/,
- const string& /*response*/, const string& /*locale*/){
- connection.client->getConnection().tune(
- context, 100, connection.getFrameMax(), connection.getHeartbeat());
+ const string& /*response*/, const string& /*locale*/)
+{
+ client.tune(
+ 100, connection.getFrameMax(), connection.getHeartbeat());
}
-void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::secureOk(
+void BrokerAdapter::ConnectionHandlerImpl::secureOk(
const MethodContext&, const string& /*response*/){}
-void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::tuneOk(
+void BrokerAdapter::ConnectionHandlerImpl::tuneOk(
const MethodContext&, u_int16_t /*channelmax*/,
u_int32_t framemax, u_int16_t heartbeat)
{
@@ -52,50 +70,55 @@ void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::tuneOk(
connection.setHeartbeat(heartbeat);
}
-void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::open(const MethodContext& context, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){
+void BrokerAdapter::ConnectionHandlerImpl::open(
+ const MethodContext& context, const string& /*virtualHost*/,
+ const string& /*capabilities*/, bool /*insist*/)
+{
string knownhosts;
- connection.client->getConnection().openOk(context, knownhosts);
+ client.openOk(
+ knownhosts, context.getRequestId());
}
-void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::close(
+void BrokerAdapter::ConnectionHandlerImpl::close(
const MethodContext& context, u_int16_t /*replyCode*/, const string& /*replyText*/,
u_int16_t /*classId*/, u_int16_t /*methodId*/)
{
- connection.client->getConnection().closeOk(context);
+ client.closeOk(context.getRequestId());
connection.getOutput().close();
}
-void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::closeOk(const MethodContext&){
+void BrokerAdapter::ConnectionHandlerImpl::closeOk(const MethodContext&){
connection.getOutput().close();
}
-void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::open(
+void BrokerAdapter::ChannelHandlerImpl::open(
const MethodContext& context, const string& /*outOfBand*/){
channel.open();
// FIXME aconway 2007-01-04: provide valid ID as per ampq 0-9
- connection.client->getChannel().openOk(context, std::string()/* ID */);
+ client.openOk(
+ std::string()/* ID */, context.getRequestId());
}
-void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::flow(const MethodContext&, bool /*active*/){}
-void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::flowOk(const MethodContext&, bool /*active*/){}
+void BrokerAdapter::ChannelHandlerImpl::flow(const MethodContext&, bool /*active*/){}
+void BrokerAdapter::ChannelHandlerImpl::flowOk(const MethodContext&, bool /*active*/){}
-void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::close(
+void BrokerAdapter::ChannelHandlerImpl::close(
const MethodContext& context, u_int16_t /*replyCode*/,
const string& /*replyText*/,
u_int16_t /*classId*/, u_int16_t /*methodId*/)
{
- connection.client->getChannel().closeOk(context);
+ client.closeOk(context.getRequestId());
// FIXME aconway 2007-01-18: Following line will "delete this". Ugly.
connection.closeChannel(channel.getId());
}
-void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::closeOk(const MethodContext&){}
+void BrokerAdapter::ChannelHandlerImpl::closeOk(const MethodContext&){}
-void BrokerAdapter::BrokerAdapter::ExchangeHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& exchange, const string& type,
- bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait,
- const FieldTable& /*arguments*/){
+void BrokerAdapter::ExchangeHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& exchange, const string& type,
+ bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait,
+ const FieldTable& /*arguments*/){
if(passive){
if(!broker.getExchanges().get(exchange)) {
@@ -116,27 +139,30 @@ void BrokerAdapter::BrokerAdapter::ExchangeHandlerImpl::declare(const MethodCont
}
}
if(!nowait){
- connection.client->getExchange().declareOk(context);
+ client.declareOk(context.getRequestId());
}
}
-void BrokerAdapter::BrokerAdapter::ExchangeHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/,
- const string& exchange, bool /*ifUnused*/, bool nowait){
+void BrokerAdapter::ExchangeHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/,
+ const string& exchange, bool /*ifUnused*/, bool nowait){
//TODO: implement unused
broker.getExchanges().destroy(exchange);
- if(!nowait) connection.client->getExchange().deleteOk(context);
+ if(!nowait) client.deleteOk(context.getRequestId());
}
-void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& name,
- bool passive, bool durable, bool exclusive,
- bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){
+void BrokerAdapter::QueueHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& name,
+ bool passive, bool durable, bool exclusive,
+ bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){
Queue::shared_ptr queue;
if (passive && !name.empty()) {
queue = connection.getQueue(name, channel.getId());
} else {
std::pair<Queue::shared_ptr, bool> queue_created =
- broker.getQueues().declare(name, durable, autoDelete ? connection.settings.timeout : 0, exclusive ? &connection : 0);
+ broker.getQueues().declare(
+ name, durable,
+ autoDelete ? connection.getTimeout() : 0,
+ exclusive ? &connection : 0);
queue = queue_created.first;
assert(queue);
if (queue_created.second) { // This is a new queue
@@ -161,20 +187,22 @@ void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::declare(const MethodContext
% queue->getName());
if (!nowait) {
string queueName = queue->getName();
- connection.client->getQueue().declareOk(context, queueName, queue->getMessageCount(), queue->getConsumerCount());
+ client.declareOk(
+ queueName, queue->getMessageCount(), queue->getConsumerCount(),
+ context.getRequestId());
}
}
-void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::bind(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName,
- const string& exchangeName, const string& routingKey, bool nowait,
- const FieldTable& arguments){
+void BrokerAdapter::QueueHandlerImpl::bind(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName,
+ const string& exchangeName, const string& routingKey, bool nowait,
+ const FieldTable& arguments){
Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName);
if(exchange){
string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey;
exchange->bind(queue, exchangeRoutingKey, &arguments);
- if(!nowait) connection.client->getQueue().bindOk(context);
+ if(!nowait) client.bindOk(context.getRequestId());
}else{
throw ChannelException(
404, "Bind failed. No such exchange: " + exchangeName);
@@ -182,7 +210,7 @@ void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::bind(const MethodContext& c
}
void
-BrokerAdapter::BrokerAdapter::QueueHandlerImpl::unbind(
+BrokerAdapter::QueueHandlerImpl::unbind(
const MethodContext& context,
u_int16_t /*ticket*/,
const string& queueName,
@@ -198,18 +226,18 @@ BrokerAdapter::BrokerAdapter::QueueHandlerImpl::unbind(
exchange->unbind(queue, routingKey, &arguments);
- connection.client->getQueue().unbindOk(context);
+ client.unbindOk(context.getRequestId());
}
-void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::purge(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool nowait){
+void BrokerAdapter::QueueHandlerImpl::purge(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool nowait){
Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
int count = queue->purge();
- if(!nowait) connection.client->getQueue().purgeOk(context, count);
+ if(!nowait) client.purgeOk( count, context.getRequestId());
}
-void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/, const string& queue,
- bool ifUnused, bool ifEmpty, bool nowait){
+void BrokerAdapter::QueueHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/, const string& queue,
+ bool ifUnused, bool ifEmpty, bool nowait){
ChannelException error(0, "");
int count(0);
Queue::shared_ptr q = connection.getQueue(queue, channel.getId());
@@ -228,20 +256,21 @@ void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::delete_(const MethodContext
broker.getQueues().destroy(queue);
}
- if(!nowait) connection.client->getQueue().deleteOk(context, count);
+ if(!nowait)
+ client.deleteOk(count, context.getRequestId());
}
-void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::qos(const MethodContext& context, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){
+void BrokerAdapter::BasicHandlerImpl::qos(const MethodContext& context, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){
//TODO: handle global
channel.setPrefetchSize(prefetchSize);
channel.setPrefetchCount(prefetchCount);
- connection.client->getBasic().qosOk(context);
+ client.qosOk(context.getRequestId());
}
-void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::consume(
+void BrokerAdapter::BasicHandlerImpl::consume(
const MethodContext& context, u_int16_t /*ticket*/,
const string& queueName, const string& consumerTag,
bool noLocal, bool noAck, bool exclusive,
@@ -257,19 +286,19 @@ void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::consume(
channel.consume(
newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &fields);
- if(!nowait) connection.client->getBasic().consumeOk(context, newTag);
+ if(!nowait) client.consumeOk(newTag, context.getRequestId());
//allow messages to be dispatched if required as there is now a consumer:
queue->dispatch();
}
-void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::cancel(const MethodContext& context, const string& consumerTag, bool nowait){
+void BrokerAdapter::BasicHandlerImpl::cancel(const MethodContext& context, const string& consumerTag, bool nowait){
channel.cancel(consumerTag);
- if(!nowait) connection.client->getBasic().cancelOk(context, consumerTag);
+ if(!nowait) client.cancelOk(consumerTag, context.getRequestId());
}
-void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::publish(
+void BrokerAdapter::BasicHandlerImpl::publish(
const MethodContext& context, u_int16_t /*ticket*/,
const string& exchangeName, const string& routingKey,
bool mandatory, bool immediate)
@@ -287,16 +316,16 @@ void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::publish(
}
}
-void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::get(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool noAck){
+void BrokerAdapter::BasicHandlerImpl::get(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool noAck){
Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
if(!connection.getChannel(channel.getId()).get(queue, "", !noAck)){
string clusterId;//not used, part of an imatix hack
- connection.client->getBasic().getEmpty(context, clusterId);
+ client.getEmpty(clusterId, context.getRequestId());
}
}
-void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::ack(const MethodContext&, u_int64_t deliveryTag, bool multiple){
+void BrokerAdapter::BasicHandlerImpl::ack(const MethodContext&, u_int64_t deliveryTag, bool multiple){
try{
channel.ack(deliveryTag, multiple);
}catch(InvalidAckException& e){
@@ -304,31 +333,31 @@ void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::ack(const MethodContext&, u
}
}
-void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::reject(const MethodContext&, u_int64_t /*deliveryTag*/, bool /*requeue*/){}
+void BrokerAdapter::BasicHandlerImpl::reject(const MethodContext&, u_int64_t /*deliveryTag*/, bool /*requeue*/){}
-void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::recover(const MethodContext&, bool requeue){
+void BrokerAdapter::BasicHandlerImpl::recover(const MethodContext&, bool requeue){
channel.recover(requeue);
}
-void BrokerAdapter::BrokerAdapter::TxHandlerImpl::select(const MethodContext& context){
+void BrokerAdapter::TxHandlerImpl::select(const MethodContext& context){
channel.begin();
- connection.client->getTx().selectOk(context);
+ client.selectOk(context.getRequestId());
}
-void BrokerAdapter::BrokerAdapter::TxHandlerImpl::commit(const MethodContext& context){
+void BrokerAdapter::TxHandlerImpl::commit(const MethodContext& context){
channel.commit();
- connection.client->getTx().commitOk(context);
+ client.commitOk(context.getRequestId());
}
-void BrokerAdapter::BrokerAdapter::TxHandlerImpl::rollback(const MethodContext& context){
+void BrokerAdapter::TxHandlerImpl::rollback(const MethodContext& context){
channel.rollback();
- connection.client->getTx().rollbackOk(context);
+ client.rollbackOk(context.getRequestId());
channel.recover(false);
}
void
-BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::ok( const MethodContext& )
+BrokerAdapter::ChannelHandlerImpl::ok( const MethodContext& )
{
//no specific action required, generic response handling should be sufficient
}
@@ -338,21 +367,21 @@ BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::ok( const MethodContext& )
// Message class method handlers
//
void
-BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::ping( const MethodContext& context)
+BrokerAdapter::ChannelHandlerImpl::ping( const MethodContext& context)
{
- connection.client->getChannel().ok(context);
- connection.client->getChannel().pong(context);
+ client.ok(context.getRequestId());
+ client.pong();
}
void
-BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::pong( const MethodContext& context)
+BrokerAdapter::ChannelHandlerImpl::pong( const MethodContext& context)
{
- connection.client->getChannel().ok(context);
+ client.ok(context.getRequestId());
}
void
-BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::resume(
+BrokerAdapter::ChannelHandlerImpl::resume(
const MethodContext&,
const string& /*channel*/ )
{