diff options
Diffstat (limited to 'cpp/lib/broker/BrokerAdapter.cpp')
-rw-r--r-- | cpp/lib/broker/BrokerAdapter.cpp | 167 |
1 files changed, 98 insertions, 69 deletions
diff --git a/cpp/lib/broker/BrokerAdapter.cpp b/cpp/lib/broker/BrokerAdapter.cpp index 8b081874fc..ec80241c66 100644 --- a/cpp/lib/broker/BrokerAdapter.cpp +++ b/cpp/lib/broker/BrokerAdapter.cpp @@ -18,11 +18,10 @@ #include <boost/format.hpp> #include "BrokerAdapter.h" +#include "BrokerChannel.h" #include "Connection.h" -#include "Exception.h" #include "AMQMethodBody.h" #include "Exception.h" -#include "MessageHandlerImpl.h" namespace qpid { namespace broker { @@ -33,18 +32,37 @@ using namespace qpid::framing; typedef std::vector<Queue::shared_ptr> QueueVector; -void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::startOk( - const MethodContext& context , const FieldTable& /*clientProperties*/, + +BrokerAdapter::BrokerAdapter(Channel& ch, Connection& c, Broker& b) : + CoreRefs(ch, c, b), + connection(c), + basicHandler(*this), + channelHandler(*this), + connectionHandler(*this), + exchangeHandler(*this), + messageHandler(*this), + queueHandler(*this), + txHandler(*this) +{} + + +ProtocolVersion BrokerAdapter::getVersion() const { + return connection.getVersion(); +} + +void BrokerAdapter::ConnectionHandlerImpl::startOk( + const MethodContext&, const FieldTable& /*clientProperties*/, const string& /*mechanism*/, - const string& /*response*/, const string& /*locale*/){ - connection.client->getConnection().tune( - context, 100, connection.getFrameMax(), connection.getHeartbeat()); + const string& /*response*/, const string& /*locale*/) +{ + client.tune( + 100, connection.getFrameMax(), connection.getHeartbeat()); } -void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::secureOk( +void BrokerAdapter::ConnectionHandlerImpl::secureOk( const MethodContext&, const string& /*response*/){} -void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::tuneOk( +void BrokerAdapter::ConnectionHandlerImpl::tuneOk( const MethodContext&, u_int16_t /*channelmax*/, u_int32_t framemax, u_int16_t heartbeat) { @@ -52,50 +70,55 @@ void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::tuneOk( connection.setHeartbeat(heartbeat); } -void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::open(const MethodContext& context, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){ +void BrokerAdapter::ConnectionHandlerImpl::open( + const MethodContext& context, const string& /*virtualHost*/, + const string& /*capabilities*/, bool /*insist*/) +{ string knownhosts; - connection.client->getConnection().openOk(context, knownhosts); + client.openOk( + knownhosts, context.getRequestId()); } -void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::close( +void BrokerAdapter::ConnectionHandlerImpl::close( const MethodContext& context, u_int16_t /*replyCode*/, const string& /*replyText*/, u_int16_t /*classId*/, u_int16_t /*methodId*/) { - connection.client->getConnection().closeOk(context); + client.closeOk(context.getRequestId()); connection.getOutput().close(); } -void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::closeOk(const MethodContext&){ +void BrokerAdapter::ConnectionHandlerImpl::closeOk(const MethodContext&){ connection.getOutput().close(); } -void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::open( +void BrokerAdapter::ChannelHandlerImpl::open( const MethodContext& context, const string& /*outOfBand*/){ channel.open(); // FIXME aconway 2007-01-04: provide valid ID as per ampq 0-9 - connection.client->getChannel().openOk(context, std::string()/* ID */); + client.openOk( + std::string()/* ID */, context.getRequestId()); } -void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::flow(const MethodContext&, bool /*active*/){} -void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::flowOk(const MethodContext&, bool /*active*/){} +void BrokerAdapter::ChannelHandlerImpl::flow(const MethodContext&, bool /*active*/){} +void BrokerAdapter::ChannelHandlerImpl::flowOk(const MethodContext&, bool /*active*/){} -void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::close( +void BrokerAdapter::ChannelHandlerImpl::close( const MethodContext& context, u_int16_t /*replyCode*/, const string& /*replyText*/, u_int16_t /*classId*/, u_int16_t /*methodId*/) { - connection.client->getChannel().closeOk(context); + client.closeOk(context.getRequestId()); // FIXME aconway 2007-01-18: Following line will "delete this". Ugly. connection.closeChannel(channel.getId()); } -void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::closeOk(const MethodContext&){} +void BrokerAdapter::ChannelHandlerImpl::closeOk(const MethodContext&){} -void BrokerAdapter::BrokerAdapter::ExchangeHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& exchange, const string& type, - bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait, - const FieldTable& /*arguments*/){ +void BrokerAdapter::ExchangeHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& exchange, const string& type, + bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait, + const FieldTable& /*arguments*/){ if(passive){ if(!broker.getExchanges().get(exchange)) { @@ -116,27 +139,30 @@ void BrokerAdapter::BrokerAdapter::ExchangeHandlerImpl::declare(const MethodCont } } if(!nowait){ - connection.client->getExchange().declareOk(context); + client.declareOk(context.getRequestId()); } } -void BrokerAdapter::BrokerAdapter::ExchangeHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/, - const string& exchange, bool /*ifUnused*/, bool nowait){ +void BrokerAdapter::ExchangeHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/, + const string& exchange, bool /*ifUnused*/, bool nowait){ //TODO: implement unused broker.getExchanges().destroy(exchange); - if(!nowait) connection.client->getExchange().deleteOk(context); + if(!nowait) client.deleteOk(context.getRequestId()); } -void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& name, - bool passive, bool durable, bool exclusive, - bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){ +void BrokerAdapter::QueueHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& name, + bool passive, bool durable, bool exclusive, + bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){ Queue::shared_ptr queue; if (passive && !name.empty()) { queue = connection.getQueue(name, channel.getId()); } else { std::pair<Queue::shared_ptr, bool> queue_created = - broker.getQueues().declare(name, durable, autoDelete ? connection.settings.timeout : 0, exclusive ? &connection : 0); + broker.getQueues().declare( + name, durable, + autoDelete ? connection.getTimeout() : 0, + exclusive ? &connection : 0); queue = queue_created.first; assert(queue); if (queue_created.second) { // This is a new queue @@ -161,20 +187,22 @@ void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::declare(const MethodContext % queue->getName()); if (!nowait) { string queueName = queue->getName(); - connection.client->getQueue().declareOk(context, queueName, queue->getMessageCount(), queue->getConsumerCount()); + client.declareOk( + queueName, queue->getMessageCount(), queue->getConsumerCount(), + context.getRequestId()); } } -void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::bind(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, - const string& exchangeName, const string& routingKey, bool nowait, - const FieldTable& arguments){ +void BrokerAdapter::QueueHandlerImpl::bind(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, + const string& exchangeName, const string& routingKey, bool nowait, + const FieldTable& arguments){ Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName); if(exchange){ string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey; exchange->bind(queue, exchangeRoutingKey, &arguments); - if(!nowait) connection.client->getQueue().bindOk(context); + if(!nowait) client.bindOk(context.getRequestId()); }else{ throw ChannelException( 404, "Bind failed. No such exchange: " + exchangeName); @@ -182,7 +210,7 @@ void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::bind(const MethodContext& c } void -BrokerAdapter::BrokerAdapter::QueueHandlerImpl::unbind( +BrokerAdapter::QueueHandlerImpl::unbind( const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, @@ -198,18 +226,18 @@ BrokerAdapter::BrokerAdapter::QueueHandlerImpl::unbind( exchange->unbind(queue, routingKey, &arguments); - connection.client->getQueue().unbindOk(context); + client.unbindOk(context.getRequestId()); } -void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::purge(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool nowait){ +void BrokerAdapter::QueueHandlerImpl::purge(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool nowait){ Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); int count = queue->purge(); - if(!nowait) connection.client->getQueue().purgeOk(context, count); + if(!nowait) client.purgeOk( count, context.getRequestId()); } -void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/, const string& queue, - bool ifUnused, bool ifEmpty, bool nowait){ +void BrokerAdapter::QueueHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/, const string& queue, + bool ifUnused, bool ifEmpty, bool nowait){ ChannelException error(0, ""); int count(0); Queue::shared_ptr q = connection.getQueue(queue, channel.getId()); @@ -228,20 +256,21 @@ void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::delete_(const MethodContext broker.getQueues().destroy(queue); } - if(!nowait) connection.client->getQueue().deleteOk(context, count); + if(!nowait) + client.deleteOk(count, context.getRequestId()); } -void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::qos(const MethodContext& context, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){ +void BrokerAdapter::BasicHandlerImpl::qos(const MethodContext& context, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){ //TODO: handle global channel.setPrefetchSize(prefetchSize); channel.setPrefetchCount(prefetchCount); - connection.client->getBasic().qosOk(context); + client.qosOk(context.getRequestId()); } -void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::consume( +void BrokerAdapter::BasicHandlerImpl::consume( const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, const string& consumerTag, bool noLocal, bool noAck, bool exclusive, @@ -257,19 +286,19 @@ void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::consume( channel.consume( newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &fields); - if(!nowait) connection.client->getBasic().consumeOk(context, newTag); + if(!nowait) client.consumeOk(newTag, context.getRequestId()); //allow messages to be dispatched if required as there is now a consumer: queue->dispatch(); } -void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::cancel(const MethodContext& context, const string& consumerTag, bool nowait){ +void BrokerAdapter::BasicHandlerImpl::cancel(const MethodContext& context, const string& consumerTag, bool nowait){ channel.cancel(consumerTag); - if(!nowait) connection.client->getBasic().cancelOk(context, consumerTag); + if(!nowait) client.cancelOk(consumerTag, context.getRequestId()); } -void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::publish( +void BrokerAdapter::BasicHandlerImpl::publish( const MethodContext& context, u_int16_t /*ticket*/, const string& exchangeName, const string& routingKey, bool mandatory, bool immediate) @@ -287,16 +316,16 @@ void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::publish( } } -void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::get(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool noAck){ +void BrokerAdapter::BasicHandlerImpl::get(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool noAck){ Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); if(!connection.getChannel(channel.getId()).get(queue, "", !noAck)){ string clusterId;//not used, part of an imatix hack - connection.client->getBasic().getEmpty(context, clusterId); + client.getEmpty(clusterId, context.getRequestId()); } } -void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::ack(const MethodContext&, u_int64_t deliveryTag, bool multiple){ +void BrokerAdapter::BasicHandlerImpl::ack(const MethodContext&, u_int64_t deliveryTag, bool multiple){ try{ channel.ack(deliveryTag, multiple); }catch(InvalidAckException& e){ @@ -304,31 +333,31 @@ void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::ack(const MethodContext&, u } } -void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::reject(const MethodContext&, u_int64_t /*deliveryTag*/, bool /*requeue*/){} +void BrokerAdapter::BasicHandlerImpl::reject(const MethodContext&, u_int64_t /*deliveryTag*/, bool /*requeue*/){} -void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::recover(const MethodContext&, bool requeue){ +void BrokerAdapter::BasicHandlerImpl::recover(const MethodContext&, bool requeue){ channel.recover(requeue); } -void BrokerAdapter::BrokerAdapter::TxHandlerImpl::select(const MethodContext& context){ +void BrokerAdapter::TxHandlerImpl::select(const MethodContext& context){ channel.begin(); - connection.client->getTx().selectOk(context); + client.selectOk(context.getRequestId()); } -void BrokerAdapter::BrokerAdapter::TxHandlerImpl::commit(const MethodContext& context){ +void BrokerAdapter::TxHandlerImpl::commit(const MethodContext& context){ channel.commit(); - connection.client->getTx().commitOk(context); + client.commitOk(context.getRequestId()); } -void BrokerAdapter::BrokerAdapter::TxHandlerImpl::rollback(const MethodContext& context){ +void BrokerAdapter::TxHandlerImpl::rollback(const MethodContext& context){ channel.rollback(); - connection.client->getTx().rollbackOk(context); + client.rollbackOk(context.getRequestId()); channel.recover(false); } void -BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::ok( const MethodContext& ) +BrokerAdapter::ChannelHandlerImpl::ok( const MethodContext& ) { //no specific action required, generic response handling should be sufficient } @@ -338,21 +367,21 @@ BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::ok( const MethodContext& ) // Message class method handlers // void -BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::ping( const MethodContext& context) +BrokerAdapter::ChannelHandlerImpl::ping( const MethodContext& context) { - connection.client->getChannel().ok(context); - connection.client->getChannel().pong(context); + client.ok(context.getRequestId()); + client.pong(); } void -BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::pong( const MethodContext& context) +BrokerAdapter::ChannelHandlerImpl::pong( const MethodContext& context) { - connection.client->getChannel().ok(context); + client.ok(context.getRequestId()); } void -BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::resume( +BrokerAdapter::ChannelHandlerImpl::resume( const MethodContext&, const string& /*channel*/ ) { |