diff options
-rw-r--r-- | cpp/broker/inc/SessionHandlerImpl.h | 3 | ||||
-rw-r--r-- | cpp/broker/src/Channel.cpp | 19 | ||||
-rw-r--r-- | cpp/broker/src/SessionHandlerImpl.cpp | 36 | ||||
-rw-r--r-- | python/tests/basic.py | 24 | ||||
-rw-r--r-- | python/tests/broker.py | 18 |
5 files changed, 79 insertions, 21 deletions
diff --git a/cpp/broker/inc/SessionHandlerImpl.h b/cpp/broker/inc/SessionHandlerImpl.h index 167bf0cc23..549f51f5a1 100644 --- a/cpp/broker/inc/SessionHandlerImpl.h +++ b/cpp/broker/inc/SessionHandlerImpl.h @@ -87,6 +87,7 @@ class SessionHandlerImpl : public virtual qpid::io::SessionHandler, void handleContent(u_int16_t channel, qpid::framing::AMQContentBody::shared_ptr body); void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body); + Channel* getChannel(u_int16_t channel); /** * Get named queue, never returns 0. * @return: named queue or default queue for channel if name="" @@ -96,7 +97,7 @@ class SessionHandlerImpl : public virtual qpid::io::SessionHandler, Queue::shared_ptr getQueue(const string& name, u_int16_t channel); Exchange* findExchange(const string& name); - + public: SessionHandlerImpl(qpid::io::SessionContext* context, QueueRegistry* queues, ExchangeRegistry* exchanges, AutoDelete* cleaner, const u_int32_t timeout); diff --git a/cpp/broker/src/Channel.cpp b/cpp/broker/src/Channel.cpp index 6980fe5a1b..f1f7d63a39 100644 --- a/cpp/broker/src/Channel.cpp +++ b/cpp/broker/src/Channel.cpp @@ -57,11 +57,14 @@ void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks, bool excl } void Channel::cancel(string& tag){ - ConsumerImpl* c = consumers[tag]; - if(c){ - c->cancel(); - consumers.erase(tag); - delete c; + consumer_iterator i = consumers.find(tag); + if(i != consumers.end()){ + ConsumerImpl* c = i->second; + consumers.erase(i); + if(c){ + c->cancel(); + delete c; + } } } @@ -69,9 +72,11 @@ void Channel::close(){ //cancel all consumers for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin() ){ ConsumerImpl* c = i->second; - c->cancel(); consumers.erase(i); - delete c; + if(c){ + c->cancel(); + delete c; + } } } diff --git a/cpp/broker/src/SessionHandlerImpl.cpp b/cpp/broker/src/SessionHandlerImpl.cpp index 872e6f124a..c72ad45d7c 100644 --- a/cpp/broker/src/SessionHandlerImpl.cpp +++ b/cpp/broker/src/SessionHandlerImpl.cpp @@ -55,10 +55,18 @@ SessionHandlerImpl::~SessionHandlerImpl(){ delete queueHandler; } +Channel* SessionHandlerImpl::getChannel(u_int16_t channel){ + channel_iterator i = channels.find(channel); + if(i == channels.end()){ + throw ConnectionException(504, "Unknown channel: " + channel); + } + return i->second; +} + Queue::shared_ptr SessionHandlerImpl::getQueue(const string& name, u_int16_t channel){ Queue::shared_ptr queue; if (name.empty()) { - queue = channels[channel]->getDefaultQueue(); + queue = getChannel(channel)->getDefaultQueue(); if (!queue) throw ConnectionException( 530, "Queue must be specified or previously declared" ); } else { queue = queues->find(name); @@ -143,11 +151,11 @@ void SessionHandlerImpl::closed(){ } void SessionHandlerImpl::handleHeader(u_int16_t channel, AMQHeaderBody::shared_ptr body){ - channels[channel]->handleHeader(body, exchanges); + getChannel(channel)->handleHeader(body, exchanges); } void SessionHandlerImpl::handleContent(u_int16_t channel, AMQContentBody::shared_ptr body){ - channels[channel]->handleContent(body, exchanges); + getChannel(channel)->handleContent(body, exchanges); } void SessionHandlerImpl::handleHeartbeat(AMQHeartbeatBody::shared_ptr body){ @@ -195,11 +203,13 @@ void SessionHandlerImpl::ChannelHandlerImpl::flowOk(u_int16_t channel, bool acti void SessionHandlerImpl::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t replyCode, string& replyText, u_int16_t classId, u_int16_t methodId){ - Channel* c = parent->channels[channel]; - parent->channels.erase(channel); - c->close(); - delete c; - parent->client.getChannel().closeOk(channel); + Channel* c = parent->getChannel(channel); + if(c){ + parent->channels.erase(channel); + c->close(); + delete c; + parent->client.getChannel().closeOk(channel); + } } void SessionHandlerImpl::ChannelHandlerImpl::closeOk(u_int16_t channel){} @@ -254,7 +264,7 @@ void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t queue = queue_created.first; assert(queue); if (queue_created.second) { // This is a new queue - parent->channels[channel]->setDefaultQueue(queue); + parent->getChannel(channel)->setDefaultQueue(queue); //add default binding: parent->exchanges->getDefault()->bind(queue, name, 0); if(exclusive){ @@ -322,8 +332,8 @@ void SessionHandlerImpl::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t void SessionHandlerImpl::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool global){ //TODO: handle global //TODO: channel doesn't do anything with these qos parameters yet - parent->channels[channel]->setPrefetchSize(prefetchSize); - parent->channels[channel]->setPrefetchCount(prefetchCount); + parent->getChannel(channel)->setPrefetchSize(prefetchSize); + parent->getChannel(channel)->setPrefetchCount(prefetchCount); parent->client.getBasic().qosOk(channel); } @@ -353,7 +363,7 @@ void SessionHandlerImpl::BasicHandlerImpl::consume(u_int16_t channelId, u_int16_ } void SessionHandlerImpl::BasicHandlerImpl::cancel(u_int16_t channel, string& consumerTag, bool nowait){ - parent->channels[channel]->cancel(consumerTag); + parent->getChannel(channel)->cancel(consumerTag); if(!nowait) parent->client.getBasic().cancelOk(channel, consumerTag); } @@ -362,7 +372,7 @@ void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t bool mandatory, bool immediate){ Message* msg = new Message(parent, exchange, routingKey, mandatory, immediate); - parent->channels[channel]->handlePublish(msg); + parent->getChannel(channel)->handlePublish(msg); } void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channel, u_int16_t ticket, string& queue, bool noAck){} diff --git a/python/tests/basic.py b/python/tests/basic.py index c3a2414eac..75d49ec805 100644 --- a/python/tests/basic.py +++ b/python/tests/basic.py @@ -113,3 +113,27 @@ class BasicTests(TestBase): except Closed, e: self.assertConnectionException(530, e.args[0]) + def test_basic_cancel(self): + """ + Test compliance of the basic.cancel method + """ + channel = self.channel + #setup, declare a queue: + channel.queue_declare(queue="test-queue-4", exclusive=True) + channel.basic_consume(consumer_tag="my-consumer", queue="test-queue-4") + channel.basic_publish(routing_key="test-queue-4", content=Content("One")) + + #cancel should stop messages being delivered + channel.basic_cancel(consumer_tag="my-consumer") + channel.basic_publish(routing_key="test-queue-4", content=Content("Two")) + myqueue = self.client.queue("my-consumer") + msg = myqueue.get(timeout=1) + self.assertEqual("One", msg.content.body) + try: + msg = myqueue.get(timeout=1) + self.fail("Got message after cancellation: " + msg) + except Empty: None + + #cancellation of non-existant consumers should be handled without error + channel.basic_cancel(consumer_tag="my-consumer") + channel.basic_cancel(consumer_tag="this-never-existed") diff --git a/python/tests/broker.py b/python/tests/broker.py index 1345076604..307d447a6c 100644 --- a/python/tests/broker.py +++ b/python/tests/broker.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # +from qpid.client import Closed from qpid.queue import Empty from qpid.content import Content from qpid.testlib import testrunner, TestBase @@ -82,3 +83,20 @@ class BrokerTests(TestBase): msg = queue.get(timeout=5) self.assert_(msg.content.body == body) + def test_invalid_channel(self): + other = self.connect() + channel = other.channel(200) + try: + channel.queue_declare(exclusive=True) + self.fail("Expected error on queue_declare for invalid channel") + except Closed, e: + self.assertConnectionException(504, e.args[0]) + + channel = self.client.channel(200) + channel.channel_open() + channel.channel_close() + try: + channel.queue_declare(exclusive=True) + self.fail("Expected error on queue_declare for closed channel") + except Closed, e: + self.assertConnectionException(504, e.args[0]) |