diff options
author | Gordon Sim <gsim@apache.org> | 2006-09-22 09:53:47 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2006-09-22 09:53:47 +0000 |
commit | 488beb5a2a072afe2be0c7a1f679a5049f3b2e19 (patch) | |
tree | ec5d2f70029800a12d3fdd6a91cc0169010a85ac /cpp/broker/src/SessionHandlerImpl.cpp | |
parent | 6225325010374b21f589e4daba8ddd48564786fa (diff) | |
download | qpid-python-488beb5a2a072afe2be0c7a1f679a5049f3b2e19.tar.gz |
Added tests for basic_cancel and for handling of invalid channel ids.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@448881 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/broker/src/SessionHandlerImpl.cpp')
-rw-r--r-- | cpp/broker/src/SessionHandlerImpl.cpp | 36 |
1 files changed, 23 insertions, 13 deletions
diff --git a/cpp/broker/src/SessionHandlerImpl.cpp b/cpp/broker/src/SessionHandlerImpl.cpp index 872e6f124a..c72ad45d7c 100644 --- a/cpp/broker/src/SessionHandlerImpl.cpp +++ b/cpp/broker/src/SessionHandlerImpl.cpp @@ -55,10 +55,18 @@ SessionHandlerImpl::~SessionHandlerImpl(){ delete queueHandler; } +Channel* SessionHandlerImpl::getChannel(u_int16_t channel){ + channel_iterator i = channels.find(channel); + if(i == channels.end()){ + throw ConnectionException(504, "Unknown channel: " + channel); + } + return i->second; +} + Queue::shared_ptr SessionHandlerImpl::getQueue(const string& name, u_int16_t channel){ Queue::shared_ptr queue; if (name.empty()) { - queue = channels[channel]->getDefaultQueue(); + queue = getChannel(channel)->getDefaultQueue(); if (!queue) throw ConnectionException( 530, "Queue must be specified or previously declared" ); } else { queue = queues->find(name); @@ -143,11 +151,11 @@ void SessionHandlerImpl::closed(){ } void SessionHandlerImpl::handleHeader(u_int16_t channel, AMQHeaderBody::shared_ptr body){ - channels[channel]->handleHeader(body, exchanges); + getChannel(channel)->handleHeader(body, exchanges); } void SessionHandlerImpl::handleContent(u_int16_t channel, AMQContentBody::shared_ptr body){ - channels[channel]->handleContent(body, exchanges); + getChannel(channel)->handleContent(body, exchanges); } void SessionHandlerImpl::handleHeartbeat(AMQHeartbeatBody::shared_ptr body){ @@ -195,11 +203,13 @@ void SessionHandlerImpl::ChannelHandlerImpl::flowOk(u_int16_t channel, bool acti void SessionHandlerImpl::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t replyCode, string& replyText, u_int16_t classId, u_int16_t methodId){ - Channel* c = parent->channels[channel]; - parent->channels.erase(channel); - c->close(); - delete c; - parent->client.getChannel().closeOk(channel); + Channel* c = parent->getChannel(channel); + if(c){ + parent->channels.erase(channel); + c->close(); + delete c; + parent->client.getChannel().closeOk(channel); + } } void SessionHandlerImpl::ChannelHandlerImpl::closeOk(u_int16_t channel){} @@ -254,7 +264,7 @@ void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t queue = queue_created.first; assert(queue); if (queue_created.second) { // This is a new queue - parent->channels[channel]->setDefaultQueue(queue); + parent->getChannel(channel)->setDefaultQueue(queue); //add default binding: parent->exchanges->getDefault()->bind(queue, name, 0); if(exclusive){ @@ -322,8 +332,8 @@ void SessionHandlerImpl::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t void SessionHandlerImpl::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool global){ //TODO: handle global //TODO: channel doesn't do anything with these qos parameters yet - parent->channels[channel]->setPrefetchSize(prefetchSize); - parent->channels[channel]->setPrefetchCount(prefetchCount); + parent->getChannel(channel)->setPrefetchSize(prefetchSize); + parent->getChannel(channel)->setPrefetchCount(prefetchCount); parent->client.getBasic().qosOk(channel); } @@ -353,7 +363,7 @@ void SessionHandlerImpl::BasicHandlerImpl::consume(u_int16_t channelId, u_int16_ } void SessionHandlerImpl::BasicHandlerImpl::cancel(u_int16_t channel, string& consumerTag, bool nowait){ - parent->channels[channel]->cancel(consumerTag); + parent->getChannel(channel)->cancel(consumerTag); if(!nowait) parent->client.getBasic().cancelOk(channel, consumerTag); } @@ -362,7 +372,7 @@ void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t bool mandatory, bool immediate){ Message* msg = new Message(parent, exchange, routingKey, mandatory, immediate); - parent->channels[channel]->handlePublish(msg); + parent->getChannel(channel)->handlePublish(msg); } void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channel, u_int16_t ticket, string& queue, bool noAck){} |