diff options
Diffstat (limited to 'cpp/broker/src/SessionHandlerImpl.cpp')
-rw-r--r-- | cpp/broker/src/SessionHandlerImpl.cpp | 39 |
1 files changed, 18 insertions, 21 deletions
diff --git a/cpp/broker/src/SessionHandlerImpl.cpp b/cpp/broker/src/SessionHandlerImpl.cpp index 19e243a01b..a75b8fcf0f 100644 --- a/cpp/broker/src/SessionHandlerImpl.cpp +++ b/cpp/broker/src/SessionHandlerImpl.cpp @@ -40,6 +40,7 @@ SessionHandlerImpl::SessionHandlerImpl(SessionContext* _context, basicHandler(new BasicHandlerImpl(this)), exchangeHandler(new ExchangeHandlerImpl(this)), queueHandler(new QueueHandlerImpl(this)), + client(context), framemax(65536), heartbeat(0){ @@ -90,9 +91,9 @@ void SessionHandlerImpl::received(qpid::framing::AMQFrame* frame){ }catch(ChannelException& e){ channels[channel]->close(); channels.erase(channel); - context->send(new AMQFrame(channel, new ChannelCloseBody(e.code, e.text, method->amqpClassId(), method->amqpMethodId()))); + client.getChannel().close(channel, e.code, e.text, method->amqpClassId(), method->amqpMethodId()); }catch(ConnectionException& e){ - context->send(new AMQFrame(0, new ConnectionCloseBody(e.code, e.text, method->amqpClassId(), method->amqpMethodId()))); + client.getConnection().close(0, e.code, e.text, method->amqpClassId(), method->amqpMethodId()); } break; @@ -116,7 +117,7 @@ void SessionHandlerImpl::initiated(qpid::framing::ProtocolInitiation* header){ FieldTable properties; string mechanisms("PLAIN"); string locales("en_US"); - context->send(new AMQFrame(0, new ConnectionStartBody(8, 0, properties, mechanisms, locales))); + client.getConnection().start(0, 8, 0, properties, mechanisms, locales); } void SessionHandlerImpl::idleOut(){ @@ -156,7 +157,7 @@ void SessionHandlerImpl::handleHeartbeat(AMQHeartbeatBody::shared_ptr body){ void SessionHandlerImpl::ConnectionHandlerImpl::startOk(u_int16_t channel, FieldTable& clientProperties, string& mechanism, string& response, string& locale){ - parent->context->send(new AMQFrame(0, new ConnectionTuneBody(100, parent->framemax, parent->heartbeat))); + parent->client.getConnection().tune(0, 100, parent->framemax, parent->heartbeat); } void SessionHandlerImpl::ConnectionHandlerImpl::secureOk(u_int16_t channel, string& response){} @@ -168,13 +169,13 @@ void SessionHandlerImpl::ConnectionHandlerImpl::tuneOk(u_int16_t channel, u_int1 void SessionHandlerImpl::ConnectionHandlerImpl::open(u_int16_t channel, string& virtualHost, string& capabilities, bool insist){ string knownhosts; - parent->context->send(new AMQFrame(0, new ConnectionOpenOkBody(knownhosts))); + parent->client.getConnection().openOk(0, knownhosts); } void SessionHandlerImpl::ConnectionHandlerImpl::close(u_int16_t channel, u_int16_t replyCode, string& replyText, u_int16_t classId, u_int16_t methodId){ - parent->context->send(new AMQFrame(0, new ConnectionCloseOkBody())); + parent->client.getConnection().closeOk(0); parent->context->close(); } @@ -186,7 +187,7 @@ void SessionHandlerImpl::ConnectionHandlerImpl::closeOk(u_int16_t channel){ void SessionHandlerImpl::ChannelHandlerImpl::open(u_int16_t channel, string& outOfBand){ parent->channels[channel] = new Channel(parent->context, channel, parent->framemax); - parent->context->send(new AMQFrame(channel, new ChannelOpenOkBody())); + parent->client.getChannel().openOk(channel); } void SessionHandlerImpl::ChannelHandlerImpl::flow(u_int16_t channel, bool active){} @@ -198,7 +199,7 @@ void SessionHandlerImpl::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t parent->channels.erase(channel); c->close(); delete c; - parent->context->send(new AMQFrame(channel, new ChannelCloseOkBody())); + parent->client.getChannel().closeOk(channel); } void SessionHandlerImpl::ChannelHandlerImpl::closeOk(u_int16_t channel){} @@ -230,7 +231,7 @@ void SessionHandlerImpl::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16 } parent->exchanges->getLock()->release(); if(!nowait){ - parent->context->send(new AMQFrame(channel, new ExchangeDeclareOkBody())); + parent->client.getExchange().declareOk(channel); } } @@ -239,11 +240,8 @@ void SessionHandlerImpl::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16 parent->exchanges->getLock()->acquire(); parent->exchanges->destroy(exchange); parent->exchanges->getLock()->release(); - if(!nowait) parent->context->send(new AMQFrame(channel, new ExchangeDeleteOkBody())); + if(!nowait) parent->client.getExchange().deleteOk(channel); } - - - void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t ticket, string& name, bool passive, bool durable, bool exclusive, @@ -271,8 +269,7 @@ void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t } if(!nowait){ name = queue->getName(); - QueueDeclareOkBody* response = new QueueDeclareOkBody(name, queue->getMessageCount(), queue->getConsumerCount()); - parent->context->send(new AMQFrame(channel, response)); + parent->client.getQueue().declareOk(channel, name, queue->getMessageCount(), queue->getConsumerCount()); } } @@ -285,7 +282,7 @@ void SessionHandlerImpl::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t tic if(exchange){ if(routingKey.size() == 0 && queueName.size() == 0) routingKey = queue->getName(); exchange->bind(queue, routingKey, &arguments); - if(!nowait) parent->context->send(new AMQFrame(channel, new QueueBindOkBody())); + if(!nowait) parent->client.getQueue().bindOk(channel); }else{ throw ChannelException(404, "Bind failed. No such exchange: " + exchangeName); } @@ -295,7 +292,7 @@ void SessionHandlerImpl::QueueHandlerImpl::purge(u_int16_t channel, u_int16_t ti Queue::shared_ptr queue = parent->getQueue(queueName, channel); int count = queue->purge(); - if(!nowait) parent->context->send(new AMQFrame(channel, new QueuePurgeOkBody(count))); + if(!nowait) parent->client.getQueue().purgeOk(channel, count); } void SessionHandlerImpl::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t ticket, string& queue, @@ -316,7 +313,7 @@ void SessionHandlerImpl::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t count = q->getMessageCount(); parent->queues->destroy(queue); } - if(!nowait) parent->context->send(new AMQFrame(channel, new QueueDeleteOkBody(count))); + if(!nowait) parent->client.getQueue().deleteOk(channel, count); } @@ -327,7 +324,7 @@ void SessionHandlerImpl::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t pref //TODO: channel doesn't do anything with these qos parameters yet parent->channels[channel]->setPrefetchSize(prefetchSize); parent->channels[channel]->setPrefetchCount(prefetchCount); - parent->context->send(new AMQFrame(channel, new BasicQosOkBody())); + parent->client.getBasic().qosOk(channel); } void SessionHandlerImpl::BasicHandlerImpl::consume(u_int16_t channelId, u_int16_t ticket, @@ -344,7 +341,7 @@ void SessionHandlerImpl::BasicHandlerImpl::consume(u_int16_t channelId, u_int16_ try{ channel->consume(consumerTag, queue, !noAck, exclusive, noLocal ? parent : 0); - if(!nowait) parent->context->send(new AMQFrame(channelId, new BasicConsumeOkBody(consumerTag))); + if(!nowait) parent->client.getBasic().consumeOk(channelId, consumerTag); //allow messages to be dispatched if required as there is now a consumer: queue->dispatch(); @@ -357,7 +354,7 @@ void SessionHandlerImpl::BasicHandlerImpl::consume(u_int16_t channelId, u_int16_ void SessionHandlerImpl::BasicHandlerImpl::cancel(u_int16_t channel, string& consumerTag, bool nowait){ parent->channels[channel]->cancel(consumerTag); - if(!nowait) parent->context->send(new AMQFrame(channel, new BasicCancelOkBody(consumerTag))); + if(!nowait) parent->client.getBasic().cancelOk(channel, consumerTag); } void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t ticket, |