diff options
Diffstat (limited to 'java/client/src/main/java/org')
4 files changed, 57 insertions, 38 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 693358c3ae..8a15fffe84 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -61,8 +61,6 @@ import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.framing.*; -import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9; -import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91; import org.apache.qpid.jms.Session; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; @@ -314,21 +312,12 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe if(getProtocolHandler().getProtocolVersion().equals(ProtocolVersion.v8_0)) { BasicRecoverBody body = getMethodRegistry().createBasicRecoverBody(false); - getAMQConnection().getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), BasicRecoverOkBody.class); - } - else if(getProtocolVersion().equals(ProtocolVersion.v0_9)) - { - BasicRecoverSyncBody body = ((MethodRegistry_0_9)getMethodRegistry()).createBasicRecoverSyncBody(false); - getAMQConnection().getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), BasicRecoverSyncOkBody.class); - } - else if(getProtocolVersion().equals(ProtocolVersion.v0_91)) - { - BasicRecoverSyncBody body = ((MethodRegistry_0_91)getMethodRegistry()).createBasicRecoverSyncBody(false); getAMQConnection().getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), BasicRecoverSyncOkBody.class); } else { - throw new RuntimeException("Unsupported version of the AMQP Protocol: " + getProtocolVersion()); + BasicRecoverSyncBody body = getMethodRegistry().createBasicRecoverSyncBody(false); + getAMQConnection().getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), BasicRecoverSyncOkBody.class); } } } @@ -1145,33 +1134,22 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe if (isBound(null, AMQShortString.valueOf(queue), null)) { - MethodRegistry methodRegistry = getProtocolHandler().getMethodRegistry(); - AMQMethodBody body; - if (methodRegistry instanceof MethodRegistry_0_9) + + if(ProtocolVersion.v8_0.equals(getProtocolVersion())) { - String bindingKey = binding.getBindingKey() == null ? queue : binding.getBindingKey(); + throw new AMQException(AMQConstant.NOT_IMPLEMENTED, "Cannot unbind a queue in AMQP 0-8"); + } - MethodRegistry_0_9 methodRegistry_0_9 = (MethodRegistry_0_9) methodRegistry; - body = methodRegistry_0_9.createQueueUnbindBody(getTicket(), + MethodRegistry methodRegistry = getProtocolHandler().getMethodRegistry(); + + String bindingKey = binding.getBindingKey() == null ? queue : binding.getBindingKey(); + + AMQMethodBody body = methodRegistry.createQueueUnbindBody(getTicket(), AMQShortString.valueOf(queue), AMQShortString.valueOf(exchange), AMQShortString.valueOf(bindingKey), null); - } - else if (methodRegistry instanceof MethodRegistry_0_91) - { - MethodRegistry_0_91 methodRegistry_0_91 = (MethodRegistry_0_91) methodRegistry; - body = methodRegistry_0_91.createQueueUnbindBody(getTicket(), - AMQShortString.valueOf(queue), - AMQShortString.valueOf(exchange), - AMQShortString.valueOf(binding.getBindingKey()), - null); - } - else - { - throw new AMQException(AMQConstant.NOT_IMPLEMENTED, "Cannot unbind a queue in AMQP 0-8"); - } getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), QueueUnbindOkBody.class); return null; } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java index 29a064b712..8144fd1258 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java @@ -125,6 +125,21 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher return false; } + @Override + public boolean dispatchQueueUnbindOk(final QueueUnbindOkBody body, final int channelId) + throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + @Override + public boolean dispatchBasicRecoverSyncOk(final BasicRecoverSyncOkBody basicRecoverSyncOkBody, + final int channelId) + throws AMQException + { + return false; + } + public boolean dispatchBasicCancelOk(BasicCancelOkBody body, int channelId) throws AMQException { _basicCancelOkMethodHandler.methodReceived(_session, body, channelId); @@ -365,6 +380,12 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher throw new AMQMethodNotImplementedException(body); } + @Override + public boolean dispatchQueueUnbind(final QueueUnbindBody queueUnbindBody, final int channelId) throws AMQException + { + return false; + } + public boolean dispatchExchangeBoundOk(ExchangeBoundOkBody body, int channelId) throws AMQException { _exchangeBoundOkMethodHandler.methodReceived(_session, body, channelId); diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java index d2ae057e6d..fc2037097f 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java @@ -22,8 +22,11 @@ import org.apache.qpid.AMQException; import org.apache.qpid.client.protocol.AMQProtocolSession; -import org.apache.qpid.framing.BasicRecoverOkBody; +import org.apache.qpid.client.state.AMQMethodNotImplementedException; +import org.apache.qpid.framing.BasicRecoverSyncOkBody; import org.apache.qpid.framing.ChannelAlertBody; +import org.apache.qpid.framing.QueueUnbindBody; +import org.apache.qpid.framing.QueueUnbindOkBody; import org.apache.qpid.framing.amqp_8_0.MethodDispatcher_8_0; public class ClientMethodDispatcherImpl_8_0 extends ClientMethodDispatcherImpl implements MethodDispatcher_8_0 @@ -33,14 +36,27 @@ public class ClientMethodDispatcherImpl_8_0 extends ClientMethodDispatcherImpl i super(session); } - public boolean dispatchBasicRecoverOk(BasicRecoverOkBody body, int channelId) throws AMQException + public boolean dispatchChannelAlert(ChannelAlertBody body, int channelId) throws AMQException { return false; } - public boolean dispatchChannelAlert(ChannelAlertBody body, int channelId) throws AMQException + @Override + public boolean dispatchQueueUnbindOk(final QueueUnbindOkBody queueUnbindOkBody, final int channelId) + { + return false; + } + + @Override + public boolean dispatchBasicRecoverSyncOk(final BasicRecoverSyncOkBody basicRecoverSyncOkBody, + final int channelId) { return false; } + @Override + public boolean dispatchQueueUnbind(final QueueUnbindBody body, final int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 5c9d8f9b91..78f6273db8 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -753,8 +753,12 @@ public class AMQProtocolHandler implements ProtocolEngine // Connection is already closed then don't do a syncWrite try { - final ConnectionCloseBody body = _protocolSession.getMethodRegistry().createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), // replyCode - new AMQShortString("JMS client is closing the connection."), 0, 0); + final ConnectionCloseBody body = _protocolSession.getMethodRegistry().createConnectionCloseBody( + AMQConstant.REPLY_SUCCESS.getCode(), + // replyCode + new AMQShortString("JMS client is closing the connection."), + 0, + 0); final AMQFrame frame = body.generateFrame(0); syncWrite(frame, ConnectionCloseOkBody.class, timeout); |