diff options
Diffstat (limited to 'java/client/src')
5 files changed, 63 insertions, 43 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 693358c3ae..8a15fffe84 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -61,8 +61,6 @@ import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.framing.*; -import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9; -import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91; import org.apache.qpid.jms.Session; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; @@ -314,21 +312,12 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe if(getProtocolHandler().getProtocolVersion().equals(ProtocolVersion.v8_0)) { BasicRecoverBody body = getMethodRegistry().createBasicRecoverBody(false); - getAMQConnection().getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), BasicRecoverOkBody.class); - } - else if(getProtocolVersion().equals(ProtocolVersion.v0_9)) - { - BasicRecoverSyncBody body = ((MethodRegistry_0_9)getMethodRegistry()).createBasicRecoverSyncBody(false); - getAMQConnection().getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), BasicRecoverSyncOkBody.class); - } - else if(getProtocolVersion().equals(ProtocolVersion.v0_91)) - { - BasicRecoverSyncBody body = ((MethodRegistry_0_91)getMethodRegistry()).createBasicRecoverSyncBody(false); getAMQConnection().getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), BasicRecoverSyncOkBody.class); } else { - throw new RuntimeException("Unsupported version of the AMQP Protocol: " + getProtocolVersion()); + BasicRecoverSyncBody body = getMethodRegistry().createBasicRecoverSyncBody(false); + getAMQConnection().getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), BasicRecoverSyncOkBody.class); } } } @@ -1145,33 +1134,22 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe if (isBound(null, AMQShortString.valueOf(queue), null)) { - MethodRegistry methodRegistry = getProtocolHandler().getMethodRegistry(); - AMQMethodBody body; - if (methodRegistry instanceof MethodRegistry_0_9) + + if(ProtocolVersion.v8_0.equals(getProtocolVersion())) { - String bindingKey = binding.getBindingKey() == null ? queue : binding.getBindingKey(); + throw new AMQException(AMQConstant.NOT_IMPLEMENTED, "Cannot unbind a queue in AMQP 0-8"); + } - MethodRegistry_0_9 methodRegistry_0_9 = (MethodRegistry_0_9) methodRegistry; - body = methodRegistry_0_9.createQueueUnbindBody(getTicket(), + MethodRegistry methodRegistry = getProtocolHandler().getMethodRegistry(); + + String bindingKey = binding.getBindingKey() == null ? queue : binding.getBindingKey(); + + AMQMethodBody body = methodRegistry.createQueueUnbindBody(getTicket(), AMQShortString.valueOf(queue), AMQShortString.valueOf(exchange), AMQShortString.valueOf(bindingKey), null); - } - else if (methodRegistry instanceof MethodRegistry_0_91) - { - MethodRegistry_0_91 methodRegistry_0_91 = (MethodRegistry_0_91) methodRegistry; - body = methodRegistry_0_91.createQueueUnbindBody(getTicket(), - AMQShortString.valueOf(queue), - AMQShortString.valueOf(exchange), - AMQShortString.valueOf(binding.getBindingKey()), - null); - } - else - { - throw new AMQException(AMQConstant.NOT_IMPLEMENTED, "Cannot unbind a queue in AMQP 0-8"); - } getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), QueueUnbindOkBody.class); return null; } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java index 29a064b712..8144fd1258 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java @@ -125,6 +125,21 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher return false; } + @Override + public boolean dispatchQueueUnbindOk(final QueueUnbindOkBody body, final int channelId) + throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + @Override + public boolean dispatchBasicRecoverSyncOk(final BasicRecoverSyncOkBody basicRecoverSyncOkBody, + final int channelId) + throws AMQException + { + return false; + } + public boolean dispatchBasicCancelOk(BasicCancelOkBody body, int channelId) throws AMQException { _basicCancelOkMethodHandler.methodReceived(_session, body, channelId); @@ -365,6 +380,12 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher throw new AMQMethodNotImplementedException(body); } + @Override + public boolean dispatchQueueUnbind(final QueueUnbindBody queueUnbindBody, final int channelId) throws AMQException + { + return false; + } + public boolean dispatchExchangeBoundOk(ExchangeBoundOkBody body, int channelId) throws AMQException { _exchangeBoundOkMethodHandler.methodReceived(_session, body, channelId); diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java index d2ae057e6d..fc2037097f 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java @@ -22,8 +22,11 @@ import org.apache.qpid.AMQException; import org.apache.qpid.client.protocol.AMQProtocolSession; -import org.apache.qpid.framing.BasicRecoverOkBody; +import org.apache.qpid.client.state.AMQMethodNotImplementedException; +import org.apache.qpid.framing.BasicRecoverSyncOkBody; import org.apache.qpid.framing.ChannelAlertBody; +import org.apache.qpid.framing.QueueUnbindBody; +import org.apache.qpid.framing.QueueUnbindOkBody; import org.apache.qpid.framing.amqp_8_0.MethodDispatcher_8_0; public class ClientMethodDispatcherImpl_8_0 extends ClientMethodDispatcherImpl implements MethodDispatcher_8_0 @@ -33,14 +36,27 @@ public class ClientMethodDispatcherImpl_8_0 extends ClientMethodDispatcherImpl i super(session); } - public boolean dispatchBasicRecoverOk(BasicRecoverOkBody body, int channelId) throws AMQException + public boolean dispatchChannelAlert(ChannelAlertBody body, int channelId) throws AMQException { return false; } - public boolean dispatchChannelAlert(ChannelAlertBody body, int channelId) throws AMQException + @Override + public boolean dispatchQueueUnbindOk(final QueueUnbindOkBody queueUnbindOkBody, final int channelId) + { + return false; + } + + @Override + public boolean dispatchBasicRecoverSyncOk(final BasicRecoverSyncOkBody basicRecoverSyncOkBody, + final int channelId) { return false; } + @Override + public boolean dispatchQueueUnbind(final QueueUnbindBody body, final int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 5c9d8f9b91..78f6273db8 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -753,8 +753,12 @@ public class AMQProtocolHandler implements ProtocolEngine // Connection is already closed then don't do a syncWrite try { - final ConnectionCloseBody body = _protocolSession.getMethodRegistry().createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), // replyCode - new AMQShortString("JMS client is closing the connection."), 0, 0); + final ConnectionCloseBody body = _protocolSession.getMethodRegistry().createConnectionCloseBody( + AMQConstant.REPLY_SUCCESS.getCode(), + // replyCode + new AMQShortString("JMS client is closing the connection."), + 0, + 0); final AMQFrame frame = body.generateFrame(0); syncWrite(frame, ConnectionCloseOkBody.class, timeout); diff --git a/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java b/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java index 70fcfcedb8..61e5247ead 100644 --- a/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java +++ b/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java @@ -20,6 +20,9 @@ */ package org.apache.qpid.client.protocol; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + import junit.framework.TestCase; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,12 +35,10 @@ import org.apache.qpid.client.transport.TestNetworkConnection; import org.apache.qpid.framing.AMQBody; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.amqp_8_0.BasicRecoverOkBodyImpl; +import org.apache.qpid.framing.BasicRecoverSyncOkBody; +import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.protocol.AMQConstant; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - /** * This is a test address QPID-1431 where frame listeners would fail to be notified of an incomming exception. * @@ -75,7 +76,7 @@ public class AMQProtocolHandlerTest extends TestCase //Create a new ProtocolHandler with a fake connection. _handler = new AMQProtocolHandler(new MockAMQConnection("amqp://guest:guest@client/test?brokerlist='tcp://localhost:1'")); _handler.setNetworkConnection(new TestNetworkConnection()); - AMQBody body = BasicRecoverOkBodyImpl.getFactory().newInstance(null, 1); + AMQBody body = new BasicRecoverSyncOkBody(ProtocolVersion.v8_0); _blockFrame = new AMQFrame(0, body); _handleCountDown = new CountDownLatch(1); |