diff options
Diffstat (limited to 'cpp/broker/src')
-rw-r--r-- | cpp/broker/src/Channel.cpp | 19 | ||||
-rw-r--r-- | cpp/broker/src/SessionHandlerImpl.cpp | 36 |
2 files changed, 35 insertions, 20 deletions
diff --git a/cpp/broker/src/Channel.cpp b/cpp/broker/src/Channel.cpp index 6980fe5a1b..f1f7d63a39 100644 --- a/cpp/broker/src/Channel.cpp +++ b/cpp/broker/src/Channel.cpp @@ -57,11 +57,14 @@ void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks, bool excl } void Channel::cancel(string& tag){ - ConsumerImpl* c = consumers[tag]; - if(c){ - c->cancel(); - consumers.erase(tag); - delete c; + consumer_iterator i = consumers.find(tag); + if(i != consumers.end()){ + ConsumerImpl* c = i->second; + consumers.erase(i); + if(c){ + c->cancel(); + delete c; + } } } @@ -69,9 +72,11 @@ void Channel::close(){ //cancel all consumers for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin() ){ ConsumerImpl* c = i->second; - c->cancel(); consumers.erase(i); - delete c; + if(c){ + c->cancel(); + delete c; + } } } diff --git a/cpp/broker/src/SessionHandlerImpl.cpp b/cpp/broker/src/SessionHandlerImpl.cpp index 872e6f124a..c72ad45d7c 100644 --- a/cpp/broker/src/SessionHandlerImpl.cpp +++ b/cpp/broker/src/SessionHandlerImpl.cpp @@ -55,10 +55,18 @@ SessionHandlerImpl::~SessionHandlerImpl(){ delete queueHandler; } +Channel* SessionHandlerImpl::getChannel(u_int16_t channel){ + channel_iterator i = channels.find(channel); + if(i == channels.end()){ + throw ConnectionException(504, "Unknown channel: " + channel); + } + return i->second; +} + Queue::shared_ptr SessionHandlerImpl::getQueue(const string& name, u_int16_t channel){ Queue::shared_ptr queue; if (name.empty()) { - queue = channels[channel]->getDefaultQueue(); + queue = getChannel(channel)->getDefaultQueue(); if (!queue) throw ConnectionException( 530, "Queue must be specified or previously declared" ); } else { queue = queues->find(name); @@ -143,11 +151,11 @@ void SessionHandlerImpl::closed(){ } void SessionHandlerImpl::handleHeader(u_int16_t channel, AMQHeaderBody::shared_ptr body){ - channels[channel]->handleHeader(body, exchanges); + getChannel(channel)->handleHeader(body, exchanges); } void SessionHandlerImpl::handleContent(u_int16_t channel, AMQContentBody::shared_ptr body){ - channels[channel]->handleContent(body, exchanges); + getChannel(channel)->handleContent(body, exchanges); } void SessionHandlerImpl::handleHeartbeat(AMQHeartbeatBody::shared_ptr body){ @@ -195,11 +203,13 @@ void SessionHandlerImpl::ChannelHandlerImpl::flowOk(u_int16_t channel, bool acti void SessionHandlerImpl::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t replyCode, string& replyText, u_int16_t classId, u_int16_t methodId){ - Channel* c = parent->channels[channel]; - parent->channels.erase(channel); - c->close(); - delete c; - parent->client.getChannel().closeOk(channel); + Channel* c = parent->getChannel(channel); + if(c){ + parent->channels.erase(channel); + c->close(); + delete c; + parent->client.getChannel().closeOk(channel); + } } void SessionHandlerImpl::ChannelHandlerImpl::closeOk(u_int16_t channel){} @@ -254,7 +264,7 @@ void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t queue = queue_created.first; assert(queue); if (queue_created.second) { // This is a new queue - parent->channels[channel]->setDefaultQueue(queue); + parent->getChannel(channel)->setDefaultQueue(queue); //add default binding: parent->exchanges->getDefault()->bind(queue, name, 0); if(exclusive){ @@ -322,8 +332,8 @@ void SessionHandlerImpl::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t void SessionHandlerImpl::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool global){ //TODO: handle global //TODO: channel doesn't do anything with these qos parameters yet - parent->channels[channel]->setPrefetchSize(prefetchSize); - parent->channels[channel]->setPrefetchCount(prefetchCount); + parent->getChannel(channel)->setPrefetchSize(prefetchSize); + parent->getChannel(channel)->setPrefetchCount(prefetchCount); parent->client.getBasic().qosOk(channel); } @@ -353,7 +363,7 @@ void SessionHandlerImpl::BasicHandlerImpl::consume(u_int16_t channelId, u_int16_ } void SessionHandlerImpl::BasicHandlerImpl::cancel(u_int16_t channel, string& consumerTag, bool nowait){ - parent->channels[channel]->cancel(consumerTag); + parent->getChannel(channel)->cancel(consumerTag); if(!nowait) parent->client.getBasic().cancelOk(channel, consumerTag); } @@ -362,7 +372,7 @@ void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t bool mandatory, bool immediate){ Message* msg = new Message(parent, exchange, routingKey, mandatory, immediate); - parent->channels[channel]->handlePublish(msg); + parent->getChannel(channel)->handlePublish(msg); } void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channel, u_int16_t ticket, string& queue, bool noAck){} |