summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java12
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java17
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java10
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java13
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java35
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java18
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java18
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java20
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java12
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java17
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java15
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java46
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java3
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java3
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQNoConsumersException.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQNoRouteException.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java20
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/UnexpectedBodyReceivedException.java3
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java3
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/AMQChannelClosedException.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/AMQChannelException.java7
-rw-r--r--java/common/src/main/java/org/apache/qpid/AMQConnectionClosedException.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/AMQConnectionException.java7
-rw-r--r--java/common/src/main/java/org/apache/qpid/AMQException.java15
-rw-r--r--java/common/src/main/java/org/apache/qpid/AMQInvalidRoutingKeyException.java31
-rw-r--r--java/common/src/main/java/org/apache/qpid/AMQInvalidSelectorException.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/AMQUndeliveredException.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/configuration/PropertyException.java7
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java58
-rw-r--r--java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java8
43 files changed, 325 insertions, 150 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 8b36576a30..0879b77f37 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -602,7 +602,7 @@ public class AMQChannel
for (RequiredDeliveryException bouncedMessage : _returnMessages)
{
AMQMessage message = bouncedMessage.getAMQMessage();
- message.writeReturn(session, _channelId, bouncedMessage.getReplyCode(), new AMQShortString(bouncedMessage.getMessage()));
+ message.writeReturn(session, _channelId, bouncedMessage.getReplyCode().getCode(), new AMQShortString(bouncedMessage.getMessage()));
}
_returnMessages.clear();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
index b85e3603b7..820f0122f5 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server;
import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.queue.AMQMessage;
/**
@@ -44,10 +45,10 @@ public abstract class RequiredDeliveryException extends AMQException
return _amqMessage;
}
- public int getErrorCode()
+ public AMQConstant getErrorCode()
{
return getReplyCode();
}
- public abstract int getReplyCode();
+ public abstract AMQConstant getReplyCode();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java
index 6688318a0a..f93b2b25e6 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java
@@ -47,13 +47,19 @@ public class BasicAckMethodHandler implements StateAwareMethodListener<BasicAckB
public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicAckBody> evt) throws AMQException
{
AMQProtocolSession protocolSession = stateManager.getProtocolSession();
-
+
if (_log.isDebugEnabled())
{
_log.debug("Ack received on channel " + evt.getChannelId());
}
BasicAckBody body = evt.getMethod();
final AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(evt.getChannelId());
+ }
+
// this method throws an AMQException if the delivery tag is not known
channel.acknowledgeMessage(body.deliveryTag, body.multiple);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java
index 1e56542b2b..7d18043f5c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java
@@ -49,15 +49,21 @@ public class BasicCancelMethodHandler implements StateAwareMethodListener<BasicC
final AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
final BasicCancelBody body = evt.getMethod();
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(evt.getChannelId());
+ }
+
channel.unsubscribeConsumer(protocolSession, body.consumerTag);
- if(!body.nowait)
+ if (!body.nowait)
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
final AMQFrame responseFrame = BasicCancelOkBody.createAMQFrame(evt.getChannelId(),
- (byte)8, (byte)0, // AMQP version (major, minor)
- body.consumerTag); // consumerTag
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
+ body.consumerTag); // consumerTag
protocolSession.writeFrame(responseFrame);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
index feb6f6b1fa..090988d304 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
@@ -61,11 +61,12 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
final int channelId = evt.getChannelId();
AMQChannel channel = session.getChannel(channelId);
+
VirtualHost vHost = session.getVirtualHost();
+
if (channel == null)
{
- _log.error("Channel " + channelId + " not found");
- // TODO: either alert or error that the
+ throw body.getChannelNotFoundException(evt.getChannelId());
}
else
{
@@ -78,12 +79,12 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
if (body.queue != null)
{
String msg = "No such queue, '" + body.queue + "'";
- throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(), msg);
+ throw body.getChannelException(AMQConstant.NOT_FOUND, msg);
}
else
{
String msg = "No queue name provided, no default queue defined.";
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED.getCode(), msg);
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED, msg);
}
}
else
@@ -108,24 +109,24 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
catch (AMQInvalidSelectorException ise)
{
_log.info("Closing connection due to invalid selector");
- throw body.getChannelException(AMQConstant.INVALID_SELECTOR.getCode(), ise.getMessage());
+ throw body.getChannelException(AMQConstant.INVALID_SELECTOR, ise.getMessage());
}
catch (ConsumerTagNotUniqueException e)
{
AMQShortString msg = new AMQShortString("Non-unique consumer tag, '" + body.consumerTag + "'");
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED.getCode(),
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
"Non-unique consumer tag, '" + body.consumerTag + "'");
}
catch (AMQQueue.ExistingExclusiveSubscription e)
{
- throw body.getChannelException(AMQConstant.ACCESS_REFUSED.getCode(),
+ throw body.getChannelException(AMQConstant.ACCESS_REFUSED,
"Cannot subscribe to queue "
+ queue.getName()
+ " as it already has an existing exclusive consumer");
}
catch (AMQQueue.ExistingSubscriptionPreventsExclusive e)
{
- throw body.getChannelException(AMQConstant.ACCESS_REFUSED.getCode(),
+ throw body.getChannelException(AMQConstant.ACCESS_REFUSED,
"Cannot subscribe to queue "
+ queue.getName()
+ " exclusively as it already has a consumer");
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
index 72d7e8b8b9..b88c2ebf3a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
@@ -39,8 +39,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
AMQChannel channel = session.getChannel(channelId);
if (channel == null)
{
- _log.error("Channel " + channelId + " not found");
- // TODO: either alert or error that the
+ throw body.getChannelNotFoundException(evt.getChannelId());
}
else
{
@@ -51,12 +50,12 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
_log.info("No queue for '" + body.queue + "'");
if(body.queue!=null)
{
- throw body.getConnectionException(AMQConstant.NOT_FOUND.getCode(),
+ throw body.getConnectionException(AMQConstant.NOT_FOUND,
"No such queue, '" + body.queue + "'");
}
else
{
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED.getCode(),
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
"No queue name provided, no default queue defined.");
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
index a30cc2ca3c..7e378dfd01 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
@@ -25,6 +25,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -70,8 +71,7 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basi
// if the exchange does not exist we raise a channel exception
if (e == null)
{
- throw body.getChannelException(500, "Unknown exchange name");
-
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange name");
}
else
{
@@ -79,6 +79,12 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basi
// is stored in the channel. Once the final body frame has been received
// it is routed to the exchange.
AMQChannel channel = session.getChannel(evt.getChannelId());
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(evt.getChannelId());
+ }
+
channel.setPublishFrame(body, session);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java
index 4bc1439e53..3cd6a87f64 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java
@@ -27,6 +27,7 @@ import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.AMQChannel;
public class BasicQosHandler implements StateAwareMethodListener<BasicQosBody>
{
@@ -40,12 +41,18 @@ public class BasicQosHandler implements StateAwareMethodListener<BasicQosBody>
public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicQosBody> evt) throws AMQException
{
AMQProtocolSession session = stateManager.getProtocolSession();
- session.getChannel(evt.getChannelId()).setPrefetchCount(evt.getMethod().prefetchCount);
- session.getChannel(evt.getChannelId()).setPrefetchSize(evt.getMethod().prefetchSize);
+ AMQChannel channel = session.getChannel(evt.getChannelId());
+ if (channel == null)
+ {
+ throw evt.getMethod().getChannelNotFoundException(evt.getChannelId());
+ }
+
+ channel.setPrefetchCount(evt.getMethod().prefetchCount);
+ channel.setPrefetchSize(evt.getMethod().prefetchSize);
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- session.writeFrame(BasicQosOkBody.createAMQFrame(evt.getChannelId(),(byte)8, (byte)0));
+ session.writeFrame(BasicQosOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0));
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
index 9f0d096a73..5f5b7ccad1 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
@@ -46,12 +46,13 @@ public class BasicRecoverMethodHandler implements StateAwareMethodListener<Basic
_logger.debug("Recover received on protocol session " + session + " and channel " + evt.getChannelId());
AMQChannel channel = session.getChannel(evt.getChannelId());
+ BasicRecoverBody body = evt.getMethod();
+
if (channel == null)
{
- throw new AMQException("Unknown channel " + evt.getChannelId());
+ throw body.getChannelNotFoundException(evt.getChannelId());
}
- BasicRecoverBody body = evt.getMethod();
- channel.resend(session, body.requeue);
+ channel.resend(session, body.requeue);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
index bdb877b7ac..bfa170cfc5 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
@@ -52,6 +52,12 @@ public class ChannelFlowHandler implements StateAwareMethodListener<ChannelFlowB
ChannelFlowBody body = evt.getMethod();
AMQChannel channel = session.getChannel(evt.getChannelId());
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(evt.getChannelId());
+ }
+
channel.setSuspended(!body.active);
_logger.debug("Channel.Flow for channel " + evt.getChannelId() + ", active=" + body.active);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
index 809676cfbe..a85af61327 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
@@ -71,7 +71,7 @@ public class ConnectionOpenMethodHandler implements StateAwareMethodListener<Con
if(virtualHost == null)
{
- throw body.getConnectionException(AMQConstant.NOT_FOUND.getCode(), "Unknown virtual host: " + virtualHostName);
+ throw body.getConnectionException(AMQConstant.NOT_FOUND, "Unknown virtual host: " + virtualHostName);
}
else
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
index 575833a68f..be3ffcc698 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
@@ -76,7 +76,7 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange
{
if(body.passive && ((body.type == null) || body.type.length() ==0))
{
- throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(), "Unknown exchange: " + body.exchange);
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + body.exchange);
}
else
{
@@ -89,14 +89,14 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange
}
catch(AMQUnknownExchangeType e)
{
- throw body.getConnectionException(AMQConstant.COMMAND_INVALID.getCode(), "Unknown exchange: " + body.exchange,e);
+ throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Unknown exchange: " + body.exchange,e);
}
}
}
else if (!exchange.getType().equals(body.type))
{
- throw new AMQConnectionException(AMQConstant.NOT_ALLOWED.getCode(), "Attempt to redeclare exchange: " + body.exchange + " of type " + exchange.getType() + " to " + body.type +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor());
+ throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + body.exchange + " of type " + exchange.getType() + " to " + body.type +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor());
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
index bccb9db967..3c903b471d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.handler;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInvalidRoutingKeyException;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.QueueBindBody;
import org.apache.qpid.framing.QueueBindOkBody;
@@ -35,6 +36,7 @@ import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.AMQChannel;
public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
{
@@ -57,17 +59,25 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
VirtualHost virtualHost = session.getVirtualHost();
ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
-
final QueueBindBody body = evt.getMethod();
final AMQQueue queue;
if (body.queue == null)
{
- queue = session.getChannel(evt.getChannelId()).getDefaultQueue();
+ AMQChannel channel = session.getChannel(evt.getChannelId());
+
+// if (channel == null)
+// {
+// throw body.getChannelNotFoundException(evt.getChannelId());
+// }
+
+ queue = channel.getDefaultQueue();
+
if (queue == null)
{
- throw new AMQException("No default queue defined on channel and queue was null");
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "No default queue defined on channel and queue was null");
}
+
if (body.routingKey == null)
{
body.routingKey = queue.getName();
@@ -80,14 +90,25 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
if (queue == null)
{
- throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(), "Queue " + body.queue + " does not exist.");
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.queue + " does not exist.");
}
final Exchange exch = exchangeRegistry.getExchange(body.exchange);
if (exch == null)
{
- throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(), "Exchange " + body.exchange + " does not exist.");
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.exchange + " does not exist.");
+ }
+ try
+ {
+ exch.registerQueue(body.routingKey, queue, body.arguments);
+ }
+ catch (AMQInvalidRoutingKeyException rke)
+ {
+ throw body.getChannelException(AMQConstant.INVALID_ROUTING_KEY, body.routingKey.toString());
+ }
+ catch (AMQException e)
+ {
+ throw body.getChannelException(AMQConstant.CHANNEL_ERROR, e.toString());
}
- exch.registerQueue(body.routingKey, queue, body.arguments);
queue.bind(body.routingKey, exch);
if (_log.isInfoEnabled())
{
@@ -98,7 +119,7 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- final AMQFrame response = QueueBindOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
+ final AMQFrame response = QueueBindOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0);
session.writeFrame(response);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
index 1e4b7c9e57..2218ff604f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
@@ -43,6 +43,7 @@ import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.AMQChannel;
public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclareBody>
{
@@ -83,7 +84,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
body.queue = createName();
}
- AMQQueue queue = null;
+ AMQQueue queue;
//TODO: do we need to check that the queue already exists with exactly the same "configuration"?
synchronized (queueRegistry)
@@ -94,8 +95,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
if(body.passive)
{
String msg = "Queue: " + body.queue + " not found.";
- throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(),msg );
-
+ throw body.getChannelException(AMQConstant.NOT_FOUND,msg );
}
else
{
@@ -116,12 +116,18 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
}
else if(queue.getOwner() != null && !session.getContextKey().equals(queue.getOwner()))
{
- // todo - constant
- throw body.getChannelException(405, "Cannot declare queue, as exclusive queue with same name declared on another connection");
+ throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot declare queue, as exclusive queue with same name declared on another connection");
+ }
+
+ AMQChannel channel = session.getChannel(evt.getChannelId());
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(evt.getChannelId());
}
+
//set this as the default queue on the channel:
- session.getChannel(evt.getChannelId()).setDefaultQueue(queue);
+ channel.setDefaultQueue(queue);
}
if (!body.nowait)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
index 4c875692f0..0c7de312a7 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
@@ -24,6 +24,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.QueueDeleteBody;
import org.apache.qpid.framing.QueueDeleteOkBody;
import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueRegistry;
@@ -31,6 +32,7 @@ import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.AMQChannel;
public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteBody>
{
@@ -65,7 +67,15 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDelete
AMQQueue queue;
if(body.queue == null)
{
- queue = session.getChannel(evt.getChannelId()).getDefaultQueue();
+ AMQChannel channel = session.getChannel(evt.getChannelId());
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(evt.getChannelId());
+ }
+
+ //get the default queue on the channel:
+ queue = channel.getDefaultQueue();
}
else
{
@@ -76,19 +86,19 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDelete
{
if(_failIfNotFound)
{
- throw body.getChannelException(404, "Queue " + body.queue + " does not exist.");
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.queue + " does not exist.");
}
}
else
{
if(body.ifEmpty && !queue.isEmpty())
{
- throw body.getChannelException(406, "Queue: " + body.queue + " is not empty." );
+ throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.queue + " is not empty." );
}
else if(body.ifUnused && !queue.isUnused())
{
// TODO - Error code
- throw body.getChannelException(406, "Queue: " + body.queue + " is still used." );
+ throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.queue + " is still used." );
}
else
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
index 3ccc61fff0..0c00436470 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
@@ -11,6 +11,7 @@ import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.AMQChannel;
public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBody>
{
@@ -39,18 +40,27 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod
VirtualHost virtualHost = session.getVirtualHost();
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+ AMQChannel channel = session.getChannel(evt.getChannelId());
+
QueuePurgeBody body = evt.getMethod();
AMQQueue queue;
if(body.queue == null)
{
- queue = session.getChannel(evt.getChannelId()).getDefaultQueue();
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(evt.getChannelId());
+ }
+
+ //get the default queue on the channel:
+ queue = channel.getDefaultQueue();
+
if(queue == null)
{
if(_failIfNotFound)
{
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED.getCode(),"No queue specified.");
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,"No queue specified.");
}
-
}
}
else
@@ -62,12 +72,12 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod
{
if(_failIfNotFound)
{
- throw body.getChannelException(404, "Queue " + body.queue + " does not exist.");
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.queue + " does not exist.");
}
}
else
{
- long purged = queue.clearQueue(session.getChannel(evt.getChannelId()).getStoreContext());
+ long purged = queue.clearQueue(channel.getStoreContext());
if(!body.nowait)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
index caf0efad67..3d7ec286f9 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
@@ -47,7 +47,7 @@ public class TxCommitHandler implements StateAwareMethodListener<TxCommitBody>
public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<TxCommitBody> evt) throws AMQException
{
- AMQProtocolSession session = stateManager.getProtocolSession();
+ AMQProtocolSession session = stateManager.getProtocolSession();
try
{
@@ -56,14 +56,20 @@ public class TxCommitHandler implements StateAwareMethodListener<TxCommitBody>
_log.debug("Commit received on channel " + evt.getChannelId());
}
AMQChannel channel = session.getChannel(evt.getChannelId());
+
+ if (channel == null)
+ {
+ throw evt.getMethod().getChannelNotFoundException(evt.getChannelId());
+ }
+
channel.commit();
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- session.writeFrame(TxCommitOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0));
+ session.writeFrame(TxCommitOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0));
channel.processReturns(session);
}
- catch(AMQException e)
+ catch (AMQException e)
{
throw evt.getMethod().getChannelException(e.getErrorCode(), "Failed to commit: " + e.getMessage());
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
index 9088240351..8ce5a0ea73 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
@@ -45,18 +45,27 @@ public class TxRollbackHandler implements StateAwareMethodListener<TxRollbackBod
public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<TxRollbackBody> evt) throws AMQException
{
AMQProtocolSession session = stateManager.getProtocolSession();
-
- try{
+
+ try
+ {
AMQChannel channel = session.getChannel(evt.getChannelId());
+
+ if (channel == null)
+ {
+ throw evt.getMethod().getChannelNotFoundException(evt.getChannelId());
+ }
+
channel.rollback();
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- session.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0));
+ session.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0));
//Now resend all the unacknowledged messages back to the original subscribers.
//(Must be done after the TxnRollback-ok response).
channel.resend(session, false);
- }catch(AMQException e){
+ }
+ catch (AMQException e)
+ {
throw evt.getMethod().getChannelException(e.getErrorCode(), "Failed to rollback: " + e.getMessage());
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java
index 29795e50ca..a9e478e301 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java
@@ -27,6 +27,7 @@ import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.AMQChannel;
public class TxSelectHandler implements StateAwareMethodListener<TxSelectBody>
{
@@ -44,11 +45,19 @@ public class TxSelectHandler implements StateAwareMethodListener<TxSelectBody>
public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<TxSelectBody> evt) throws AMQException
{
AMQProtocolSession session = stateManager.getProtocolSession();
-
- session.getChannel(evt.getChannelId()).setLocalTransactional();
+
+ AMQChannel channel = session.getChannel(evt.getChannelId());
+
+ if (channel == null)
+ {
+ throw evt.getMethod().getChannelNotFoundException(evt.getChannelId());
+ }
+
+ channel.setLocalTransactional();
+
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- session.writeFrame(TxSelectOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0));
+ session.writeFrame(TxSelectOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0));
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index e53410420f..309fa4663a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -325,6 +325,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
_logger.debug("Content header frame received: " + frame);
}
+ //fixme what happens if getChannel returns null
getChannel(frame.getChannel()).publishContentHeader((ContentHeaderBody) frame.getBodyFrame());
}
@@ -334,6 +335,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
_logger.debug("Content body frame received: " + frame);
}
+ //fixme what happens if getChannel returns null
getChannel(frame.getChannel()).publishContentBody((ContentBody) frame.getBodyFrame(), this);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java b/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java
index 2049189e0f..c63490f019 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java
@@ -35,8 +35,8 @@ public class NoConsumersException extends RequiredDeliveryException
super("Immediate delivery is not possible.", message);
}
- public int getReplyCode()
+ public AMQConstant getReplyCode()
{
- return AMQConstant.NO_CONSUMERS.getCode();
+ return AMQConstant.NO_CONSUMERS;
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index 05841ccfc0..6bdfeccc0f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -24,6 +24,8 @@ import java.util.Queue;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQChannelException;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.common.ClientProperties;
import org.apache.qpid.framing.AMQShortString;
@@ -37,11 +39,8 @@ import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
/**
- * Encapsulation of a supscription to a queue.
- * <p/>
- * Ties together the protocol session of a subscriber, the consumer tag that
- * was given out by the broker and the channel id.
- * <p/>
+ * Encapsulation of a supscription to a queue. <p/> Ties together the protocol session of a subscriber, the consumer tag
+ * that was given out by the broker and the channel id. <p/>
*/
public class SubscriptionImpl implements Subscription
{
@@ -59,9 +58,7 @@ public class SubscriptionImpl implements Subscription
private final boolean _noLocal;
- /**
- * True if messages need to be acknowledged
- */
+ /** True if messages need to be acknowledged */
private final boolean _acks;
private FilterManager _filters;
private final boolean _isBrowser;
@@ -96,8 +93,8 @@ public class SubscriptionImpl implements Subscription
{
AMQChannel channel = protocolSession.getChannel(channelId);
if (channel == null)
- {
- throw new NullPointerException("channel not found in protocol session");
+ {
+ throw new AMQException(AMQConstant.NOT_FOUND, "channel :" + channelId + " not found in protocol session");
}
this.channel = channel;
@@ -172,9 +169,7 @@ public class SubscriptionImpl implements Subscription
return (o instanceof SubscriptionImpl) && equals((SubscriptionImpl) o);
}
- /**
- * Equality holds if the session matches and the channel and consumer tag are the same.
- */
+ /** Equality holds if the session matches and the channel and consumer tag are the same. */
private boolean equals(SubscriptionImpl psc)
{
return sessionKey.equals(psc.sessionKey)
@@ -193,11 +188,12 @@ public class SubscriptionImpl implements Subscription
}
/**
- * This method can be called by each of the publisher threads.
- * As a result all changes to the channel object must be thread safe.
+ * This method can be called by each of the publisher threads. As a result all changes to the channel object must be
+ * thread safe.
*
* @param msg
* @param queue
+ *
* @throws AMQException
*/
public void send(AMQMessage msg, AMQQueue queue) throws AMQException
@@ -224,7 +220,7 @@ public class SubscriptionImpl implements Subscription
// We don't decrement the reference here as we don't want to consume the message
// but we do want to send it to the client.
- synchronized(channel)
+ synchronized (channel)
{
long deliveryTag = channel.getNextDeliveryTag();
@@ -260,7 +256,7 @@ public class SubscriptionImpl implements Subscription
}
queue.dequeue(storeContext, msg);
}
- synchronized(channel)
+ synchronized (channel)
{
long deliveryTag = channel.getNextDeliveryTag();
@@ -309,11 +305,11 @@ public class SubscriptionImpl implements Subscription
Object localInstance;
Object msgInstance;
- if((protocolSession.getClientProperties() != null) &&
- (localInstance = protocolSession.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
+ if ((protocolSession.getClientProperties() != null) &&
+ (localInstance = protocolSession.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
{
- if((msg.getPublisher().getClientProperties() != null) &&
- (msgInstance = msg.getPublisher().getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
+ if ((msg.getPublisher().getClientProperties() != null) &&
+ (msgInstance = msg.getPublisher().getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
{
if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance)))
{
@@ -402,10 +398,10 @@ public class SubscriptionImpl implements Subscription
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(),
- protocolSession.getProtocolMajorVersion(),
- protocolSession.getProtocolMinorVersion(),
- consumerTag // consumerTag
- ));
+ protocolSession.getProtocolMajorVersion(),
+ protocolSession.getProtocolMinorVersion(),
+ consumerTag // consumerTag
+ ));
_closed = true;
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
index 6d1e9ce99d..29efdd9513 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
@@ -55,7 +55,6 @@ import org.apache.qpid.framing.QueuePurgeBody;
import org.apache.qpid.framing.TxCommitBody;
import org.apache.qpid.framing.TxRollbackBody;
import org.apache.qpid.framing.TxSelectBody;
-import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.server.handler.BasicAckMethodHandler;
@@ -231,7 +230,7 @@ public class AMQStateManager implements AMQMethodListener
&& (protocolSession.getChannel(evt.getChannelId()) == null)
&& !protocolSession.channelAwaitingClosure(evt.getChannelId()))
{
- throw evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR.getCode(), "No such channel: " + evt.getChannelId());
+ throw evt.getMethod().getChannelNotFoundException(evt.getChannelId());
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java b/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java
index 52dcfcfbfb..0bc474f6e6 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java
@@ -21,10 +21,11 @@
package org.apache.qpid.client;
import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
public class AMQAuthenticationException extends AMQException
{
- public AMQAuthenticationException(int error, String msg)
+ public AMQAuthenticationException(AMQConstant error, String msg)
{
super(error,msg);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index cb2533c2bb..ebaa22ce44 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -966,7 +966,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
if (cause instanceof AMQException)
{
- je = new JMSException(Integer.toString(((AMQException) cause).getErrorCode()), "Exception thrown against " + toString() + ": " + cause);
+ je = new JMSException(Integer.toString(((AMQException) cause).getErrorCode().getCode()), "Exception thrown against " + toString() + ": " + cause);
}
else
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQNoConsumersException.java b/java/client/src/main/java/org/apache/qpid/client/AMQNoConsumersException.java
index 277e3f7eaf..bec2958cb9 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQNoConsumersException.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQNoConsumersException.java
@@ -28,7 +28,7 @@ public class AMQNoConsumersException extends AMQUndeliveredException
{
public AMQNoConsumersException(String msg, Object bounced)
{
- super(AMQConstant.NO_CONSUMERS.getCode(), msg, bounced);
+ super(AMQConstant.NO_CONSUMERS, msg, bounced);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQNoRouteException.java b/java/client/src/main/java/org/apache/qpid/client/AMQNoRouteException.java
index 0e84ad75f2..6ea8413446 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQNoRouteException.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQNoRouteException.java
@@ -28,7 +28,7 @@ public class AMQNoRouteException extends AMQUndeliveredException
{
public AMQNoRouteException(String msg, Object bounced)
{
- super(AMQConstant.NO_ROUTE.getCode(), msg, bounced);
+ super(AMQConstant.NO_ROUTE, msg, bounced);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
index d011d02a91..6ef187286b 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
@@ -24,6 +24,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQChannelClosedException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInvalidSelectorException;
+import org.apache.qpid.AMQInvalidRoutingKeyException;
import org.apache.qpid.client.AMQNoConsumersException;
import org.apache.qpid.client.AMQNoRouteException;
import org.apache.qpid.client.protocol.AMQProtocolSession;
@@ -52,34 +53,39 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener
_logger.debug("ChannelClose method received");
ChannelCloseBody method = (ChannelCloseBody) evt.getMethod();
- int errorCode = method.replyCode;
+ AMQConstant errorCode = AMQConstant.getConstant(method.replyCode);
AMQShortString reason = method.replyText;
if (_logger.isDebugEnabled())
{
_logger.debug("Channel close reply code: " + errorCode + ", reason: " + reason);
}
-
// TODO: Be aware of possible changes to parameter order as versions change.
AMQFrame frame = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), method.getMajor(), method.getMinor());
protocolSession.writeFrame(frame);
- if (errorCode != AMQConstant.REPLY_SUCCESS.getCode())
+ if (errorCode != AMQConstant.REPLY_SUCCESS)
{
_logger.error("Channel close received with errorCode " + errorCode + ", and reason " + reason);
- if (errorCode == AMQConstant.NO_CONSUMERS.getCode())
+ if (errorCode == AMQConstant.NO_CONSUMERS)
{
throw new AMQNoConsumersException("Error: " + reason, null);
}
- else if (errorCode == AMQConstant.NO_ROUTE.getCode())
+ else if (errorCode == AMQConstant.NO_ROUTE)
{
throw new AMQNoRouteException("Error: " + reason, null);
}
- else if (errorCode == AMQConstant.INVALID_SELECTOR.getCode())
+ else if (errorCode == AMQConstant.INVALID_SELECTOR)
{
- _logger.info("Broker responded with Invalid Selector.");
+ _logger.debug("Broker responded with Invalid Selector.");
throw new AMQInvalidSelectorException(String.valueOf(reason));
}
+ else if (errorCode == AMQConstant.INVALID_ROUTING_KEY)
+ {
+ _logger.debug("Broker responded with Invalid Routing Key.");
+
+ throw new AMQInvalidRoutingKeyException(String.valueOf(reason));
+ }
else
{
throw new AMQChannelClosedException(errorCode, "Error: " + reason);
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
index f928ab56eb..57d987712a 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
@@ -57,16 +57,16 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener
// does it matter
//stateManager.changeState(AMQState.CONNECTION_CLOSING);
- int errorCode = method.replyCode;
+ AMQConstant errorCode = AMQConstant.getConstant(method.replyCode);
AMQShortString reason = method.replyText;
// TODO: check whether channel id of zero is appropriate
// Be aware of possible changes to parameter order as versions change.
protocolSession.writeFrame(ConnectionCloseOkBody.createAMQFrame((short)0, method.getMajor(), method.getMinor()));
- if (errorCode != 200)
+ if (errorCode != AMQConstant.REPLY_SUCCESS)
{
- if(errorCode == AMQConstant.NOT_ALLOWED.getCode())
+ if(errorCode == AMQConstant.NOT_ALLOWED)
{
_logger.info("Authentication Error:"+Thread.currentThread().getName());
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/UnexpectedBodyReceivedException.java b/java/client/src/main/java/org/apache/qpid/client/message/UnexpectedBodyReceivedException.java
index 51a9aa7226..21526ac6d2 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/UnexpectedBodyReceivedException.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/UnexpectedBodyReceivedException.java
@@ -22,6 +22,7 @@ package org.apache.qpid.client.message;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
public class UnexpectedBodyReceivedException extends AMQException
{
@@ -36,7 +37,7 @@ public class UnexpectedBodyReceivedException extends AMQException
super(logger, msg);
}
- public UnexpectedBodyReceivedException(Logger logger, int errorCode, String msg)
+ public UnexpectedBodyReceivedException(Logger logger, AMQConstant errorCode, String msg)
{
super(logger, errorCode, msg);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index d4d700966a..055109d3be 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -48,6 +48,7 @@ import org.apache.qpid.framing.ProtocolInitiation;
import org.apache.qpid.framing.ProtocolVersionList;
import org.apache.qpid.framing.VersionSpecificRegistry;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.protocol.AMQConstant;
/**
* Wrapper for protocol session that provides type-safe access to session attributes.
@@ -389,7 +390,7 @@ public class AMQProtocolSession implements ProtocolVersionList, AMQVersionAwareP
* initiated the channel close, false if the channel close is just the server
* responding to the client's earlier request to close the channel.
*/
- public boolean channelClosed(int channelId, int code, String text) throws AMQException
+ public boolean channelClosed(int channelId, AMQConstant code, String text) throws AMQException
{
final Integer chId = channelId;
// if this is not a response to an earlier request to close the channel
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
index 9b214e88f9..f957df2c34 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
@@ -63,7 +63,7 @@ public class DurableSubscriptionTest extends TestCase
Session session2 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, "MySubscription");
- con.start();
+ con.start();
producer.send(session1.createTextMessage("A"));
diff --git a/java/common/src/main/java/org/apache/qpid/AMQChannelClosedException.java b/java/common/src/main/java/org/apache/qpid/AMQChannelClosedException.java
index cd8b40c6da..272933ca04 100644
--- a/java/common/src/main/java/org/apache/qpid/AMQChannelClosedException.java
+++ b/java/common/src/main/java/org/apache/qpid/AMQChannelClosedException.java
@@ -20,12 +20,14 @@
*/
package org.apache.qpid;
+import org.apache.qpid.protocol.AMQConstant;
+
/**
* AMQ channel closed exception.
*/
public class AMQChannelClosedException extends AMQException
{
- public AMQChannelClosedException(int errorCode, String msg)
+ public AMQChannelClosedException(AMQConstant errorCode, String msg)
{
super(errorCode, msg);
}
diff --git a/java/common/src/main/java/org/apache/qpid/AMQChannelException.java b/java/common/src/main/java/org/apache/qpid/AMQChannelException.java
index d1750ebbb5..d8c9b287bd 100644
--- a/java/common/src/main/java/org/apache/qpid/AMQChannelException.java
+++ b/java/common/src/main/java/org/apache/qpid/AMQChannelException.java
@@ -23,6 +23,7 @@ package org.apache.qpid;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ChannelCloseBody;
+import org.apache.qpid.protocol.AMQConstant;
public class AMQChannelException extends AMQException
{
@@ -32,7 +33,7 @@ public class AMQChannelException extends AMQException
private final byte major;
private final byte minor;
- public AMQChannelException(int errorCode, String msg, int classId, int methodId, byte major, byte minor, Throwable t)
+ public AMQChannelException(AMQConstant errorCode, String msg, int classId, int methodId, byte major, byte minor, Throwable t)
{
super(errorCode, msg, t);
_classId = classId;
@@ -41,7 +42,7 @@ public class AMQChannelException extends AMQException
this.minor = minor;
}
- public AMQChannelException(int errorCode, String msg, int classId, int methodId, byte major, byte minor)
+ public AMQChannelException(AMQConstant errorCode, String msg, int classId, int methodId, byte major, byte minor)
{
super(errorCode, msg);
_classId = classId;
@@ -52,6 +53,6 @@ public class AMQChannelException extends AMQException
public AMQFrame getCloseFrame(int channel)
{
- return ChannelCloseBody.createAMQFrame(channel, major, minor, _classId, _methodId, getErrorCode(), new AMQShortString(getMessage()));
+ return ChannelCloseBody.createAMQFrame(channel, major, minor, _classId, _methodId, getErrorCode().getCode(), new AMQShortString(getMessage()));
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/AMQConnectionClosedException.java b/java/common/src/main/java/org/apache/qpid/AMQConnectionClosedException.java
index 6ec18bad20..e0ed16a9f0 100644
--- a/java/common/src/main/java/org/apache/qpid/AMQConnectionClosedException.java
+++ b/java/common/src/main/java/org/apache/qpid/AMQConnectionClosedException.java
@@ -20,12 +20,14 @@
*/
package org.apache.qpid;
+import org.apache.qpid.protocol.AMQConstant;
+
/**
* AMQ channel closed exception.
*/
public class AMQConnectionClosedException extends AMQException
{
- public AMQConnectionClosedException(int errorCode, String msg)
+ public AMQConnectionClosedException(AMQConstant errorCode, String msg)
{
super(errorCode, msg);
}
diff --git a/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java b/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java
index c6a874bcf3..c4f80191a3 100644
--- a/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java
+++ b/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java
@@ -24,6 +24,7 @@ package org.apache.qpid;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.protocol.AMQConstant;
public class AMQConnectionException extends AMQException
{
@@ -34,7 +35,7 @@ public class AMQConnectionException extends AMQException
private final byte minor;
boolean _closeConnetion;
- public AMQConnectionException(int errorCode, String msg, int classId, int methodId, byte major, byte minor, Throwable t)
+ public AMQConnectionException(AMQConstant errorCode, String msg, int classId, int methodId, byte major, byte minor, Throwable t)
{
super(errorCode, msg, t);
_classId = classId;
@@ -43,7 +44,7 @@ public class AMQConnectionException extends AMQException
this.minor = minor;
}
- public AMQConnectionException(int errorCode, String msg, int classId, int methodId, byte major, byte minor)
+ public AMQConnectionException(AMQConstant errorCode, String msg, int classId, int methodId, byte major, byte minor)
{
super(errorCode, msg);
_classId = classId;
@@ -56,7 +57,7 @@ public class AMQConnectionException extends AMQException
public AMQFrame getCloseFrame(int channel)
{
- return ConnectionCloseBody.createAMQFrame(channel, major, minor, _classId, _methodId, getErrorCode(), new AMQShortString(getMessage()));
+ return ConnectionCloseBody.createAMQFrame(channel, major, minor, _classId, _methodId, getErrorCode().getCode(), new AMQShortString(getMessage()));
}
diff --git a/java/common/src/main/java/org/apache/qpid/AMQException.java b/java/common/src/main/java/org/apache/qpid/AMQException.java
index 93c31e4fa8..5c11ec18ca 100644
--- a/java/common/src/main/java/org/apache/qpid/AMQException.java
+++ b/java/common/src/main/java/org/apache/qpid/AMQException.java
@@ -21,31 +21,36 @@
package org.apache.qpid;
import org.apache.log4j.Logger;
+import org.apache.qpid.protocol.AMQConstant;
/**
* Generic AMQ exception.
*/
public class AMQException extends Exception
{
- private int _errorCode;
+ private AMQConstant _errorCode;
public AMQException(String message)
{
super(message);
+ //fixme This method needs removed and all AMQExceptions need a valid error code
+ _errorCode = AMQConstant.getConstant(-1);
}
public AMQException(String msg, Throwable t)
{
super(msg, t);
+ //fixme This method needs removed and all AMQExceptions need a valid error code
+ _errorCode = AMQConstant.getConstant(-1);
}
- public AMQException(int errorCode, String msg, Throwable t)
+ public AMQException(AMQConstant errorCode, String msg, Throwable t)
{
super(msg + " [error code " + errorCode + ']', t);
_errorCode = errorCode;
}
- public AMQException(int errorCode, String msg)
+ public AMQException(AMQConstant errorCode, String msg)
{
super(msg + " [error code " + errorCode + ']');
_errorCode = errorCode;
@@ -63,13 +68,13 @@ public class AMQException extends Exception
logger.error(getMessage(), this);
}
- public AMQException(Logger logger, int errorCode, String msg)
+ public AMQException(Logger logger, AMQConstant errorCode, String msg)
{
this(errorCode, msg);
logger.error(getMessage(), this);
}
- public int getErrorCode()
+ public AMQConstant getErrorCode()
{
return _errorCode;
}
diff --git a/java/common/src/main/java/org/apache/qpid/AMQInvalidRoutingKeyException.java b/java/common/src/main/java/org/apache/qpid/AMQInvalidRoutingKeyException.java
new file mode 100644
index 0000000000..3293e2523d
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/AMQInvalidRoutingKeyException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid;
+
+import org.apache.qpid.protocol.AMQConstant;
+
+public class AMQInvalidRoutingKeyException extends AMQException
+{
+ public AMQInvalidRoutingKeyException(String message)
+ {
+ super(AMQConstant.INVALID_ROUTING_KEY,message);
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/AMQInvalidSelectorException.java b/java/common/src/main/java/org/apache/qpid/AMQInvalidSelectorException.java
index dcd039b789..9d003514ad 100644
--- a/java/common/src/main/java/org/apache/qpid/AMQInvalidSelectorException.java
+++ b/java/common/src/main/java/org/apache/qpid/AMQInvalidSelectorException.java
@@ -26,6 +26,6 @@ public class AMQInvalidSelectorException extends AMQException
{
public AMQInvalidSelectorException(String message)
{
- super(AMQConstant.INVALID_SELECTOR.getCode(),message);
+ super(AMQConstant.INVALID_SELECTOR,message);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/AMQUndeliveredException.java b/java/common/src/main/java/org/apache/qpid/AMQUndeliveredException.java
index 4944ccc371..ad5aff7bb6 100644
--- a/java/common/src/main/java/org/apache/qpid/AMQUndeliveredException.java
+++ b/java/common/src/main/java/org/apache/qpid/AMQUndeliveredException.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid;
+import org.apache.qpid.protocol.AMQConstant;
+
/**
* Generic AMQ exception.
*/
@@ -27,7 +29,7 @@ public class AMQUndeliveredException extends AMQException
{
private Object _bounced;
- public AMQUndeliveredException(int errorCode, String msg, Object bounced)
+ public AMQUndeliveredException(AMQConstant errorCode, String msg, Object bounced)
{
super(errorCode, msg);
diff --git a/java/common/src/main/java/org/apache/qpid/configuration/PropertyException.java b/java/common/src/main/java/org/apache/qpid/configuration/PropertyException.java
index 67af0b0b74..958f59191f 100644
--- a/java/common/src/main/java/org/apache/qpid/configuration/PropertyException.java
+++ b/java/common/src/main/java/org/apache/qpid/configuration/PropertyException.java
@@ -22,6 +22,7 @@ package org.apache.qpid.configuration;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
/**
* Indicates an error parsing a property expansion.
@@ -38,12 +39,12 @@ public class PropertyException extends AMQException
super(msg, t);
}
- public PropertyException(int errorCode, String msg, Throwable t)
+ public PropertyException(AMQConstant errorCode, String msg, Throwable t)
{
super(errorCode, msg, t);
}
- public PropertyException(int errorCode, String msg)
+ public PropertyException(AMQConstant errorCode, String msg)
{
super(errorCode, msg);
}
@@ -58,7 +59,7 @@ public class PropertyException extends AMQException
super(logger, msg);
}
- public PropertyException(Logger logger, int errorCode, String msg)
+ public PropertyException(Logger logger, AMQConstant errorCode, String msg)
{
super(logger, errorCode, msg);
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
index 3fa5b150ab..111d9a8f20 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
@@ -23,19 +23,26 @@ package org.apache.qpid.framing;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
+import org.apache.qpid.protocol.AMQConstant;
public abstract class AMQMethodBody extends AMQBody
{
- public static final byte TYPE = 1;
-
- /**
- * AMQP version
- */
+ public static final byte TYPE = 1;
+
+ /** AMQP version */
protected byte major;
protected byte minor;
- public byte getMajor() { return major; }
- public byte getMinor() { return minor; }
-
+
+ public byte getMajor()
+ {
+ return major;
+ }
+
+ public byte getMinor()
+ {
+ return minor;
+ }
+
public AMQMethodBody(byte major, byte minor)
{
this.major = major;
@@ -45,14 +52,10 @@ public abstract class AMQMethodBody extends AMQBody
/** unsigned short */
protected abstract int getBodySize();
- /**
- * @return unsigned short
- */
+ /** @return unsigned short */
protected abstract int getClazz();
- /**
- * @return unsigned short
- */
+ /** @return unsigned short */
protected abstract int getMethod();
protected abstract void writeMethodPayload(ByteBuffer buffer);
@@ -90,27 +93,38 @@ public abstract class AMQMethodBody extends AMQBody
}
/**
- * Creates an AMQChannelException for the corresponding body type (a channel exception
- * should include the class and method ids of the body it resulted from).
+ * Creates an AMQChannelException for the corresponding body type (a channel exception should include the class and
+ * method ids of the body it resulted from).
+ */
+
+ /**
+ * Convenience Method to create a channel not found exception
+ *
+ * @param channelId The channel id that is not found
+ *
+ * @return new AMQChannelException
*/
- public AMQChannelException getChannelException(int code, String message)
+ public AMQChannelException getChannelNotFoundException(int channelId)
+ {
+ return getChannelException(AMQConstant.NOT_FOUND, "Channel not found for id:" + channelId);
+ }
+
+ public AMQChannelException getChannelException(AMQConstant code, String message)
{
return new AMQChannelException(code, message, getClazz(), getMethod(), major, minor);
}
- public AMQChannelException getChannelException(int code, String message, Throwable cause)
+ public AMQChannelException getChannelException(AMQConstant code, String message, Throwable cause)
{
return new AMQChannelException(code, message, getClazz(), getMethod(), major, minor, cause);
}
- public AMQConnectionException getConnectionException(int code, String message)
+ public AMQConnectionException getConnectionException(AMQConstant code, String message)
{
return new AMQConnectionException(code, message, getClazz(), getMethod(), major, minor);
}
-
-
- public AMQConnectionException getConnectionException(int code, String message, Throwable cause)
+ public AMQConnectionException getConnectionException(AMQConstant code, String message, Throwable cause)
{
return new AMQConnectionException(code, message, getClazz(), getMethod(), major, minor, cause);
}
diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
index a4d90e9ee3..05365de137 100644
--- a/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
+++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
@@ -69,7 +69,7 @@ public final class AMQConstant
public static final AMQConstant MESSAGE_TOO_LARGE = new AMQConstant(311, "message too large", true);
public static final AMQConstant NO_ROUTE = new AMQConstant(312, "no route", true);
-
+
public static final AMQConstant NO_CONSUMERS = new AMQConstant(313, "no consumers", true);
public static final AMQConstant CONTEXT_IN_USE = new AMQConstant(320, "context in use", true);
@@ -78,12 +78,18 @@ public final class AMQConstant
public static final AMQConstant INVALID_SELECTOR = new AMQConstant(322, "selector invalid", true);
+ public static final AMQConstant INVALID_ROUTING_KEY = new AMQConstant(323, "routing key invalid", true);
+
public static final AMQConstant INVALID_PATH = new AMQConstant(402, "invalid path", true);
public static final AMQConstant ACCESS_REFUSED = new AMQConstant(403, "access refused", true);
public static final AMQConstant NOT_FOUND = new AMQConstant(404, "not found", true);
+ public static final AMQConstant ALREADY_EXISTS = new AMQConstant(405, "Already exists", true);
+
+ public static final AMQConstant IN_USE = new AMQConstant(406, "In use", true);
+
public static final AMQConstant FRAME_ERROR = new AMQConstant(501, "frame error", true);
public static final AMQConstant SYNTAX_ERROR = new AMQConstant(502, "syntax error", true);