summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2007-01-10 08:52:41 +0000
committerRobert Greig <rgreig@apache.org>2007-01-10 08:52:41 +0000
commita873c620e65e3060a5d29acce49f77b8213b4a06 (patch)
tree7dcb16cfe40dfbaa1587fad6e83a30fd1653851d
parentf0675254ce88655bd97c7e8e4c214754b5dcd674 (diff)
downloadqpid-python-a873c620e65e3060a5d29acce49f77b8213b4a06.tar.gz
QPID-275 : (Patch supplied by Rob Godfrey) Fixes to allow broker to pass more of the Python tests
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@494769 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java99
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java33
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java41
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java35
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java21
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java11
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java6
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java2
-rw-r--r--python/java_failing.txt14
-rw-r--r--python/tests/exchange.py4
12 files changed, 185 insertions, 87 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
index 0cb1d8bee8..cee6ef7c98 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
@@ -71,49 +71,78 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
if (queue == null)
{
_log.info("No queue for '" + body.queue + "'");
- }
- try
- {
- AMQShortString consumerTag = channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck,
- body.arguments, body.noLocal);
- if (!body.nowait)
+ if(body.queue!=null)
{
+ AMQShortString msg = new AMQShortString("No such queue, '" + body.queue + "'");
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- session.writeFrame(BasicConsumeOkBody.createAMQFrame(channelId,
+ session.writeFrame(ChannelCloseBody.createAMQFrame(channelId,
(byte)8, (byte)0, // AMQP version (major, minor)
- consumerTag)); // consumerTag
+ BasicConsumeBody.getClazz((byte)8, (byte)0), // classId
+ BasicConsumeBody.getMethod((byte)8, (byte)0), // methodId
+ AMQConstant.NOT_FOUND.getCode(), // replyCode
+ msg)); // replyText
+ }
+ else
+ {
+ AMQShortString msg = new AMQShortString("No queue name provided, no default queue defined.");
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ BasicConsumeBody.getClazz((byte)8, (byte)0), // classId
+ BasicConsumeBody.getMethod((byte)8, (byte)0), // methodId
+ AMQConstant.NOT_ALLOWED.getCode(), // replyCode
+ msg)); // replyText
}
-
- //now allow queue to start async processing of any backlog of messages
- queue.deliverAsync();
- }
- catch (AMQInvalidSelectorException ise)
- {
- _log.info("Closing connection due to invalid selector");
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- session.writeFrame(ChannelCloseBody.createAMQFrame(channelId,
- (byte)8, (byte)0, // AMQP version (major, minor)
- BasicConsumeBody.getClazz((byte)8, (byte)0), // classId
- BasicConsumeBody.getMethod((byte)8, (byte)0), // methodId
- AMQConstant.INVALID_SELECTOR.getCode(), // replyCode
- new AMQShortString(ise.getMessage()))); // replyText
}
- catch (ConsumerTagNotUniqueException e)
+ else
{
- AMQShortString msg = new AMQShortString("Non-unique consumer tag, '" + body.consumerTag + "'");
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId,
- (byte)8, (byte)0, // AMQP version (major, minor)
- BasicConsumeBody.getClazz((byte)8, (byte)0), // classId
- BasicConsumeBody.getMethod((byte)8, (byte)0), // methodId
- AMQConstant.NOT_ALLOWED.getCode(), // replyCode
- msg)); // replyText
+ try
+ {
+ AMQShortString consumerTag = channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck,
+ body.arguments, body.noLocal);
+ if (!body.nowait)
+ {
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ session.writeFrame(BasicConsumeOkBody.createAMQFrame(channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ consumerTag)); // consumerTag
+ }
+
+ //now allow queue to start async processing of any backlog of messages
+ queue.deliverAsync();
+ }
+ catch (AMQInvalidSelectorException ise)
+ {
+ _log.info("Closing connection due to invalid selector");
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ session.writeFrame(ChannelCloseBody.createAMQFrame(channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ BasicConsumeBody.getClazz((byte)8, (byte)0), // classId
+ BasicConsumeBody.getMethod((byte)8, (byte)0), // methodId
+ AMQConstant.INVALID_SELECTOR.getCode(), // replyCode
+ new AMQShortString(ise.getMessage()))); // replyText
+ }
+ catch (ConsumerTagNotUniqueException e)
+ {
+ AMQShortString msg = new AMQShortString("Non-unique consumer tag, '" + body.consumerTag + "'");
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ BasicConsumeBody.getClazz((byte)8, (byte)0), // classId
+ BasicConsumeBody.getMethod((byte)8, (byte)0), // methodId
+ AMQConstant.NOT_ALLOWED.getCode(), // replyCode
+ msg)); // replyText
+ }
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
index 7937a9bb2d..4053364778 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
@@ -22,6 +22,10 @@ package org.apache.qpid.server.handler;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQChannelException;
+import org.apache.qpid.AMQConnectionException;
+import org.apache.qpid.AMQUnknownExchangeType;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.ExchangeDeclareBody;
import org.apache.qpid.framing.ExchangeDeclareOkBody;
@@ -66,12 +70,35 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange
{
Exchange exchange = exchangeRegistry.getExchange(body.exchange);
+
+
if (exchange == null)
{
- exchange = exchangeFactory.createExchange(body.exchange, body.type, body.durable,
- body.passive, body.ticket);
- exchangeRegistry.registerExchange(exchange);
+ if(body.passive && ((body.type == null) || body.type.length() ==0))
+ {
+ throw new AMQChannelException(AMQConstant.NOT_FOUND.getCode(), "Unknown exchange: " + body.exchange,body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor());
+ }
+ else
+ {
+ try
+ {
+
+ exchange = exchangeFactory.createExchange(body.exchange, body.type, body.durable,
+ body.passive, body.ticket);
+ exchangeRegistry.registerExchange(exchange);
+ }
+ catch(AMQUnknownExchangeType e)
+ {
+ throw new AMQConnectionException(AMQConstant.COMMAND_INVALID.getCode(), "Unknown exchange: " + body.exchange,body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor(),e);
+ }
+ }
}
+ else if (!exchange.getType().equals(body.type))
+ {
+
+ throw new AMQConnectionException(AMQConstant.NOT_ALLOWED.getCode(), "Attempt to redeclare exchange: " + body.exchange + " of type " + exchange.getType() + " to " + body.type +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor());
+ }
+
}
if(!body.nowait)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
index 6ff7700a13..7a73fbb18a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
@@ -22,12 +22,11 @@ package org.apache.qpid.server.handler;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQChannelException;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.configuration.Configured;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.QueueDeclareBody;
-import org.apache.qpid.framing.QueueDeclareOkBody;
-import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.*;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.protocol.AMQMethodEvent;
@@ -83,20 +82,34 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
synchronized (queueRegistry)
{
AMQQueue queue;
- if ((queue = queueRegistry.getQueue(body.queue)) == null)
+ if (((queue = queueRegistry.getQueue(body.queue)) == null) )
{
- queue = createQueue(body, queueRegistry, protocolSession);
- if (queue.isDurable() && !queue.isAutoDelete())
+ if(body.passive)
{
- _store.createQueue(queue);
+ String msg = "Queue: " + body.queue + " not found.";
+ throw new AMQChannelException(AMQConstant.NOT_FOUND.getCode(),
+ msg,
+ body.getClazz(),
+ body.getMethod(),
+ (byte)8,
+ (byte)0 );
+
}
- queueRegistry.registerQueue(queue);
- if (autoRegister)
+ else
{
- Exchange defaultExchange = exchangeRegistry.getExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME);
- defaultExchange.registerQueue(body.queue, queue, null);
- queue.bind(body.queue, defaultExchange);
- _log.info("Queue " + body.queue + " bound to default exchange");
+ queue = createQueue(body, queueRegistry, protocolSession);
+ if (queue.isDurable() && !queue.isAutoDelete())
+ {
+ _store.createQueue(queue);
+ }
+ queueRegistry.registerQueue(queue);
+ if (autoRegister)
+ {
+ Exchange defaultExchange = exchangeRegistry.getExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME);
+ defaultExchange.registerQueue(body.queue, queue, null);
+ queue.bind(body.queue, defaultExchange);
+ _log.info("Queue " + body.queue + " bound to default exchange");
+ }
}
}
//set this as the default queue on the channel:
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
index 5437561095..bbe1464bdc 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
@@ -31,7 +31,10 @@ import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.framing.QueueDeleteBody;
import org.apache.qpid.framing.QueueDeleteOkBody;
+import org.apache.qpid.framing.ChannelCloseBody;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteBody>
{
@@ -79,14 +82,30 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDelete
}
else
{
- int purged = queue.delete(body.ifUnused, body.ifEmpty);
- _store.removeQueue(queue.getName().toString());
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- session.writeFrame(QueueDeleteOkBody.createAMQFrame(evt.getChannelId(),
- (byte)8, (byte)0, // AMQP version (major, minor)
- purged)); // messageCount
+ if(body.ifEmpty && !queue.isEmpty())
+ {
+ AMQShortString msg = new AMQShortString("Queue: " + body.queue + " is not empty.");
+ // TODO - Error code
+ session.writeFrame(ChannelCloseBody.createAMQFrame(evt.getChannelId(),(byte)8, (byte)0, body.getClazz(), body.getMethod(), 406, msg ));
+ }
+ else if(body.ifUnused && !queue.isUnused())
+ {
+ AMQShortString msg = new AMQShortString("Queue: " + body.queue + " is still used.");
+ // TODO - Error code
+ session.writeFrame(ChannelCloseBody.createAMQFrame(evt.getChannelId(),(byte)8, (byte)0, body.getClazz(), body.getMethod(), 406, msg ));
+
+ }
+ else
+ {
+ int purged = queue.delete(body.ifUnused, body.ifEmpty);
+ _store.removeQueue(queue.getName().toString());
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ session.writeFrame(QueueDeleteOkBody.createAMQFrame(evt.getChannelId(),
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ purged)); // messageCount
+ }
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index f86d8afe02..596631b3a8 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -26,6 +26,7 @@ import org.apache.mina.common.IoSession;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.framing.*;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.codec.AMQDecoder;
@@ -107,7 +108,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
_stateManager = stateManager;
_minaProtocolSession = session;
session.setAttachment(this);
- _frameListeners.add(_stateManager);
+
_queueRegistry = queueRegistry;
_exchangeRegistry = exchangeRegistry;
_codecFactory = codecFactory;
@@ -206,11 +207,15 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
(AMQMethodBody) frame.bodyFrame);
try
{
- boolean wasAnyoneInterested = false;
- for (AMQMethodListener listener : _frameListeners)
+ boolean wasAnyoneInterested = _stateManager.methodReceived(evt, this, _queueRegistry, _exchangeRegistry);
+
+ if(!_frameListeners.isEmpty())
{
- wasAnyoneInterested = listener.methodReceived(evt, this, _queueRegistry, _exchangeRegistry) ||
- wasAnyoneInterested;
+ for (AMQMethodListener listener : _frameListeners)
+ {
+ wasAnyoneInterested = listener.methodReceived(evt, this, _queueRegistry, _exchangeRegistry) ||
+ wasAnyoneInterested;
+ }
}
if (!wasAnyoneInterested)
{
@@ -222,8 +227,14 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
_logger.error("Closing channel due to: " + e.getMessage());
writeFrame(e.getCloseFrame(frame.channel));
}
+ catch (AMQConnectionException e)
+ {
+ _logger.error("Closing connection due to: " + e.getMessage());
+ writeFrame(e.getCloseFrame(frame.channel));
+ }
catch (AMQException e)
{
+ _stateManager.error(e);
for (AMQMethodListener listener : _frameListeners)
{
listener.error(e);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index ea09654988..368cb979e8 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -410,6 +410,17 @@ public class AMQQueue implements Managable, Comparable
}
}
+ public boolean isUnused()
+ {
+ return _subscribers.isEmpty();
+ }
+
+ public boolean isEmpty()
+ {
+ return !_deliveryMgr.hasQueuedMessages();
+ }
+
+
public int delete(boolean checkUnused, boolean checkEmpty) throws AMQException
{
if (checkUnused && !_subscribers.isEmpty())
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java
index 2f742952c9..656549e025 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java
@@ -63,7 +63,7 @@ class ExchangeBindings
public int hashCode()
{
- return exchange.hashCode() + routingKey.hashCode();
+ return (exchange == null ? 0 : exchange.hashCode()) + (routingKey == null ? 0 : routingKey.hashCode());
}
public boolean equals(Object o)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index 0afe17c6ca..e2356faaf5 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -301,8 +301,10 @@ public class SubscriptionImpl implements Subscription
if (_noLocal)
{
// We don't want local messages so check to see if message is one we sent
- if (protocolSession.getClientProperties().getObject(ClientProperties.instance.toString()).equals(
- msg.getPublisher().getClientProperties().getObject(ClientProperties.instance.toString())))
+ Object localInstance = protocolSession.getClientProperties().getObject(ClientProperties.instance.toString());
+ Object msgInstance = msg.getPublisher().getClientProperties().getObject(ClientProperties.instance.toString());
+
+ if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance)))
{
if (_logger.isTraceEnabled())
{
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
index a1763ddc73..234b4c8a67 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
@@ -55,7 +55,7 @@ public class RecoverTest extends TestCase
Queue queue = new AMQQueue(new AMQShortString("someQ"), new AMQShortString("someQ"), false, true);
MessageConsumer consumer = consumerSession.createConsumer(queue);
//force synch to ensure the consumer has resulted in a bound queue
- ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_NAME);
+ ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
@@ -112,7 +112,7 @@ public class RecoverTest extends TestCase
Queue queue = new AMQQueue(new AMQShortString("someQ"), new AMQShortString("someQ"), false, true);
MessageConsumer consumer = consumerSession.createConsumer(queue);
//force synch to ensure the consumer has resulted in a bound queue
- ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_NAME);
+ ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
index 0b9bf56875..523a24f278 100644
--- a/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
+++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
@@ -94,7 +94,7 @@ public final class AMQConstant
public static final AMQConstant RESOURCE_ERROR = new AMQConstant(506, "resource error", true);
- public static final AMQConstant NOT_ALLOWED = new AMQConstant(507, "not allowed", true);
+ public static final AMQConstant NOT_ALLOWED = new AMQConstant(530, "not allowed", true);
public static final AMQConstant NOT_IMPLEMENTED = new AMQConstant(540, "not implemented", true);
diff --git a/python/java_failing.txt b/python/java_failing.txt
index a752bc53c0..a564fe0148 100644
--- a/python/java_failing.txt
+++ b/python/java_failing.txt
@@ -1,29 +1,15 @@
-tests.basic.BasicTests.test_cancel
tests.basic.BasicTests.test_consume_exclusive
tests.basic.BasicTests.test_consume_no_local
-tests.basic.BasicTests.test_consume_queue_errors
-tests.basic.BasicTests.test_consume_unique_consumers
tests.basic.BasicTests.test_get
tests.basic.BasicTests.test_qos_prefetch_size
tests.basic.BasicTests.test_recover_requeue
-tests.exchange.ExchangeTests
tests.exchange.DefaultExchangeRuleTests.testDefaultExchange
tests.exchange.HeadersExchangeTests.testMatchAll
tests.exchange.HeadersExchangeTests.testMatchAny
-tests.exchange.RecommendedTypesRuleTests.testDirect
-tests.exchange.RecommendedTypesRuleTests.testFanout
-tests.exchange.RecommendedTypesRuleTests.testHeaders
tests.exchange.RecommendedTypesRuleTests.testTopic
-tests.exchange.RequiredInstancesRuleTests.testAmqDirect
-tests.exchange.RequiredInstancesRuleTests.testAmqFanOut
tests.exchange.RequiredInstancesRuleTests.testAmqMatch
tests.exchange.RequiredInstancesRuleTests.testAmqTopic
tests.queue.QueueTests.test_declare_exclusive
-tests.queue.QueueTests.test_declare_passive
-tests.queue.QueueTests.test_delete_ifempty
-tests.queue.QueueTests.test_delete_ifunused
-tests.queue.QueueTests.test_delete_simple
tests.queue.QueueTests.test_purge
-tests.queue.QueueTests.test_bind
tests.testlib.TestBaseTest.testMessageProperties
tests.broker.BrokerTests.test_invalid_channel
diff --git a/python/tests/exchange.py b/python/tests/exchange.py
index 121b47097d..56d6fa82e4 100644
--- a/python/tests/exchange.py
+++ b/python/tests/exchange.py
@@ -316,9 +316,9 @@ class MiscellaneousErrorsTests(TestBase):
self.channel.exchange_declare(exchange="test_different_declared_type_exchange", type="direct")
try:
self.channel.exchange_declare(exchange="test_different_declared_type_exchange", type="topic")
- self.fail("Expected 507 for redeclaration of exchange with different type.")
+ self.fail("Expected 530 for redeclaration of exchange with different type.")
except Closed, e:
- self.assertConnectionException(507, e.args[0])
+ self.assertConnectionException(530, e.args[0])
#cleanup
other = self.connect()
c2 = other.channel(1)