diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/BrokerAdapter.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/BrokerAdapter.cpp | 48 |
1 files changed, 23 insertions, 25 deletions
diff --git a/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp b/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp index c266b36dfb..0fb521d626 100644 --- a/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp +++ b/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp @@ -16,8 +16,6 @@ * */ #include "BrokerAdapter.h" -#include "Session.h" -#include "SessionHandler.h" #include "Connection.h" #include "DeliveryToken.h" #include "MessageDelivery.h" @@ -38,7 +36,7 @@ typedef std::vector<Queue::shared_ptr> QueueVector; // by the handlers responsible for those classes. // -BrokerAdapter::BrokerAdapter(Session& s) : +BrokerAdapter::BrokerAdapter(SemanticState& s) : HandlerImpl(s), basicHandler(s), exchangeHandler(s), @@ -153,7 +151,7 @@ BindingQueryResult BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/ QueueQueryResult BrokerAdapter::QueueHandlerImpl::query(const string& name) { - Queue::shared_ptr queue = getSession().getQueue(name); + Queue::shared_ptr queue = state.getQueue(name); Exchange::shared_ptr alternateExchange = queue->getAlternateExchange(); return QueueQueryResult(queue->getName(), @@ -176,7 +174,7 @@ void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string& } Queue::shared_ptr queue; if (passive && !name.empty()) { - queue = getSession().getQueue(name); + queue = state.getQueue(name); //TODO: check alternate-exchange is as expected } else { std::pair<Queue::shared_ptr, bool> queue_created = @@ -187,7 +185,7 @@ void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string& queue = queue_created.first; assert(queue); if (queue_created.second) { // This is a new queue - getSession().setDefaultQueue(queue); + state.setDefaultQueue(queue); if (alternate) { queue->setAlternateExchange(alternate); alternate->incAlternateUsers(); @@ -216,7 +214,7 @@ void BrokerAdapter::QueueHandlerImpl::bind(uint16_t /*ticket*/, const string& qu const string& exchangeName, const string& routingKey, const FieldTable& arguments){ - Queue::shared_ptr queue = getSession().getQueue(queueName); + Queue::shared_ptr queue = state.getQueue(queueName); Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName); if(exchange){ string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey; @@ -239,7 +237,7 @@ BrokerAdapter::QueueHandlerImpl::unbind(uint16_t /*ticket*/, const string& routingKey, const qpid::framing::FieldTable& arguments ) { - Queue::shared_ptr queue = getSession().getQueue(queueName); + Queue::shared_ptr queue = state.getQueue(queueName); if (!queue.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName); Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName); @@ -252,12 +250,12 @@ BrokerAdapter::QueueHandlerImpl::unbind(uint16_t /*ticket*/, } void BrokerAdapter::QueueHandlerImpl::purge(uint16_t /*ticket*/, const string& queue){ - getSession().getQueue(queue)->purge(); + state.getQueue(queue)->purge(); } void BrokerAdapter::QueueHandlerImpl::delete_(uint16_t /*ticket*/, const string& queue, bool ifUnused, bool ifEmpty){ ChannelException error(0, ""); - Queue::shared_ptr q = getSession().getQueue(queue); + Queue::shared_ptr q = state.getQueue(queue); if(ifEmpty && q->getMessageCount() > 0){ throw PreconditionFailedException("Queue not empty."); }else if(ifUnused && q->getConsumerCount() > 0){ @@ -279,8 +277,8 @@ void BrokerAdapter::QueueHandlerImpl::delete_(uint16_t /*ticket*/, const string& void BrokerAdapter::BasicHandlerImpl::qos(uint32_t prefetchSize, uint16_t prefetchCount, bool /*global*/){ //TODO: handle global - getSession().setPrefetchSize(prefetchSize); - getSession().setPrefetchCount(prefetchCount); + state.setPrefetchSize(prefetchSize); + state.setPrefetchCount(prefetchCount); } void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/, @@ -289,8 +287,8 @@ void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/, bool nowait, const FieldTable& fields) { - Queue::shared_ptr queue = getSession().getQueue(queueName); - if(!consumerTag.empty() && getSession().exists(consumerTag)){ + Queue::shared_ptr queue = state.getQueue(queueName); + if(!consumerTag.empty() && state.exists(consumerTag)){ throw ConnectionException(530, "Consumer tags must be unique"); } string newTag = consumerTag; @@ -298,7 +296,7 @@ void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/, //also version specific behaviour now) if (newTag.empty()) newTag = tagGenerator.generate(); DeliveryToken::shared_ptr token(MessageDelivery::getBasicConsumeToken(newTag)); - getSession().consume(token, newTag, queue, noLocal, !noAck, true, exclusive, &fields); + state.consume(token, newTag, queue, noLocal, !noAck, true, exclusive, &fields); if(!nowait) getProxy().getBasic().consumeOk(newTag); @@ -308,13 +306,13 @@ void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/, } void BrokerAdapter::BasicHandlerImpl::cancel(const string& consumerTag){ - getSession().cancel(consumerTag); + state.cancel(consumerTag); } void BrokerAdapter::BasicHandlerImpl::get(uint16_t /*ticket*/, const string& queueName, bool noAck){ - Queue::shared_ptr queue = getSession().getQueue(queueName); + Queue::shared_ptr queue = state.getQueue(queueName); DeliveryToken::shared_ptr token(MessageDelivery::getBasicGetToken(queue)); - if(!getSession().get(token, queue, !noAck)){ + if(!state.get(token, queue, !noAck)){ string clusterId;//not used, part of an imatix hack getProxy().getBasic().getEmpty(clusterId); @@ -323,9 +321,9 @@ void BrokerAdapter::BasicHandlerImpl::get(uint16_t /*ticket*/, const string& que void BrokerAdapter::BasicHandlerImpl::ack(uint64_t deliveryTag, bool multiple){ if (multiple) { - getSession().ackCumulative(deliveryTag); + state.ackCumulative(deliveryTag); } else { - getSession().ackRange(deliveryTag, deliveryTag); + state.ackRange(deliveryTag, deliveryTag); } } @@ -333,23 +331,23 @@ void BrokerAdapter::BasicHandlerImpl::reject(uint64_t /*deliveryTag*/, bool /*re void BrokerAdapter::BasicHandlerImpl::recover(bool requeue) { - getSession().recover(requeue); + state.recover(requeue); } void BrokerAdapter::TxHandlerImpl::select() { - getSession().startTx(); + state.startTx(); } void BrokerAdapter::TxHandlerImpl::commit() { - getSession().commit(&getBroker().getStore()); + state.commit(&getBroker().getStore()); } void BrokerAdapter::TxHandlerImpl::rollback() { - getSession().rollback(); - getSession().recover(false); + state.rollback(); + state.recover(false); } }} // namespace qpid::broker |