diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-09-29 23:20:39 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-09-29 23:20:39 +0000 |
| commit | 38e0deb8298e6f4b27255066b8309cd19f703958 (patch) | |
| tree | b49e9ab3a2ba9cf2a1b07ea4eeb31951b7f2aa05 /java/broker-plugins | |
| parent | 17ff2442352bac23f3776b3a8a6e4e419833e787 (diff) | |
| download | qpid-python-38e0deb8298e6f4b27255066b8309cd19f703958.tar.gz | |
Merge method body implementation classes
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6125-ProtocolRefactoring@1628336 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker-plugins')
9 files changed, 71 insertions, 129 deletions
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index 765e7d6ad7..cc6b76957a 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -1407,7 +1407,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(getProtocolVersion()); - ConnectionCloseBody closeBody = methodRegistry.createConnectionCloseBody(200, AMQShortString.validValueOf(throwable.getMessage()),0,0); + ConnectionCloseBody closeBody = methodRegistry.createConnectionCloseBody(200, + AMQShortString.validValueOf( + throwable.getMessage()), + 0, + 0); writeFrame(closeBody.generateFrame(0)); diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/AccessRequestHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/AccessRequestHandler.java index bf89a812b9..df66120731 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/AccessRequestHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/AccessRequestHandler.java @@ -25,8 +25,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.AccessRequestBody;
import org.apache.qpid.framing.AccessRequestOkBody;
import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
-import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0;
+import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
@@ -63,22 +62,14 @@ public class AccessRequestHandler implements StateAwareMethodListener<AccessRequ MethodRegistry methodRegistry = connection.getMethodRegistry();
- // We don't implement access control class, but to keep clients happy that expect it
- // always use the "0" ticket.
- AccessRequestOkBody response;
- if(methodRegistry instanceof MethodRegistry_0_9)
- {
- response = ((MethodRegistry_0_9)methodRegistry).createAccessRequestOkBody(0);
- }
- else if(methodRegistry instanceof MethodRegistry_8_0)
- {
- response = ((MethodRegistry_8_0)methodRegistry).createAccessRequestOkBody(0);
- }
- else
+ if(ProtocolVersion.v0_91.equals(connection.getProtocolVersion()) )
{
throw new AMQException(AMQConstant.COMMAND_INVALID, "AccessRequest not present in AMQP versions other than 0-8, 0-9");
}
+ // We don't implement access control class, but to keep clients happy that expect it
+ // always use the "0" ticket.
+ AccessRequestOkBody response = methodRegistry.createAccessRequestOkBody(0);
channel.sync();
connection.writeFrame(response.generateFrame(channelId));
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java index e90744ab5e..b352a2772c 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java @@ -28,6 +28,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.protocol.v0_8.AMQChannel; @@ -94,7 +95,10 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basic throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); } - MessagePublishInfo info = connection.getMethodRegistry().getProtocolVersionMethodConverter().convertToInfo(body); + MessagePublishInfo info = new MessagePublishInfoImpl(body.getExchange(), + body.getImmediate(), + body.getMandatory(), + body.getRoutingKey()); info.setExchange(exchangeName); try { diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java index 2f0468fd15..29ddf4421a 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java @@ -25,8 +25,8 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.BasicRecoverBody; +import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.framing.ProtocolVersion; -import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; @@ -61,8 +61,8 @@ public class BasicRecoverMethodHandler implements StateAwareMethodListener<Basic // In Qpid 0-9 we create a separate sync-recover, sync-recover-ok pair to be "more" compliant if(connection.getProtocolVersion().equals(ProtocolVersion.v8_0)) { - MethodRegistry_8_0 methodRegistry = (MethodRegistry_8_0) connection.getMethodRegistry(); - AMQMethodBody recoverOk = methodRegistry.createBasicRecoverOkBody(); + MethodRegistry methodRegistry = connection.getMethodRegistry(); + AMQMethodBody recoverOk = methodRegistry.createBasicRecoverSyncOkBody(); channel.sync(); connection.writeFrame(recoverOk.generateFrame(channelId)); diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java index efcec299e9..b75492a65d 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java @@ -26,9 +26,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.BasicRecoverSyncBody;
-import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
-import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91;
+import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
@@ -60,22 +58,9 @@ public class BasicRecoverSyncMethodHandler implements StateAwareMethodListener<B channel.sync();
channel.resend();
- // Qpid 0-8 hacks a synchronous -ok onto recover.
- // In Qpid 0-9 we create a separate sync-recover, sync-recover-ok pair to be "more" compliant
- if(connection.getProtocolVersion().equals(ProtocolVersion.v0_9))
- {
- MethodRegistry_0_9 methodRegistry = (MethodRegistry_0_9) connection.getMethodRegistry();
- AMQMethodBody recoverOk = methodRegistry.createBasicRecoverSyncOkBody();
- connection.writeFrame(recoverOk.generateFrame(channelId));
-
- }
- else if(connection.getProtocolVersion().equals(ProtocolVersion.v0_91))
- {
- MethodRegistry_0_91 methodRegistry = (MethodRegistry_0_91) connection.getMethodRegistry();
- AMQMethodBody recoverOk = methodRegistry.createBasicRecoverSyncOkBody();
- connection.writeFrame(recoverOk.generateFrame(channelId));
-
- }
+ MethodRegistry methodRegistry = connection.getMethodRegistry();
+ AMQMethodBody recoverOk = methodRegistry.createBasicRecoverSyncOkBody();
+ connection.writeFrame(recoverOk.generateFrame(channelId));
}
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelOpenHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelOpenHandler.java index dbfe07840f..cb1a59ba2a 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelOpenHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelOpenHandler.java @@ -20,26 +20,15 @@ */ package org.apache.qpid.server.protocol.v0_8.handler; -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.util.UUID; - import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.ChannelOpenBody; import org.apache.qpid.framing.ChannelOpenOkBody; -import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.framing.ProtocolVersion; -import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9; -import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91; -import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; -import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHostImpl; public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenBody> @@ -76,66 +65,9 @@ public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenB ChannelOpenOkBody response; - ProtocolVersion pv = connection.getProtocolVersion(); - if(pv.equals(ProtocolVersion.v8_0)) - { - MethodRegistry_8_0 methodRegistry = (MethodRegistry_8_0) MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0); - response = methodRegistry.createChannelOpenOkBody(); + response = connection.getMethodRegistry().createChannelOpenOkBody(); - } - else if(pv.equals(ProtocolVersion.v0_9)) - { - MethodRegistry_0_9 methodRegistry = (MethodRegistry_0_9) MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9); - UUID uuid = UUID.randomUUID(); - ByteArrayOutputStream output = new ByteArrayOutputStream(); - DataOutputStream dataOut = new DataOutputStream(output); - try - { - dataOut.writeLong(uuid.getMostSignificantBits()); - dataOut.writeLong(uuid.getLeastSignificantBits()); - dataOut.flush(); - dataOut.close(); - } - catch (IOException e) - { - // This *really* shouldn't happen as we're not doing any I/O - throw new ConnectionScopedRuntimeException("I/O exception when writing to byte array", e); - } - - // should really associate this channelId to the session - byte[] channelName = output.toByteArray(); - - response = methodRegistry.createChannelOpenOkBody(channelName); - } - else if(pv.equals(ProtocolVersion.v0_91)) - { - MethodRegistry_0_91 methodRegistry = (MethodRegistry_0_91) MethodRegistry.getMethodRegistry(ProtocolVersion.v0_91); - UUID uuid = UUID.randomUUID(); - ByteArrayOutputStream output = new ByteArrayOutputStream(); - DataOutputStream dataOut = new DataOutputStream(output); - try - { - dataOut.writeLong(uuid.getMostSignificantBits()); - dataOut.writeLong(uuid.getLeastSignificantBits()); - dataOut.flush(); - dataOut.close(); - } - catch (IOException e) - { - // This *really* shouldn't happen as we're not doing any I/O - throw new ConnectionScopedRuntimeException("I/O exception when writing to byte array", e); - } - - // should really associate this channelId to the session - byte[] channelName = output.toByteArray(); - - response = methodRegistry.createChannelOpenOkBody(channelName); - } - else - { - throw new AMQException(AMQConstant.INTERNAL_ERROR, "Got channel open for protocol version not catered for: " + pv, null); - } connection.writeFrame(response.generateFrame(channelId)); diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java index 9ae66b889c..1b2d3c0653 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java @@ -27,10 +27,8 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.framing.QueueUnbindBody; -import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9; -import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.protocol.v0_8.AMQChannel; @@ -58,6 +56,13 @@ public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindB QueueUnbindBody body, int channelId) throws AMQException { + + if (ProtocolVersion.v8_0.equals(connection.getProtocolVersion())) + { + // 0-8 does not support QueueUnbind + throw new AMQException(AMQConstant.COMMAND_INVALID, "QueueUnbind not present in AMQP version: " + connection.getProtocolVersion(), null); + } + VirtualHostImpl virtualHost = connection.getVirtualHost(); final AMQQueue queue; @@ -133,21 +138,8 @@ public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindB _log.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + routingKey); } - final MethodRegistry registry = connection.getMethodRegistry(); - final AMQMethodBody responseBody; - if (registry instanceof MethodRegistry_0_9) - { - responseBody = ((MethodRegistry_0_9)registry).createQueueUnbindOkBody(); - } - else if (registry instanceof MethodRegistry_0_91) - { - responseBody = ((MethodRegistry_0_91)registry).createQueueUnbindOkBody(); - } - else - { - // 0-8 does not support QueueUnbind - throw new AMQException(AMQConstant.COMMAND_INVALID, "QueueUnbind not present in AMQP version: " + connection.getProtocolVersion(), null); - } + + final AMQMethodBody responseBody = connection.getMethodRegistry().createQueueUnbindOkBody(); channel.sync(); connection.writeFrame(responseBody.generateFrame(channelId)); } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl.java index 429b5875bc..ce735306ee 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl.java @@ -186,6 +186,21 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher throw new UnexpectedMethodException(body);
}
+ @Override
+ public boolean dispatchQueueUnbindOk(final QueueUnbindOkBody body, final int channelId)
+ throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
+ @Override
+ public boolean dispatchBasicRecoverSyncOk(final BasicRecoverSyncOkBody body,
+ final int channelId)
+ throws AMQException
+ {
+ throw new UnexpectedMethodException(body);
+ }
+
public boolean dispatchBasicCancelOk(BasicCancelOkBody body, int channelId) throws AMQException
{
throw new UnexpectedMethodException(body);
@@ -434,7 +449,11 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher return true;
}
-
+ @Override
+ public boolean dispatchQueueUnbind(final QueueUnbindBody queueUnbindBody, final int channelId) throws AMQException
+ {
+ return false;
+ }
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_8_0.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_8_0.java index df2306428f..b8c253d601 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_8_0.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_8_0.java @@ -21,8 +21,10 @@ package org.apache.qpid.server.protocol.v0_8.handler;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicRecoverOkBody;
+import org.apache.qpid.framing.BasicRecoverSyncOkBody;
import org.apache.qpid.framing.ChannelAlertBody;
+import org.apache.qpid.framing.QueueUnbindBody;
+import org.apache.qpid.framing.QueueUnbindOkBody;
import org.apache.qpid.framing.amqp_8_0.MethodDispatcher_8_0;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
@@ -35,14 +37,27 @@ public class ServerMethodDispatcherImpl_8_0 super(connection);
}
- public boolean dispatchBasicRecoverOk(BasicRecoverOkBody body, int channelId) throws AMQException
+ public boolean dispatchChannelAlert(ChannelAlertBody body, int channelId) throws AMQException
{
throw new UnexpectedMethodException(body);
}
- public boolean dispatchChannelAlert(ChannelAlertBody body, int channelId) throws AMQException
+ @Override
+ public boolean dispatchQueueUnbindOk(final QueueUnbindOkBody queueUnbindOkBody, final int channelId)
+ {
+ return false;
+ }
+
+ @Override
+ public boolean dispatchBasicRecoverSyncOk(final BasicRecoverSyncOkBody body,
+ final int channelId) throws AMQException
{
throw new UnexpectedMethodException(body);
}
+ @Override
+ public boolean dispatchQueueUnbind(final QueueUnbindBody queueUnbindBody, final int channelId) throws AMQException
+ {
+ return false;
+ }
}
|
