diff options
Diffstat (limited to 'java')
43 files changed, 325 insertions, 150 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 8b36576a30..0879b77f37 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -602,7 +602,7 @@ public class AMQChannel for (RequiredDeliveryException bouncedMessage : _returnMessages) { AMQMessage message = bouncedMessage.getAMQMessage(); - message.writeReturn(session, _channelId, bouncedMessage.getReplyCode(), new AMQShortString(bouncedMessage.getMessage())); + message.writeReturn(session, _channelId, bouncedMessage.getReplyCode().getCode(), new AMQShortString(bouncedMessage.getMessage())); } _returnMessages.clear(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java index b85e3603b7..820f0122f5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java @@ -21,6 +21,7 @@ package org.apache.qpid.server; import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.queue.AMQMessage; /** @@ -44,10 +45,10 @@ public abstract class RequiredDeliveryException extends AMQException return _amqMessage; } - public int getErrorCode() + public AMQConstant getErrorCode() { return getReplyCode(); } - public abstract int getReplyCode(); + public abstract AMQConstant getReplyCode(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java index 6688318a0a..f93b2b25e6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java @@ -47,13 +47,19 @@ public class BasicAckMethodHandler implements StateAwareMethodListener<BasicAckB public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicAckBody> evt) throws AMQException { AMQProtocolSession protocolSession = stateManager.getProtocolSession(); - + if (_log.isDebugEnabled()) { _log.debug("Ack received on channel " + evt.getChannelId()); } BasicAckBody body = evt.getMethod(); final AMQChannel channel = protocolSession.getChannel(evt.getChannelId()); + + if (channel == null) + { + throw body.getChannelNotFoundException(evt.getChannelId()); + } + // this method throws an AMQException if the delivery tag is not known channel.acknowledgeMessage(body.deliveryTag, body.multiple); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java index 1e56542b2b..7d18043f5c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java @@ -49,15 +49,21 @@ public class BasicCancelMethodHandler implements StateAwareMethodListener<BasicC final AMQChannel channel = protocolSession.getChannel(evt.getChannelId()); final BasicCancelBody body = evt.getMethod(); + + if (channel == null) + { + throw body.getChannelNotFoundException(evt.getChannelId()); + } + channel.unsubscribeConsumer(protocolSession, body.consumerTag); - if(!body.nowait) + if (!body.nowait) { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. final AMQFrame responseFrame = BasicCancelOkBody.createAMQFrame(evt.getChannelId(), - (byte)8, (byte)0, // AMQP version (major, minor) - body.consumerTag); // consumerTag + (byte) 8, (byte) 0, // AMQP version (major, minor) + body.consumerTag); // consumerTag protocolSession.writeFrame(responseFrame); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java index feb6f6b1fa..090988d304 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java @@ -61,11 +61,12 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic final int channelId = evt.getChannelId(); AMQChannel channel = session.getChannel(channelId); + VirtualHost vHost = session.getVirtualHost(); + if (channel == null) { - _log.error("Channel " + channelId + " not found"); - // TODO: either alert or error that the + throw body.getChannelNotFoundException(evt.getChannelId()); } else { @@ -78,12 +79,12 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic if (body.queue != null) { String msg = "No such queue, '" + body.queue + "'"; - throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(), msg); + throw body.getChannelException(AMQConstant.NOT_FOUND, msg); } else { String msg = "No queue name provided, no default queue defined."; - throw body.getConnectionException(AMQConstant.NOT_ALLOWED.getCode(), msg); + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, msg); } } else @@ -108,24 +109,24 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic catch (AMQInvalidSelectorException ise) { _log.info("Closing connection due to invalid selector"); - throw body.getChannelException(AMQConstant.INVALID_SELECTOR.getCode(), ise.getMessage()); + throw body.getChannelException(AMQConstant.INVALID_SELECTOR, ise.getMessage()); } catch (ConsumerTagNotUniqueException e) { AMQShortString msg = new AMQShortString("Non-unique consumer tag, '" + body.consumerTag + "'"); - throw body.getConnectionException(AMQConstant.NOT_ALLOWED.getCode(), + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Non-unique consumer tag, '" + body.consumerTag + "'"); } catch (AMQQueue.ExistingExclusiveSubscription e) { - throw body.getChannelException(AMQConstant.ACCESS_REFUSED.getCode(), + throw body.getChannelException(AMQConstant.ACCESS_REFUSED, "Cannot subscribe to queue " + queue.getName() + " as it already has an existing exclusive consumer"); } catch (AMQQueue.ExistingSubscriptionPreventsExclusive e) { - throw body.getChannelException(AMQConstant.ACCESS_REFUSED.getCode(), + throw body.getChannelException(AMQConstant.ACCESS_REFUSED, "Cannot subscribe to queue " + queue.getName() + " exclusively as it already has a consumer"); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java index 72d7e8b8b9..b88c2ebf3a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java @@ -39,8 +39,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB AMQChannel channel = session.getChannel(channelId);
if (channel == null)
{
- _log.error("Channel " + channelId + " not found");
- // TODO: either alert or error that the
+ throw body.getChannelNotFoundException(evt.getChannelId());
}
else
{
@@ -51,12 +50,12 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB _log.info("No queue for '" + body.queue + "'");
if(body.queue!=null)
{
- throw body.getConnectionException(AMQConstant.NOT_FOUND.getCode(),
+ throw body.getConnectionException(AMQConstant.NOT_FOUND,
"No such queue, '" + body.queue + "'");
}
else
{
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED.getCode(),
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
"No queue name provided, no default queue defined.");
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java index a30cc2ca3c..7e378dfd01 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java @@ -25,6 +25,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.protocol.AMQProtocolSession; @@ -70,8 +71,7 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basi // if the exchange does not exist we raise a channel exception if (e == null) { - throw body.getChannelException(500, "Unknown exchange name"); - + throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange name"); } else { @@ -79,6 +79,12 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basi // is stored in the channel. Once the final body frame has been received // it is routed to the exchange. AMQChannel channel = session.getChannel(evt.getChannelId()); + + if (channel == null) + { + throw body.getChannelNotFoundException(evt.getChannelId()); + } + channel.setPublishFrame(body, session); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java index 4bc1439e53..3cd6a87f64 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java @@ -27,6 +27,7 @@ import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.AMQChannel; public class BasicQosHandler implements StateAwareMethodListener<BasicQosBody> { @@ -40,12 +41,18 @@ public class BasicQosHandler implements StateAwareMethodListener<BasicQosBody> public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicQosBody> evt) throws AMQException { AMQProtocolSession session = stateManager.getProtocolSession(); - session.getChannel(evt.getChannelId()).setPrefetchCount(evt.getMethod().prefetchCount); - session.getChannel(evt.getChannelId()).setPrefetchSize(evt.getMethod().prefetchSize); + AMQChannel channel = session.getChannel(evt.getChannelId()); + if (channel == null) + { + throw evt.getMethod().getChannelNotFoundException(evt.getChannelId()); + } + + channel.setPrefetchCount(evt.getMethod().prefetchCount); + channel.setPrefetchSize(evt.getMethod().prefetchSize); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - session.writeFrame(BasicQosOkBody.createAMQFrame(evt.getChannelId(),(byte)8, (byte)0)); + session.writeFrame(BasicQosOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0)); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java index 9f0d096a73..5f5b7ccad1 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java @@ -46,12 +46,13 @@ public class BasicRecoverMethodHandler implements StateAwareMethodListener<Basic _logger.debug("Recover received on protocol session " + session + " and channel " + evt.getChannelId()); AMQChannel channel = session.getChannel(evt.getChannelId()); + BasicRecoverBody body = evt.getMethod(); + if (channel == null) { - throw new AMQException("Unknown channel " + evt.getChannelId()); + throw body.getChannelNotFoundException(evt.getChannelId()); } - BasicRecoverBody body = evt.getMethod(); - channel.resend(session, body.requeue); + channel.resend(session, body.requeue); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java index bdb877b7ac..bfa170cfc5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java @@ -52,6 +52,12 @@ public class ChannelFlowHandler implements StateAwareMethodListener<ChannelFlowB ChannelFlowBody body = evt.getMethod(); AMQChannel channel = session.getChannel(evt.getChannelId()); + + if (channel == null) + { + throw body.getChannelNotFoundException(evt.getChannelId()); + } + channel.setSuspended(!body.active); _logger.debug("Channel.Flow for channel " + evt.getChannelId() + ", active=" + body.active); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java index 809676cfbe..a85af61327 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java @@ -71,7 +71,7 @@ public class ConnectionOpenMethodHandler implements StateAwareMethodListener<Con if(virtualHost == null) { - throw body.getConnectionException(AMQConstant.NOT_FOUND.getCode(), "Unknown virtual host: " + virtualHostName); + throw body.getConnectionException(AMQConstant.NOT_FOUND, "Unknown virtual host: " + virtualHostName); } else { diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java index 575833a68f..be3ffcc698 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java @@ -76,7 +76,7 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange { if(body.passive && ((body.type == null) || body.type.length() ==0)) { - throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(), "Unknown exchange: " + body.exchange); + throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + body.exchange); } else { @@ -89,14 +89,14 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange } catch(AMQUnknownExchangeType e) { - throw body.getConnectionException(AMQConstant.COMMAND_INVALID.getCode(), "Unknown exchange: " + body.exchange,e); + throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Unknown exchange: " + body.exchange,e); } } } else if (!exchange.getType().equals(body.type)) { - throw new AMQConnectionException(AMQConstant.NOT_ALLOWED.getCode(), "Attempt to redeclare exchange: " + body.exchange + " of type " + exchange.getType() + " to " + body.type +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor()); + throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + body.exchange + " of type " + exchange.getType() + " to " + body.type +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor()); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java index bccb9db967..3c903b471d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.handler; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQInvalidRoutingKeyException; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.QueueBindBody; import org.apache.qpid.framing.QueueBindOkBody; @@ -35,6 +36,7 @@ import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.AMQChannel; public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> { @@ -57,17 +59,25 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> VirtualHost virtualHost = session.getVirtualHost(); ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); - final QueueBindBody body = evt.getMethod(); final AMQQueue queue; if (body.queue == null) { - queue = session.getChannel(evt.getChannelId()).getDefaultQueue(); + AMQChannel channel = session.getChannel(evt.getChannelId()); + +// if (channel == null) +// { +// throw body.getChannelNotFoundException(evt.getChannelId()); +// } + + queue = channel.getDefaultQueue(); + if (queue == null) { - throw new AMQException("No default queue defined on channel and queue was null"); + throw body.getChannelException(AMQConstant.NOT_FOUND, "No default queue defined on channel and queue was null"); } + if (body.routingKey == null) { body.routingKey = queue.getName(); @@ -80,14 +90,25 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> if (queue == null) { - throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(), "Queue " + body.queue + " does not exist."); + throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.queue + " does not exist."); } final Exchange exch = exchangeRegistry.getExchange(body.exchange); if (exch == null) { - throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(), "Exchange " + body.exchange + " does not exist."); + throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.exchange + " does not exist."); + } + try + { + exch.registerQueue(body.routingKey, queue, body.arguments); + } + catch (AMQInvalidRoutingKeyException rke) + { + throw body.getChannelException(AMQConstant.INVALID_ROUTING_KEY, body.routingKey.toString()); + } + catch (AMQException e) + { + throw body.getChannelException(AMQConstant.CHANNEL_ERROR, e.toString()); } - exch.registerQueue(body.routingKey, queue, body.arguments); queue.bind(body.routingKey, exch); if (_log.isInfoEnabled()) { @@ -98,7 +119,7 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - final AMQFrame response = QueueBindOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0); + final AMQFrame response = QueueBindOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0); session.writeFrame(response); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java index 1e4b7c9e57..2218ff604f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java @@ -43,6 +43,7 @@ import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.AMQChannel; public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclareBody> { @@ -83,7 +84,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar body.queue = createName(); } - AMQQueue queue = null; + AMQQueue queue; //TODO: do we need to check that the queue already exists with exactly the same "configuration"? synchronized (queueRegistry) @@ -94,8 +95,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar if(body.passive) { String msg = "Queue: " + body.queue + " not found."; - throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(),msg ); - + throw body.getChannelException(AMQConstant.NOT_FOUND,msg ); } else { @@ -116,12 +116,18 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar } else if(queue.getOwner() != null && !session.getContextKey().equals(queue.getOwner())) { - // todo - constant - throw body.getChannelException(405, "Cannot declare queue, as exclusive queue with same name declared on another connection"); + throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot declare queue, as exclusive queue with same name declared on another connection"); + } + + AMQChannel channel = session.getChannel(evt.getChannelId()); + if (channel == null) + { + throw body.getChannelNotFoundException(evt.getChannelId()); } + //set this as the default queue on the channel: - session.getChannel(evt.getChannelId()).setDefaultQueue(queue); + channel.setDefaultQueue(queue); } if (!body.nowait) diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java index 4c875692f0..0c7de312a7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java @@ -24,6 +24,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.QueueDeleteBody; import org.apache.qpid.framing.QueueDeleteOkBody; import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueRegistry; @@ -31,6 +32,7 @@ import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.AMQChannel; public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteBody> { @@ -65,7 +67,15 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDelete AMQQueue queue; if(body.queue == null) { - queue = session.getChannel(evt.getChannelId()).getDefaultQueue(); + AMQChannel channel = session.getChannel(evt.getChannelId()); + + if (channel == null) + { + throw body.getChannelNotFoundException(evt.getChannelId()); + } + + //get the default queue on the channel: + queue = channel.getDefaultQueue(); } else { @@ -76,19 +86,19 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDelete { if(_failIfNotFound) { - throw body.getChannelException(404, "Queue " + body.queue + " does not exist."); + throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.queue + " does not exist."); } } else { if(body.ifEmpty && !queue.isEmpty()) { - throw body.getChannelException(406, "Queue: " + body.queue + " is not empty." ); + throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.queue + " is not empty." ); } else if(body.ifUnused && !queue.isUnused()) { // TODO - Error code - throw body.getChannelException(406, "Queue: " + body.queue + " is still used." ); + throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.queue + " is still used." ); } else diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java index 3ccc61fff0..0c00436470 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java @@ -11,6 +11,7 @@ import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.AMQChannel;
public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBody>
{
@@ -39,18 +40,27 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod VirtualHost virtualHost = session.getVirtualHost();
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+ AMQChannel channel = session.getChannel(evt.getChannelId());
+
QueuePurgeBody body = evt.getMethod();
AMQQueue queue;
if(body.queue == null)
{
- queue = session.getChannel(evt.getChannelId()).getDefaultQueue();
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(evt.getChannelId());
+ }
+
+ //get the default queue on the channel:
+ queue = channel.getDefaultQueue();
+
if(queue == null)
{
if(_failIfNotFound)
{
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED.getCode(),"No queue specified.");
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,"No queue specified.");
}
-
}
}
else
@@ -62,12 +72,12 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod {
if(_failIfNotFound)
{
- throw body.getChannelException(404, "Queue " + body.queue + " does not exist.");
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.queue + " does not exist.");
}
}
else
{
- long purged = queue.clearQueue(session.getChannel(evt.getChannelId()).getStoreContext());
+ long purged = queue.clearQueue(channel.getStoreContext());
if(!body.nowait)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java index caf0efad67..3d7ec286f9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java @@ -47,7 +47,7 @@ public class TxCommitHandler implements StateAwareMethodListener<TxCommitBody> public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<TxCommitBody> evt) throws AMQException { - AMQProtocolSession session = stateManager.getProtocolSession(); + AMQProtocolSession session = stateManager.getProtocolSession(); try { @@ -56,14 +56,20 @@ public class TxCommitHandler implements StateAwareMethodListener<TxCommitBody> _log.debug("Commit received on channel " + evt.getChannelId()); } AMQChannel channel = session.getChannel(evt.getChannelId()); + + if (channel == null) + { + throw evt.getMethod().getChannelNotFoundException(evt.getChannelId()); + } + channel.commit(); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - session.writeFrame(TxCommitOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0)); + session.writeFrame(TxCommitOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0)); channel.processReturns(session); } - catch(AMQException e) + catch (AMQException e) { throw evt.getMethod().getChannelException(e.getErrorCode(), "Failed to commit: " + e.getMessage()); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java index 9088240351..8ce5a0ea73 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java @@ -45,18 +45,27 @@ public class TxRollbackHandler implements StateAwareMethodListener<TxRollbackBod public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<TxRollbackBody> evt) throws AMQException { AMQProtocolSession session = stateManager.getProtocolSession(); - - try{ + + try + { AMQChannel channel = session.getChannel(evt.getChannelId()); + + if (channel == null) + { + throw evt.getMethod().getChannelNotFoundException(evt.getChannelId()); + } + channel.rollback(); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - session.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0)); + session.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0)); //Now resend all the unacknowledged messages back to the original subscribers. //(Must be done after the TxnRollback-ok response). channel.resend(session, false); - }catch(AMQException e){ + } + catch (AMQException e) + { throw evt.getMethod().getChannelException(e.getErrorCode(), "Failed to rollback: " + e.getMessage()); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java index 29795e50ca..a9e478e301 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java @@ -27,6 +27,7 @@ import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.AMQChannel; public class TxSelectHandler implements StateAwareMethodListener<TxSelectBody> { @@ -44,11 +45,19 @@ public class TxSelectHandler implements StateAwareMethodListener<TxSelectBody> public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<TxSelectBody> evt) throws AMQException { AMQProtocolSession session = stateManager.getProtocolSession(); - - session.getChannel(evt.getChannelId()).setLocalTransactional(); + + AMQChannel channel = session.getChannel(evt.getChannelId()); + + if (channel == null) + { + throw evt.getMethod().getChannelNotFoundException(evt.getChannelId()); + } + + channel.setLocalTransactional(); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - session.writeFrame(TxSelectOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0)); + session.writeFrame(TxSelectOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0)); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index e53410420f..309fa4663a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -325,6 +325,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _logger.debug("Content header frame received: " + frame); } + //fixme what happens if getChannel returns null getChannel(frame.getChannel()).publishContentHeader((ContentHeaderBody) frame.getBodyFrame()); } @@ -334,6 +335,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _logger.debug("Content body frame received: " + frame); } + //fixme what happens if getChannel returns null getChannel(frame.getChannel()).publishContentBody((ContentBody) frame.getBodyFrame(), this); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java b/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java index 2049189e0f..c63490f019 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java @@ -35,8 +35,8 @@ public class NoConsumersException extends RequiredDeliveryException super("Immediate delivery is not possible.", message); } - public int getReplyCode() + public AMQConstant getReplyCode() { - return AMQConstant.NO_CONSUMERS.getCode(); + return AMQConstant.NO_CONSUMERS; } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index 05841ccfc0..6bdfeccc0f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -24,6 +24,8 @@ import java.util.Queue; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQChannelException; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.common.ClientProperties; import org.apache.qpid.framing.AMQShortString; @@ -37,11 +39,8 @@ import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize; /** - * Encapsulation of a supscription to a queue. - * <p/> - * Ties together the protocol session of a subscriber, the consumer tag that - * was given out by the broker and the channel id. - * <p/> + * Encapsulation of a supscription to a queue. <p/> Ties together the protocol session of a subscriber, the consumer tag + * that was given out by the broker and the channel id. <p/> */ public class SubscriptionImpl implements Subscription { @@ -59,9 +58,7 @@ public class SubscriptionImpl implements Subscription private final boolean _noLocal; - /** - * True if messages need to be acknowledged - */ + /** True if messages need to be acknowledged */ private final boolean _acks; private FilterManager _filters; private final boolean _isBrowser; @@ -96,8 +93,8 @@ public class SubscriptionImpl implements Subscription { AMQChannel channel = protocolSession.getChannel(channelId); if (channel == null) - { - throw new NullPointerException("channel not found in protocol session"); + { + throw new AMQException(AMQConstant.NOT_FOUND, "channel :" + channelId + " not found in protocol session"); } this.channel = channel; @@ -172,9 +169,7 @@ public class SubscriptionImpl implements Subscription return (o instanceof SubscriptionImpl) && equals((SubscriptionImpl) o); } - /** - * Equality holds if the session matches and the channel and consumer tag are the same. - */ + /** Equality holds if the session matches and the channel and consumer tag are the same. */ private boolean equals(SubscriptionImpl psc) { return sessionKey.equals(psc.sessionKey) @@ -193,11 +188,12 @@ public class SubscriptionImpl implements Subscription } /** - * This method can be called by each of the publisher threads. - * As a result all changes to the channel object must be thread safe. + * This method can be called by each of the publisher threads. As a result all changes to the channel object must be + * thread safe. * * @param msg * @param queue + * * @throws AMQException */ public void send(AMQMessage msg, AMQQueue queue) throws AMQException @@ -224,7 +220,7 @@ public class SubscriptionImpl implements Subscription // We don't decrement the reference here as we don't want to consume the message // but we do want to send it to the client. - synchronized(channel) + synchronized (channel) { long deliveryTag = channel.getNextDeliveryTag(); @@ -260,7 +256,7 @@ public class SubscriptionImpl implements Subscription } queue.dequeue(storeContext, msg); } - synchronized(channel) + synchronized (channel) { long deliveryTag = channel.getNextDeliveryTag(); @@ -309,11 +305,11 @@ public class SubscriptionImpl implements Subscription Object localInstance; Object msgInstance; - if((protocolSession.getClientProperties() != null) && - (localInstance = protocolSession.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null) + if ((protocolSession.getClientProperties() != null) && + (localInstance = protocolSession.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null) { - if((msg.getPublisher().getClientProperties() != null) && - (msgInstance = msg.getPublisher().getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null) + if ((msg.getPublisher().getClientProperties() != null) && + (msgInstance = msg.getPublisher().getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null) { if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance))) { @@ -402,10 +398,10 @@ public class SubscriptionImpl implements Subscription // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(), - protocolSession.getProtocolMajorVersion(), - protocolSession.getProtocolMinorVersion(), - consumerTag // consumerTag - )); + protocolSession.getProtocolMajorVersion(), + protocolSession.getProtocolMinorVersion(), + consumerTag // consumerTag + )); _closed = true; } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java index 6d1e9ce99d..29efdd9513 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java @@ -55,7 +55,6 @@ import org.apache.qpid.framing.QueuePurgeBody; import org.apache.qpid.framing.TxCommitBody; import org.apache.qpid.framing.TxRollbackBody; import org.apache.qpid.framing.TxSelectBody; -import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.server.handler.BasicAckMethodHandler; @@ -231,7 +230,7 @@ public class AMQStateManager implements AMQMethodListener && (protocolSession.getChannel(evt.getChannelId()) == null) && !protocolSession.channelAwaitingClosure(evt.getChannelId())) { - throw evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR.getCode(), "No such channel: " + evt.getChannelId()); + throw evt.getMethod().getChannelNotFoundException(evt.getChannelId()); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java b/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java index 52dcfcfbfb..0bc474f6e6 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java @@ -21,10 +21,11 @@ package org.apache.qpid.client; import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQConstant; public class AMQAuthenticationException extends AMQException { - public AMQAuthenticationException(int error, String msg) + public AMQAuthenticationException(AMQConstant error, String msg) { super(error,msg); } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index cb2533c2bb..ebaa22ce44 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -966,7 +966,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { if (cause instanceof AMQException) { - je = new JMSException(Integer.toString(((AMQException) cause).getErrorCode()), "Exception thrown against " + toString() + ": " + cause); + je = new JMSException(Integer.toString(((AMQException) cause).getErrorCode().getCode()), "Exception thrown against " + toString() + ": " + cause); } else { diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQNoConsumersException.java b/java/client/src/main/java/org/apache/qpid/client/AMQNoConsumersException.java index 277e3f7eaf..bec2958cb9 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQNoConsumersException.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQNoConsumersException.java @@ -28,7 +28,7 @@ public class AMQNoConsumersException extends AMQUndeliveredException { public AMQNoConsumersException(String msg, Object bounced) { - super(AMQConstant.NO_CONSUMERS.getCode(), msg, bounced); + super(AMQConstant.NO_CONSUMERS, msg, bounced); } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQNoRouteException.java b/java/client/src/main/java/org/apache/qpid/client/AMQNoRouteException.java index 0e84ad75f2..6ea8413446 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQNoRouteException.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQNoRouteException.java @@ -28,7 +28,7 @@ public class AMQNoRouteException extends AMQUndeliveredException { public AMQNoRouteException(String msg, Object bounced) { - super(AMQConstant.NO_ROUTE.getCode(), msg, bounced); + super(AMQConstant.NO_ROUTE, msg, bounced); } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java index d011d02a91..6ef187286b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java @@ -24,6 +24,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQChannelClosedException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQInvalidSelectorException; +import org.apache.qpid.AMQInvalidRoutingKeyException; import org.apache.qpid.client.AMQNoConsumersException; import org.apache.qpid.client.AMQNoRouteException; import org.apache.qpid.client.protocol.AMQProtocolSession; @@ -52,34 +53,39 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener _logger.debug("ChannelClose method received"); ChannelCloseBody method = (ChannelCloseBody) evt.getMethod(); - int errorCode = method.replyCode; + AMQConstant errorCode = AMQConstant.getConstant(method.replyCode); AMQShortString reason = method.replyText; if (_logger.isDebugEnabled()) { _logger.debug("Channel close reply code: " + errorCode + ", reason: " + reason); } - // TODO: Be aware of possible changes to parameter order as versions change. AMQFrame frame = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), method.getMajor(), method.getMinor()); protocolSession.writeFrame(frame); - if (errorCode != AMQConstant.REPLY_SUCCESS.getCode()) + if (errorCode != AMQConstant.REPLY_SUCCESS) { _logger.error("Channel close received with errorCode " + errorCode + ", and reason " + reason); - if (errorCode == AMQConstant.NO_CONSUMERS.getCode()) + if (errorCode == AMQConstant.NO_CONSUMERS) { throw new AMQNoConsumersException("Error: " + reason, null); } - else if (errorCode == AMQConstant.NO_ROUTE.getCode()) + else if (errorCode == AMQConstant.NO_ROUTE) { throw new AMQNoRouteException("Error: " + reason, null); } - else if (errorCode == AMQConstant.INVALID_SELECTOR.getCode()) + else if (errorCode == AMQConstant.INVALID_SELECTOR) { - _logger.info("Broker responded with Invalid Selector."); + _logger.debug("Broker responded with Invalid Selector."); throw new AMQInvalidSelectorException(String.valueOf(reason)); } + else if (errorCode == AMQConstant.INVALID_ROUTING_KEY) + { + _logger.debug("Broker responded with Invalid Routing Key."); + + throw new AMQInvalidRoutingKeyException(String.valueOf(reason)); + } else { throw new AMQChannelClosedException(errorCode, "Error: " + reason); diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java index f928ab56eb..57d987712a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java @@ -57,16 +57,16 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener // does it matter //stateManager.changeState(AMQState.CONNECTION_CLOSING); - int errorCode = method.replyCode; + AMQConstant errorCode = AMQConstant.getConstant(method.replyCode); AMQShortString reason = method.replyText; // TODO: check whether channel id of zero is appropriate // Be aware of possible changes to parameter order as versions change. protocolSession.writeFrame(ConnectionCloseOkBody.createAMQFrame((short)0, method.getMajor(), method.getMinor())); - if (errorCode != 200) + if (errorCode != AMQConstant.REPLY_SUCCESS) { - if(errorCode == AMQConstant.NOT_ALLOWED.getCode()) + if(errorCode == AMQConstant.NOT_ALLOWED) { _logger.info("Authentication Error:"+Thread.currentThread().getName()); diff --git a/java/client/src/main/java/org/apache/qpid/client/message/UnexpectedBodyReceivedException.java b/java/client/src/main/java/org/apache/qpid/client/message/UnexpectedBodyReceivedException.java index 51a9aa7226..21526ac6d2 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/UnexpectedBodyReceivedException.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/UnexpectedBodyReceivedException.java @@ -22,6 +22,7 @@ package org.apache.qpid.client.message; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQConstant; public class UnexpectedBodyReceivedException extends AMQException { @@ -36,7 +37,7 @@ public class UnexpectedBodyReceivedException extends AMQException super(logger, msg); } - public UnexpectedBodyReceivedException(Logger logger, int errorCode, String msg) + public UnexpectedBodyReceivedException(Logger logger, AMQConstant errorCode, String msg) { super(logger, errorCode, msg); } diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index d4d700966a..055109d3be 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -48,6 +48,7 @@ import org.apache.qpid.framing.ProtocolInitiation; import org.apache.qpid.framing.ProtocolVersionList; import org.apache.qpid.framing.VersionSpecificRegistry; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import org.apache.qpid.protocol.AMQConstant; /** * Wrapper for protocol session that provides type-safe access to session attributes. @@ -389,7 +390,7 @@ public class AMQProtocolSession implements ProtocolVersionList, AMQVersionAwareP * initiated the channel close, false if the channel close is just the server * responding to the client's earlier request to close the channel. */ - public boolean channelClosed(int channelId, int code, String text) throws AMQException + public boolean channelClosed(int channelId, AMQConstant code, String text) throws AMQException { final Integer chId = channelId; // if this is not a response to an earlier request to close the channel diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java index 9b214e88f9..f957df2c34 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java @@ -63,7 +63,7 @@ public class DurableSubscriptionTest extends TestCase Session session2 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE); TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, "MySubscription"); - con.start(); + con.start(); producer.send(session1.createTextMessage("A")); diff --git a/java/common/src/main/java/org/apache/qpid/AMQChannelClosedException.java b/java/common/src/main/java/org/apache/qpid/AMQChannelClosedException.java index cd8b40c6da..272933ca04 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQChannelClosedException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQChannelClosedException.java @@ -20,12 +20,14 @@ */ package org.apache.qpid; +import org.apache.qpid.protocol.AMQConstant; + /** * AMQ channel closed exception. */ public class AMQChannelClosedException extends AMQException { - public AMQChannelClosedException(int errorCode, String msg) + public AMQChannelClosedException(AMQConstant errorCode, String msg) { super(errorCode, msg); } diff --git a/java/common/src/main/java/org/apache/qpid/AMQChannelException.java b/java/common/src/main/java/org/apache/qpid/AMQChannelException.java index d1750ebbb5..d8c9b287bd 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQChannelException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQChannelException.java @@ -23,6 +23,7 @@ package org.apache.qpid; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ChannelCloseBody; +import org.apache.qpid.protocol.AMQConstant; public class AMQChannelException extends AMQException { @@ -32,7 +33,7 @@ public class AMQChannelException extends AMQException private final byte major; private final byte minor; - public AMQChannelException(int errorCode, String msg, int classId, int methodId, byte major, byte minor, Throwable t) + public AMQChannelException(AMQConstant errorCode, String msg, int classId, int methodId, byte major, byte minor, Throwable t) { super(errorCode, msg, t); _classId = classId; @@ -41,7 +42,7 @@ public class AMQChannelException extends AMQException this.minor = minor; } - public AMQChannelException(int errorCode, String msg, int classId, int methodId, byte major, byte minor) + public AMQChannelException(AMQConstant errorCode, String msg, int classId, int methodId, byte major, byte minor) { super(errorCode, msg); _classId = classId; @@ -52,6 +53,6 @@ public class AMQChannelException extends AMQException public AMQFrame getCloseFrame(int channel) { - return ChannelCloseBody.createAMQFrame(channel, major, minor, _classId, _methodId, getErrorCode(), new AMQShortString(getMessage())); + return ChannelCloseBody.createAMQFrame(channel, major, minor, _classId, _methodId, getErrorCode().getCode(), new AMQShortString(getMessage())); } } diff --git a/java/common/src/main/java/org/apache/qpid/AMQConnectionClosedException.java b/java/common/src/main/java/org/apache/qpid/AMQConnectionClosedException.java index 6ec18bad20..e0ed16a9f0 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQConnectionClosedException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQConnectionClosedException.java @@ -20,12 +20,14 @@ */ package org.apache.qpid; +import org.apache.qpid.protocol.AMQConstant; + /** * AMQ channel closed exception. */ public class AMQConnectionClosedException extends AMQException { - public AMQConnectionClosedException(int errorCode, String msg) + public AMQConnectionClosedException(AMQConstant errorCode, String msg) { super(errorCode, msg); } diff --git a/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java b/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java index c6a874bcf3..c4f80191a3 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java @@ -24,6 +24,7 @@ package org.apache.qpid; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ConnectionCloseBody; +import org.apache.qpid.protocol.AMQConstant; public class AMQConnectionException extends AMQException { @@ -34,7 +35,7 @@ public class AMQConnectionException extends AMQException private final byte minor; boolean _closeConnetion; - public AMQConnectionException(int errorCode, String msg, int classId, int methodId, byte major, byte minor, Throwable t) + public AMQConnectionException(AMQConstant errorCode, String msg, int classId, int methodId, byte major, byte minor, Throwable t) { super(errorCode, msg, t); _classId = classId; @@ -43,7 +44,7 @@ public class AMQConnectionException extends AMQException this.minor = minor; } - public AMQConnectionException(int errorCode, String msg, int classId, int methodId, byte major, byte minor) + public AMQConnectionException(AMQConstant errorCode, String msg, int classId, int methodId, byte major, byte minor) { super(errorCode, msg); _classId = classId; @@ -56,7 +57,7 @@ public class AMQConnectionException extends AMQException public AMQFrame getCloseFrame(int channel) { - return ConnectionCloseBody.createAMQFrame(channel, major, minor, _classId, _methodId, getErrorCode(), new AMQShortString(getMessage())); + return ConnectionCloseBody.createAMQFrame(channel, major, minor, _classId, _methodId, getErrorCode().getCode(), new AMQShortString(getMessage())); } diff --git a/java/common/src/main/java/org/apache/qpid/AMQException.java b/java/common/src/main/java/org/apache/qpid/AMQException.java index 93c31e4fa8..5c11ec18ca 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQException.java @@ -21,31 +21,36 @@ package org.apache.qpid; import org.apache.log4j.Logger; +import org.apache.qpid.protocol.AMQConstant; /** * Generic AMQ exception. */ public class AMQException extends Exception { - private int _errorCode; + private AMQConstant _errorCode; public AMQException(String message) { super(message); + //fixme This method needs removed and all AMQExceptions need a valid error code + _errorCode = AMQConstant.getConstant(-1); } public AMQException(String msg, Throwable t) { super(msg, t); + //fixme This method needs removed and all AMQExceptions need a valid error code + _errorCode = AMQConstant.getConstant(-1); } - public AMQException(int errorCode, String msg, Throwable t) + public AMQException(AMQConstant errorCode, String msg, Throwable t) { super(msg + " [error code " + errorCode + ']', t); _errorCode = errorCode; } - public AMQException(int errorCode, String msg) + public AMQException(AMQConstant errorCode, String msg) { super(msg + " [error code " + errorCode + ']'); _errorCode = errorCode; @@ -63,13 +68,13 @@ public class AMQException extends Exception logger.error(getMessage(), this); } - public AMQException(Logger logger, int errorCode, String msg) + public AMQException(Logger logger, AMQConstant errorCode, String msg) { this(errorCode, msg); logger.error(getMessage(), this); } - public int getErrorCode() + public AMQConstant getErrorCode() { return _errorCode; } diff --git a/java/common/src/main/java/org/apache/qpid/AMQInvalidRoutingKeyException.java b/java/common/src/main/java/org/apache/qpid/AMQInvalidRoutingKeyException.java new file mode 100644 index 0000000000..3293e2523d --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/AMQInvalidRoutingKeyException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid; + +import org.apache.qpid.protocol.AMQConstant; + +public class AMQInvalidRoutingKeyException extends AMQException +{ + public AMQInvalidRoutingKeyException(String message) + { + super(AMQConstant.INVALID_ROUTING_KEY,message); + } +} diff --git a/java/common/src/main/java/org/apache/qpid/AMQInvalidSelectorException.java b/java/common/src/main/java/org/apache/qpid/AMQInvalidSelectorException.java index dcd039b789..9d003514ad 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQInvalidSelectorException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQInvalidSelectorException.java @@ -26,6 +26,6 @@ public class AMQInvalidSelectorException extends AMQException { public AMQInvalidSelectorException(String message) { - super(AMQConstant.INVALID_SELECTOR.getCode(),message); + super(AMQConstant.INVALID_SELECTOR,message); } } diff --git a/java/common/src/main/java/org/apache/qpid/AMQUndeliveredException.java b/java/common/src/main/java/org/apache/qpid/AMQUndeliveredException.java index 4944ccc371..ad5aff7bb6 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQUndeliveredException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQUndeliveredException.java @@ -20,6 +20,8 @@ */ package org.apache.qpid; +import org.apache.qpid.protocol.AMQConstant; + /** * Generic AMQ exception. */ @@ -27,7 +29,7 @@ public class AMQUndeliveredException extends AMQException { private Object _bounced; - public AMQUndeliveredException(int errorCode, String msg, Object bounced) + public AMQUndeliveredException(AMQConstant errorCode, String msg, Object bounced) { super(errorCode, msg); diff --git a/java/common/src/main/java/org/apache/qpid/configuration/PropertyException.java b/java/common/src/main/java/org/apache/qpid/configuration/PropertyException.java index 67af0b0b74..958f59191f 100644 --- a/java/common/src/main/java/org/apache/qpid/configuration/PropertyException.java +++ b/java/common/src/main/java/org/apache/qpid/configuration/PropertyException.java @@ -22,6 +22,7 @@ package org.apache.qpid.configuration; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQConstant; /** * Indicates an error parsing a property expansion. @@ -38,12 +39,12 @@ public class PropertyException extends AMQException super(msg, t); } - public PropertyException(int errorCode, String msg, Throwable t) + public PropertyException(AMQConstant errorCode, String msg, Throwable t) { super(errorCode, msg, t); } - public PropertyException(int errorCode, String msg) + public PropertyException(AMQConstant errorCode, String msg) { super(errorCode, msg); } @@ -58,7 +59,7 @@ public class PropertyException extends AMQException super(logger, msg); } - public PropertyException(Logger logger, int errorCode, String msg) + public PropertyException(Logger logger, AMQConstant errorCode, String msg) { super(logger, errorCode, msg); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java index 3fa5b150ab..111d9a8f20 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java @@ -23,19 +23,26 @@ package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQConnectionException; +import org.apache.qpid.protocol.AMQConstant; public abstract class AMQMethodBody extends AMQBody { - public static final byte TYPE = 1; - - /** - * AMQP version - */ + public static final byte TYPE = 1; + + /** AMQP version */ protected byte major; protected byte minor; - public byte getMajor() { return major; } - public byte getMinor() { return minor; } - + + public byte getMajor() + { + return major; + } + + public byte getMinor() + { + return minor; + } + public AMQMethodBody(byte major, byte minor) { this.major = major; @@ -45,14 +52,10 @@ public abstract class AMQMethodBody extends AMQBody /** unsigned short */ protected abstract int getBodySize(); - /** - * @return unsigned short - */ + /** @return unsigned short */ protected abstract int getClazz(); - /** - * @return unsigned short - */ + /** @return unsigned short */ protected abstract int getMethod(); protected abstract void writeMethodPayload(ByteBuffer buffer); @@ -90,27 +93,38 @@ public abstract class AMQMethodBody extends AMQBody } /** - * Creates an AMQChannelException for the corresponding body type (a channel exception - * should include the class and method ids of the body it resulted from). + * Creates an AMQChannelException for the corresponding body type (a channel exception should include the class and + * method ids of the body it resulted from). + */ + + /** + * Convenience Method to create a channel not found exception + * + * @param channelId The channel id that is not found + * + * @return new AMQChannelException */ - public AMQChannelException getChannelException(int code, String message) + public AMQChannelException getChannelNotFoundException(int channelId) + { + return getChannelException(AMQConstant.NOT_FOUND, "Channel not found for id:" + channelId); + } + + public AMQChannelException getChannelException(AMQConstant code, String message) { return new AMQChannelException(code, message, getClazz(), getMethod(), major, minor); } - public AMQChannelException getChannelException(int code, String message, Throwable cause) + public AMQChannelException getChannelException(AMQConstant code, String message, Throwable cause) { return new AMQChannelException(code, message, getClazz(), getMethod(), major, minor, cause); } - public AMQConnectionException getConnectionException(int code, String message) + public AMQConnectionException getConnectionException(AMQConstant code, String message) { return new AMQConnectionException(code, message, getClazz(), getMethod(), major, minor); } - - - public AMQConnectionException getConnectionException(int code, String message, Throwable cause) + public AMQConnectionException getConnectionException(AMQConstant code, String message, Throwable cause) { return new AMQConnectionException(code, message, getClazz(), getMethod(), major, minor, cause); } diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java index a4d90e9ee3..05365de137 100644 --- a/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java +++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java @@ -69,7 +69,7 @@ public final class AMQConstant public static final AMQConstant MESSAGE_TOO_LARGE = new AMQConstant(311, "message too large", true); public static final AMQConstant NO_ROUTE = new AMQConstant(312, "no route", true); - + public static final AMQConstant NO_CONSUMERS = new AMQConstant(313, "no consumers", true); public static final AMQConstant CONTEXT_IN_USE = new AMQConstant(320, "context in use", true); @@ -78,12 +78,18 @@ public final class AMQConstant public static final AMQConstant INVALID_SELECTOR = new AMQConstant(322, "selector invalid", true); + public static final AMQConstant INVALID_ROUTING_KEY = new AMQConstant(323, "routing key invalid", true); + public static final AMQConstant INVALID_PATH = new AMQConstant(402, "invalid path", true); public static final AMQConstant ACCESS_REFUSED = new AMQConstant(403, "access refused", true); public static final AMQConstant NOT_FOUND = new AMQConstant(404, "not found", true); + public static final AMQConstant ALREADY_EXISTS = new AMQConstant(405, "Already exists", true); + + public static final AMQConstant IN_USE = new AMQConstant(406, "In use", true); + public static final AMQConstant FRAME_ERROR = new AMQConstant(501, "frame error", true); public static final AMQConstant SYNTAX_ERROR = new AMQConstant(502, "syntax error", true); |