diff options
author | Robert Greig <rgreig@apache.org> | 2007-01-10 08:52:41 +0000 |
---|---|---|
committer | Robert Greig <rgreig@apache.org> | 2007-01-10 08:52:41 +0000 |
commit | a873c620e65e3060a5d29acce49f77b8213b4a06 (patch) | |
tree | 7dcb16cfe40dfbaa1587fad6e83a30fd1653851d | |
parent | f0675254ce88655bd97c7e8e4c214754b5dcd674 (diff) | |
download | qpid-python-a873c620e65e3060a5d29acce49f77b8213b4a06.tar.gz |
QPID-275 : (Patch supplied by Rob Godfrey) Fixes to allow broker to pass more of the Python tests
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@494769 13f79535-47bb-0310-9956-ffa450edef68
12 files changed, 185 insertions, 87 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java index 0cb1d8bee8..cee6ef7c98 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java @@ -71,49 +71,78 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic if (queue == null) { _log.info("No queue for '" + body.queue + "'"); - } - try - { - AMQShortString consumerTag = channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck, - body.arguments, body.noLocal); - if (!body.nowait) + if(body.queue!=null) { + AMQShortString msg = new AMQShortString("No such queue, '" + body.queue + "'"); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - session.writeFrame(BasicConsumeOkBody.createAMQFrame(channelId, + session.writeFrame(ChannelCloseBody.createAMQFrame(channelId, (byte)8, (byte)0, // AMQP version (major, minor) - consumerTag)); // consumerTag + BasicConsumeBody.getClazz((byte)8, (byte)0), // classId + BasicConsumeBody.getMethod((byte)8, (byte)0), // methodId + AMQConstant.NOT_FOUND.getCode(), // replyCode + msg)); // replyText + } + else + { + AMQShortString msg = new AMQShortString("No queue name provided, no default queue defined."); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId, + (byte)8, (byte)0, // AMQP version (major, minor) + BasicConsumeBody.getClazz((byte)8, (byte)0), // classId + BasicConsumeBody.getMethod((byte)8, (byte)0), // methodId + AMQConstant.NOT_ALLOWED.getCode(), // replyCode + msg)); // replyText } - - //now allow queue to start async processing of any backlog of messages - queue.deliverAsync(); - } - catch (AMQInvalidSelectorException ise) - { - _log.info("Closing connection due to invalid selector"); - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - session.writeFrame(ChannelCloseBody.createAMQFrame(channelId, - (byte)8, (byte)0, // AMQP version (major, minor) - BasicConsumeBody.getClazz((byte)8, (byte)0), // classId - BasicConsumeBody.getMethod((byte)8, (byte)0), // methodId - AMQConstant.INVALID_SELECTOR.getCode(), // replyCode - new AMQShortString(ise.getMessage()))); // replyText } - catch (ConsumerTagNotUniqueException e) + else { - AMQShortString msg = new AMQShortString("Non-unique consumer tag, '" + body.consumerTag + "'"); - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId, - (byte)8, (byte)0, // AMQP version (major, minor) - BasicConsumeBody.getClazz((byte)8, (byte)0), // classId - BasicConsumeBody.getMethod((byte)8, (byte)0), // methodId - AMQConstant.NOT_ALLOWED.getCode(), // replyCode - msg)); // replyText + try + { + AMQShortString consumerTag = channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck, + body.arguments, body.noLocal); + if (!body.nowait) + { + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + session.writeFrame(BasicConsumeOkBody.createAMQFrame(channelId, + (byte)8, (byte)0, // AMQP version (major, minor) + consumerTag)); // consumerTag + } + + //now allow queue to start async processing of any backlog of messages + queue.deliverAsync(); + } + catch (AMQInvalidSelectorException ise) + { + _log.info("Closing connection due to invalid selector"); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + session.writeFrame(ChannelCloseBody.createAMQFrame(channelId, + (byte)8, (byte)0, // AMQP version (major, minor) + BasicConsumeBody.getClazz((byte)8, (byte)0), // classId + BasicConsumeBody.getMethod((byte)8, (byte)0), // methodId + AMQConstant.INVALID_SELECTOR.getCode(), // replyCode + new AMQShortString(ise.getMessage()))); // replyText + } + catch (ConsumerTagNotUniqueException e) + { + AMQShortString msg = new AMQShortString("Non-unique consumer tag, '" + body.consumerTag + "'"); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId, + (byte)8, (byte)0, // AMQP version (major, minor) + BasicConsumeBody.getClazz((byte)8, (byte)0), // classId + BasicConsumeBody.getMethod((byte)8, (byte)0), // methodId + AMQConstant.NOT_ALLOWED.getCode(), // replyCode + msg)); // replyText + } } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java index 7937a9bb2d..4053364778 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java @@ -22,6 +22,10 @@ package org.apache.qpid.server.handler; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQChannelException; +import org.apache.qpid.AMQConnectionException; +import org.apache.qpid.AMQUnknownExchangeType; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.ExchangeDeclareBody; import org.apache.qpid.framing.ExchangeDeclareOkBody; @@ -66,12 +70,35 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange { Exchange exchange = exchangeRegistry.getExchange(body.exchange); + + if (exchange == null) { - exchange = exchangeFactory.createExchange(body.exchange, body.type, body.durable, - body.passive, body.ticket); - exchangeRegistry.registerExchange(exchange); + if(body.passive && ((body.type == null) || body.type.length() ==0)) + { + throw new AMQChannelException(AMQConstant.NOT_FOUND.getCode(), "Unknown exchange: " + body.exchange,body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor()); + } + else + { + try + { + + exchange = exchangeFactory.createExchange(body.exchange, body.type, body.durable, + body.passive, body.ticket); + exchangeRegistry.registerExchange(exchange); + } + catch(AMQUnknownExchangeType e) + { + throw new AMQConnectionException(AMQConstant.COMMAND_INVALID.getCode(), "Unknown exchange: " + body.exchange,body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor(),e); + } + } } + else if (!exchange.getType().equals(body.type)) + { + + throw new AMQConnectionException(AMQConstant.NOT_ALLOWED.getCode(), "Attempt to redeclare exchange: " + body.exchange + " of type " + exchange.getType() + " to " + body.type +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor()); + } + } if(!body.nowait) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java index 6ff7700a13..7a73fbb18a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java @@ -22,12 +22,11 @@ package org.apache.qpid.server.handler; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQChannelException; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.configuration.Configured; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.QueueDeclareBody; -import org.apache.qpid.framing.QueueDeclareOkBody; -import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.*; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.protocol.AMQMethodEvent; @@ -83,20 +82,34 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar synchronized (queueRegistry) { AMQQueue queue; - if ((queue = queueRegistry.getQueue(body.queue)) == null) + if (((queue = queueRegistry.getQueue(body.queue)) == null) ) { - queue = createQueue(body, queueRegistry, protocolSession); - if (queue.isDurable() && !queue.isAutoDelete()) + if(body.passive) { - _store.createQueue(queue); + String msg = "Queue: " + body.queue + " not found."; + throw new AMQChannelException(AMQConstant.NOT_FOUND.getCode(), + msg, + body.getClazz(), + body.getMethod(), + (byte)8, + (byte)0 ); + } - queueRegistry.registerQueue(queue); - if (autoRegister) + else { - Exchange defaultExchange = exchangeRegistry.getExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME); - defaultExchange.registerQueue(body.queue, queue, null); - queue.bind(body.queue, defaultExchange); - _log.info("Queue " + body.queue + " bound to default exchange"); + queue = createQueue(body, queueRegistry, protocolSession); + if (queue.isDurable() && !queue.isAutoDelete()) + { + _store.createQueue(queue); + } + queueRegistry.registerQueue(queue); + if (autoRegister) + { + Exchange defaultExchange = exchangeRegistry.getExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME); + defaultExchange.registerQueue(body.queue, queue, null); + queue.bind(body.queue, defaultExchange); + _log.info("Queue " + body.queue + " bound to default exchange"); + } } } //set this as the default queue on the channel: diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java index 5437561095..bbe1464bdc 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java @@ -31,7 +31,10 @@ import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.framing.QueueDeleteBody; import org.apache.qpid.framing.QueueDeleteOkBody; +import org.apache.qpid.framing.ChannelCloseBody; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQConstant; public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteBody> { @@ -79,14 +82,30 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDelete } else { - int purged = queue.delete(body.ifUnused, body.ifEmpty); - _store.removeQueue(queue.getName().toString()); - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - session.writeFrame(QueueDeleteOkBody.createAMQFrame(evt.getChannelId(), - (byte)8, (byte)0, // AMQP version (major, minor) - purged)); // messageCount + if(body.ifEmpty && !queue.isEmpty()) + { + AMQShortString msg = new AMQShortString("Queue: " + body.queue + " is not empty."); + // TODO - Error code + session.writeFrame(ChannelCloseBody.createAMQFrame(evt.getChannelId(),(byte)8, (byte)0, body.getClazz(), body.getMethod(), 406, msg )); + } + else if(body.ifUnused && !queue.isUnused()) + { + AMQShortString msg = new AMQShortString("Queue: " + body.queue + " is still used."); + // TODO - Error code + session.writeFrame(ChannelCloseBody.createAMQFrame(evt.getChannelId(),(byte)8, (byte)0, body.getClazz(), body.getMethod(), 406, msg )); + + } + else + { + int purged = queue.delete(body.ifUnused, body.ifEmpty); + _store.removeQueue(queue.getName().toString()); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + session.writeFrame(QueueDeleteOkBody.createAMQFrame(evt.getChannelId(), + (byte)8, (byte)0, // AMQP version (major, minor) + purged)); // messageCount + } } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index f86d8afe02..596631b3a8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -26,6 +26,7 @@ import org.apache.mina.common.IoSession; import org.apache.mina.transport.vmpipe.VmPipeAddress; import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQConnectionException; import org.apache.qpid.framing.*; import org.apache.qpid.codec.AMQCodecFactory; import org.apache.qpid.codec.AMQDecoder; @@ -107,7 +108,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, _stateManager = stateManager; _minaProtocolSession = session; session.setAttachment(this); - _frameListeners.add(_stateManager); + _queueRegistry = queueRegistry; _exchangeRegistry = exchangeRegistry; _codecFactory = codecFactory; @@ -206,11 +207,15 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, (AMQMethodBody) frame.bodyFrame); try { - boolean wasAnyoneInterested = false; - for (AMQMethodListener listener : _frameListeners) + boolean wasAnyoneInterested = _stateManager.methodReceived(evt, this, _queueRegistry, _exchangeRegistry); + + if(!_frameListeners.isEmpty()) { - wasAnyoneInterested = listener.methodReceived(evt, this, _queueRegistry, _exchangeRegistry) || - wasAnyoneInterested; + for (AMQMethodListener listener : _frameListeners) + { + wasAnyoneInterested = listener.methodReceived(evt, this, _queueRegistry, _exchangeRegistry) || + wasAnyoneInterested; + } } if (!wasAnyoneInterested) { @@ -222,8 +227,14 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, _logger.error("Closing channel due to: " + e.getMessage()); writeFrame(e.getCloseFrame(frame.channel)); } + catch (AMQConnectionException e) + { + _logger.error("Closing connection due to: " + e.getMessage()); + writeFrame(e.getCloseFrame(frame.channel)); + } catch (AMQException e) { + _stateManager.error(e); for (AMQMethodListener listener : _frameListeners) { listener.error(e); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index ea09654988..368cb979e8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -410,6 +410,17 @@ public class AMQQueue implements Managable, Comparable } } + public boolean isUnused() + { + return _subscribers.isEmpty(); + } + + public boolean isEmpty() + { + return !_deliveryMgr.hasQueuedMessages(); + } + + public int delete(boolean checkUnused, boolean checkEmpty) throws AMQException { if (checkUnused && !_subscribers.isEmpty()) diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java index 2f742952c9..656549e025 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java @@ -63,7 +63,7 @@ class ExchangeBindings public int hashCode() { - return exchange.hashCode() + routingKey.hashCode(); + return (exchange == null ? 0 : exchange.hashCode()) + (routingKey == null ? 0 : routingKey.hashCode()); } public boolean equals(Object o) diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index 0afe17c6ca..e2356faaf5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -301,8 +301,10 @@ public class SubscriptionImpl implements Subscription if (_noLocal) { // We don't want local messages so check to see if message is one we sent - if (protocolSession.getClientProperties().getObject(ClientProperties.instance.toString()).equals( - msg.getPublisher().getClientProperties().getObject(ClientProperties.instance.toString()))) + Object localInstance = protocolSession.getClientProperties().getObject(ClientProperties.instance.toString()); + Object msgInstance = msg.getPublisher().getClientProperties().getObject(ClientProperties.instance.toString()); + + if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance))) { if (_logger.isTraceEnabled()) { diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java index a1763ddc73..234b4c8a67 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java @@ -55,7 +55,7 @@ public class RecoverTest extends TestCase Queue queue = new AMQQueue(new AMQShortString("someQ"), new AMQShortString("someQ"), false, true); MessageConsumer consumer = consumerSession.createConsumer(queue); //force synch to ensure the consumer has resulted in a bound queue - ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_NAME); + ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test"); Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); @@ -112,7 +112,7 @@ public class RecoverTest extends TestCase Queue queue = new AMQQueue(new AMQShortString("someQ"), new AMQShortString("someQ"), false, true); MessageConsumer consumer = consumerSession.createConsumer(queue); //force synch to ensure the consumer has resulted in a bound queue - ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_NAME); + ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test"); Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java index 0b9bf56875..523a24f278 100644 --- a/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java +++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java @@ -94,7 +94,7 @@ public final class AMQConstant public static final AMQConstant RESOURCE_ERROR = new AMQConstant(506, "resource error", true); - public static final AMQConstant NOT_ALLOWED = new AMQConstant(507, "not allowed", true); + public static final AMQConstant NOT_ALLOWED = new AMQConstant(530, "not allowed", true); public static final AMQConstant NOT_IMPLEMENTED = new AMQConstant(540, "not implemented", true); diff --git a/python/java_failing.txt b/python/java_failing.txt index a752bc53c0..a564fe0148 100644 --- a/python/java_failing.txt +++ b/python/java_failing.txt @@ -1,29 +1,15 @@ -tests.basic.BasicTests.test_cancel tests.basic.BasicTests.test_consume_exclusive tests.basic.BasicTests.test_consume_no_local -tests.basic.BasicTests.test_consume_queue_errors -tests.basic.BasicTests.test_consume_unique_consumers tests.basic.BasicTests.test_get tests.basic.BasicTests.test_qos_prefetch_size tests.basic.BasicTests.test_recover_requeue -tests.exchange.ExchangeTests tests.exchange.DefaultExchangeRuleTests.testDefaultExchange tests.exchange.HeadersExchangeTests.testMatchAll tests.exchange.HeadersExchangeTests.testMatchAny -tests.exchange.RecommendedTypesRuleTests.testDirect -tests.exchange.RecommendedTypesRuleTests.testFanout -tests.exchange.RecommendedTypesRuleTests.testHeaders tests.exchange.RecommendedTypesRuleTests.testTopic -tests.exchange.RequiredInstancesRuleTests.testAmqDirect -tests.exchange.RequiredInstancesRuleTests.testAmqFanOut tests.exchange.RequiredInstancesRuleTests.testAmqMatch tests.exchange.RequiredInstancesRuleTests.testAmqTopic tests.queue.QueueTests.test_declare_exclusive -tests.queue.QueueTests.test_declare_passive -tests.queue.QueueTests.test_delete_ifempty -tests.queue.QueueTests.test_delete_ifunused -tests.queue.QueueTests.test_delete_simple tests.queue.QueueTests.test_purge -tests.queue.QueueTests.test_bind tests.testlib.TestBaseTest.testMessageProperties tests.broker.BrokerTests.test_invalid_channel diff --git a/python/tests/exchange.py b/python/tests/exchange.py index 121b47097d..56d6fa82e4 100644 --- a/python/tests/exchange.py +++ b/python/tests/exchange.py @@ -316,9 +316,9 @@ class MiscellaneousErrorsTests(TestBase): self.channel.exchange_declare(exchange="test_different_declared_type_exchange", type="direct") try: self.channel.exchange_declare(exchange="test_different_declared_type_exchange", type="topic") - self.fail("Expected 507 for redeclaration of exchange with different type.") + self.fail("Expected 530 for redeclaration of exchange with different type.") except Closed, e: - self.assertConnectionException(507, e.args[0]) + self.assertConnectionException(530, e.args[0]) #cleanup other = self.connect() c2 = other.channel(1) |