summaryrefslogtreecommitdiff
path: root/java/client/src/main/java/org
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/main/java/org')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java44
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java21
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java22
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java8
4 files changed, 57 insertions, 38 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index 693358c3ae..8a15fffe84 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -61,8 +61,6 @@ import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
-import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91;
import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
@@ -314,21 +312,12 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
if(getProtocolHandler().getProtocolVersion().equals(ProtocolVersion.v8_0))
{
BasicRecoverBody body = getMethodRegistry().createBasicRecoverBody(false);
- getAMQConnection().getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), BasicRecoverOkBody.class);
- }
- else if(getProtocolVersion().equals(ProtocolVersion.v0_9))
- {
- BasicRecoverSyncBody body = ((MethodRegistry_0_9)getMethodRegistry()).createBasicRecoverSyncBody(false);
- getAMQConnection().getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), BasicRecoverSyncOkBody.class);
- }
- else if(getProtocolVersion().equals(ProtocolVersion.v0_91))
- {
- BasicRecoverSyncBody body = ((MethodRegistry_0_91)getMethodRegistry()).createBasicRecoverSyncBody(false);
getAMQConnection().getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), BasicRecoverSyncOkBody.class);
}
else
{
- throw new RuntimeException("Unsupported version of the AMQP Protocol: " + getProtocolVersion());
+ BasicRecoverSyncBody body = getMethodRegistry().createBasicRecoverSyncBody(false);
+ getAMQConnection().getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), BasicRecoverSyncOkBody.class);
}
}
}
@@ -1145,33 +1134,22 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
if (isBound(null, AMQShortString.valueOf(queue), null))
{
- MethodRegistry methodRegistry = getProtocolHandler().getMethodRegistry();
- AMQMethodBody body;
- if (methodRegistry instanceof MethodRegistry_0_9)
+
+ if(ProtocolVersion.v8_0.equals(getProtocolVersion()))
{
- String bindingKey = binding.getBindingKey() == null ? queue : binding.getBindingKey();
+ throw new AMQException(AMQConstant.NOT_IMPLEMENTED, "Cannot unbind a queue in AMQP 0-8");
+ }
- MethodRegistry_0_9 methodRegistry_0_9 = (MethodRegistry_0_9) methodRegistry;
- body = methodRegistry_0_9.createQueueUnbindBody(getTicket(),
+ MethodRegistry methodRegistry = getProtocolHandler().getMethodRegistry();
+
+ String bindingKey = binding.getBindingKey() == null ? queue : binding.getBindingKey();
+
+ AMQMethodBody body = methodRegistry.createQueueUnbindBody(getTicket(),
AMQShortString.valueOf(queue),
AMQShortString.valueOf(exchange),
AMQShortString.valueOf(bindingKey),
null);
- }
- else if (methodRegistry instanceof MethodRegistry_0_91)
- {
- MethodRegistry_0_91 methodRegistry_0_91 = (MethodRegistry_0_91) methodRegistry;
- body = methodRegistry_0_91.createQueueUnbindBody(getTicket(),
- AMQShortString.valueOf(queue),
- AMQShortString.valueOf(exchange),
- AMQShortString.valueOf(binding.getBindingKey()),
- null);
- }
- else
- {
- throw new AMQException(AMQConstant.NOT_IMPLEMENTED, "Cannot unbind a queue in AMQP 0-8");
- }
getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), QueueUnbindOkBody.class);
return null;
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
index 29a064b712..8144fd1258 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
@@ -125,6 +125,21 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher
return false;
}
+ @Override
+ public boolean dispatchQueueUnbindOk(final QueueUnbindOkBody body, final int channelId)
+ throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ @Override
+ public boolean dispatchBasicRecoverSyncOk(final BasicRecoverSyncOkBody basicRecoverSyncOkBody,
+ final int channelId)
+ throws AMQException
+ {
+ return false;
+ }
+
public boolean dispatchBasicCancelOk(BasicCancelOkBody body, int channelId) throws AMQException
{
_basicCancelOkMethodHandler.methodReceived(_session, body, channelId);
@@ -365,6 +380,12 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher
throw new AMQMethodNotImplementedException(body);
}
+ @Override
+ public boolean dispatchQueueUnbind(final QueueUnbindBody queueUnbindBody, final int channelId) throws AMQException
+ {
+ return false;
+ }
+
public boolean dispatchExchangeBoundOk(ExchangeBoundOkBody body, int channelId) throws AMQException
{
_exchangeBoundOkMethodHandler.methodReceived(_session, body, channelId);
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java
index d2ae057e6d..fc2037097f 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java
@@ -22,8 +22,11 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.framing.BasicRecoverOkBody;
+import org.apache.qpid.client.state.AMQMethodNotImplementedException;
+import org.apache.qpid.framing.BasicRecoverSyncOkBody;
import org.apache.qpid.framing.ChannelAlertBody;
+import org.apache.qpid.framing.QueueUnbindBody;
+import org.apache.qpid.framing.QueueUnbindOkBody;
import org.apache.qpid.framing.amqp_8_0.MethodDispatcher_8_0;
public class ClientMethodDispatcherImpl_8_0 extends ClientMethodDispatcherImpl implements MethodDispatcher_8_0
@@ -33,14 +36,27 @@ public class ClientMethodDispatcherImpl_8_0 extends ClientMethodDispatcherImpl i
super(session);
}
- public boolean dispatchBasicRecoverOk(BasicRecoverOkBody body, int channelId) throws AMQException
+ public boolean dispatchChannelAlert(ChannelAlertBody body, int channelId) throws AMQException
{
return false;
}
- public boolean dispatchChannelAlert(ChannelAlertBody body, int channelId) throws AMQException
+ @Override
+ public boolean dispatchQueueUnbindOk(final QueueUnbindOkBody queueUnbindOkBody, final int channelId)
+ {
+ return false;
+ }
+
+ @Override
+ public boolean dispatchBasicRecoverSyncOk(final BasicRecoverSyncOkBody basicRecoverSyncOkBody,
+ final int channelId)
{
return false;
}
+ @Override
+ public boolean dispatchQueueUnbind(final QueueUnbindBody body, final int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 5c9d8f9b91..78f6273db8 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -753,8 +753,12 @@ public class AMQProtocolHandler implements ProtocolEngine
// Connection is already closed then don't do a syncWrite
try
{
- final ConnectionCloseBody body = _protocolSession.getMethodRegistry().createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
- new AMQShortString("JMS client is closing the connection."), 0, 0);
+ final ConnectionCloseBody body = _protocolSession.getMethodRegistry().createConnectionCloseBody(
+ AMQConstant.REPLY_SUCCESS.getCode(),
+ // replyCode
+ new AMQShortString("JMS client is closing the connection."),
+ 0,
+ 0);
final AMQFrame frame = body.generateFrame(0);
syncWrite(frame, ConnectionCloseOkBody.class, timeout);