summaryrefslogtreecommitdiff
path: root/java/broker-plugins/amqp-0-8-protocol/src
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker-plugins/amqp-0-8-protocol/src')
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java3
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java102
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java9
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/AccessRequestHandler.java14
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicAckMethodHandler.java11
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicCancelMethodHandler.java15
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java37
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java32
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java23
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicQosHandler.java14
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java19
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java24
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java11
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseHandler.java18
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseOkHandler.java8
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelFlowHandler.java14
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelOpenHandler.java26
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseMethodHandler.java16
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseOkMethodHandler.java10
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java30
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java32
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java41
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java20
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java16
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java49
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java33
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java30
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java39
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java39
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java31
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java42
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl.java240
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_0_9.java105
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_0_91.java110
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_8_0.java53
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxCommitHandler.java18
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxRollbackHandler.java18
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxSelectHandler.java15
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java153
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/StateAwareMethodListener.java3
40 files changed, 551 insertions, 972 deletions
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index f6ef4256d0..4e1af5ddd6 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -464,8 +464,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
AMQConstant.NO_ROUTE,
"No route for message " + currentMessageDescription(),
0, 0, // default class and method ids
- getProtocolSession().getProtocolVersion().getMajorVersion(),
- getProtocolSession().getProtocolVersion().getMinorVersion(),
+ getProtocolSession().getMethodRegistry(),
(Throwable) null);
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
index a59f173d2f..1a746447b0 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
@@ -27,6 +27,8 @@ import java.nio.ByteBuffer;
import java.security.AccessController;
import java.security.Principal;
import java.security.PrivilegedAction;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -74,7 +76,7 @@ import org.apache.qpid.server.protocol.v0_8.handler.ServerMethodDispatcherImpl;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverterRegistry;
import org.apache.qpid.server.protocol.v0_8.state.AMQState;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
+import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.util.Action;
@@ -86,7 +88,7 @@ import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.util.BytesDataOutput;
-public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSession<AMQProtocolEngine>
+public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSession<AMQProtocolEngine>, MethodRegistrySource
{
private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class);
@@ -123,7 +125,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
private final Set<AMQChannel<AMQProtocolEngine>> _channelsForCurrentMessage =
new HashSet<AMQChannel<AMQProtocolEngine>>();
- private final AMQStateManager _stateManager;
+ /** The current state */
+ private AMQState _currentState;
private AMQDecoder _decoder;
@@ -189,7 +192,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
_transport = transport;
_maxNoOfChannels = broker.getConnection_sessionCountLimit();
_receivedLock = new ReentrantLock();
- _stateManager = new AMQStateManager(broker, this);
_decoder = new AMQDecoder(true, this);
_connectionID = connectionId;
_logSubject = new ConnectionLogSubject(this);
@@ -623,7 +625,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
{
try
{
- boolean wasAnyoneInterested = _stateManager.methodReceived(evt);
+ boolean wasAnyoneInterested = methodReceived(evt);
if (!wasAnyoneInterested)
{
@@ -654,9 +656,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
_logger.info("Closing connection due to: " + e.getMessage());
}
- AMQConnectionException ce =
- evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR,
- AMQConstant.CHANNEL_ERROR.getName().toString());
+ AMQConnectionException ce = new AMQConnectionException(AMQConstant.CHANNEL_ERROR,
+ AMQConstant.CHANNEL_ERROR.getName().toString(),
+ methodBody, getMethodRegistry());
_logger.info(e.getMessage() + " whilst processing:" + methodBody);
closeConnection(channelId, ce);
@@ -675,6 +677,58 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
closeProtocolSession();
}
}
+ private <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws AMQException
+ {
+ final MethodDispatcher dispatcher = getMethodDispatcher();
+
+ final int channelId = evt.getChannelId();
+ final B body = evt.getMethod();
+
+ final AMQChannel channel = getChannel(channelId);
+ if(channelId != 0 && channel == null)
+ {
+
+ if(! ((body instanceof ChannelOpenBody)
+ || (body instanceof ChannelCloseOkBody)
+ || (body instanceof ChannelCloseBody)))
+ {
+ throw new AMQConnectionException(AMQConstant.CHANNEL_ERROR, "channel is closed won't process:" + body, body, getMethodRegistry());
+ }
+
+ }
+ if(channel == null)
+ {
+ return body.execute(dispatcher, channelId);
+ }
+ else
+ {
+ try
+ {
+ return Subject.doAs(channel.getSubject(), new PrivilegedExceptionAction<Boolean>()
+ {
+ @Override
+ public Boolean run() throws AMQException
+ {
+ return body.execute(dispatcher, channelId);
+ }
+ });
+ }
+ catch (PrivilegedActionException e)
+ {
+ if(e.getCause() instanceof AMQException)
+ {
+ throw (AMQException) e.getCause();
+ }
+ else
+ {
+ throw new ServerScopedRuntimeException(e.getCause());
+ }
+ }
+
+
+ }
+
+ }
public void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException
{
@@ -1056,8 +1110,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
{
try
{
- _stateManager.changeState(AMQState.CONNECTION_CLOSING);
- writeFrame(e.getCloseFrame(channelId));
+ changeState(AMQState.CONNECTION_CLOSING);
+ writeFrame(e.getCloseFrame());
}
finally
{
@@ -1075,7 +1129,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
try
{
- _stateManager.changeState(AMQState.CONNECTION_CLOSED);
+ changeState(AMQState.CONNECTION_CLOSED);
}
catch (ConnectionScopedRuntimeException e)
{
@@ -1176,7 +1230,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
_protocolVersion = pv;
_methodRegistry = MethodRegistry.getMethodRegistry(_protocolVersion);
_protocolOutputConverter = ProtocolOutputConverterRegistry.getConverter(this);
- _dispatcher = ServerMethodDispatcherImpl.createMethodDispatcher(_stateManager, _protocolVersion);
+ _dispatcher = ServerMethodDispatcherImpl.createMethodDispatcher(this);
}
public byte getProtocolMajorVersion()
@@ -1478,9 +1532,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
public void close(AMQConstant cause, String message)
{
closeConnection(0, new AMQConnectionException(cause, message, 0, 0,
- getProtocolOutputConverter().getProtocolMajorVersion(),
- getProtocolOutputConverter().getProtocolMinorVersion(),
- null));
+ getMethodRegistry(),
+ null));
}
public void block()
@@ -1674,6 +1727,25 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
return _messageCompressionThreshold;
}
+ @Override
+ public void changeState(AMQState newState)
+ {
+ _logger.debug("State changing to " + newState + " from old state " + _currentState);
+ _currentState = newState;
+ }
+
+ @Override
+ public Broker<?> getBroker()
+ {
+ return _broker;
+ }
+
+ @Override
+ public SubjectCreator getSubjectCreator()
+ {
+ return _broker.getSubjectCreator(getLocalAddress(), getTransport().isSecure());
+ }
+
public EventLogger getEventLogger()
{
if(_virtualHost != null)
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java
index 587669dadc..709ea409bf 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java
@@ -35,9 +35,12 @@ import org.apache.qpid.framing.MethodDispatcher;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
+import org.apache.qpid.server.protocol.v0_8.state.AMQState;
import org.apache.qpid.server.security.AuthorizationHolder;
+import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -214,4 +217,10 @@ public interface AMQProtocolSession<T extends AMQProtocolSession<T>>
boolean isCompressionSupported();
int getMessageCompressionThreshold();
+
+ void changeState(AMQState state);
+
+ Broker<?> getBroker();
+
+ SubjectCreator getSubjectCreator();
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/AccessRequestHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/AccessRequestHandler.java
index ae07d60c4e..bf89a812b9 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/AccessRequestHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/AccessRequestHandler.java
@@ -30,7 +30,6 @@ import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
/**
@@ -52,16 +51,17 @@ public class AccessRequestHandler implements StateAwareMethodListener<AccessRequ
{
}
- public void methodReceived(AMQStateManager stateManager, AccessRequestBody body, int channelId) throws AMQException
+ public void methodReceived(final AMQProtocolSession<?> connection,
+ AccessRequestBody body,
+ int channelId) throws AMQException
{
- AMQProtocolSession session = stateManager.getProtocolSession();
- final AMQChannel channel = session.getChannel(channelId);
+ final AMQChannel channel = connection.getChannel(channelId);
if (channel == null)
{
- throw body.getChannelNotFoundException(channelId);
+ throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
}
- MethodRegistry methodRegistry = session.getMethodRegistry();
+ MethodRegistry methodRegistry = connection.getMethodRegistry();
// We don't implement access control class, but to keep clients happy that expect it
// always use the "0" ticket.
@@ -80,6 +80,6 @@ public class AccessRequestHandler implements StateAwareMethodListener<AccessRequ
}
channel.sync();
- session.writeFrame(response.generateFrame(channelId));
+ connection.writeFrame(response.generateFrame(channelId));
}
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicAckMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicAckMethodHandler.java
index f623d27e87..efc91800a1 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicAckMethodHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicAckMethodHandler.java
@@ -26,7 +26,6 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicAckBody;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
public class BasicAckMethodHandler implements StateAwareMethodListener<BasicAckBody>
@@ -44,21 +43,21 @@ public class BasicAckMethodHandler implements StateAwareMethodListener<BasicAckB
{
}
- public void methodReceived(AMQStateManager stateManager, BasicAckBody body, int channelId) throws AMQException
+ public void methodReceived(final AMQProtocolSession<?> connection,
+ BasicAckBody body,
+ int channelId) throws AMQException
{
- AMQProtocolSession protocolSession = stateManager.getProtocolSession();
-
if (_log.isDebugEnabled())
{
_log.debug("Ack(Tag:" + body.getDeliveryTag() + ":Mult:" + body.getMultiple() + ") received on channel " + channelId);
}
- final AMQChannel channel = protocolSession.getChannel(channelId);
+ final AMQChannel channel = connection.getChannel(channelId);
if (channel == null)
{
- throw body.getChannelNotFoundException(channelId);
+ throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
}
// this method throws an AMQException if the delivery tag is not known
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicCancelMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicCancelMethodHandler.java
index 5a6a7bdc18..16498b3e88 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicCancelMethodHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicCancelMethodHandler.java
@@ -28,7 +28,6 @@ import org.apache.qpid.framing.BasicCancelOkBody;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
public class BasicCancelMethodHandler implements StateAwareMethodListener<BasicCancelBody>
@@ -46,16 +45,16 @@ public class BasicCancelMethodHandler implements StateAwareMethodListener<BasicC
{
}
- public void methodReceived(AMQStateManager stateManager, BasicCancelBody body, int channelId) throws AMQException
+ public void methodReceived(final AMQProtocolSession<?> connection,
+ BasicCancelBody body,
+ int channelId) throws AMQException
{
- AMQProtocolSession session = stateManager.getProtocolSession();
-
- final AMQChannel channel = session.getChannel(channelId);
+ final AMQChannel channel = connection.getChannel(channelId);
if (channel == null)
{
- throw body.getChannelNotFoundException(channelId);
+ throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
}
if (_log.isDebugEnabled())
@@ -67,10 +66,10 @@ public class BasicCancelMethodHandler implements StateAwareMethodListener<BasicC
channel.unsubscribeConsumer(body.getConsumerTag());
if (!body.getNowait())
{
- MethodRegistry methodRegistry = session.getMethodRegistry();
+ MethodRegistry methodRegistry = connection.getMethodRegistry();
BasicCancelOkBody cancelOkBody = methodRegistry.createBasicCancelOkBody(body.getConsumerTag());
channel.sync();
- session.writeFrame(cancelOkBody.generateFrame(channelId));
+ connection.writeFrame(cancelOkBody.generateFrame(channelId));
}
}
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
index c1e3d850ef..b4219fe29c 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
@@ -36,7 +36,6 @@ import org.apache.qpid.server.filter.AMQInvalidArgumentException;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -56,16 +55,16 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
{
}
- public void methodReceived(AMQStateManager stateManager, BasicConsumeBody body, int channelId) throws AMQException
+ public void methodReceived(final AMQProtocolSession<?> connection,
+ BasicConsumeBody body,
+ int channelId) throws AMQException
{
- AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
-
- AMQChannel channel = protocolConnection.getChannel(channelId);
- VirtualHostImpl<?,?,?> vHost = protocolConnection.getVirtualHost();
+ AMQChannel channel = connection.getChannel(channelId);
+ VirtualHostImpl<?,?,?> vHost = connection.getVirtualHost();
if (channel == null)
{
- throw body.getChannelNotFoundException(channelId);
+ throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
}
else
{
@@ -119,12 +118,12 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
if (queueName != null)
{
String msg = "No such queue, '" + queueName + "'";
- throw body.getChannelException(AMQConstant.NOT_FOUND, msg);
+ throw body.getChannelException(AMQConstant.NOT_FOUND, msg, connection.getMethodRegistry());
}
else
{
String msg = "No queue name provided, no default queue defined.";
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED, msg);
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED, msg, connection.getMethodRegistry());
}
}
else
@@ -153,9 +152,9 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
body.getNoLocal());
if (!body.getNowait())
{
- MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
+ MethodRegistry methodRegistry = connection.getMethodRegistry();
AMQMethodBody responseBody = methodRegistry.createBasicConsumeOkBody(consumerTag);
- protocolConnection.writeFrame(responseBody.generateFrame(channelId));
+ connection.writeFrame(responseBody.generateFrame(channelId));
}
}
@@ -163,12 +162,12 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
{
AMQShortString msg = AMQShortString.validValueOf("Non-unique consumer tag, '" + body.getConsumerTag() + "'");
- MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
+ MethodRegistry methodRegistry = connection.getMethodRegistry();
AMQMethodBody responseBody = methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(), // replyCode
msg, // replytext
body.getClazz(),
body.getMethod());
- protocolConnection.writeFrame(responseBody.generateFrame(0));
+ connection.writeFrame(responseBody.generateFrame(0));
}
}
@@ -176,12 +175,12 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
{
_logger.debug("Closing connection due to invalid selector");
- MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
+ MethodRegistry methodRegistry = connection.getMethodRegistry();
AMQMethodBody responseBody = methodRegistry.createChannelCloseBody(AMQConstant.ARGUMENT_INVALID.getCode(),
AMQShortString.validValueOf(ise.getMessage()),
body.getClazz(),
body.getMethod());
- protocolConnection.writeFrame(responseBody.generateFrame(channelId));
+ connection.writeFrame(responseBody.generateFrame(channelId));
}
@@ -190,28 +189,28 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
"Cannot subscribe to queue "
+ queue.getName()
- + " as it already has an existing exclusive consumer");
+ + " as it already has an existing exclusive consumer", connection.getMethodRegistry());
}
catch (AMQQueue.ExistingConsumerPreventsExclusive e)
{
throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
"Cannot subscribe to queue "
+ queue.getName()
- + " exclusively as it already has a consumer");
+ + " exclusively as it already has a consumer", connection.getMethodRegistry());
}
catch (AccessControlException e)
{
throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
"Cannot subscribe to queue "
+ queue.getName()
- + " permission denied");
+ + " permission denied", connection.getMethodRegistry());
}
catch (MessageSource.ConsumerAccessRefused consumerAccessRefused)
{
throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
"Cannot subscribe to queue "
+ queue.getName()
- + " as it already has an incompatible exclusivity policy");
+ + " as it already has an incompatible exclusivity policy", connection.getMethodRegistry());
}
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
index c3bdedf44d..d650292546 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
@@ -45,7 +45,6 @@ import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.protocol.v0_8.ClientDeliveryMethod;
import org.apache.qpid.server.protocol.v0_8.ConsumerTarget_0_8;
import org.apache.qpid.server.protocol.v0_8.RecordDeliveryMethod;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -65,17 +64,17 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
{
}
- public void methodReceived(AMQStateManager stateManager, BasicGetBody body, int channelId) throws AMQException
+ public void methodReceived(final AMQProtocolSession<?> connection,
+ BasicGetBody body,
+ int channelId) throws AMQException
{
- AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
+ VirtualHostImpl vHost = connection.getVirtualHost();
- VirtualHostImpl vHost = protocolConnection.getVirtualHost();
-
- AMQChannel channel = protocolConnection.getChannel(channelId);
+ AMQChannel channel = connection.getChannel(channelId);
if (channel == null)
{
- throw body.getChannelNotFoundException(channelId);
+ throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
}
else
{
@@ -87,12 +86,12 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
if(body.getQueue()!=null)
{
throw body.getConnectionException(AMQConstant.NOT_FOUND,
- "No such queue, '" + body.getQueue()+ "'");
+ "No such queue, '" + body.getQueue()+ "'", connection.getMethodRegistry());
}
else
{
throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
- "No queue name provided, no default queue defined.");
+ "No queue name provided, no default queue defined.", connection.getMethodRegistry());
}
}
else
@@ -100,36 +99,37 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
try
{
- if (!performGet(queue,protocolConnection, channel, !body.getNoAck()))
+ if (!performGet(queue,connection, channel, !body.getNoAck()))
{
- MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
+ MethodRegistry methodRegistry = connection.getMethodRegistry();
// TODO - set clusterId
BasicGetEmptyBody responseBody = methodRegistry.createBasicGetEmptyBody(null);
- protocolConnection.writeFrame(responseBody.generateFrame(channelId));
+ connection.writeFrame(responseBody.generateFrame(channelId));
}
}
catch (AccessControlException e)
{
throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
- e.getMessage());
+ e.getMessage(), connection.getMethodRegistry());
}
catch (MessageSource.ExistingExclusiveConsumer e)
{
throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
- "Queue has an exclusive consumer");
+ "Queue has an exclusive consumer", connection.getMethodRegistry());
}
catch (MessageSource.ExistingConsumerPreventsExclusive e)
{
throw body.getConnectionException(AMQConstant.INTERNAL_ERROR,
"The GET request has been evaluated as an exclusive consumer, " +
- "this is likely due to a programming error in the Qpid broker");
+ "this is likely due to a programming error in the Qpid broker",
+ connection.getMethodRegistry());
}
catch (MessageSource.ConsumerAccessRefused consumerAccessRefused)
{
throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
- "Queue has an incompatible exclusivit policy");
+ "Queue has an incompatible exclusivit policy", connection.getMethodRegistry());
}
}
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java
index 0d09c31ec8..e90744ab5e 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.protocol.v0_8.handler;
+import java.security.AccessControlException;
+
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
@@ -30,12 +32,9 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-import java.security.AccessControlException;
-
public class BasicPublishMethodHandler implements StateAwareMethodListener<BasicPublishBody>
{
private static final Logger _logger = Logger.getLogger(BasicPublishMethodHandler.class);
@@ -52,16 +51,17 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basic
{
}
- public void methodReceived(AMQStateManager stateManager, BasicPublishBody body, int channelId) throws AMQException
+ public void methodReceived(final AMQProtocolSession<?> connection,
+ BasicPublishBody body,
+ int channelId) throws AMQException
{
- AMQProtocolSession session = stateManager.getProtocolSession();
if (_logger.isDebugEnabled())
{
_logger.debug("Publish received on channel " + channelId);
}
AMQShortString exchangeName = body.getExchange();
- VirtualHostImpl vHost = session.getVirtualHost();
+ VirtualHostImpl vHost = connection.getVirtualHost();
// TODO: check the delivery tag field details - is it unique across the broker or per subscriber?
@@ -79,21 +79,22 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basic
// if the exchange does not exist we raise a channel exception
if (destination == null)
{
- throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange name");
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange name",
+ connection.getMethodRegistry());
}
else
{
// The partially populated BasicDeliver frame plus the received route body
// is stored in the channel. Once the final body frame has been received
// it is routed to the exchange.
- AMQChannel channel = session.getChannel(channelId);
+ AMQChannel channel = connection.getChannel(channelId);
if (channel == null)
{
- throw body.getChannelNotFoundException(channelId);
+ throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
}
- MessagePublishInfo info = session.getMethodRegistry().getProtocolVersionMethodConverter().convertToInfo(body);
+ MessagePublishInfo info = connection.getMethodRegistry().getProtocolVersionMethodConverter().convertToInfo(body);
info.setExchange(exchangeName);
try
{
@@ -101,7 +102,7 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basic
}
catch (AccessControlException e)
{
- throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage());
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage(), connection.getMethodRegistry());
}
}
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicQosHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicQosHandler.java
index e4a6636a74..9464be4c6e 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicQosHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicQosHandler.java
@@ -26,7 +26,6 @@ import org.apache.qpid.framing.BasicQosBody;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
public class BasicQosHandler implements StateAwareMethodListener<BasicQosBody>
@@ -38,21 +37,22 @@ public class BasicQosHandler implements StateAwareMethodListener<BasicQosBody>
return _instance;
}
- public void methodReceived(AMQStateManager stateManager, BasicQosBody body, int channelId) throws AMQException
+ public void methodReceived(final AMQProtocolSession<?> connection,
+ BasicQosBody body,
+ int channelId) throws AMQException
{
- AMQProtocolSession session = stateManager.getProtocolSession();
- AMQChannel channel = session.getChannel(channelId);
+ AMQChannel channel = connection.getChannel(channelId);
if (channel == null)
{
- throw body.getChannelNotFoundException(channelId);
+ throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
}
channel.sync();
channel.setCredit(body.getPrefetchSize(), body.getPrefetchCount());
- MethodRegistry methodRegistry = session.getMethodRegistry();
+ MethodRegistry methodRegistry = connection.getMethodRegistry();
AMQMethodBody responseBody = methodRegistry.createBasicQosOkBody();
- session.writeFrame(responseBody.generateFrame(channelId));
+ connection.writeFrame(responseBody.generateFrame(channelId));
}
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java
index 606bcf1693..2f0468fd15 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java
@@ -29,7 +29,6 @@ import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
public class BasicRecoverMethodHandler implements StateAwareMethodListener<BasicRecoverBody>
@@ -43,29 +42,29 @@ public class BasicRecoverMethodHandler implements StateAwareMethodListener<Basic
return _instance;
}
- public void methodReceived(AMQStateManager stateManager, BasicRecoverBody body, int channelId) throws AMQException
+ public void methodReceived(final AMQProtocolSession<?> connection,
+ BasicRecoverBody body,
+ int channelId) throws AMQException
{
- AMQProtocolSession session = stateManager.getProtocolSession();
-
- _logger.debug("Recover received on protocol session " + session + " and channel " + channelId);
- AMQChannel channel = session.getChannel(channelId);
+ _logger.debug("Recover received on protocol session " + connection + " and channel " + channelId);
+ AMQChannel channel = connection.getChannel(channelId);
if (channel == null)
{
- throw body.getChannelNotFoundException(channelId);
+ throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
}
channel.resend();
// Qpid 0-8 hacks a synchronous -ok onto recover.
// In Qpid 0-9 we create a separate sync-recover, sync-recover-ok pair to be "more" compliant
- if(session.getProtocolVersion().equals(ProtocolVersion.v8_0))
+ if(connection.getProtocolVersion().equals(ProtocolVersion.v8_0))
{
- MethodRegistry_8_0 methodRegistry = (MethodRegistry_8_0) session.getMethodRegistry();
+ MethodRegistry_8_0 methodRegistry = (MethodRegistry_8_0) connection.getMethodRegistry();
AMQMethodBody recoverOk = methodRegistry.createBasicRecoverOkBody();
channel.sync();
- session.writeFrame(recoverOk.generateFrame(channelId));
+ connection.writeFrame(recoverOk.generateFrame(channelId));
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java
index ef26e60a62..efcec299e9 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java
@@ -31,7 +31,6 @@ import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
public class BasicRecoverSyncMethodHandler implements StateAwareMethodListener<BasicRecoverSyncBody>
@@ -45,35 +44,36 @@ public class BasicRecoverSyncMethodHandler implements StateAwareMethodListener<B
return _instance;
}
- public void methodReceived(AMQStateManager stateManager, BasicRecoverSyncBody body, int channelId) throws AMQException
+ public void methodReceived(final AMQProtocolSession<?> connection,
+ BasicRecoverSyncBody body,
+ int channelId) throws AMQException
{
- AMQProtocolSession session = stateManager.getProtocolSession();
- _logger.debug("Recover received on protocol session " + session + " and channel " + channelId);
- AMQChannel channel = session.getChannel(channelId);
+ _logger.debug("Recover received on protocol session " + connection + " and channel " + channelId);
+ AMQChannel channel = connection.getChannel(channelId);
if (channel == null)
{
- throw body.getChannelNotFoundException(channelId);
+ throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
}
channel.sync();
channel.resend();
// Qpid 0-8 hacks a synchronous -ok onto recover.
// In Qpid 0-9 we create a separate sync-recover, sync-recover-ok pair to be "more" compliant
- if(session.getProtocolVersion().equals(ProtocolVersion.v0_9))
+ if(connection.getProtocolVersion().equals(ProtocolVersion.v0_9))
{
- MethodRegistry_0_9 methodRegistry = (MethodRegistry_0_9) session.getMethodRegistry();
+ MethodRegistry_0_9 methodRegistry = (MethodRegistry_0_9) connection.getMethodRegistry();
AMQMethodBody recoverOk = methodRegistry.createBasicRecoverSyncOkBody();
- session.writeFrame(recoverOk.generateFrame(channelId));
+ connection.writeFrame(recoverOk.generateFrame(channelId));
}
- else if(session.getProtocolVersion().equals(ProtocolVersion.v0_91))
+ else if(connection.getProtocolVersion().equals(ProtocolVersion.v0_91))
{
- MethodRegistry_0_91 methodRegistry = (MethodRegistry_0_91) session.getMethodRegistry();
+ MethodRegistry_0_91 methodRegistry = (MethodRegistry_0_91) connection.getMethodRegistry();
AMQMethodBody recoverOk = methodRegistry.createBasicRecoverSyncOkBody();
- session.writeFrame(recoverOk.generateFrame(channelId));
+ connection.writeFrame(recoverOk.generateFrame(channelId));
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java
index fdbd44b06d..1f299893f9 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java
@@ -27,8 +27,6 @@ import org.apache.qpid.framing.BasicRejectBody;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicRejectBody>
@@ -46,15 +44,16 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR
{
}
- public void methodReceived(AMQStateManager stateManager, BasicRejectBody body, int channelId) throws AMQException
+ public void methodReceived(final AMQProtocolSession<?> connection,
+ BasicRejectBody body,
+ int channelId) throws AMQException
{
- AMQProtocolSession session = stateManager.getProtocolSession();
- AMQChannel channel = session.getChannel(channelId);
+ AMQChannel channel = connection.getChannel(channelId);
if (channel == null)
{
- throw body.getChannelNotFoundException(channelId);
+ throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
}
if (_logger.isDebugEnabled())
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseHandler.java
index e96d098618..a9e52c5240 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseHandler.java
@@ -29,7 +29,6 @@ import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
public class ChannelCloseHandler implements StateAwareMethodListener<ChannelCloseBody>
@@ -47,9 +46,10 @@ public class ChannelCloseHandler implements StateAwareMethodListener<ChannelClos
{
}
- public void methodReceived(AMQStateManager stateManager, ChannelCloseBody body, int channelId) throws AMQException
+ public void methodReceived(final AMQProtocolSession<?> connection,
+ ChannelCloseBody body,
+ int channelId) throws AMQException
{
- AMQProtocolSession session = stateManager.getProtocolSession();
if (_logger.isInfoEnabled())
{
@@ -58,19 +58,19 @@ public class ChannelCloseHandler implements StateAwareMethodListener<ChannelClos
}
- AMQChannel channel = session.getChannel(channelId);
+ AMQChannel channel = connection.getChannel(channelId);
if (channel == null)
{
- throw body.getConnectionException(AMQConstant.CHANNEL_ERROR, "Trying to close unknown channel");
+ throw body.getConnectionException(AMQConstant.CHANNEL_ERROR, "Trying to close unknown channel", connection.getMethodRegistry());
}
channel.sync();
- session.closeChannel(channelId);
+ connection.closeChannel(channelId);
// Client requested closure so we don't wait for ok we send it
- stateManager.getProtocolSession().closeChannelOk(channelId);
+ connection.closeChannelOk(channelId);
- MethodRegistry methodRegistry = session.getMethodRegistry();
+ MethodRegistry methodRegistry = connection.getMethodRegistry();
ChannelCloseOkBody responseBody = methodRegistry.createChannelCloseOkBody();
- session.writeFrame(responseBody.generateFrame(channelId));
+ connection.writeFrame(responseBody.generateFrame(channelId));
}
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseOkHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseOkHandler.java
index 2a220ff78d..fe9d20e151 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseOkHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseOkHandler.java
@@ -24,7 +24,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ChannelCloseOkBody;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
+import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
public class ChannelCloseOkHandler implements StateAwareMethodListener<ChannelCloseOkBody>
@@ -42,12 +42,14 @@ public class ChannelCloseOkHandler implements StateAwareMethodListener<ChannelCl
{
}
- public void methodReceived(AMQStateManager stateManager, ChannelCloseOkBody body, int channelId) throws AMQException
+ public void methodReceived(final AMQProtocolSession<?> connection,
+ ChannelCloseOkBody body,
+ int channelId) throws AMQException
{
_logger.info("Received channel-close-ok for channel-id " + channelId);
// Let the Protocol Session know the channel is now closed.
- stateManager.getProtocolSession().closeChannelOk(channelId);
+ connection.closeChannelOk(channelId);
}
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelFlowHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelFlowHandler.java
index cc1677c93e..99c0e3b2de 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelFlowHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelFlowHandler.java
@@ -28,7 +28,6 @@ import org.apache.qpid.framing.ChannelFlowBody;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
public class ChannelFlowHandler implements StateAwareMethodListener<ChannelFlowBody>
@@ -46,23 +45,24 @@ public class ChannelFlowHandler implements StateAwareMethodListener<ChannelFlowB
{
}
- public void methodReceived(AMQStateManager stateManager, ChannelFlowBody body, int channelId) throws AMQException
+ public void methodReceived(final AMQProtocolSession<?> connection,
+ ChannelFlowBody body,
+ int channelId) throws AMQException
{
- AMQProtocolSession session = stateManager.getProtocolSession();
- AMQChannel channel = session.getChannel(channelId);
+ AMQChannel channel = connection.getChannel(channelId);
if (channel == null)
{
- throw body.getChannelNotFoundException(channelId);
+ throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
}
channel.sync();
channel.setSuspended(!body.getActive());
_logger.debug("Channel.Flow for channel " + channelId + ", active=" + body.getActive());
- MethodRegistry methodRegistry = session.getMethodRegistry();
+ MethodRegistry methodRegistry = connection.getMethodRegistry();
AMQMethodBody responseBody = methodRegistry.createChannelFlowOkBody(body.getActive());
- session.writeFrame(responseBody.generateFrame(channelId));
+ connection.writeFrame(responseBody.generateFrame(channelId));
}
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelOpenHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelOpenHandler.java
index 63c6857216..dbfe07840f 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelOpenHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelOpenHandler.java
@@ -20,6 +20,11 @@
*/
package org.apache.qpid.server.protocol.v0_8.handler;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.UUID;
+
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
@@ -33,16 +38,10 @@ import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.UUID;
-
public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenBody>
{
private static final Logger _logger = Logger.getLogger(ChannelOpenHandler.class);
@@ -58,10 +57,11 @@ public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenB
{
}
- public void methodReceived(AMQStateManager stateManager, ChannelOpenBody body, int channelId) throws AMQException
+ public void methodReceived(final AMQProtocolSession<?> connection,
+ ChannelOpenBody body,
+ int channelId) throws AMQException
{
- AMQProtocolSession session = stateManager.getProtocolSession();
- VirtualHostImpl virtualHost = session.getVirtualHost();
+ VirtualHostImpl virtualHost = connection.getVirtualHost();
// Protect the broker against out of order frame request.
if (virtualHost == null)
@@ -70,13 +70,13 @@ public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenB
}
_logger.info("Connecting to: " + virtualHost.getName());
- final AMQChannel channel = new AMQChannel(session,channelId, virtualHost.getMessageStore());
+ final AMQChannel channel = new AMQChannel(connection,channelId, virtualHost.getMessageStore());
- session.addChannel(channel);
+ connection.addChannel(channel);
ChannelOpenOkBody response;
- ProtocolVersion pv = session.getProtocolVersion();
+ ProtocolVersion pv = connection.getProtocolVersion();
if(pv.equals(ProtocolVersion.v8_0))
{
@@ -138,6 +138,6 @@ public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenB
}
- session.writeFrame(response.generateFrame(channelId));
+ connection.writeFrame(response.generateFrame(channelId));
}
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseMethodHandler.java
index 60f9c1d495..c4a8eb4acb 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseMethodHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseMethodHandler.java
@@ -27,7 +27,6 @@ import org.apache.qpid.framing.ConnectionCloseBody;
import org.apache.qpid.framing.ConnectionCloseOkBody;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
public class ConnectionCloseMethodHandler implements StateAwareMethodListener<ConnectionCloseBody>
@@ -45,28 +44,29 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener<Co
{
}
- public void methodReceived(AMQStateManager stateManager, ConnectionCloseBody body, int channelId) throws AMQException
+ public void methodReceived(final AMQProtocolSession<?> connection,
+ ConnectionCloseBody body,
+ int channelId) throws AMQException
{
- AMQProtocolSession session = stateManager.getProtocolSession();
if (_logger.isInfoEnabled())
{
_logger.info("ConnectionClose received with reply code/reply text " + body.getReplyCode() + "/" +
- body.getReplyText() + " for " + session);
+ body.getReplyText() + " for " + connection);
}
try
{
- session.closeSession();
+ connection.closeSession();
}
catch (Exception e)
{
_logger.error("Error closing protocol session: " + e, e);
}
- MethodRegistry methodRegistry = session.getMethodRegistry();
+ MethodRegistry methodRegistry = connection.getMethodRegistry();
ConnectionCloseOkBody responseBody = methodRegistry.createConnectionCloseOkBody();
- session.writeFrame(responseBody.generateFrame(channelId));
+ connection.writeFrame(responseBody.generateFrame(channelId));
- session.closeProtocolSession();
+ connection.closeProtocolSession();
}
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseOkMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseOkMethodHandler.java
index fe46b6c0cd..03c43cc80a 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseOkMethodHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseOkMethodHandler.java
@@ -26,7 +26,6 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ConnectionCloseOkBody;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.protocol.v0_8.state.AMQState;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
public class ConnectionCloseOkMethodHandler implements StateAwareMethodListener<ConnectionCloseOkBody>
@@ -44,16 +43,17 @@ public class ConnectionCloseOkMethodHandler implements StateAwareMethodListener<
{
}
- public void methodReceived(AMQStateManager stateManager, ConnectionCloseOkBody body, int channelId) throws AMQException
+ public void methodReceived(final AMQProtocolSession<?> connection,
+ ConnectionCloseOkBody body,
+ int channelId) throws AMQException
{
- AMQProtocolSession session = stateManager.getProtocolSession();
//todo should this not do more than just log the method?
_logger.info("Received Connection-close-ok");
try
{
- stateManager.changeState(AMQState.CONNECTION_CLOSED);
- session.closeSession();
+ connection.changeState(AMQState.CONNECTION_CLOSED);
+ connection.closeSession();
}
catch (Exception e)
{
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java
index 632f751756..20c5e90f5d 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java
@@ -34,7 +34,6 @@ import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.protocol.v0_8.state.AMQState;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -58,9 +57,10 @@ public class ConnectionOpenMethodHandler implements StateAwareMethodListener<Con
return new AMQShortString(Long.toString(System.currentTimeMillis()));
}
- public void methodReceived(AMQStateManager stateManager, ConnectionOpenBody body, int channelId) throws AMQException
+ public void methodReceived(final AMQProtocolSession<?> connection,
+ ConnectionOpenBody body,
+ int channelId) throws AMQException
{
- AMQProtocolSession session = stateManager.getProtocolSession();
//ignore leading '/'
String virtualHostName;
@@ -73,42 +73,44 @@ public class ConnectionOpenMethodHandler implements StateAwareMethodListener<Con
virtualHostName = body.getVirtualHost() == null ? null : String.valueOf(body.getVirtualHost());
}
- VirtualHostImpl virtualHost = ((AmqpPort)stateManager.getProtocolSession().getPort()).getVirtualHost(virtualHostName);
+ VirtualHostImpl virtualHost = ((AmqpPort)connection.getPort()).getVirtualHost(virtualHostName);
if (virtualHost == null)
{
- throw body.getConnectionException(AMQConstant.NOT_FOUND, "Unknown virtual host: '" + virtualHostName + "'");
+ throw body.getConnectionException(AMQConstant.NOT_FOUND, "Unknown virtual host: '" + virtualHostName + "'",
+ connection.getMethodRegistry());
}
else
{
// Check virtualhost access
if (virtualHost.getState() != State.ACTIVE)
{
- throw body.getConnectionException(AMQConstant.CONNECTION_FORCED, "Virtual host '" + virtualHost.getName() + "' is not active");
+ throw body.getConnectionException(AMQConstant.CONNECTION_FORCED, "Virtual host '" + virtualHost.getName() + "' is not active",
+ connection.getMethodRegistry());
}
- session.setVirtualHost(virtualHost);
+ connection.setVirtualHost(virtualHost);
try
{
- virtualHost.getSecurityManager().authoriseCreateConnection(session);
+ virtualHost.getSecurityManager().authoriseCreateConnection(connection);
}
catch (AccessControlException e)
{
- throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage());
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage(), connection.getMethodRegistry());
}
// See Spec (0.8.2). Section 3.1.2 Virtual Hosts
- if (session.getContextKey() == null)
+ if (connection.getContextKey() == null)
{
- session.setContextKey(generateClientID());
+ connection.setContextKey(generateClientID());
}
- MethodRegistry methodRegistry = session.getMethodRegistry();
+ MethodRegistry methodRegistry = connection.getMethodRegistry();
AMQMethodBody responseBody = methodRegistry.createConnectionOpenOkBody(body.getVirtualHost());
- stateManager.changeState(AMQState.CONNECTION_OPEN);
+ connection.changeState(AMQState.CONNECTION_OPEN);
- session.writeFrame(responseBody.generateFrame(channelId));
+ connection.writeFrame(responseBody.generateFrame(channelId));
}
}
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java
index 8a1160314b..7a0c0945e1 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java
@@ -36,7 +36,6 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.protocol.v0_8.state.AMQState;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
@@ -56,19 +55,20 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener
{
}
- public void methodReceived(AMQStateManager stateManager, ConnectionSecureOkBody body, int channelId) throws AMQException
+ public void methodReceived(final AMQProtocolSession<?> connection,
+ ConnectionSecureOkBody body,
+ int channelId) throws AMQException
{
- Broker<?> broker = stateManager.getBroker();
- AMQProtocolSession session = stateManager.getProtocolSession();
+ Broker<?> broker = connection.getBroker();
- SubjectCreator subjectCreator = stateManager.getSubjectCreator();
+ SubjectCreator subjectCreator = connection.getSubjectCreator();
- SaslServer ss = session.getSaslServer();
+ SaslServer ss = connection.getSaslServer();
if (ss == null)
{
throw new AMQException("No SASL context set up in session");
}
- MethodRegistry methodRegistry = session.getMethodRegistry();
+ MethodRegistry methodRegistry = connection.getMethodRegistry();
SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, body.getResponse());
switch (authResult.getStatus())
{
@@ -78,7 +78,7 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener
_logger.info("Authentication failed:" + (cause == null ? "" : cause.getMessage()));
// This should be abstracted
- stateManager.changeState(AMQState.CONNECTION_CLOSING);
+ connection.changeState(AMQState.CONNECTION_CLOSING);
ConnectionCloseBody connectionCloseBody =
methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(),
@@ -86,15 +86,15 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener
body.getClazz(),
body.getMethod());
- session.writeFrame(connectionCloseBody.generateFrame(0));
- disposeSaslServer(session);
+ connection.writeFrame(connectionCloseBody.generateFrame(0));
+ disposeSaslServer(connection);
break;
case SUCCESS:
if (_logger.isInfoEnabled())
{
_logger.info("Connected as: " + authResult.getSubject());
}
- stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
+ connection.changeState(AMQState.CONNECTION_NOT_TUNED);
int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE);
@@ -107,15 +107,15 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener
methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(),
frameMax,
broker.getConnection_heartBeatDelay());
- session.writeFrame(tuneBody.generateFrame(0));
- session.setAuthorizedSubject(authResult.getSubject());
- disposeSaslServer(session);
+ connection.writeFrame(tuneBody.generateFrame(0));
+ connection.setAuthorizedSubject(authResult.getSubject());
+ disposeSaslServer(connection);
break;
case CONTINUE:
- stateManager.changeState(AMQState.CONNECTION_NOT_AUTH);
+ connection.changeState(AMQState.CONNECTION_NOT_AUTH);
ConnectionSecureBody secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge());
- session.writeFrame(secureBody.generateFrame(0));
+ connection.writeFrame(secureBody.generateFrame(0));
}
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java
index e59032b87f..311b596979 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java
@@ -35,7 +35,6 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.protocol.v0_8.state.AMQState;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
@@ -56,32 +55,36 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener<
{
}
- public void methodReceived(AMQStateManager stateManager, ConnectionStartOkBody body, int channelId) throws AMQException
+ public void methodReceived(final AMQProtocolSession<?> connection,
+ ConnectionStartOkBody body,
+ int channelId) throws AMQException
{
- Broker<?> broker = stateManager.getBroker();
- AMQProtocolSession session = stateManager.getProtocolSession();
+ Broker<?> broker = connection.getBroker();
_logger.info("SASL Mechanism selected: " + body.getMechanism());
_logger.info("Locale selected: " + body.getLocale());
- SubjectCreator subjectCreator = stateManager.getSubjectCreator();
+ SubjectCreator subjectCreator = connection.getSubjectCreator();
SaslServer ss = null;
try
{
- ss = subjectCreator.createSaslServer(String.valueOf(body.getMechanism()), session.getLocalFQDN(), session.getPeerPrincipal());
+ ss = subjectCreator.createSaslServer(String.valueOf(body.getMechanism()),
+ connection.getLocalFQDN(),
+ connection.getPeerPrincipal());
if (ss == null)
{
- throw body.getConnectionException(AMQConstant.RESOURCE_ERROR, "Unable to create SASL Server:" + body.getMechanism());
+ throw body.getConnectionException(AMQConstant.RESOURCE_ERROR, "Unable to create SASL Server:" + body.getMechanism(),
+ connection.getMethodRegistry());
}
- session.setSaslServer(ss);
+ connection.setSaslServer(ss);
final SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, body.getResponse());
//save clientProperties
- session.setClientProperties(body.getClientProperties());
+ connection.setClientProperties(body.getClientProperties());
- MethodRegistry methodRegistry = session.getMethodRegistry();
+ MethodRegistry methodRegistry = connection.getMethodRegistry();
switch (authResult.getStatus())
{
@@ -90,7 +93,7 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener<
_logger.info("Authentication failed:" + (cause == null ? "" : cause.getMessage()));
- stateManager.changeState(AMQState.CONNECTION_CLOSING);
+ connection.changeState(AMQState.CONNECTION_CLOSING);
ConnectionCloseBody closeBody =
methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(), // replyCode
@@ -98,8 +101,8 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener<
body.getClazz(),
body.getMethod());
- session.writeFrame(closeBody.generateFrame(0));
- disposeSaslServer(session);
+ connection.writeFrame(closeBody.generateFrame(0));
+ disposeSaslServer(connection);
break;
case SUCCESS:
@@ -107,9 +110,9 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener<
{
_logger.info("Connected as: " + authResult.getSubject());
}
- session.setAuthorizedSubject(authResult.getSubject());
+ connection.setAuthorizedSubject(authResult.getSubject());
- stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
+ connection.changeState(AMQState.CONNECTION_NOT_TUNED);
int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE);
if(frameMax <= 0)
@@ -120,18 +123,18 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener<
ConnectionTuneBody tuneBody = methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(),
frameMax,
broker.getConnection_heartBeatDelay());
- session.writeFrame(tuneBody.generateFrame(0));
+ connection.writeFrame(tuneBody.generateFrame(0));
break;
case CONTINUE:
- stateManager.changeState(AMQState.CONNECTION_NOT_AUTH);
+ connection.changeState(AMQState.CONNECTION_NOT_AUTH);
ConnectionSecureBody secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge());
- session.writeFrame(secureBody.generateFrame(0));
+ connection.writeFrame(secureBody.generateFrame(0));
}
}
catch (SaslException e)
{
- disposeSaslServer(session);
+ disposeSaslServer(connection);
throw new AMQException("SASL error: " + e, e);
}
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java
index fb4818d3ed..d5f066063d 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java
@@ -29,7 +29,6 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.protocol.v0_8.state.AMQState;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
public class ConnectionTuneOkMethodHandler implements StateAwareMethodListener<ConnectionTuneOkBody>
@@ -43,19 +42,20 @@ public class ConnectionTuneOkMethodHandler implements StateAwareMethodListener<C
return _instance;
}
- public void methodReceived(AMQStateManager stateManager, ConnectionTuneOkBody body, int channelId) throws AMQException
+ public void methodReceived(final AMQProtocolSession<?> connection,
+ ConnectionTuneOkBody body,
+ int channelId) throws AMQException
{
- AMQProtocolSession session = stateManager.getProtocolSession();
if (_logger.isDebugEnabled())
{
_logger.debug(body);
}
- stateManager.changeState(AMQState.CONNECTION_NOT_OPENED);
+ connection.changeState(AMQState.CONNECTION_NOT_OPENED);
- session.initHeartbeats(body.getHeartbeat());
+ connection.initHeartbeats(body.getHeartbeat());
- int brokerFrameMax = stateManager.getBroker().getContextValue(Integer.class,Broker.BROKER_FRAME_SIZE);
+ int brokerFrameMax = connection.getBroker().getContextValue(Integer.class,Broker.BROKER_FRAME_SIZE);
if(brokerFrameMax <= 0)
{
brokerFrameMax = Integer.MAX_VALUE;
@@ -68,7 +68,7 @@ public class ConnectionTuneOkMethodHandler implements StateAwareMethodListener<C
+ "greater than the broker will allow: "
+ brokerFrameMax,
body.getClazz(), body.getMethod(),
- body.getMajor(), body.getMinor(),null);
+ connection.getMethodRegistry(),null);
}
else if(body.getFrameMax() > 0 && body.getFrameMax() < AMQConstant.FRAME_MIN_SIZE.getCode())
{
@@ -77,13 +77,13 @@ public class ConnectionTuneOkMethodHandler implements StateAwareMethodListener<C
+ "which is smaller than the specification definined minimum: "
+ AMQConstant.FRAME_MIN_SIZE.getCode(),
body.getClazz(), body.getMethod(),
- body.getMajor(), body.getMinor(),null);
+ connection.getMethodRegistry(),null);
}
int frameMax = body.getFrameMax() == 0 ? brokerFrameMax : (int) body.getFrameMax();
- session.setMaxFrameSize(frameMax);
+ connection.setMaxFrameSize(frameMax);
long maxChannelNumber = body.getChannelMax();
//0 means no implied limit, except that forced by protocol limitations (0xFFFF)
- session.setMaximumNumberOfChannels( maxChannelNumber == 0 ? 0xFFFFL : maxChannelNumber);
+ connection.setMaximumNumberOfChannels(maxChannelNumber == 0 ? 0xFFFFL : maxChannelNumber);
}
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java
index 2ea638a358..22e377c219 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java
@@ -29,7 +29,6 @@ import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -65,16 +64,17 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo
{
}
- public void methodReceived(AMQStateManager stateManager, ExchangeBoundBody body, int channelId) throws AMQException
+ public void methodReceived(final AMQProtocolSession<?> connection,
+ ExchangeBoundBody body,
+ int channelId) throws AMQException
{
- AMQProtocolSession session = stateManager.getProtocolSession();
- VirtualHostImpl virtualHost = session.getVirtualHost();
- MethodRegistry methodRegistry = session.getMethodRegistry();
+ VirtualHostImpl virtualHost = connection.getVirtualHost();
+ MethodRegistry methodRegistry = connection.getMethodRegistry();
- final AMQChannel channel = session.getChannel(channelId);
+ final AMQChannel channel = connection.getChannel(channelId);
if (channel == null)
{
- throw body.getChannelNotFoundException(channelId);
+ throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
}
channel.sync();
@@ -227,7 +227,7 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo
}
}
}
- session.writeFrame(response.generateFrame(channelId));
+ connection.writeFrame(response.generateFrame(channelId));
}
protected boolean isDefaultExchange(final AMQShortString exchangeName)
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java
index 3f48b413ef..f90f47d77c 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java
@@ -41,7 +41,6 @@ import org.apache.qpid.server.model.NoFactoryForTypeException;
import org.apache.qpid.server.model.UnknownConfiguredObjectException;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
import org.apache.qpid.server.virtualhost.ExchangeExistsException;
import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
@@ -62,14 +61,15 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange
{
}
- public void methodReceived(AMQStateManager stateManager, ExchangeDeclareBody body, int channelId) throws AMQException
+ public void methodReceived(final AMQProtocolSession<?> connection,
+ ExchangeDeclareBody body,
+ int channelId) throws AMQException
{
- AMQProtocolSession session = stateManager.getProtocolSession();
- VirtualHostImpl virtualHost = session.getVirtualHost();
- final AMQChannel channel = session.getChannel(channelId);
+ VirtualHostImpl virtualHost = connection.getVirtualHost();
+ final AMQChannel channel = connection.getChannel(channelId);
if (channel == null)
{
- throw body.getChannelNotFoundException(channelId);
+ throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
}
final AMQShortString exchangeName = body.getExchange();
@@ -89,7 +89,7 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange
+ ExchangeDefaults.DIRECT_EXCHANGE_CLASS
+ " to " + body.getType() +".",
body.getClazz(), body.getMethod(),
- body.getMajor(), body.getMinor(),null);
+ connection.getMethodRegistry(),null);
}
}
else
@@ -99,14 +99,15 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange
exchange = virtualHost.getExchange(exchangeName.toString());
if(exchange == null)
{
- throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + exchangeName);
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + exchangeName,
+ connection.getMethodRegistry());
}
else if (!(body.getType() == null || body.getType().length() ==0) && !exchange.getType().equals(body.getType().asString()))
{
throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " +
exchangeName + " of type " + exchange.getType()
- + " to " + body.getType() +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor(),null);
+ + " to " + body.getType() +".",body.getClazz(), body.getMethod(),connection.getMethodRegistry(),null);
}
}
@@ -139,7 +140,7 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange
{
throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
"Attempt to declare exchange: " + exchangeName +
- " which begins with reserved prefix.");
+ " which begins with reserved prefix.", connection.getMethodRegistry());
}
catch(ExchangeExistsException e)
@@ -147,40 +148,44 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange
exchange = e.getExistingExchange();
if(!new AMQShortString(exchange.getType()).equals(body.getType()))
{
- throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: "
- + exchangeName + " of type "
- + exchange.getType()
- + " to " + body.getType() +".",
- body.getClazz(), body.getMethod(),
- body.getMajor(), body.getMinor(),null);
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: "
+ + exchangeName + " of type "
+ + exchange.getType()
+ + " to " + body.getType() + ".",
+ connection.getMethodRegistry());
}
}
catch(NoFactoryForTypeException e)
{
- throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Unknown exchange: " + exchangeName,e);
+ throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Unknown exchange type '"+e.getType()+"' for exchange '" + exchangeName + "'", connection.getMethodRegistry());
}
catch (AccessControlException e)
{
- throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage());
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage(), connection.getMethodRegistry());
}
catch (UnknownConfiguredObjectException e)
{
// note - since 0-8/9/9-1 can't set the alt. exchange this exception should never occur
- throw body.getConnectionException(AMQConstant.NOT_FOUND, "Unknown alternate exchange",e);
+ throw body.getConnectionException(AMQConstant.NOT_FOUND,
+ "Unknown alternate exchange "
+ + (e.getName() != null
+ ? "name: \"" + e.getName() + "\""
+ : "id: " + e.getId()),
+ connection.getMethodRegistry());
}
catch (IllegalArgumentException e)
{
- throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Error creating exchange",e);
+ throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Error creating exchange '"+exchangeName+"': " + e.getMessage(),connection.getMethodRegistry());
}
}
}
if(!body.getNowait())
{
- MethodRegistry methodRegistry = session.getMethodRegistry();
+ MethodRegistry methodRegistry = connection.getMethodRegistry();
AMQMethodBody responseBody = methodRegistry.createExchangeDeclareOkBody();
channel.sync();
- session.writeFrame(responseBody.generateFrame(channelId));
+ connection.writeFrame(responseBody.generateFrame(channelId));
}
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java
index 1c01a24f63..b5c10c190e 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.protocol.v0_8.handler;
+import java.security.AccessControlException;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ExchangeDeleteBody;
@@ -28,14 +30,11 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
import org.apache.qpid.server.virtualhost.RequiredExchangeException;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-import java.security.AccessControlException;
-
public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeDeleteBody>
{
private static final ExchangeDeleteHandler _instance = new ExchangeDeleteHandler();
@@ -49,14 +48,15 @@ public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeD
{
}
- public void methodReceived(AMQStateManager stateManager, ExchangeDeleteBody body, int channelId) throws AMQException
+ public void methodReceived(final AMQProtocolSession<?> connection,
+ ExchangeDeleteBody body,
+ int channelId) throws AMQException
{
- AMQProtocolSession session = stateManager.getProtocolSession();
- VirtualHostImpl virtualHost = session.getVirtualHost();
- final AMQChannel channel = session.getChannel(channelId);
+ VirtualHostImpl virtualHost = connection.getVirtualHost();
+ final AMQChannel channel = connection.getChannel(channelId);
if (channel == null)
{
- throw body.getChannelNotFoundException(channelId);
+ throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
}
channel.sync();
try
@@ -64,7 +64,7 @@ public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeD
if(isDefaultExchange(body.getExchange()))
{
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Default Exchange cannot be deleted");
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Default Exchange cannot be deleted", connection.getMethodRegistry());
}
final String exchangeName = body.getExchange().toString();
@@ -72,28 +72,31 @@ public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeD
final ExchangeImpl exchange = virtualHost.getExchange(exchangeName);
if(exchange == null)
{
- throw body.getChannelException(AMQConstant.NOT_FOUND, "No such exchange: " + body.getExchange());
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "No such exchange: " + body.getExchange(),
+ connection.getMethodRegistry());
}
virtualHost.removeExchange(exchange, !body.getIfUnused());
- ExchangeDeleteOkBody responseBody = session.getMethodRegistry().createExchangeDeleteOkBody();
+ ExchangeDeleteOkBody responseBody = connection.getMethodRegistry().createExchangeDeleteOkBody();
- session.writeFrame(responseBody.generateFrame(channelId));
+ connection.writeFrame(responseBody.generateFrame(channelId));
}
catch (ExchangeIsAlternateException e)
{
- throw body.getChannelException(AMQConstant.NOT_ALLOWED, "Exchange in use as an alternate exchange");
+ throw body.getChannelException(AMQConstant.NOT_ALLOWED, "Exchange in use as an alternate exchange",
+ connection.getMethodRegistry());
}
catch (RequiredExchangeException e)
{
- throw body.getChannelException(AMQConstant.NOT_ALLOWED, "Exchange '"+body.getExchange()+"' cannot be deleted");
+ throw body.getChannelException(AMQConstant.NOT_ALLOWED, "Exchange '"+body.getExchange()+"' cannot be deleted",
+ connection.getMethodRegistry());
}
catch (AccessControlException e)
{
- throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage());
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage(), connection.getMethodRegistry());
}
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java
index 0140d2ec7e..c47a4b528f 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java
@@ -36,7 +36,6 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -56,15 +55,16 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
{
}
- public void methodReceived(AMQStateManager stateManager, QueueBindBody body, int channelId) throws AMQException
+ public void methodReceived(final AMQProtocolSession<?> connection,
+ QueueBindBody body,
+ int channelId) throws AMQException
{
- AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
- VirtualHostImpl virtualHost = protocolConnection.getVirtualHost();
- AMQChannel channel = protocolConnection.getChannel(channelId);
+ VirtualHostImpl virtualHost = connection.getVirtualHost();
+ AMQChannel channel = connection.getChannel(channelId);
if (channel == null)
{
- throw body.getChannelNotFoundException(channelId);
+ throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
}
final AMQQueue queue;
@@ -79,7 +79,8 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
if (queue == null)
{
- throw body.getChannelException(AMQConstant.NOT_FOUND, "No default queue defined on channel and queue was null");
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "No default queue defined on channel and queue was null",
+ connection.getMethodRegistry());
}
if (body.getRoutingKey() == null)
@@ -99,12 +100,14 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
if (queue == null)
{
- throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + queueName + " does not exist.");
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + queueName + " does not exist.",
+ connection.getMethodRegistry());
}
if(isDefaultExchange(body.getExchange()))
{
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Cannot bind the queue " + queueName + " to the default exchange");
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Cannot bind the queue " + queueName + " to the default exchange",
+ connection.getMethodRegistry());
}
final String exchangeName = body.getExchange().toString();
@@ -112,7 +115,8 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
final ExchangeImpl exch = virtualHost.getExchange(exchangeName);
if (exch == null)
{
- throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + exchangeName + " does not exist.");
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + exchangeName + " does not exist.",
+ connection.getMethodRegistry());
}
@@ -133,7 +137,7 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
}
catch (AccessControlException e)
{
- throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage());
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage(), connection.getMethodRegistry());
}
if (_log.isInfoEnabled())
@@ -143,9 +147,9 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
if (!body.getNowait())
{
channel.sync();
- MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
+ MethodRegistry methodRegistry = connection.getMethodRegistry();
AMQMethodBody responseBody = methodRegistry.createQueueBindOkBody();
- protocolConnection.writeFrame(responseBody.generateFrame(channelId));
+ connection.writeFrame(responseBody.generateFrame(channelId));
}
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java
index ef8d01d89f..0e1016c319 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java
@@ -39,7 +39,6 @@ import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueArgumentsConverter;
@@ -57,11 +56,12 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
return _instance;
}
- public void methodReceived(AMQStateManager stateManager, QueueDeclareBody body, int channelId) throws AMQException
+ public void methodReceived(final AMQProtocolSession<?> connection,
+ QueueDeclareBody body,
+ int channelId) throws AMQException
{
- final AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
- final AMQSessionModel session = protocolConnection.getChannel(channelId);
- VirtualHostImpl virtualHost = protocolConnection.getVirtualHost();
+ final AMQSessionModel session = connection.getChannel(channelId);
+ VirtualHostImpl virtualHost = connection.getVirtualHost();
final AMQShortString queueName;
@@ -79,11 +79,11 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
//TODO: do we need to check that the queue already exists with exactly the same "configuration"?
- AMQChannel channel = protocolConnection.getChannel(channelId);
+ AMQChannel channel = connection.getChannel(channelId);
if (channel == null)
{
- throw body.getChannelNotFoundException(channelId);
+ throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
}
if(body.getPassive())
@@ -92,14 +92,15 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
if (queue == null)
{
String msg = "Queue: " + queueName + " not found on VirtualHost(" + virtualHost + ").";
- throw body.getChannelException(AMQConstant.NOT_FOUND, msg);
+ throw body.getChannelException(AMQConstant.NOT_FOUND, msg, connection.getMethodRegistry());
}
else
{
if (!queue.verifySessionAccess(channel))
{
throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
- "Queue " + queue.getName() + " is exclusive, but not created on this Connection.");
+ "Queue " + queue.getName() + " is exclusive, but not created on this Connection.",
+ connection.getMethodRegistry());
}
//set this as the default queue on the channel:
@@ -112,7 +113,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
try
{
- queue = createQueue(channel, queueName, body, virtualHost, protocolConnection);
+ queue = createQueue(channel, queueName, body, virtualHost, connection);
}
catch(QueueExistsException qe)
@@ -123,33 +124,37 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
if (!queue.verifySessionAccess(channel))
{
throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
- "Queue " + queue.getName() + " is exclusive, but not created on this Connection.");
+ "Queue " + queue.getName() + " is exclusive, but not created on this Connection.",
+ connection.getMethodRegistry());
}
else if(queue.isExclusive() != body.getExclusive())
{
throw body.getChannelException(AMQConstant.ALREADY_EXISTS,
"Cannot re-declare queue '" + queue.getName() + "' with different exclusivity (was: "
- + queue.isExclusive() + " requested " + body.getExclusive() + ")");
+ + queue.isExclusive() + " requested " + body.getExclusive() + ")",
+ connection.getMethodRegistry());
}
else if((body.getAutoDelete() && queue.getLifetimePolicy() != LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS)
|| (!body.getAutoDelete() && queue.getLifetimePolicy() != ((body.getExclusive() && !body.getDurable()) ? LifetimePolicy.DELETE_ON_CONNECTION_CLOSE : LifetimePolicy.PERMANENT)))
{
throw body.getChannelException(AMQConstant.ALREADY_EXISTS,
"Cannot re-declare queue '" + queue.getName() + "' with different lifetime policy (was: "
- + queue.getLifetimePolicy() + " requested autodelete: " + body.getAutoDelete() + ")");
+ + queue.getLifetimePolicy() + " requested autodelete: " + body.getAutoDelete() + ")",
+ connection.getMethodRegistry());
}
else if(queue.isDurable() != body.getDurable())
{
throw body.getChannelException(AMQConstant.ALREADY_EXISTS,
"Cannot re-declare queue '" + queue.getName() + "' with different durability (was: "
- + queue.isDurable() + " requested " + body.getDurable() + ")");
+ + queue.isDurable() + " requested " + body.getDurable() + ")",
+ connection.getMethodRegistry());
}
}
catch (AccessControlException e)
{
- throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage());
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage(), connection.getMethodRegistry());
}
//set this as the default queue on the channel:
@@ -159,12 +164,12 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
if (!body.getNowait())
{
channel.sync();
- MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
+ MethodRegistry methodRegistry = connection.getMethodRegistry();
QueueDeclareOkBody responseBody =
methodRegistry.createQueueDeclareOkBody(queueName,
queue.getQueueDepthMessages(),
queue.getConsumerCount());
- protocolConnection.writeFrame(responseBody.generateFrame(channelId));
+ connection.writeFrame(responseBody.generateFrame(channelId));
_logger.info("Queue " + queueName + " declared successfully");
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java
index 84efcb68b8..123c076a25 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.protocol.v0_8.handler;
+import java.security.AccessControlException;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.QueueDeleteBody;
@@ -27,14 +29,10 @@ import org.apache.qpid.framing.QueueDeleteOkBody;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
-import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-import java.security.AccessControlException;
-
public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteBody>
{
private static final QueueDeleteHandler _instance = new QueueDeleteHandler();
@@ -57,18 +55,17 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB
}
- public void methodReceived(AMQStateManager stateManager, QueueDeleteBody body, int channelId) throws AMQException
+ public void methodReceived(final AMQProtocolSession<?> connection,
+ QueueDeleteBody body,
+ int channelId) throws AMQException
{
- AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
- VirtualHostImpl virtualHost = protocolConnection.getVirtualHost();
- DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
-
+ VirtualHostImpl virtualHost = connection.getVirtualHost();
- AMQChannel channel = protocolConnection.getChannel(channelId);
+ AMQChannel channel = connection.getChannel(channelId);
if (channel == null)
{
- throw body.getChannelNotFoundException(channelId);
+ throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
}
channel.sync();
AMQQueue queue;
@@ -87,26 +84,30 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB
{
if (_failIfNotFound)
{
- throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.");
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.",
+ connection.getMethodRegistry());
}
}
else
{
if (body.getIfEmpty() && !queue.isEmpty())
{
- throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.getQueue() + " is not empty.");
+ throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.getQueue() + " is not empty.",
+ connection.getMethodRegistry());
}
else if (body.getIfUnused() && !queue.isUnused())
{
// TODO - Error code
- throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.getQueue() + " is still used.");
+ throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.getQueue() + " is still used.",
+ connection.getMethodRegistry());
}
else
{
if (!queue.verifySessionAccess(channel))
{
throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
- "Queue " + queue.getName() + " is exclusive, but not created on this Connection.");
+ "Queue " + queue.getName() + " is exclusive, but not created on this Connection.",
+ connection.getMethodRegistry());
}
int purged = 0;
@@ -116,12 +117,12 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB
}
catch (AccessControlException e)
{
- throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage());
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage(), connection.getMethodRegistry());
}
- MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
+ MethodRegistry methodRegistry = connection.getMethodRegistry();
QueueDeleteOkBody responseBody = methodRegistry.createQueueDeleteOkBody(purged);
- protocolConnection.writeFrame(responseBody.generateFrame(channelId));
+ connection.writeFrame(responseBody.generateFrame(channelId));
}
}
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java
index 68ecf0324c..2c06fef1e2 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java
@@ -21,6 +21,8 @@
package org.apache.qpid.server.protocol.v0_8.handler;
+import java.security.AccessControlException;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.MethodRegistry;
@@ -28,13 +30,10 @@ import org.apache.qpid.framing.QueuePurgeBody;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
+import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-import java.security.AccessControlException;
-
public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBody>
{
private static final QueuePurgeHandler _instance = new QueuePurgeHandler();
@@ -56,15 +55,16 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod
_failIfNotFound = failIfNotFound;
}
- public void methodReceived(AMQStateManager stateManager, QueuePurgeBody body, int channelId) throws AMQException
+ public void methodReceived(final AMQProtocolSession<?> connection,
+ QueuePurgeBody body,
+ int channelId) throws AMQException
{
- AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
- VirtualHostImpl virtualHost = protocolConnection.getVirtualHost();
+ VirtualHostImpl virtualHost = connection.getVirtualHost();
- AMQChannel channel = protocolConnection.getChannel(channelId);
+ AMQChannel channel = connection.getChannel(channelId);
if (channel == null)
{
- throw body.getChannelNotFoundException(channelId);
+ throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
}
AMQQueue queue;
if(body.getQueue() == null)
@@ -77,7 +77,7 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod
{
if(_failIfNotFound)
{
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED,"No queue specified.");
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,"No queue specified.", connection.getMethodRegistry());
}
}
}
@@ -90,7 +90,8 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod
{
if(_failIfNotFound)
{
- throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.");
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.",
+ connection.getMethodRegistry());
}
}
else
@@ -98,7 +99,7 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod
if (!queue.verifySessionAccess(channel))
{
throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
- "Queue is exclusive, but not created on this Connection.");
+ "Queue is exclusive, but not created on this Connection.", connection.getMethodRegistry());
}
long purged = 0;
@@ -108,16 +109,16 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod
}
catch (AccessControlException e)
{
- throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage());
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage(), connection.getMethodRegistry());
}
if(!body.getNowait())
{
channel.sync();
- MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
+ MethodRegistry methodRegistry = connection.getMethodRegistry();
AMQMethodBody responseBody = methodRegistry.createQueuePurgeOkBody(purged);
- protocolConnection.writeFrame(responseBody.generateFrame(channelId));
+ connection.writeFrame(responseBody.generateFrame(channelId));
}
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java
index 4e2d1bc8a2..9ae66b889c 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.protocol.v0_8.handler;
+import java.security.AccessControlException;
+
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
@@ -33,13 +35,10 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
+import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-import java.security.AccessControlException;
-
public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindBody>
{
private static final Logger _log = Logger.getLogger(QueueUnbindHandler.class);
@@ -55,19 +54,20 @@ public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindB
{
}
- public void methodReceived(AMQStateManager stateManager, QueueUnbindBody body, int channelId) throws AMQException
+ public void methodReceived(final AMQProtocolSession<?> connection,
+ QueueUnbindBody body,
+ int channelId) throws AMQException
{
- AMQProtocolSession session = stateManager.getProtocolSession();
- VirtualHostImpl virtualHost = session.getVirtualHost();
+ VirtualHostImpl virtualHost = connection.getVirtualHost();
final AMQQueue queue;
final AMQShortString routingKey;
- AMQChannel channel = session.getChannel(channelId);
+ AMQChannel channel = connection.getChannel(channelId);
if (channel == null)
{
- throw body.getChannelNotFoundException(channelId);
+ throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
}
if (body.getQueue() == null)
@@ -77,7 +77,8 @@ public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindB
if (queue == null)
{
- throw body.getChannelException(AMQConstant.NOT_FOUND, "No default queue defined on channel and queue was null");
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "No default queue defined on channel and queue was null",
+ connection.getMethodRegistry());
}
routingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().intern(false);
@@ -91,23 +92,28 @@ public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindB
if (queue == null)
{
- throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.");
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.",
+ connection.getMethodRegistry());
}
if(isDefaultExchange(body.getExchange()))
{
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Cannot unbind the queue " + queue.getName() + " from the default exchange");
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Cannot unbind the queue "
+ + queue.getName()
+ + " from the default exchange", connection.getMethodRegistry());
}
final ExchangeImpl exch = virtualHost.getExchange(body.getExchange() == null ? null : body.getExchange().toString());
if (exch == null)
{
- throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.getExchange() + " does not exist.");
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.getExchange() + " does not exist.",
+ connection.getMethodRegistry());
}
if(!exch.hasBinding(String.valueOf(routingKey), queue))
{
- throw body.getChannelException(AMQConstant.NOT_FOUND,"No such binding");
+ throw body.getChannelException(AMQConstant.NOT_FOUND,"No such binding", connection.getMethodRegistry());
}
else
{
@@ -117,7 +123,7 @@ public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindB
}
catch (AccessControlException e)
{
- throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage());
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage(), connection.getMethodRegistry());
}
}
@@ -127,7 +133,7 @@ public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindB
_log.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + routingKey);
}
- final MethodRegistry registry = session.getMethodRegistry();
+ final MethodRegistry registry = connection.getMethodRegistry();
final AMQMethodBody responseBody;
if (registry instanceof MethodRegistry_0_9)
{
@@ -140,10 +146,10 @@ public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindB
else
{
// 0-8 does not support QueueUnbind
- throw new AMQException(AMQConstant.COMMAND_INVALID, "QueueUnbind not present in AMQP version: " + session.getProtocolVersion(), null);
+ throw new AMQException(AMQConstant.COMMAND_INVALID, "QueueUnbind not present in AMQP version: " + connection.getProtocolVersion(), null);
}
channel.sync();
- session.writeFrame(responseBody.generateFrame(channelId));
+ connection.writeFrame(responseBody.generateFrame(channelId));
}
protected boolean isDefaultExchange(final AMQShortString exchangeName)
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl.java
index 43e97c0cb6..429b5875bc 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl.java
@@ -20,20 +20,20 @@
*/
package org.apache.qpid.server.protocol.v0_8.handler;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
-
import java.util.HashMap;
import java.util.Map;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
+
public class ServerMethodDispatcherImpl implements MethodDispatcher
{
- private final AMQStateManager _stateManager;
+ private final AMQProtocolSession<?> _connection;
private static interface DispatcherFactory
{
- public MethodDispatcher createMethodDispatcher(AMQStateManager stateManager);
+ public MethodDispatcher createMethodDispatcher(AMQProtocolSession<?> connection);
}
private static final Map<ProtocolVersion, DispatcherFactory> _dispatcherFactories =
@@ -45,26 +45,26 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher
_dispatcherFactories.put(ProtocolVersion.v8_0,
new DispatcherFactory()
{
- public MethodDispatcher createMethodDispatcher(AMQStateManager stateManager)
+ public MethodDispatcher createMethodDispatcher(AMQProtocolSession<?> connection)
{
- return new ServerMethodDispatcherImpl_8_0(stateManager);
+ return new ServerMethodDispatcherImpl_8_0(connection);
}
});
_dispatcherFactories.put(ProtocolVersion.v0_9,
new DispatcherFactory()
{
- public MethodDispatcher createMethodDispatcher(AMQStateManager stateManager)
+ public MethodDispatcher createMethodDispatcher(AMQProtocolSession<?> connection)
{
- return new ServerMethodDispatcherImpl_0_9(stateManager);
+ return new ServerMethodDispatcherImpl_0_9(connection);
}
});
_dispatcherFactories.put(ProtocolVersion.v0_91,
new DispatcherFactory()
{
- public MethodDispatcher createMethodDispatcher(AMQStateManager stateManager)
+ public MethodDispatcher createMethodDispatcher(AMQProtocolSession<?> connection)
{
- return new ServerMethodDispatcherImpl_0_91(stateManager);
+ return new ServerMethodDispatcherImpl_0_91(connection);
}
});
@@ -103,82 +103,80 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher
- public static MethodDispatcher createMethodDispatcher(AMQStateManager stateManager, ProtocolVersion protocolVersion)
+ public static MethodDispatcher createMethodDispatcher(AMQProtocolSession<?> connection)
{
- return _dispatcherFactories.get(protocolVersion).createMethodDispatcher(stateManager);
+ return _dispatcherFactories.get(connection.getProtocolVersion()).createMethodDispatcher(connection);
}
- public ServerMethodDispatcherImpl(AMQStateManager stateManager)
+ public ServerMethodDispatcherImpl(AMQProtocolSession<?> connection)
{
- _stateManager = stateManager;
+ _connection = connection;
}
- protected AMQStateManager getStateManager()
+ protected final AMQProtocolSession<?> getConnection()
{
- return _stateManager;
+ return _connection;
}
-
-
public boolean dispatchAccessRequest(AccessRequestBody body, int channelId) throws AMQException
{
- _accessRequestHandler.methodReceived(_stateManager, body, channelId);
+ _accessRequestHandler.methodReceived(getConnection(), body, channelId);
return true;
}
public boolean dispatchBasicAck(BasicAckBody body, int channelId) throws AMQException
{
- _basicAckMethodHandler.methodReceived(_stateManager, body, channelId);
+ _basicAckMethodHandler.methodReceived(getConnection(), body, channelId);
return true;
}
public boolean dispatchBasicCancel(BasicCancelBody body, int channelId) throws AMQException
{
- _basicCancelMethodHandler.methodReceived(_stateManager, body, channelId);
+ _basicCancelMethodHandler.methodReceived(getConnection(), body, channelId);
return true;
}
public boolean dispatchBasicConsume(BasicConsumeBody body, int channelId) throws AMQException
{
- _basicConsumeMethodHandler.methodReceived(_stateManager, body, channelId);
+ _basicConsumeMethodHandler.methodReceived(getConnection(), body, channelId);
return true;
}
public boolean dispatchBasicGet(BasicGetBody body, int channelId) throws AMQException
{
- _basicGetMethodHandler.methodReceived(_stateManager, body, channelId);
+ _basicGetMethodHandler.methodReceived(getConnection(), body, channelId);
return true;
}
public boolean dispatchBasicPublish(BasicPublishBody body, int channelId) throws AMQException
{
- _basicPublishMethodHandler.methodReceived(_stateManager, body, channelId);
+ _basicPublishMethodHandler.methodReceived(getConnection(), body, channelId);
return true;
}
public boolean dispatchBasicQos(BasicQosBody body, int channelId) throws AMQException
{
- _basicQosHandler.methodReceived(_stateManager, body, channelId);
+ _basicQosHandler.methodReceived(getConnection(), body, channelId);
return true;
}
public boolean dispatchBasicRecover(BasicRecoverBody body, int channelId) throws AMQException
{
- _basicRecoverMethodHandler.methodReceived(_stateManager, body, channelId);
+ _basicRecoverMethodHandler.methodReceived(getConnection(), body, channelId);
return true;
}
public boolean dispatchBasicReject(BasicRejectBody body, int channelId) throws AMQException
{
- _basicRejectMethodHandler.methodReceived(_stateManager, body, channelId);
+ _basicRejectMethodHandler.methodReceived(getConnection(), body, channelId);
return true;
}
public boolean dispatchChannelOpen(ChannelOpenBody body, int channelId) throws AMQException
{
- _channelOpenHandler.methodReceived(_stateManager, body, channelId);
+ _channelOpenHandler.methodReceived(getConnection(), body, channelId);
return true;
}
@@ -225,21 +223,21 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher
public boolean dispatchChannelClose(ChannelCloseBody body, int channelId) throws AMQException
{
- _channelCloseHandler.methodReceived(_stateManager, body, channelId);
+ _channelCloseHandler.methodReceived(getConnection(), body, channelId);
return true;
}
public boolean dispatchChannelCloseOk(ChannelCloseOkBody body, int channelId) throws AMQException
{
- _channelCloseOkHandler.methodReceived(_stateManager, body, channelId);
+ _channelCloseOkHandler.methodReceived(getConnection(), body, channelId);
return true;
}
public boolean dispatchChannelFlow(ChannelFlowBody body, int channelId) throws AMQException
{
- _channelFlowHandler.methodReceived(_stateManager, body, channelId);
+ _channelFlowHandler.methodReceived(getConnection(), body, channelId);
return true;
}
@@ -256,21 +254,23 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher
public boolean dispatchConnectionOpen(ConnectionOpenBody body, int channelId) throws AMQException
{
- _connectionOpenMethodHandler.methodReceived(_stateManager, body, channelId);
+ _connectionOpenMethodHandler.methodReceived(getConnection(), body, channelId);
return true;
}
public boolean dispatchConnectionClose(ConnectionCloseBody body, int channelId) throws AMQException
{
- _connectionCloseMethodHandler.methodReceived(_stateManager, body, channelId);
+ _connectionCloseMethodHandler.methodReceived(getConnection(), body, channelId);
return true;
}
public boolean dispatchConnectionCloseOk(ConnectionCloseOkBody body, int channelId) throws AMQException
{
- _connectionCloseOkMethodHandler.methodReceived(_stateManager, body, channelId);
+ _connectionCloseOkMethodHandler.methodReceived(
+ getConnection(),
+ body, channelId);
return true;
}
@@ -299,15 +299,6 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher
throw new UnexpectedMethodException(body);
}
- public boolean dispatchDtxSelectOk(DtxSelectOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchDtxStartOk(DtxStartOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
public boolean dispatchExchangeBoundOk(ExchangeBoundOkBody body, int channelId) throws AMQException
{
@@ -324,46 +315,6 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher
throw new UnexpectedMethodException(body);
}
- public boolean dispatchFileCancelOk(FileCancelOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchFileConsumeOk(FileConsumeOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchFileDeliver(FileDeliverBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchFileOpen(FileOpenBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchFileOpenOk(FileOpenOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchFileQosOk(FileQosOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchFileReturn(FileReturnBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchFileStage(FileStageBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
public boolean dispatchQueueBindOk(QueueBindOkBody body, int channelId) throws AMQException
{
throw new UnexpectedMethodException(body);
@@ -384,31 +335,6 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher
throw new UnexpectedMethodException(body);
}
- public boolean dispatchStreamCancelOk(StreamCancelOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchStreamConsumeOk(StreamConsumeOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchStreamDeliver(StreamDeliverBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchStreamQosOk(StreamQosOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchStreamReturn(StreamReturnBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
public boolean dispatchTxCommitOk(TxCommitOkBody body, int channelId) throws AMQException
{
throw new UnexpectedMethodException(body);
@@ -427,144 +353,84 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher
public boolean dispatchConnectionSecureOk(ConnectionSecureOkBody body, int channelId) throws AMQException
{
- _connectionSecureOkMethodHandler.methodReceived(_stateManager, body, channelId);
+ _connectionSecureOkMethodHandler.methodReceived(
+ getConnection(),
+ body, channelId);
return true;
}
public boolean dispatchConnectionStartOk(ConnectionStartOkBody body, int channelId) throws AMQException
{
- _connectionStartOkMethodHandler.methodReceived(_stateManager, body, channelId);
+ _connectionStartOkMethodHandler.methodReceived(
+ getConnection(),
+ body, channelId);
return true;
}
public boolean dispatchConnectionTuneOk(ConnectionTuneOkBody body, int channelId) throws AMQException
{
- _connectionTuneOkMethodHandler.methodReceived(_stateManager, body, channelId);
+ _connectionTuneOkMethodHandler.methodReceived(getConnection(), body, channelId);
return true;
}
- public boolean dispatchDtxSelect(DtxSelectBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchDtxStart(DtxStartBody body, int channelId) throws AMQException
- {
- return false;
- }
-
public boolean dispatchExchangeBound(ExchangeBoundBody body, int channelId) throws AMQException
{
- _exchangeBoundHandler.methodReceived(_stateManager, body, channelId);
+ _exchangeBoundHandler.methodReceived(getConnection(), body, channelId);
return true;
}
public boolean dispatchExchangeDeclare(ExchangeDeclareBody body, int channelId) throws AMQException
{
- _exchangeDeclareHandler.methodReceived(_stateManager, body, channelId);
+ _exchangeDeclareHandler.methodReceived(getConnection(), body, channelId);
return true;
}
public boolean dispatchExchangeDelete(ExchangeDeleteBody body, int channelId) throws AMQException
{
- _exchangeDeleteHandler.methodReceived(_stateManager, body, channelId);
+ _exchangeDeleteHandler.methodReceived(getConnection(), body, channelId);
return true;
}
- public boolean dispatchFileAck(FileAckBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchFileCancel(FileCancelBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchFileConsume(FileConsumeBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchFilePublish(FilePublishBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchFileQos(FileQosBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchFileReject(FileRejectBody body, int channelId) throws AMQException
- {
- return false;
- }
-
public boolean dispatchQueueBind(QueueBindBody body, int channelId) throws AMQException
{
- _queueBindHandler.methodReceived(_stateManager, body, channelId);
+ _queueBindHandler.methodReceived(getConnection(), body, channelId);
return true;
}
public boolean dispatchQueueDeclare(QueueDeclareBody body, int channelId) throws AMQException
{
- _queueDeclareHandler.methodReceived(_stateManager, body, channelId);
+ _queueDeclareHandler.methodReceived(getConnection(), body, channelId);
return true;
}
public boolean dispatchQueueDelete(QueueDeleteBody body, int channelId) throws AMQException
{
- _queueDeleteHandler.methodReceived(_stateManager, body, channelId);
+ _queueDeleteHandler.methodReceived(getConnection(), body, channelId);
return true;
}
public boolean dispatchQueuePurge(QueuePurgeBody body, int channelId) throws AMQException
{
- _queuePurgeHandler.methodReceived(_stateManager, body, channelId);
+ _queuePurgeHandler.methodReceived(getConnection(), body, channelId);
return true;
}
- public boolean dispatchStreamCancel(StreamCancelBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchStreamConsume(StreamConsumeBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchStreamPublish(StreamPublishBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchStreamQos(StreamQosBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchTunnelRequest(TunnelRequestBody body, int channelId) throws AMQException
- {
- return false;
- }
public boolean dispatchTxCommit(TxCommitBody body, int channelId) throws AMQException
{
- _txCommitHandler.methodReceived(_stateManager, body, channelId);
+ _txCommitHandler.methodReceived(getConnection(), body, channelId);
return true;
}
public boolean dispatchTxRollback(TxRollbackBody body, int channelId) throws AMQException
{
- _txRollbackHandler.methodReceived(_stateManager, body, channelId);
+ _txRollbackHandler.methodReceived(getConnection(), body, channelId);
return true;
}
public boolean dispatchTxSelect(TxSelectBody body, int channelId) throws AMQException
{
- _txSelectHandler.methodReceived(_stateManager, body, channelId);
+ _txSelectHandler.methodReceived(getConnection(), body, channelId);
return true;
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_0_9.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_0_9.java
index 1ee6d732c2..4d66d0f999 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_0_9.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_0_9.java
@@ -24,7 +24,7 @@ package org.apache.qpid.server.protocol.v0_8.handler;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.*;
import org.apache.qpid.framing.amqp_0_9.MethodDispatcher_0_9;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
+import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
@@ -40,14 +40,14 @@ public class ServerMethodDispatcherImpl_0_9
QueueUnbindHandler.getInstance();
- public ServerMethodDispatcherImpl_0_9(AMQStateManager stateManager)
+ public ServerMethodDispatcherImpl_0_9(AMQProtocolSession<?> connection)
{
- super(stateManager);
+ super(connection);
}
public boolean dispatchBasicRecoverSync(BasicRecoverSyncBody body, int channelId) throws AMQException
{
- _basicRecoverSyncMethodHandler.methodReceived(getStateManager(), body, channelId);
+ _basicRecoverSyncMethodHandler.methodReceived(getConnection(), body, channelId);
return true;
}
@@ -56,101 +56,6 @@ public class ServerMethodDispatcherImpl_0_9
throw new UnexpectedMethodException(body);
}
- public boolean dispatchChannelOk(ChannelOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchChannelPing(ChannelPingBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchChannelPong(ChannelPongBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchChannelResume(ChannelResumeBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageAppend(MessageAppendBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageCancel(MessageCancelBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageCheckpoint(MessageCheckpointBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageClose(MessageCloseBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageConsume(MessageConsumeBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageEmpty(MessageEmptyBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageGet(MessageGetBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageOffset(MessageOffsetBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageOk(MessageOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageOpen(MessageOpenBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageQos(MessageQosBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageRecover(MessageRecoverBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageReject(MessageRejectBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageResume(MessageResumeBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageTransfer(MessageTransferBody body, int channelId) throws AMQException
- {
- return false;
- }
-
public boolean dispatchQueueUnbindOk(QueueUnbindOkBody body, int channelId) throws AMQException
{
throw new UnexpectedMethodException(body);
@@ -158,7 +63,7 @@ public class ServerMethodDispatcherImpl_0_9
public boolean dispatchQueueUnbind(QueueUnbindBody body, int channelId) throws AMQException
{
- _queueUnbindHandler.methodReceived(getStateManager(),body,channelId);
+ _queueUnbindHandler.methodReceived(getConnection(), body,channelId);
return true;
}
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_0_91.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_0_91.java
index b11b9cff2b..711dfe12ff 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_0_91.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_0_91.java
@@ -24,7 +24,7 @@ package org.apache.qpid.server.protocol.v0_8.handler;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.*;
import org.apache.qpid.framing.amqp_0_91.MethodDispatcher_0_91;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
+import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
public class ServerMethodDispatcherImpl_0_91
@@ -39,14 +39,14 @@ public class ServerMethodDispatcherImpl_0_91
QueueUnbindHandler.getInstance();
- public ServerMethodDispatcherImpl_0_91(AMQStateManager stateManager)
+ public ServerMethodDispatcherImpl_0_91(AMQProtocolSession<?> connection)
{
- super(stateManager);
+ super(connection);
}
public boolean dispatchBasicRecoverSync(BasicRecoverSyncBody body, int channelId) throws AMQException
{
- _basicRecoverSyncMethodHandler.methodReceived(getStateManager(), body, channelId);
+ _basicRecoverSyncMethodHandler.methodReceived(getConnection(), body, channelId);
return true;
}
@@ -55,106 +55,6 @@ public class ServerMethodDispatcherImpl_0_91
throw new UnexpectedMethodException(body);
}
- public boolean dispatchChannelOk(ChannelOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchChannelPing(ChannelPingBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchChannelPong(ChannelPongBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchChannelResume(ChannelResumeBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageAppend(MessageAppendBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageCancel(MessageCancelBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageCheckpoint(MessageCheckpointBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageClose(MessageCloseBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageConsume(MessageConsumeBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageEmpty(MessageEmptyBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageGet(MessageGetBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageOffset(MessageOffsetBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageOk(MessageOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageOpen(MessageOpenBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageQos(MessageQosBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageRecover(MessageRecoverBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageReject(MessageRejectBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageResume(MessageResumeBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageTransfer(MessageTransferBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchBasicRecoverOk(BasicRecoverOkBody body, int channelId) throws AMQException
- {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
public boolean dispatchQueueUnbindOk(QueueUnbindOkBody body, int channelId) throws AMQException
{
throw new UnexpectedMethodException(body);
@@ -162,7 +62,7 @@ public class ServerMethodDispatcherImpl_0_91
public boolean dispatchQueueUnbind(QueueUnbindBody body, int channelId) throws AMQException
{
- _queueUnbindHandler.methodReceived(getStateManager(),body,channelId);
+ _queueUnbindHandler.methodReceived(getConnection(), body,channelId);
return true;
}
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_8_0.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_8_0.java
index f05219712f..df2306428f 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_8_0.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_8_0.java
@@ -23,24 +23,16 @@ package org.apache.qpid.server.protocol.v0_8.handler;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicRecoverOkBody;
import org.apache.qpid.framing.ChannelAlertBody;
-import org.apache.qpid.framing.TestContentBody;
-import org.apache.qpid.framing.TestContentOkBody;
-import org.apache.qpid.framing.TestIntegerBody;
-import org.apache.qpid.framing.TestIntegerOkBody;
-import org.apache.qpid.framing.TestStringBody;
-import org.apache.qpid.framing.TestStringOkBody;
-import org.apache.qpid.framing.TestTableBody;
-import org.apache.qpid.framing.TestTableOkBody;
import org.apache.qpid.framing.amqp_8_0.MethodDispatcher_8_0;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
+import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
public class ServerMethodDispatcherImpl_8_0
extends ServerMethodDispatcherImpl
implements MethodDispatcher_8_0
{
- public ServerMethodDispatcherImpl_8_0(AMQStateManager stateManager)
+ public ServerMethodDispatcherImpl_8_0(AMQProtocolSession<?> connection)
{
- super(stateManager);
+ super(connection);
}
public boolean dispatchBasicRecoverOk(BasicRecoverOkBody body, int channelId) throws AMQException
@@ -53,43 +45,4 @@ public class ServerMethodDispatcherImpl_8_0
throw new UnexpectedMethodException(body);
}
- public boolean dispatchTestContent(TestContentBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchTestContentOk(TestContentOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchTestInteger(TestIntegerBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchTestIntegerOk(TestIntegerOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchTestString(TestStringBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchTestStringOk(TestStringOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchTestTable(TestTableBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchTestTableOk(TestTableOkBody body, int channelId) throws AMQException
- {
- return false;
- }
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxCommitHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxCommitHandler.java
index b257030a59..cb08b1fd4f 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxCommitHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxCommitHandler.java
@@ -28,7 +28,6 @@ import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.TxCommitBody;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
public class TxCommitHandler implements StateAwareMethodListener<TxCommitBody>
@@ -46,21 +45,21 @@ public class TxCommitHandler implements StateAwareMethodListener<TxCommitBody>
{
}
- public void methodReceived(AMQStateManager stateManager, TxCommitBody body, final int channelId) throws AMQException
+ public void methodReceived(final AMQProtocolSession<?> connection,
+ TxCommitBody body,
+ final int channelId) throws AMQException
{
- final AMQProtocolSession session = stateManager.getProtocolSession();
-
try
{
if (_log.isDebugEnabled())
{
_log.debug("Commit received on channel " + channelId);
}
- AMQChannel channel = session.getChannel(channelId);
+ AMQChannel channel = connection.getChannel(channelId);
if (channel == null)
{
- throw body.getChannelNotFoundException(channelId);
+ throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
}
channel.commit(new Runnable()
{
@@ -68,9 +67,9 @@ public class TxCommitHandler implements StateAwareMethodListener<TxCommitBody>
@Override
public void run()
{
- MethodRegistry methodRegistry = session.getMethodRegistry();
+ MethodRegistry methodRegistry = connection.getMethodRegistry();
AMQMethodBody responseBody = methodRegistry.createTxCommitOkBody();
- session.writeFrame(responseBody.generateFrame(channelId));
+ connection.writeFrame(responseBody.generateFrame(channelId));
}
}, true);
@@ -79,7 +78,8 @@ public class TxCommitHandler implements StateAwareMethodListener<TxCommitBody>
}
catch (AMQException e)
{
- throw body.getChannelException(e.getErrorCode(), "Failed to commit: " + e.getMessage());
+ throw body.getChannelException(e.getErrorCode(), "Failed to commit: " + e.getMessage(),
+ connection.getMethodRegistry());
}
}
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxRollbackHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxRollbackHandler.java
index 69ad1a0a21..08c1c2378b 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxRollbackHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxRollbackHandler.java
@@ -26,7 +26,6 @@ import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.TxRollbackBody;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
public class TxRollbackHandler implements StateAwareMethodListener<TxRollbackBody>
@@ -42,22 +41,22 @@ public class TxRollbackHandler implements StateAwareMethodListener<TxRollbackBod
{
}
- public void methodReceived(AMQStateManager stateManager, TxRollbackBody body, final int channelId) throws AMQException
+ public void methodReceived(final AMQProtocolSession<?> connection,
+ TxRollbackBody body,
+ final int channelId) throws AMQException
{
- final AMQProtocolSession session = stateManager.getProtocolSession();
-
try
{
- AMQChannel channel = session.getChannel(channelId);
+ AMQChannel channel = connection.getChannel(channelId);
if (channel == null)
{
- throw body.getChannelNotFoundException(channelId);
+ throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
}
- final MethodRegistry methodRegistry = session.getMethodRegistry();
+ final MethodRegistry methodRegistry = connection.getMethodRegistry();
final AMQMethodBody responseBody = methodRegistry.createTxRollbackOkBody();
Runnable task = new Runnable()
@@ -65,7 +64,7 @@ public class TxRollbackHandler implements StateAwareMethodListener<TxRollbackBod
public void run()
{
- session.writeFrame(responseBody.generateFrame(channelId));
+ connection.writeFrame(responseBody.generateFrame(channelId));
}
};
@@ -79,7 +78,8 @@ public class TxRollbackHandler implements StateAwareMethodListener<TxRollbackBod
}
catch (AMQException e)
{
- throw body.getChannelException(e.getErrorCode(), "Failed to rollback: " + e.getMessage());
+ throw body.getChannelException(e.getErrorCode(), "Failed to rollback: " + e.getMessage(),
+ connection.getMethodRegistry());
}
}
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxSelectHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxSelectHandler.java
index a43e1ebdab..d6ac194b09 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxSelectHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxSelectHandler.java
@@ -26,7 +26,6 @@ import org.apache.qpid.framing.TxSelectBody;
import org.apache.qpid.framing.TxSelectOkBody;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
public class TxSelectHandler implements StateAwareMethodListener<TxSelectBody>
@@ -42,21 +41,21 @@ public class TxSelectHandler implements StateAwareMethodListener<TxSelectBody>
{
}
- public void methodReceived(AMQStateManager stateManager, TxSelectBody body, int channelId) throws AMQException
+ public void methodReceived(final AMQProtocolSession<?> connection,
+ TxSelectBody body,
+ int channelId) throws AMQException
{
- AMQProtocolSession session = stateManager.getProtocolSession();
-
- AMQChannel channel = session.getChannel(channelId);
+ AMQChannel channel = connection.getChannel(channelId);
if (channel == null)
{
- throw body.getChannelNotFoundException(channelId);
+ throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
}
channel.setLocalTransactional();
- MethodRegistry methodRegistry = session.getMethodRegistry();
+ MethodRegistry methodRegistry = connection.getMethodRegistry();
TxSelectOkBody responseBody = methodRegistry.createTxSelectOkBody();
- session.writeFrame(responseBody.generateFrame(channelId));
+ connection.writeFrame(responseBody.generateFrame(channelId));
}
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java
deleted file mode 100644
index 3c1f1dedc3..0000000000
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8.state;
-
-import java.security.PrivilegedActionException;
-import java.security.PrivilegedExceptionAction;
-
-import javax.security.auth.Subject;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.ChannelCloseBody;
-import org.apache.qpid.framing.ChannelCloseOkBody;
-import org.apache.qpid.framing.ChannelOpenBody;
-import org.apache.qpid.framing.MethodDispatcher;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.protocol.AMQMethodListener;
-import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.protocol.v0_8.AMQChannel;
-import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.security.SubjectCreator;
-import org.apache.qpid.server.util.ServerScopedRuntimeException;
-
-/**
- * The state manager is responsible for managing the state of the protocol session.
- * <p>
- * For each AMQProtocolHandler there is a separate state manager.
- */
-public class AMQStateManager implements AMQMethodListener
-{
- private static final Logger _logger = Logger.getLogger(AMQStateManager.class);
-
- private final Broker<?> _broker;
- private final AMQProtocolSession _protocolSession;
- /** The current state */
- private AMQState _currentState;
-
- public AMQStateManager(Broker<?> broker, AMQProtocolSession protocolSession)
- {
- _broker = broker;
- _protocolSession = protocolSession;
- _currentState = AMQState.CONNECTION_NOT_STARTED;
-
- }
-
- /**
- * Get the Broker instance
- *
- * @return the Broker
- */
- public Broker<?> getBroker()
- {
- return _broker;
- }
-
- public void changeState(AMQState newState)
- {
- _logger.debug("State changing to " + newState + " from old state " + _currentState);
- final AMQState oldState = _currentState;
- _currentState = newState;
-
- }
-
- public void error(Exception e)
- {
- _logger.error("State manager received error notification[Current State:" + _currentState + "]: " + e, e);
- }
-
- public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws AMQException
- {
- final MethodDispatcher dispatcher = _protocolSession.getMethodDispatcher();
-
- final int channelId = evt.getChannelId();
- final B body = evt.getMethod();
-
- final AMQChannel channel = _protocolSession.getChannel(channelId);
- if(channelId != 0 && channel == null)
- {
-
- if(! ((body instanceof ChannelOpenBody)
- || (body instanceof ChannelCloseOkBody)
- || (body instanceof ChannelCloseBody)))
- {
- throw body.getConnectionException(AMQConstant.CHANNEL_ERROR, "channel is closed won't process:" + body);
- }
-
- }
- if(channel == null)
- {
- return body.execute(dispatcher, channelId);
- }
- else
- {
- try
- {
- return Subject.doAs(channel.getSubject(), new PrivilegedExceptionAction<Boolean>()
- {
- @Override
- public Boolean run() throws AMQException
- {
- return body.execute(dispatcher, channelId);
- }
- });
- }
- catch (PrivilegedActionException e)
- {
- if(e.getCause() instanceof AMQException)
- {
- throw (AMQException) e.getCause();
- }
- else
- {
- throw new ServerScopedRuntimeException(e.getCause());
- }
- }
-
-
- }
-
- }
-
- public AMQProtocolSession getProtocolSession()
- {
- return _protocolSession;
- }
-
-
- public SubjectCreator getSubjectCreator()
- {
- return _broker.getSubjectCreator(getProtocolSession().getLocalAddress(), getProtocolSession().getTransport().isSecure());
- }
-}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/StateAwareMethodListener.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/StateAwareMethodListener.java
index 63ab23919d..d767c7e326 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/StateAwareMethodListener.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/StateAwareMethodListener.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v0_8.state;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
/**
* A frame listener that is informed of the protocol state when invoked and has
@@ -30,5 +31,5 @@ import org.apache.qpid.framing.AMQMethodBody;
*/
public interface StateAwareMethodListener<B extends AMQMethodBody>
{
- void methodReceived(AMQStateManager stateManager, B evt, int channelId) throws AMQException;
+ void methodReceived(final AMQProtocolSession<?> connection, B evt, int channelId) throws AMQException;
}