diff options
author | Gordon Sim <gsim@apache.org> | 2006-10-30 19:27:54 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2006-10-30 19:27:54 +0000 |
commit | b0a120b4edfdb49a08bd7c8c2479e7b1cadc5233 (patch) | |
tree | d2b4ca0e774100285e116e5442bff9e55b4a3f92 /cpp/src/qpid/broker/SessionHandlerImpl.cpp | |
parent | f491af49008a2ed219ad4507cd507b4317afa4cb (diff) | |
download | qpid-python-b0a120b4edfdb49a08bd7c8c2479e7b1cadc5233.tar.gz |
Initial implementation for tx class.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@469242 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/SessionHandlerImpl.cpp')
-rw-r--r-- | cpp/src/qpid/broker/SessionHandlerImpl.cpp | 40 |
1 files changed, 31 insertions, 9 deletions
diff --git a/cpp/src/qpid/broker/SessionHandlerImpl.cpp b/cpp/src/qpid/broker/SessionHandlerImpl.cpp index 35f5b20854..a472cd27b0 100644 --- a/cpp/src/qpid/broker/SessionHandlerImpl.cpp +++ b/cpp/src/qpid/broker/SessionHandlerImpl.cpp @@ -19,7 +19,6 @@ #include "qpid/broker/SessionHandlerImpl.h" #include "qpid/broker/FanOutExchange.h" #include "qpid/broker/HeadersExchange.h" -#include "qpid/broker/Router.h" #include "qpid/broker/TopicExchange.h" #include "assert.h" @@ -40,11 +39,12 @@ SessionHandlerImpl::SessionHandlerImpl(SessionContext* _context, exchanges(_exchanges), cleaner(_cleaner), timeout(_timeout), - connectionHandler(new ConnectionHandlerImpl(this)), - channelHandler(new ChannelHandlerImpl(this)), basicHandler(new BasicHandlerImpl(this)), + channelHandler(new ChannelHandlerImpl(this)), + connectionHandler(new ConnectionHandlerImpl(this)), exchangeHandler(new ExchangeHandlerImpl(this)), queueHandler(new QueueHandlerImpl(this)), + txHandler(new TxHandlerImpl(this)), framemax(65536), heartbeat(0) {} @@ -146,11 +146,11 @@ void SessionHandlerImpl::closed(){ } void SessionHandlerImpl::handleHeader(u_int16_t channel, AMQHeaderBody::shared_ptr body){ - getChannel(channel)->handleHeader(body, Router(*exchanges)); + getChannel(channel)->handleHeader(body); } void SessionHandlerImpl::handleContent(u_int16_t channel, AMQContentBody::shared_ptr body){ - getChannel(channel)->handleContent(body, Router(*exchanges)); + getChannel(channel)->handleContent(body); } void SessionHandlerImpl::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){ @@ -261,7 +261,8 @@ void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t if (passive && !name.empty()) { queue = parent->getQueue(name, channel); } else { - std::pair<Queue::shared_ptr, bool> queue_created = parent->queues->declare(name, durable, autoDelete ? parent->timeout : 0, exclusive ? parent : 0); + std::pair<Queue::shared_ptr, bool> queue_created = + parent->queues->declare(name, durable, autoDelete ? parent->timeout : 0, 0, exclusive ? parent : 0); queue = queue_created.first; assert(queue); if (queue_created.second) { // This is a new queue @@ -367,11 +368,16 @@ void SessionHandlerImpl::BasicHandlerImpl::cancel(u_int16_t channel, string& con } void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t /*ticket*/, - string& exchange, string& routingKey, + string& exchangeName, string& routingKey, bool mandatory, bool immediate){ - Message* msg = new Message(parent, exchange, routingKey, mandatory, immediate); - parent->getChannel(channel)->handlePublish(msg); + Exchange* exchange = exchangeName.empty() ? parent->exchanges->getDefault() : parent->exchanges->get(exchangeName); + if(exchange){ + Message* msg = new Message(parent, exchangeName, routingKey, mandatory, immediate); + parent->getChannel(channel)->handlePublish(msg, exchange); + }else{ + throw ChannelException(404, "Exchange not found '" + exchangeName + "'"); + } } void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t /*ticket*/, string& queueName, bool noAck){ @@ -395,4 +401,20 @@ void SessionHandlerImpl::BasicHandlerImpl::reject(u_int16_t /*channel*/, u_int64 void SessionHandlerImpl::BasicHandlerImpl::recover(u_int16_t channel, bool requeue){ parent->getChannel(channel)->recover(requeue); } + +void SessionHandlerImpl::TxHandlerImpl::select(u_int16_t channel){ + parent->getChannel(channel)->begin(); + parent->client.getTx().selectOk(channel); +} + +void SessionHandlerImpl::TxHandlerImpl::commit(u_int16_t channel){ + parent->getChannel(channel)->commit(); + parent->client.getTx().commitOk(channel); +} + +void SessionHandlerImpl::TxHandlerImpl::rollback(u_int16_t channel){ + parent->getChannel(channel)->rollback(); + parent->client.getTx().rollbackOk(channel); + parent->getChannel(channel)->recover(false); +} |