summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SessionHandlerImpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/SessionHandlerImpl.cpp')
-rw-r--r--cpp/src/qpid/broker/SessionHandlerImpl.cpp40
1 files changed, 31 insertions, 9 deletions
diff --git a/cpp/src/qpid/broker/SessionHandlerImpl.cpp b/cpp/src/qpid/broker/SessionHandlerImpl.cpp
index 35f5b20854..a472cd27b0 100644
--- a/cpp/src/qpid/broker/SessionHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/SessionHandlerImpl.cpp
@@ -19,7 +19,6 @@
#include "qpid/broker/SessionHandlerImpl.h"
#include "qpid/broker/FanOutExchange.h"
#include "qpid/broker/HeadersExchange.h"
-#include "qpid/broker/Router.h"
#include "qpid/broker/TopicExchange.h"
#include "assert.h"
@@ -40,11 +39,12 @@ SessionHandlerImpl::SessionHandlerImpl(SessionContext* _context,
exchanges(_exchanges),
cleaner(_cleaner),
timeout(_timeout),
- connectionHandler(new ConnectionHandlerImpl(this)),
- channelHandler(new ChannelHandlerImpl(this)),
basicHandler(new BasicHandlerImpl(this)),
+ channelHandler(new ChannelHandlerImpl(this)),
+ connectionHandler(new ConnectionHandlerImpl(this)),
exchangeHandler(new ExchangeHandlerImpl(this)),
queueHandler(new QueueHandlerImpl(this)),
+ txHandler(new TxHandlerImpl(this)),
framemax(65536),
heartbeat(0) {}
@@ -146,11 +146,11 @@ void SessionHandlerImpl::closed(){
}
void SessionHandlerImpl::handleHeader(u_int16_t channel, AMQHeaderBody::shared_ptr body){
- getChannel(channel)->handleHeader(body, Router(*exchanges));
+ getChannel(channel)->handleHeader(body);
}
void SessionHandlerImpl::handleContent(u_int16_t channel, AMQContentBody::shared_ptr body){
- getChannel(channel)->handleContent(body, Router(*exchanges));
+ getChannel(channel)->handleContent(body);
}
void SessionHandlerImpl::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
@@ -261,7 +261,8 @@ void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t
if (passive && !name.empty()) {
queue = parent->getQueue(name, channel);
} else {
- std::pair<Queue::shared_ptr, bool> queue_created = parent->queues->declare(name, durable, autoDelete ? parent->timeout : 0, exclusive ? parent : 0);
+ std::pair<Queue::shared_ptr, bool> queue_created =
+ parent->queues->declare(name, durable, autoDelete ? parent->timeout : 0, 0, exclusive ? parent : 0);
queue = queue_created.first;
assert(queue);
if (queue_created.second) { // This is a new queue
@@ -367,11 +368,16 @@ void SessionHandlerImpl::BasicHandlerImpl::cancel(u_int16_t channel, string& con
}
void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t /*ticket*/,
- string& exchange, string& routingKey,
+ string& exchangeName, string& routingKey,
bool mandatory, bool immediate){
- Message* msg = new Message(parent, exchange, routingKey, mandatory, immediate);
- parent->getChannel(channel)->handlePublish(msg);
+ Exchange* exchange = exchangeName.empty() ? parent->exchanges->getDefault() : parent->exchanges->get(exchangeName);
+ if(exchange){
+ Message* msg = new Message(parent, exchangeName, routingKey, mandatory, immediate);
+ parent->getChannel(channel)->handlePublish(msg, exchange);
+ }else{
+ throw ChannelException(404, "Exchange not found '" + exchangeName + "'");
+ }
}
void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t /*ticket*/, string& queueName, bool noAck){
@@ -395,4 +401,20 @@ void SessionHandlerImpl::BasicHandlerImpl::reject(u_int16_t /*channel*/, u_int64
void SessionHandlerImpl::BasicHandlerImpl::recover(u_int16_t channel, bool requeue){
parent->getChannel(channel)->recover(requeue);
}
+
+void SessionHandlerImpl::TxHandlerImpl::select(u_int16_t channel){
+ parent->getChannel(channel)->begin();
+ parent->client.getTx().selectOk(channel);
+}
+
+void SessionHandlerImpl::TxHandlerImpl::commit(u_int16_t channel){
+ parent->getChannel(channel)->commit();
+ parent->client.getTx().commitOk(channel);
+}
+
+void SessionHandlerImpl::TxHandlerImpl::rollback(u_int16_t channel){
+ parent->getChannel(channel)->rollback();
+ parent->client.getTx().rollbackOk(channel);
+ parent->getChannel(channel)->recover(false);
+}