summaryrefslogtreecommitdiff
path: root/cpp/broker/src/SessionHandlerImpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/broker/src/SessionHandlerImpl.cpp')
-rw-r--r--cpp/broker/src/SessionHandlerImpl.cpp39
1 files changed, 18 insertions, 21 deletions
diff --git a/cpp/broker/src/SessionHandlerImpl.cpp b/cpp/broker/src/SessionHandlerImpl.cpp
index 19e243a01b..a75b8fcf0f 100644
--- a/cpp/broker/src/SessionHandlerImpl.cpp
+++ b/cpp/broker/src/SessionHandlerImpl.cpp
@@ -40,6 +40,7 @@ SessionHandlerImpl::SessionHandlerImpl(SessionContext* _context,
basicHandler(new BasicHandlerImpl(this)),
exchangeHandler(new ExchangeHandlerImpl(this)),
queueHandler(new QueueHandlerImpl(this)),
+ client(context),
framemax(65536),
heartbeat(0){
@@ -90,9 +91,9 @@ void SessionHandlerImpl::received(qpid::framing::AMQFrame* frame){
}catch(ChannelException& e){
channels[channel]->close();
channels.erase(channel);
- context->send(new AMQFrame(channel, new ChannelCloseBody(e.code, e.text, method->amqpClassId(), method->amqpMethodId())));
+ client.getChannel().close(channel, e.code, e.text, method->amqpClassId(), method->amqpMethodId());
}catch(ConnectionException& e){
- context->send(new AMQFrame(0, new ConnectionCloseBody(e.code, e.text, method->amqpClassId(), method->amqpMethodId())));
+ client.getConnection().close(0, e.code, e.text, method->amqpClassId(), method->amqpMethodId());
}
break;
@@ -116,7 +117,7 @@ void SessionHandlerImpl::initiated(qpid::framing::ProtocolInitiation* header){
FieldTable properties;
string mechanisms("PLAIN");
string locales("en_US");
- context->send(new AMQFrame(0, new ConnectionStartBody(8, 0, properties, mechanisms, locales)));
+ client.getConnection().start(0, 8, 0, properties, mechanisms, locales);
}
void SessionHandlerImpl::idleOut(){
@@ -156,7 +157,7 @@ void SessionHandlerImpl::handleHeartbeat(AMQHeartbeatBody::shared_ptr body){
void SessionHandlerImpl::ConnectionHandlerImpl::startOk(u_int16_t channel, FieldTable& clientProperties, string& mechanism,
string& response, string& locale){
- parent->context->send(new AMQFrame(0, new ConnectionTuneBody(100, parent->framemax, parent->heartbeat)));
+ parent->client.getConnection().tune(0, 100, parent->framemax, parent->heartbeat);
}
void SessionHandlerImpl::ConnectionHandlerImpl::secureOk(u_int16_t channel, string& response){}
@@ -168,13 +169,13 @@ void SessionHandlerImpl::ConnectionHandlerImpl::tuneOk(u_int16_t channel, u_int1
void SessionHandlerImpl::ConnectionHandlerImpl::open(u_int16_t channel, string& virtualHost, string& capabilities, bool insist){
string knownhosts;
- parent->context->send(new AMQFrame(0, new ConnectionOpenOkBody(knownhosts)));
+ parent->client.getConnection().openOk(0, knownhosts);
}
void SessionHandlerImpl::ConnectionHandlerImpl::close(u_int16_t channel, u_int16_t replyCode, string& replyText,
u_int16_t classId, u_int16_t methodId){
- parent->context->send(new AMQFrame(0, new ConnectionCloseOkBody()));
+ parent->client.getConnection().closeOk(0);
parent->context->close();
}
@@ -186,7 +187,7 @@ void SessionHandlerImpl::ConnectionHandlerImpl::closeOk(u_int16_t channel){
void SessionHandlerImpl::ChannelHandlerImpl::open(u_int16_t channel, string& outOfBand){
parent->channels[channel] = new Channel(parent->context, channel, parent->framemax);
- parent->context->send(new AMQFrame(channel, new ChannelOpenOkBody()));
+ parent->client.getChannel().openOk(channel);
}
void SessionHandlerImpl::ChannelHandlerImpl::flow(u_int16_t channel, bool active){}
@@ -198,7 +199,7 @@ void SessionHandlerImpl::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t
parent->channels.erase(channel);
c->close();
delete c;
- parent->context->send(new AMQFrame(channel, new ChannelCloseOkBody()));
+ parent->client.getChannel().closeOk(channel);
}
void SessionHandlerImpl::ChannelHandlerImpl::closeOk(u_int16_t channel){}
@@ -230,7 +231,7 @@ void SessionHandlerImpl::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16
}
parent->exchanges->getLock()->release();
if(!nowait){
- parent->context->send(new AMQFrame(channel, new ExchangeDeclareOkBody()));
+ parent->client.getExchange().declareOk(channel);
}
}
@@ -239,11 +240,8 @@ void SessionHandlerImpl::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16
parent->exchanges->getLock()->acquire();
parent->exchanges->destroy(exchange);
parent->exchanges->getLock()->release();
- if(!nowait) parent->context->send(new AMQFrame(channel, new ExchangeDeleteOkBody()));
+ if(!nowait) parent->client.getExchange().deleteOk(channel);
}
-
-
-
void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t ticket, string& name,
bool passive, bool durable, bool exclusive,
@@ -271,8 +269,7 @@ void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t
}
if(!nowait){
name = queue->getName();
- QueueDeclareOkBody* response = new QueueDeclareOkBody(name, queue->getMessageCount(), queue->getConsumerCount());
- parent->context->send(new AMQFrame(channel, response));
+ parent->client.getQueue().declareOk(channel, name, queue->getMessageCount(), queue->getConsumerCount());
}
}
@@ -285,7 +282,7 @@ void SessionHandlerImpl::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t tic
if(exchange){
if(routingKey.size() == 0 && queueName.size() == 0) routingKey = queue->getName();
exchange->bind(queue, routingKey, &arguments);
- if(!nowait) parent->context->send(new AMQFrame(channel, new QueueBindOkBody()));
+ if(!nowait) parent->client.getQueue().bindOk(channel);
}else{
throw ChannelException(404, "Bind failed. No such exchange: " + exchangeName);
}
@@ -295,7 +292,7 @@ void SessionHandlerImpl::QueueHandlerImpl::purge(u_int16_t channel, u_int16_t ti
Queue::shared_ptr queue = parent->getQueue(queueName, channel);
int count = queue->purge();
- if(!nowait) parent->context->send(new AMQFrame(channel, new QueuePurgeOkBody(count)));
+ if(!nowait) parent->client.getQueue().purgeOk(channel, count);
}
void SessionHandlerImpl::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t ticket, string& queue,
@@ -316,7 +313,7 @@ void SessionHandlerImpl::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t
count = q->getMessageCount();
parent->queues->destroy(queue);
}
- if(!nowait) parent->context->send(new AMQFrame(channel, new QueueDeleteOkBody(count)));
+ if(!nowait) parent->client.getQueue().deleteOk(channel, count);
}
@@ -327,7 +324,7 @@ void SessionHandlerImpl::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t pref
//TODO: channel doesn't do anything with these qos parameters yet
parent->channels[channel]->setPrefetchSize(prefetchSize);
parent->channels[channel]->setPrefetchCount(prefetchCount);
- parent->context->send(new AMQFrame(channel, new BasicQosOkBody()));
+ parent->client.getBasic().qosOk(channel);
}
void SessionHandlerImpl::BasicHandlerImpl::consume(u_int16_t channelId, u_int16_t ticket,
@@ -344,7 +341,7 @@ void SessionHandlerImpl::BasicHandlerImpl::consume(u_int16_t channelId, u_int16_
try{
channel->consume(consumerTag, queue, !noAck, exclusive, noLocal ? parent : 0);
- if(!nowait) parent->context->send(new AMQFrame(channelId, new BasicConsumeOkBody(consumerTag)));
+ if(!nowait) parent->client.getBasic().consumeOk(channelId, consumerTag);
//allow messages to be dispatched if required as there is now a consumer:
queue->dispatch();
@@ -357,7 +354,7 @@ void SessionHandlerImpl::BasicHandlerImpl::consume(u_int16_t channelId, u_int16_
void SessionHandlerImpl::BasicHandlerImpl::cancel(u_int16_t channel, string& consumerTag, bool nowait){
parent->channels[channel]->cancel(consumerTag);
- if(!nowait) parent->context->send(new AMQFrame(channel, new BasicCancelOkBody(consumerTag)));
+ if(!nowait) parent->client.getBasic().cancelOk(channel, consumerTag);
}
void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t ticket,