summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/BrokerAdapter.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/BrokerAdapter.cpp48
1 files changed, 23 insertions, 25 deletions
diff --git a/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp b/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
index c266b36dfb..0fb521d626 100644
--- a/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
+++ b/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
@@ -16,8 +16,6 @@
*
*/
#include "BrokerAdapter.h"
-#include "Session.h"
-#include "SessionHandler.h"
#include "Connection.h"
#include "DeliveryToken.h"
#include "MessageDelivery.h"
@@ -38,7 +36,7 @@ typedef std::vector<Queue::shared_ptr> QueueVector;
// by the handlers responsible for those classes.
//
-BrokerAdapter::BrokerAdapter(Session& s) :
+BrokerAdapter::BrokerAdapter(SemanticState& s) :
HandlerImpl(s),
basicHandler(s),
exchangeHandler(s),
@@ -153,7 +151,7 @@ BindingQueryResult BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/
QueueQueryResult BrokerAdapter::QueueHandlerImpl::query(const string& name)
{
- Queue::shared_ptr queue = getSession().getQueue(name);
+ Queue::shared_ptr queue = state.getQueue(name);
Exchange::shared_ptr alternateExchange = queue->getAlternateExchange();
return QueueQueryResult(queue->getName(),
@@ -176,7 +174,7 @@ void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string&
}
Queue::shared_ptr queue;
if (passive && !name.empty()) {
- queue = getSession().getQueue(name);
+ queue = state.getQueue(name);
//TODO: check alternate-exchange is as expected
} else {
std::pair<Queue::shared_ptr, bool> queue_created =
@@ -187,7 +185,7 @@ void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string&
queue = queue_created.first;
assert(queue);
if (queue_created.second) { // This is a new queue
- getSession().setDefaultQueue(queue);
+ state.setDefaultQueue(queue);
if (alternate) {
queue->setAlternateExchange(alternate);
alternate->incAlternateUsers();
@@ -216,7 +214,7 @@ void BrokerAdapter::QueueHandlerImpl::bind(uint16_t /*ticket*/, const string& qu
const string& exchangeName, const string& routingKey,
const FieldTable& arguments){
- Queue::shared_ptr queue = getSession().getQueue(queueName);
+ Queue::shared_ptr queue = state.getQueue(queueName);
Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName);
if(exchange){
string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey;
@@ -239,7 +237,7 @@ BrokerAdapter::QueueHandlerImpl::unbind(uint16_t /*ticket*/,
const string& routingKey,
const qpid::framing::FieldTable& arguments )
{
- Queue::shared_ptr queue = getSession().getQueue(queueName);
+ Queue::shared_ptr queue = state.getQueue(queueName);
if (!queue.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName);
Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName);
@@ -252,12 +250,12 @@ BrokerAdapter::QueueHandlerImpl::unbind(uint16_t /*ticket*/,
}
void BrokerAdapter::QueueHandlerImpl::purge(uint16_t /*ticket*/, const string& queue){
- getSession().getQueue(queue)->purge();
+ state.getQueue(queue)->purge();
}
void BrokerAdapter::QueueHandlerImpl::delete_(uint16_t /*ticket*/, const string& queue, bool ifUnused, bool ifEmpty){
ChannelException error(0, "");
- Queue::shared_ptr q = getSession().getQueue(queue);
+ Queue::shared_ptr q = state.getQueue(queue);
if(ifEmpty && q->getMessageCount() > 0){
throw PreconditionFailedException("Queue not empty.");
}else if(ifUnused && q->getConsumerCount() > 0){
@@ -279,8 +277,8 @@ void BrokerAdapter::QueueHandlerImpl::delete_(uint16_t /*ticket*/, const string&
void BrokerAdapter::BasicHandlerImpl::qos(uint32_t prefetchSize, uint16_t prefetchCount, bool /*global*/){
//TODO: handle global
- getSession().setPrefetchSize(prefetchSize);
- getSession().setPrefetchCount(prefetchCount);
+ state.setPrefetchSize(prefetchSize);
+ state.setPrefetchCount(prefetchCount);
}
void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/,
@@ -289,8 +287,8 @@ void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/,
bool nowait, const FieldTable& fields)
{
- Queue::shared_ptr queue = getSession().getQueue(queueName);
- if(!consumerTag.empty() && getSession().exists(consumerTag)){
+ Queue::shared_ptr queue = state.getQueue(queueName);
+ if(!consumerTag.empty() && state.exists(consumerTag)){
throw ConnectionException(530, "Consumer tags must be unique");
}
string newTag = consumerTag;
@@ -298,7 +296,7 @@ void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/,
//also version specific behaviour now)
if (newTag.empty()) newTag = tagGenerator.generate();
DeliveryToken::shared_ptr token(MessageDelivery::getBasicConsumeToken(newTag));
- getSession().consume(token, newTag, queue, noLocal, !noAck, true, exclusive, &fields);
+ state.consume(token, newTag, queue, noLocal, !noAck, true, exclusive, &fields);
if(!nowait)
getProxy().getBasic().consumeOk(newTag);
@@ -308,13 +306,13 @@ void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/,
}
void BrokerAdapter::BasicHandlerImpl::cancel(const string& consumerTag){
- getSession().cancel(consumerTag);
+ state.cancel(consumerTag);
}
void BrokerAdapter::BasicHandlerImpl::get(uint16_t /*ticket*/, const string& queueName, bool noAck){
- Queue::shared_ptr queue = getSession().getQueue(queueName);
+ Queue::shared_ptr queue = state.getQueue(queueName);
DeliveryToken::shared_ptr token(MessageDelivery::getBasicGetToken(queue));
- if(!getSession().get(token, queue, !noAck)){
+ if(!state.get(token, queue, !noAck)){
string clusterId;//not used, part of an imatix hack
getProxy().getBasic().getEmpty(clusterId);
@@ -323,9 +321,9 @@ void BrokerAdapter::BasicHandlerImpl::get(uint16_t /*ticket*/, const string& que
void BrokerAdapter::BasicHandlerImpl::ack(uint64_t deliveryTag, bool multiple){
if (multiple) {
- getSession().ackCumulative(deliveryTag);
+ state.ackCumulative(deliveryTag);
} else {
- getSession().ackRange(deliveryTag, deliveryTag);
+ state.ackRange(deliveryTag, deliveryTag);
}
}
@@ -333,23 +331,23 @@ void BrokerAdapter::BasicHandlerImpl::reject(uint64_t /*deliveryTag*/, bool /*re
void BrokerAdapter::BasicHandlerImpl::recover(bool requeue)
{
- getSession().recover(requeue);
+ state.recover(requeue);
}
void BrokerAdapter::TxHandlerImpl::select()
{
- getSession().startTx();
+ state.startTx();
}
void BrokerAdapter::TxHandlerImpl::commit()
{
- getSession().commit(&getBroker().getStore());
+ state.commit(&getBroker().getStore());
}
void BrokerAdapter::TxHandlerImpl::rollback()
{
- getSession().rollback();
- getSession().recover(false);
+ state.rollback();
+ state.recover(false);
}
}} // namespace qpid::broker