diff options
Diffstat (limited to 'cpp/src/qpid/broker/SessionHandlerImpl.cpp')
-rw-r--r-- | cpp/src/qpid/broker/SessionHandlerImpl.cpp | 40 |
1 files changed, 31 insertions, 9 deletions
diff --git a/cpp/src/qpid/broker/SessionHandlerImpl.cpp b/cpp/src/qpid/broker/SessionHandlerImpl.cpp index 35f5b20854..a472cd27b0 100644 --- a/cpp/src/qpid/broker/SessionHandlerImpl.cpp +++ b/cpp/src/qpid/broker/SessionHandlerImpl.cpp @@ -19,7 +19,6 @@ #include "qpid/broker/SessionHandlerImpl.h" #include "qpid/broker/FanOutExchange.h" #include "qpid/broker/HeadersExchange.h" -#include "qpid/broker/Router.h" #include "qpid/broker/TopicExchange.h" #include "assert.h" @@ -40,11 +39,12 @@ SessionHandlerImpl::SessionHandlerImpl(SessionContext* _context, exchanges(_exchanges), cleaner(_cleaner), timeout(_timeout), - connectionHandler(new ConnectionHandlerImpl(this)), - channelHandler(new ChannelHandlerImpl(this)), basicHandler(new BasicHandlerImpl(this)), + channelHandler(new ChannelHandlerImpl(this)), + connectionHandler(new ConnectionHandlerImpl(this)), exchangeHandler(new ExchangeHandlerImpl(this)), queueHandler(new QueueHandlerImpl(this)), + txHandler(new TxHandlerImpl(this)), framemax(65536), heartbeat(0) {} @@ -146,11 +146,11 @@ void SessionHandlerImpl::closed(){ } void SessionHandlerImpl::handleHeader(u_int16_t channel, AMQHeaderBody::shared_ptr body){ - getChannel(channel)->handleHeader(body, Router(*exchanges)); + getChannel(channel)->handleHeader(body); } void SessionHandlerImpl::handleContent(u_int16_t channel, AMQContentBody::shared_ptr body){ - getChannel(channel)->handleContent(body, Router(*exchanges)); + getChannel(channel)->handleContent(body); } void SessionHandlerImpl::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){ @@ -261,7 +261,8 @@ void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t if (passive && !name.empty()) { queue = parent->getQueue(name, channel); } else { - std::pair<Queue::shared_ptr, bool> queue_created = parent->queues->declare(name, durable, autoDelete ? parent->timeout : 0, exclusive ? parent : 0); + std::pair<Queue::shared_ptr, bool> queue_created = + parent->queues->declare(name, durable, autoDelete ? parent->timeout : 0, 0, exclusive ? parent : 0); queue = queue_created.first; assert(queue); if (queue_created.second) { // This is a new queue @@ -367,11 +368,16 @@ void SessionHandlerImpl::BasicHandlerImpl::cancel(u_int16_t channel, string& con } void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t /*ticket*/, - string& exchange, string& routingKey, + string& exchangeName, string& routingKey, bool mandatory, bool immediate){ - Message* msg = new Message(parent, exchange, routingKey, mandatory, immediate); - parent->getChannel(channel)->handlePublish(msg); + Exchange* exchange = exchangeName.empty() ? parent->exchanges->getDefault() : parent->exchanges->get(exchangeName); + if(exchange){ + Message* msg = new Message(parent, exchangeName, routingKey, mandatory, immediate); + parent->getChannel(channel)->handlePublish(msg, exchange); + }else{ + throw ChannelException(404, "Exchange not found '" + exchangeName + "'"); + } } void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t /*ticket*/, string& queueName, bool noAck){ @@ -395,4 +401,20 @@ void SessionHandlerImpl::BasicHandlerImpl::reject(u_int16_t /*channel*/, u_int64 void SessionHandlerImpl::BasicHandlerImpl::recover(u_int16_t channel, bool requeue){ parent->getChannel(channel)->recover(requeue); } + +void SessionHandlerImpl::TxHandlerImpl::select(u_int16_t channel){ + parent->getChannel(channel)->begin(); + parent->client.getTx().selectOk(channel); +} + +void SessionHandlerImpl::TxHandlerImpl::commit(u_int16_t channel){ + parent->getChannel(channel)->commit(); + parent->client.getTx().commitOk(channel); +} + +void SessionHandlerImpl::TxHandlerImpl::rollback(u_int16_t channel){ + parent->getChannel(channel)->rollback(); + parent->client.getTx().rollbackOk(channel); + parent->getChannel(channel)->recover(false); +} |