diff options
Diffstat (limited to 'java/broker-plugins/amqp-0-8-protocol/src')
40 files changed, 551 insertions, 972 deletions
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index f6ef4256d0..4e1af5ddd6 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -464,8 +464,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> AMQConstant.NO_ROUTE, "No route for message " + currentMessageDescription(), 0, 0, // default class and method ids - getProtocolSession().getProtocolVersion().getMajorVersion(), - getProtocolSession().getProtocolVersion().getMinorVersion(), + getProtocolSession().getMethodRegistry(), (Throwable) null); } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index a59f173d2f..1a746447b0 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -27,6 +27,8 @@ import java.nio.ByteBuffer; import java.security.AccessController; import java.security.Principal; import java.security.PrivilegedAction; +import java.security.PrivilegedActionException; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -74,7 +76,7 @@ import org.apache.qpid.server.protocol.v0_8.handler.ServerMethodDispatcherImpl; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverterRegistry; import org.apache.qpid.server.protocol.v0_8.state.AMQState; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; +import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.util.Action; @@ -86,7 +88,7 @@ import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.util.BytesDataOutput; -public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSession<AMQProtocolEngine> +public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSession<AMQProtocolEngine>, MethodRegistrySource { private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class); @@ -123,7 +125,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private final Set<AMQChannel<AMQProtocolEngine>> _channelsForCurrentMessage = new HashSet<AMQChannel<AMQProtocolEngine>>(); - private final AMQStateManager _stateManager; + /** The current state */ + private AMQState _currentState; private AMQDecoder _decoder; @@ -189,7 +192,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi _transport = transport; _maxNoOfChannels = broker.getConnection_sessionCountLimit(); _receivedLock = new ReentrantLock(); - _stateManager = new AMQStateManager(broker, this); _decoder = new AMQDecoder(true, this); _connectionID = connectionId; _logSubject = new ConnectionLogSubject(this); @@ -623,7 +625,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi { try { - boolean wasAnyoneInterested = _stateManager.methodReceived(evt); + boolean wasAnyoneInterested = methodReceived(evt); if (!wasAnyoneInterested) { @@ -654,9 +656,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi _logger.info("Closing connection due to: " + e.getMessage()); } - AMQConnectionException ce = - evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR, - AMQConstant.CHANNEL_ERROR.getName().toString()); + AMQConnectionException ce = new AMQConnectionException(AMQConstant.CHANNEL_ERROR, + AMQConstant.CHANNEL_ERROR.getName().toString(), + methodBody, getMethodRegistry()); _logger.info(e.getMessage() + " whilst processing:" + methodBody); closeConnection(channelId, ce); @@ -675,6 +677,58 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi closeProtocolSession(); } } + private <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws AMQException + { + final MethodDispatcher dispatcher = getMethodDispatcher(); + + final int channelId = evt.getChannelId(); + final B body = evt.getMethod(); + + final AMQChannel channel = getChannel(channelId); + if(channelId != 0 && channel == null) + { + + if(! ((body instanceof ChannelOpenBody) + || (body instanceof ChannelCloseOkBody) + || (body instanceof ChannelCloseBody))) + { + throw new AMQConnectionException(AMQConstant.CHANNEL_ERROR, "channel is closed won't process:" + body, body, getMethodRegistry()); + } + + } + if(channel == null) + { + return body.execute(dispatcher, channelId); + } + else + { + try + { + return Subject.doAs(channel.getSubject(), new PrivilegedExceptionAction<Boolean>() + { + @Override + public Boolean run() throws AMQException + { + return body.execute(dispatcher, channelId); + } + }); + } + catch (PrivilegedActionException e) + { + if(e.getCause() instanceof AMQException) + { + throw (AMQException) e.getCause(); + } + else + { + throw new ServerScopedRuntimeException(e.getCause()); + } + } + + + } + + } public void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException { @@ -1056,8 +1110,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi { try { - _stateManager.changeState(AMQState.CONNECTION_CLOSING); - writeFrame(e.getCloseFrame(channelId)); + changeState(AMQState.CONNECTION_CLOSING); + writeFrame(e.getCloseFrame()); } finally { @@ -1075,7 +1129,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi try { - _stateManager.changeState(AMQState.CONNECTION_CLOSED); + changeState(AMQState.CONNECTION_CLOSED); } catch (ConnectionScopedRuntimeException e) { @@ -1176,7 +1230,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi _protocolVersion = pv; _methodRegistry = MethodRegistry.getMethodRegistry(_protocolVersion); _protocolOutputConverter = ProtocolOutputConverterRegistry.getConverter(this); - _dispatcher = ServerMethodDispatcherImpl.createMethodDispatcher(_stateManager, _protocolVersion); + _dispatcher = ServerMethodDispatcherImpl.createMethodDispatcher(this); } public byte getProtocolMajorVersion() @@ -1478,9 +1532,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi public void close(AMQConstant cause, String message) { closeConnection(0, new AMQConnectionException(cause, message, 0, 0, - getProtocolOutputConverter().getProtocolMajorVersion(), - getProtocolOutputConverter().getProtocolMinorVersion(), - null)); + getMethodRegistry(), + null)); } public void block() @@ -1674,6 +1727,25 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return _messageCompressionThreshold; } + @Override + public void changeState(AMQState newState) + { + _logger.debug("State changing to " + newState + " from old state " + _currentState); + _currentState = newState; + } + + @Override + public Broker<?> getBroker() + { + return _broker; + } + + @Override + public SubjectCreator getSubjectCreator() + { + return _broker.getSubjectCreator(getLocalAddress(), getTransport().isSecure()); + } + public EventLogger getEventLogger() { if(_virtualHost != null) diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java index 587669dadc..709ea409bf 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java @@ -35,9 +35,12 @@ import org.apache.qpid.framing.MethodDispatcher; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; +import org.apache.qpid.server.protocol.v0_8.state.AMQState; import org.apache.qpid.server.security.AuthorizationHolder; +import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.virtualhost.VirtualHostImpl; @@ -214,4 +217,10 @@ public interface AMQProtocolSession<T extends AMQProtocolSession<T>> boolean isCompressionSupported(); int getMessageCompressionThreshold(); + + void changeState(AMQState state); + + Broker<?> getBroker(); + + SubjectCreator getSubjectCreator(); } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/AccessRequestHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/AccessRequestHandler.java index ae07d60c4e..bf89a812b9 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/AccessRequestHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/AccessRequestHandler.java @@ -30,7 +30,6 @@ import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0; import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
/**
@@ -52,16 +51,17 @@ public class AccessRequestHandler implements StateAwareMethodListener<AccessRequ {
}
- public void methodReceived(AMQStateManager stateManager, AccessRequestBody body, int channelId) throws AMQException
+ public void methodReceived(final AMQProtocolSession<?> connection,
+ AccessRequestBody body,
+ int channelId) throws AMQException
{
- AMQProtocolSession session = stateManager.getProtocolSession();
- final AMQChannel channel = session.getChannel(channelId);
+ final AMQChannel channel = connection.getChannel(channelId);
if (channel == null)
{
- throw body.getChannelNotFoundException(channelId);
+ throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
}
- MethodRegistry methodRegistry = session.getMethodRegistry();
+ MethodRegistry methodRegistry = connection.getMethodRegistry();
// We don't implement access control class, but to keep clients happy that expect it
// always use the "0" ticket.
@@ -80,6 +80,6 @@ public class AccessRequestHandler implements StateAwareMethodListener<AccessRequ }
channel.sync();
- session.writeFrame(response.generateFrame(channelId));
+ connection.writeFrame(response.generateFrame(channelId));
}
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicAckMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicAckMethodHandler.java index f623d27e87..efc91800a1 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicAckMethodHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicAckMethodHandler.java @@ -26,7 +26,6 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.BasicAckBody; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; public class BasicAckMethodHandler implements StateAwareMethodListener<BasicAckBody> @@ -44,21 +43,21 @@ public class BasicAckMethodHandler implements StateAwareMethodListener<BasicAckB { } - public void methodReceived(AMQStateManager stateManager, BasicAckBody body, int channelId) throws AMQException + public void methodReceived(final AMQProtocolSession<?> connection, + BasicAckBody body, + int channelId) throws AMQException { - AMQProtocolSession protocolSession = stateManager.getProtocolSession(); - if (_log.isDebugEnabled()) { _log.debug("Ack(Tag:" + body.getDeliveryTag() + ":Mult:" + body.getMultiple() + ") received on channel " + channelId); } - final AMQChannel channel = protocolSession.getChannel(channelId); + final AMQChannel channel = connection.getChannel(channelId); if (channel == null) { - throw body.getChannelNotFoundException(channelId); + throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); } // this method throws an AMQException if the delivery tag is not known diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicCancelMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicCancelMethodHandler.java index 5a6a7bdc18..16498b3e88 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicCancelMethodHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicCancelMethodHandler.java @@ -28,7 +28,6 @@ import org.apache.qpid.framing.BasicCancelOkBody; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; public class BasicCancelMethodHandler implements StateAwareMethodListener<BasicCancelBody> @@ -46,16 +45,16 @@ public class BasicCancelMethodHandler implements StateAwareMethodListener<BasicC { } - public void methodReceived(AMQStateManager stateManager, BasicCancelBody body, int channelId) throws AMQException + public void methodReceived(final AMQProtocolSession<?> connection, + BasicCancelBody body, + int channelId) throws AMQException { - AMQProtocolSession session = stateManager.getProtocolSession(); - - final AMQChannel channel = session.getChannel(channelId); + final AMQChannel channel = connection.getChannel(channelId); if (channel == null) { - throw body.getChannelNotFoundException(channelId); + throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); } if (_log.isDebugEnabled()) @@ -67,10 +66,10 @@ public class BasicCancelMethodHandler implements StateAwareMethodListener<BasicC channel.unsubscribeConsumer(body.getConsumerTag()); if (!body.getNowait()) { - MethodRegistry methodRegistry = session.getMethodRegistry(); + MethodRegistry methodRegistry = connection.getMethodRegistry(); BasicCancelOkBody cancelOkBody = methodRegistry.createBasicCancelOkBody(body.getConsumerTag()); channel.sync(); - session.writeFrame(cancelOkBody.generateFrame(channelId)); + connection.writeFrame(cancelOkBody.generateFrame(channelId)); } } } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java index c1e3d850ef..b4219fe29c 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java @@ -36,7 +36,6 @@ import org.apache.qpid.server.filter.AMQInvalidArgumentException; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHostImpl; @@ -56,16 +55,16 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic { } - public void methodReceived(AMQStateManager stateManager, BasicConsumeBody body, int channelId) throws AMQException + public void methodReceived(final AMQProtocolSession<?> connection, + BasicConsumeBody body, + int channelId) throws AMQException { - AMQProtocolSession protocolConnection = stateManager.getProtocolSession(); - - AMQChannel channel = protocolConnection.getChannel(channelId); - VirtualHostImpl<?,?,?> vHost = protocolConnection.getVirtualHost(); + AMQChannel channel = connection.getChannel(channelId); + VirtualHostImpl<?,?,?> vHost = connection.getVirtualHost(); if (channel == null) { - throw body.getChannelNotFoundException(channelId); + throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); } else { @@ -119,12 +118,12 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic if (queueName != null) { String msg = "No such queue, '" + queueName + "'"; - throw body.getChannelException(AMQConstant.NOT_FOUND, msg); + throw body.getChannelException(AMQConstant.NOT_FOUND, msg, connection.getMethodRegistry()); } else { String msg = "No queue name provided, no default queue defined."; - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, msg); + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, msg, connection.getMethodRegistry()); } } else @@ -153,9 +152,9 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic body.getNoLocal()); if (!body.getNowait()) { - MethodRegistry methodRegistry = protocolConnection.getMethodRegistry(); + MethodRegistry methodRegistry = connection.getMethodRegistry(); AMQMethodBody responseBody = methodRegistry.createBasicConsumeOkBody(consumerTag); - protocolConnection.writeFrame(responseBody.generateFrame(channelId)); + connection.writeFrame(responseBody.generateFrame(channelId)); } } @@ -163,12 +162,12 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic { AMQShortString msg = AMQShortString.validValueOf("Non-unique consumer tag, '" + body.getConsumerTag() + "'"); - MethodRegistry methodRegistry = protocolConnection.getMethodRegistry(); + MethodRegistry methodRegistry = connection.getMethodRegistry(); AMQMethodBody responseBody = methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(), // replyCode msg, // replytext body.getClazz(), body.getMethod()); - protocolConnection.writeFrame(responseBody.generateFrame(0)); + connection.writeFrame(responseBody.generateFrame(0)); } } @@ -176,12 +175,12 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic { _logger.debug("Closing connection due to invalid selector"); - MethodRegistry methodRegistry = protocolConnection.getMethodRegistry(); + MethodRegistry methodRegistry = connection.getMethodRegistry(); AMQMethodBody responseBody = methodRegistry.createChannelCloseBody(AMQConstant.ARGUMENT_INVALID.getCode(), AMQShortString.validValueOf(ise.getMessage()), body.getClazz(), body.getMethod()); - protocolConnection.writeFrame(responseBody.generateFrame(channelId)); + connection.writeFrame(responseBody.generateFrame(channelId)); } @@ -190,28 +189,28 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Cannot subscribe to queue " + queue.getName() - + " as it already has an existing exclusive consumer"); + + " as it already has an existing exclusive consumer", connection.getMethodRegistry()); } catch (AMQQueue.ExistingConsumerPreventsExclusive e) { throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Cannot subscribe to queue " + queue.getName() - + " exclusively as it already has a consumer"); + + " exclusively as it already has a consumer", connection.getMethodRegistry()); } catch (AccessControlException e) { throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Cannot subscribe to queue " + queue.getName() - + " permission denied"); + + " permission denied", connection.getMethodRegistry()); } catch (MessageSource.ConsumerAccessRefused consumerAccessRefused) { throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Cannot subscribe to queue " + queue.getName() - + " as it already has an incompatible exclusivity policy"); + + " as it already has an incompatible exclusivity policy", connection.getMethodRegistry()); } } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java index c3bdedf44d..d650292546 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java @@ -45,7 +45,6 @@ import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.v0_8.ClientDeliveryMethod; import org.apache.qpid.server.protocol.v0_8.ConsumerTarget_0_8; import org.apache.qpid.server.protocol.v0_8.RecordDeliveryMethod; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHostImpl; @@ -65,17 +64,17 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB { } - public void methodReceived(AMQStateManager stateManager, BasicGetBody body, int channelId) throws AMQException + public void methodReceived(final AMQProtocolSession<?> connection, + BasicGetBody body, + int channelId) throws AMQException { - AMQProtocolSession protocolConnection = stateManager.getProtocolSession(); + VirtualHostImpl vHost = connection.getVirtualHost(); - VirtualHostImpl vHost = protocolConnection.getVirtualHost(); - - AMQChannel channel = protocolConnection.getChannel(channelId); + AMQChannel channel = connection.getChannel(channelId); if (channel == null) { - throw body.getChannelNotFoundException(channelId); + throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); } else { @@ -87,12 +86,12 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB if(body.getQueue()!=null) { throw body.getConnectionException(AMQConstant.NOT_FOUND, - "No such queue, '" + body.getQueue()+ "'"); + "No such queue, '" + body.getQueue()+ "'", connection.getMethodRegistry()); } else { throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "No queue name provided, no default queue defined."); + "No queue name provided, no default queue defined.", connection.getMethodRegistry()); } } else @@ -100,36 +99,37 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB try { - if (!performGet(queue,protocolConnection, channel, !body.getNoAck())) + if (!performGet(queue,connection, channel, !body.getNoAck())) { - MethodRegistry methodRegistry = protocolConnection.getMethodRegistry(); + MethodRegistry methodRegistry = connection.getMethodRegistry(); // TODO - set clusterId BasicGetEmptyBody responseBody = methodRegistry.createBasicGetEmptyBody(null); - protocolConnection.writeFrame(responseBody.generateFrame(channelId)); + connection.writeFrame(responseBody.generateFrame(channelId)); } } catch (AccessControlException e) { throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, - e.getMessage()); + e.getMessage(), connection.getMethodRegistry()); } catch (MessageSource.ExistingExclusiveConsumer e) { throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Queue has an exclusive consumer"); + "Queue has an exclusive consumer", connection.getMethodRegistry()); } catch (MessageSource.ExistingConsumerPreventsExclusive e) { throw body.getConnectionException(AMQConstant.INTERNAL_ERROR, "The GET request has been evaluated as an exclusive consumer, " + - "this is likely due to a programming error in the Qpid broker"); + "this is likely due to a programming error in the Qpid broker", + connection.getMethodRegistry()); } catch (MessageSource.ConsumerAccessRefused consumerAccessRefused) { throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Queue has an incompatible exclusivit policy"); + "Queue has an incompatible exclusivit policy", connection.getMethodRegistry()); } } } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java index 0d09c31ec8..e90744ab5e 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.protocol.v0_8.handler; +import java.security.AccessControlException; + import org.apache.log4j.Logger; import org.apache.qpid.AMQException; @@ -30,12 +32,9 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; import org.apache.qpid.server.virtualhost.VirtualHostImpl; -import java.security.AccessControlException; - public class BasicPublishMethodHandler implements StateAwareMethodListener<BasicPublishBody> { private static final Logger _logger = Logger.getLogger(BasicPublishMethodHandler.class); @@ -52,16 +51,17 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basic { } - public void methodReceived(AMQStateManager stateManager, BasicPublishBody body, int channelId) throws AMQException + public void methodReceived(final AMQProtocolSession<?> connection, + BasicPublishBody body, + int channelId) throws AMQException { - AMQProtocolSession session = stateManager.getProtocolSession(); if (_logger.isDebugEnabled()) { _logger.debug("Publish received on channel " + channelId); } AMQShortString exchangeName = body.getExchange(); - VirtualHostImpl vHost = session.getVirtualHost(); + VirtualHostImpl vHost = connection.getVirtualHost(); // TODO: check the delivery tag field details - is it unique across the broker or per subscriber? @@ -79,21 +79,22 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basic // if the exchange does not exist we raise a channel exception if (destination == null) { - throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange name"); + throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange name", + connection.getMethodRegistry()); } else { // The partially populated BasicDeliver frame plus the received route body // is stored in the channel. Once the final body frame has been received // it is routed to the exchange. - AMQChannel channel = session.getChannel(channelId); + AMQChannel channel = connection.getChannel(channelId); if (channel == null) { - throw body.getChannelNotFoundException(channelId); + throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); } - MessagePublishInfo info = session.getMethodRegistry().getProtocolVersionMethodConverter().convertToInfo(body); + MessagePublishInfo info = connection.getMethodRegistry().getProtocolVersionMethodConverter().convertToInfo(body); info.setExchange(exchangeName); try { @@ -101,7 +102,7 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basic } catch (AccessControlException e) { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage()); + throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage(), connection.getMethodRegistry()); } } } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicQosHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicQosHandler.java index e4a6636a74..9464be4c6e 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicQosHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicQosHandler.java @@ -26,7 +26,6 @@ import org.apache.qpid.framing.BasicQosBody; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; public class BasicQosHandler implements StateAwareMethodListener<BasicQosBody> @@ -38,21 +37,22 @@ public class BasicQosHandler implements StateAwareMethodListener<BasicQosBody> return _instance; } - public void methodReceived(AMQStateManager stateManager, BasicQosBody body, int channelId) throws AMQException + public void methodReceived(final AMQProtocolSession<?> connection, + BasicQosBody body, + int channelId) throws AMQException { - AMQProtocolSession session = stateManager.getProtocolSession(); - AMQChannel channel = session.getChannel(channelId); + AMQChannel channel = connection.getChannel(channelId); if (channel == null) { - throw body.getChannelNotFoundException(channelId); + throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); } channel.sync(); channel.setCredit(body.getPrefetchSize(), body.getPrefetchCount()); - MethodRegistry methodRegistry = session.getMethodRegistry(); + MethodRegistry methodRegistry = connection.getMethodRegistry(); AMQMethodBody responseBody = methodRegistry.createBasicQosOkBody(); - session.writeFrame(responseBody.generateFrame(channelId)); + connection.writeFrame(responseBody.generateFrame(channelId)); } } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java index 606bcf1693..2f0468fd15 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java @@ -29,7 +29,6 @@ import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; public class BasicRecoverMethodHandler implements StateAwareMethodListener<BasicRecoverBody> @@ -43,29 +42,29 @@ public class BasicRecoverMethodHandler implements StateAwareMethodListener<Basic return _instance; } - public void methodReceived(AMQStateManager stateManager, BasicRecoverBody body, int channelId) throws AMQException + public void methodReceived(final AMQProtocolSession<?> connection, + BasicRecoverBody body, + int channelId) throws AMQException { - AMQProtocolSession session = stateManager.getProtocolSession(); - - _logger.debug("Recover received on protocol session " + session + " and channel " + channelId); - AMQChannel channel = session.getChannel(channelId); + _logger.debug("Recover received on protocol session " + connection + " and channel " + channelId); + AMQChannel channel = connection.getChannel(channelId); if (channel == null) { - throw body.getChannelNotFoundException(channelId); + throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); } channel.resend(); // Qpid 0-8 hacks a synchronous -ok onto recover. // In Qpid 0-9 we create a separate sync-recover, sync-recover-ok pair to be "more" compliant - if(session.getProtocolVersion().equals(ProtocolVersion.v8_0)) + if(connection.getProtocolVersion().equals(ProtocolVersion.v8_0)) { - MethodRegistry_8_0 methodRegistry = (MethodRegistry_8_0) session.getMethodRegistry(); + MethodRegistry_8_0 methodRegistry = (MethodRegistry_8_0) connection.getMethodRegistry(); AMQMethodBody recoverOk = methodRegistry.createBasicRecoverOkBody(); channel.sync(); - session.writeFrame(recoverOk.generateFrame(channelId)); + connection.writeFrame(recoverOk.generateFrame(channelId)); } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java index ef26e60a62..efcec299e9 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java @@ -31,7 +31,6 @@ import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9; import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
public class BasicRecoverSyncMethodHandler implements StateAwareMethodListener<BasicRecoverSyncBody>
@@ -45,35 +44,36 @@ public class BasicRecoverSyncMethodHandler implements StateAwareMethodListener<B return _instance;
}
- public void methodReceived(AMQStateManager stateManager, BasicRecoverSyncBody body, int channelId) throws AMQException
+ public void methodReceived(final AMQProtocolSession<?> connection,
+ BasicRecoverSyncBody body,
+ int channelId) throws AMQException
{
- AMQProtocolSession session = stateManager.getProtocolSession();
- _logger.debug("Recover received on protocol session " + session + " and channel " + channelId);
- AMQChannel channel = session.getChannel(channelId);
+ _logger.debug("Recover received on protocol session " + connection + " and channel " + channelId);
+ AMQChannel channel = connection.getChannel(channelId);
if (channel == null)
{
- throw body.getChannelNotFoundException(channelId);
+ throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
}
channel.sync();
channel.resend();
// Qpid 0-8 hacks a synchronous -ok onto recover.
// In Qpid 0-9 we create a separate sync-recover, sync-recover-ok pair to be "more" compliant
- if(session.getProtocolVersion().equals(ProtocolVersion.v0_9))
+ if(connection.getProtocolVersion().equals(ProtocolVersion.v0_9))
{
- MethodRegistry_0_9 methodRegistry = (MethodRegistry_0_9) session.getMethodRegistry();
+ MethodRegistry_0_9 methodRegistry = (MethodRegistry_0_9) connection.getMethodRegistry();
AMQMethodBody recoverOk = methodRegistry.createBasicRecoverSyncOkBody();
- session.writeFrame(recoverOk.generateFrame(channelId));
+ connection.writeFrame(recoverOk.generateFrame(channelId));
}
- else if(session.getProtocolVersion().equals(ProtocolVersion.v0_91))
+ else if(connection.getProtocolVersion().equals(ProtocolVersion.v0_91))
{
- MethodRegistry_0_91 methodRegistry = (MethodRegistry_0_91) session.getMethodRegistry();
+ MethodRegistry_0_91 methodRegistry = (MethodRegistry_0_91) connection.getMethodRegistry();
AMQMethodBody recoverOk = methodRegistry.createBasicRecoverSyncOkBody();
- session.writeFrame(recoverOk.generateFrame(channelId));
+ connection.writeFrame(recoverOk.generateFrame(channelId));
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java index fdbd44b06d..1f299893f9 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java @@ -27,8 +27,6 @@ import org.apache.qpid.framing.BasicRejectBody; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicRejectBody> @@ -46,15 +44,16 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR { } - public void methodReceived(AMQStateManager stateManager, BasicRejectBody body, int channelId) throws AMQException + public void methodReceived(final AMQProtocolSession<?> connection, + BasicRejectBody body, + int channelId) throws AMQException { - AMQProtocolSession session = stateManager.getProtocolSession(); - AMQChannel channel = session.getChannel(channelId); + AMQChannel channel = connection.getChannel(channelId); if (channel == null) { - throw body.getChannelNotFoundException(channelId); + throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); } if (_logger.isDebugEnabled()) diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseHandler.java index e96d098618..a9e52c5240 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseHandler.java @@ -29,7 +29,6 @@ import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; public class ChannelCloseHandler implements StateAwareMethodListener<ChannelCloseBody> @@ -47,9 +46,10 @@ public class ChannelCloseHandler implements StateAwareMethodListener<ChannelClos { } - public void methodReceived(AMQStateManager stateManager, ChannelCloseBody body, int channelId) throws AMQException + public void methodReceived(final AMQProtocolSession<?> connection, + ChannelCloseBody body, + int channelId) throws AMQException { - AMQProtocolSession session = stateManager.getProtocolSession(); if (_logger.isInfoEnabled()) { @@ -58,19 +58,19 @@ public class ChannelCloseHandler implements StateAwareMethodListener<ChannelClos } - AMQChannel channel = session.getChannel(channelId); + AMQChannel channel = connection.getChannel(channelId); if (channel == null) { - throw body.getConnectionException(AMQConstant.CHANNEL_ERROR, "Trying to close unknown channel"); + throw body.getConnectionException(AMQConstant.CHANNEL_ERROR, "Trying to close unknown channel", connection.getMethodRegistry()); } channel.sync(); - session.closeChannel(channelId); + connection.closeChannel(channelId); // Client requested closure so we don't wait for ok we send it - stateManager.getProtocolSession().closeChannelOk(channelId); + connection.closeChannelOk(channelId); - MethodRegistry methodRegistry = session.getMethodRegistry(); + MethodRegistry methodRegistry = connection.getMethodRegistry(); ChannelCloseOkBody responseBody = methodRegistry.createChannelCloseOkBody(); - session.writeFrame(responseBody.generateFrame(channelId)); + connection.writeFrame(responseBody.generateFrame(channelId)); } } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseOkHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseOkHandler.java index 2a220ff78d..fe9d20e151 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseOkHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseOkHandler.java @@ -24,7 +24,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.ChannelCloseOkBody; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; +import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; public class ChannelCloseOkHandler implements StateAwareMethodListener<ChannelCloseOkBody> @@ -42,12 +42,14 @@ public class ChannelCloseOkHandler implements StateAwareMethodListener<ChannelCl { } - public void methodReceived(AMQStateManager stateManager, ChannelCloseOkBody body, int channelId) throws AMQException + public void methodReceived(final AMQProtocolSession<?> connection, + ChannelCloseOkBody body, + int channelId) throws AMQException { _logger.info("Received channel-close-ok for channel-id " + channelId); // Let the Protocol Session know the channel is now closed. - stateManager.getProtocolSession().closeChannelOk(channelId); + connection.closeChannelOk(channelId); } } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelFlowHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelFlowHandler.java index cc1677c93e..99c0e3b2de 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelFlowHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelFlowHandler.java @@ -28,7 +28,6 @@ import org.apache.qpid.framing.ChannelFlowBody; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; public class ChannelFlowHandler implements StateAwareMethodListener<ChannelFlowBody> @@ -46,23 +45,24 @@ public class ChannelFlowHandler implements StateAwareMethodListener<ChannelFlowB { } - public void methodReceived(AMQStateManager stateManager, ChannelFlowBody body, int channelId) throws AMQException + public void methodReceived(final AMQProtocolSession<?> connection, + ChannelFlowBody body, + int channelId) throws AMQException { - AMQProtocolSession session = stateManager.getProtocolSession(); - AMQChannel channel = session.getChannel(channelId); + AMQChannel channel = connection.getChannel(channelId); if (channel == null) { - throw body.getChannelNotFoundException(channelId); + throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); } channel.sync(); channel.setSuspended(!body.getActive()); _logger.debug("Channel.Flow for channel " + channelId + ", active=" + body.getActive()); - MethodRegistry methodRegistry = session.getMethodRegistry(); + MethodRegistry methodRegistry = connection.getMethodRegistry(); AMQMethodBody responseBody = methodRegistry.createChannelFlowOkBody(body.getActive()); - session.writeFrame(responseBody.generateFrame(channelId)); + connection.writeFrame(responseBody.generateFrame(channelId)); } } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelOpenHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelOpenHandler.java index 63c6857216..dbfe07840f 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelOpenHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelOpenHandler.java @@ -20,6 +20,11 @@ */ package org.apache.qpid.server.protocol.v0_8.handler; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.UUID; + import org.apache.log4j.Logger; import org.apache.qpid.AMQException; @@ -33,16 +38,10 @@ import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHostImpl; -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.util.UUID; - public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenBody> { private static final Logger _logger = Logger.getLogger(ChannelOpenHandler.class); @@ -58,10 +57,11 @@ public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenB { } - public void methodReceived(AMQStateManager stateManager, ChannelOpenBody body, int channelId) throws AMQException + public void methodReceived(final AMQProtocolSession<?> connection, + ChannelOpenBody body, + int channelId) throws AMQException { - AMQProtocolSession session = stateManager.getProtocolSession(); - VirtualHostImpl virtualHost = session.getVirtualHost(); + VirtualHostImpl virtualHost = connection.getVirtualHost(); // Protect the broker against out of order frame request. if (virtualHost == null) @@ -70,13 +70,13 @@ public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenB } _logger.info("Connecting to: " + virtualHost.getName()); - final AMQChannel channel = new AMQChannel(session,channelId, virtualHost.getMessageStore()); + final AMQChannel channel = new AMQChannel(connection,channelId, virtualHost.getMessageStore()); - session.addChannel(channel); + connection.addChannel(channel); ChannelOpenOkBody response; - ProtocolVersion pv = session.getProtocolVersion(); + ProtocolVersion pv = connection.getProtocolVersion(); if(pv.equals(ProtocolVersion.v8_0)) { @@ -138,6 +138,6 @@ public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenB } - session.writeFrame(response.generateFrame(channelId)); + connection.writeFrame(response.generateFrame(channelId)); } } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseMethodHandler.java index 60f9c1d495..c4a8eb4acb 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseMethodHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseMethodHandler.java @@ -27,7 +27,6 @@ import org.apache.qpid.framing.ConnectionCloseBody; import org.apache.qpid.framing.ConnectionCloseOkBody; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; public class ConnectionCloseMethodHandler implements StateAwareMethodListener<ConnectionCloseBody> @@ -45,28 +44,29 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener<Co { } - public void methodReceived(AMQStateManager stateManager, ConnectionCloseBody body, int channelId) throws AMQException + public void methodReceived(final AMQProtocolSession<?> connection, + ConnectionCloseBody body, + int channelId) throws AMQException { - AMQProtocolSession session = stateManager.getProtocolSession(); if (_logger.isInfoEnabled()) { _logger.info("ConnectionClose received with reply code/reply text " + body.getReplyCode() + "/" + - body.getReplyText() + " for " + session); + body.getReplyText() + " for " + connection); } try { - session.closeSession(); + connection.closeSession(); } catch (Exception e) { _logger.error("Error closing protocol session: " + e, e); } - MethodRegistry methodRegistry = session.getMethodRegistry(); + MethodRegistry methodRegistry = connection.getMethodRegistry(); ConnectionCloseOkBody responseBody = methodRegistry.createConnectionCloseOkBody(); - session.writeFrame(responseBody.generateFrame(channelId)); + connection.writeFrame(responseBody.generateFrame(channelId)); - session.closeProtocolSession(); + connection.closeProtocolSession(); } } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseOkMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseOkMethodHandler.java index fe46b6c0cd..03c43cc80a 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseOkMethodHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseOkMethodHandler.java @@ -26,7 +26,6 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.ConnectionCloseOkBody; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.v0_8.state.AMQState; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; public class ConnectionCloseOkMethodHandler implements StateAwareMethodListener<ConnectionCloseOkBody> @@ -44,16 +43,17 @@ public class ConnectionCloseOkMethodHandler implements StateAwareMethodListener< { } - public void methodReceived(AMQStateManager stateManager, ConnectionCloseOkBody body, int channelId) throws AMQException + public void methodReceived(final AMQProtocolSession<?> connection, + ConnectionCloseOkBody body, + int channelId) throws AMQException { - AMQProtocolSession session = stateManager.getProtocolSession(); //todo should this not do more than just log the method? _logger.info("Received Connection-close-ok"); try { - stateManager.changeState(AMQState.CONNECTION_CLOSED); - session.closeSession(); + connection.changeState(AMQState.CONNECTION_CLOSED); + connection.closeSession(); } catch (Exception e) { diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java index 632f751756..20c5e90f5d 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java @@ -34,7 +34,6 @@ import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.v0_8.state.AMQState; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; import org.apache.qpid.server.virtualhost.VirtualHostImpl; @@ -58,9 +57,10 @@ public class ConnectionOpenMethodHandler implements StateAwareMethodListener<Con return new AMQShortString(Long.toString(System.currentTimeMillis())); } - public void methodReceived(AMQStateManager stateManager, ConnectionOpenBody body, int channelId) throws AMQException + public void methodReceived(final AMQProtocolSession<?> connection, + ConnectionOpenBody body, + int channelId) throws AMQException { - AMQProtocolSession session = stateManager.getProtocolSession(); //ignore leading '/' String virtualHostName; @@ -73,42 +73,44 @@ public class ConnectionOpenMethodHandler implements StateAwareMethodListener<Con virtualHostName = body.getVirtualHost() == null ? null : String.valueOf(body.getVirtualHost()); } - VirtualHostImpl virtualHost = ((AmqpPort)stateManager.getProtocolSession().getPort()).getVirtualHost(virtualHostName); + VirtualHostImpl virtualHost = ((AmqpPort)connection.getPort()).getVirtualHost(virtualHostName); if (virtualHost == null) { - throw body.getConnectionException(AMQConstant.NOT_FOUND, "Unknown virtual host: '" + virtualHostName + "'"); + throw body.getConnectionException(AMQConstant.NOT_FOUND, "Unknown virtual host: '" + virtualHostName + "'", + connection.getMethodRegistry()); } else { // Check virtualhost access if (virtualHost.getState() != State.ACTIVE) { - throw body.getConnectionException(AMQConstant.CONNECTION_FORCED, "Virtual host '" + virtualHost.getName() + "' is not active"); + throw body.getConnectionException(AMQConstant.CONNECTION_FORCED, "Virtual host '" + virtualHost.getName() + "' is not active", + connection.getMethodRegistry()); } - session.setVirtualHost(virtualHost); + connection.setVirtualHost(virtualHost); try { - virtualHost.getSecurityManager().authoriseCreateConnection(session); + virtualHost.getSecurityManager().authoriseCreateConnection(connection); } catch (AccessControlException e) { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage()); + throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage(), connection.getMethodRegistry()); } // See Spec (0.8.2). Section 3.1.2 Virtual Hosts - if (session.getContextKey() == null) + if (connection.getContextKey() == null) { - session.setContextKey(generateClientID()); + connection.setContextKey(generateClientID()); } - MethodRegistry methodRegistry = session.getMethodRegistry(); + MethodRegistry methodRegistry = connection.getMethodRegistry(); AMQMethodBody responseBody = methodRegistry.createConnectionOpenOkBody(body.getVirtualHost()); - stateManager.changeState(AMQState.CONNECTION_OPEN); + connection.changeState(AMQState.CONNECTION_OPEN); - session.writeFrame(responseBody.generateFrame(channelId)); + connection.writeFrame(responseBody.generateFrame(channelId)); } } } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java index 8a1160314b..7a0c0945e1 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java @@ -36,7 +36,6 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.v0_8.state.AMQState; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.security.auth.SubjectAuthenticationResult; @@ -56,19 +55,20 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener { } - public void methodReceived(AMQStateManager stateManager, ConnectionSecureOkBody body, int channelId) throws AMQException + public void methodReceived(final AMQProtocolSession<?> connection, + ConnectionSecureOkBody body, + int channelId) throws AMQException { - Broker<?> broker = stateManager.getBroker(); - AMQProtocolSession session = stateManager.getProtocolSession(); + Broker<?> broker = connection.getBroker(); - SubjectCreator subjectCreator = stateManager.getSubjectCreator(); + SubjectCreator subjectCreator = connection.getSubjectCreator(); - SaslServer ss = session.getSaslServer(); + SaslServer ss = connection.getSaslServer(); if (ss == null) { throw new AMQException("No SASL context set up in session"); } - MethodRegistry methodRegistry = session.getMethodRegistry(); + MethodRegistry methodRegistry = connection.getMethodRegistry(); SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, body.getResponse()); switch (authResult.getStatus()) { @@ -78,7 +78,7 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener _logger.info("Authentication failed:" + (cause == null ? "" : cause.getMessage())); // This should be abstracted - stateManager.changeState(AMQState.CONNECTION_CLOSING); + connection.changeState(AMQState.CONNECTION_CLOSING); ConnectionCloseBody connectionCloseBody = methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(), @@ -86,15 +86,15 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener body.getClazz(), body.getMethod()); - session.writeFrame(connectionCloseBody.generateFrame(0)); - disposeSaslServer(session); + connection.writeFrame(connectionCloseBody.generateFrame(0)); + disposeSaslServer(connection); break; case SUCCESS: if (_logger.isInfoEnabled()) { _logger.info("Connected as: " + authResult.getSubject()); } - stateManager.changeState(AMQState.CONNECTION_NOT_TUNED); + connection.changeState(AMQState.CONNECTION_NOT_TUNED); int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE); @@ -107,15 +107,15 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(), frameMax, broker.getConnection_heartBeatDelay()); - session.writeFrame(tuneBody.generateFrame(0)); - session.setAuthorizedSubject(authResult.getSubject()); - disposeSaslServer(session); + connection.writeFrame(tuneBody.generateFrame(0)); + connection.setAuthorizedSubject(authResult.getSubject()); + disposeSaslServer(connection); break; case CONTINUE: - stateManager.changeState(AMQState.CONNECTION_NOT_AUTH); + connection.changeState(AMQState.CONNECTION_NOT_AUTH); ConnectionSecureBody secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge()); - session.writeFrame(secureBody.generateFrame(0)); + connection.writeFrame(secureBody.generateFrame(0)); } } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java index e59032b87f..311b596979 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java @@ -35,7 +35,6 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.v0_8.state.AMQState; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.security.auth.SubjectAuthenticationResult; @@ -56,32 +55,36 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener< { } - public void methodReceived(AMQStateManager stateManager, ConnectionStartOkBody body, int channelId) throws AMQException + public void methodReceived(final AMQProtocolSession<?> connection, + ConnectionStartOkBody body, + int channelId) throws AMQException { - Broker<?> broker = stateManager.getBroker(); - AMQProtocolSession session = stateManager.getProtocolSession(); + Broker<?> broker = connection.getBroker(); _logger.info("SASL Mechanism selected: " + body.getMechanism()); _logger.info("Locale selected: " + body.getLocale()); - SubjectCreator subjectCreator = stateManager.getSubjectCreator(); + SubjectCreator subjectCreator = connection.getSubjectCreator(); SaslServer ss = null; try { - ss = subjectCreator.createSaslServer(String.valueOf(body.getMechanism()), session.getLocalFQDN(), session.getPeerPrincipal()); + ss = subjectCreator.createSaslServer(String.valueOf(body.getMechanism()), + connection.getLocalFQDN(), + connection.getPeerPrincipal()); if (ss == null) { - throw body.getConnectionException(AMQConstant.RESOURCE_ERROR, "Unable to create SASL Server:" + body.getMechanism()); + throw body.getConnectionException(AMQConstant.RESOURCE_ERROR, "Unable to create SASL Server:" + body.getMechanism(), + connection.getMethodRegistry()); } - session.setSaslServer(ss); + connection.setSaslServer(ss); final SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, body.getResponse()); //save clientProperties - session.setClientProperties(body.getClientProperties()); + connection.setClientProperties(body.getClientProperties()); - MethodRegistry methodRegistry = session.getMethodRegistry(); + MethodRegistry methodRegistry = connection.getMethodRegistry(); switch (authResult.getStatus()) { @@ -90,7 +93,7 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener< _logger.info("Authentication failed:" + (cause == null ? "" : cause.getMessage())); - stateManager.changeState(AMQState.CONNECTION_CLOSING); + connection.changeState(AMQState.CONNECTION_CLOSING); ConnectionCloseBody closeBody = methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(), // replyCode @@ -98,8 +101,8 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener< body.getClazz(), body.getMethod()); - session.writeFrame(closeBody.generateFrame(0)); - disposeSaslServer(session); + connection.writeFrame(closeBody.generateFrame(0)); + disposeSaslServer(connection); break; case SUCCESS: @@ -107,9 +110,9 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener< { _logger.info("Connected as: " + authResult.getSubject()); } - session.setAuthorizedSubject(authResult.getSubject()); + connection.setAuthorizedSubject(authResult.getSubject()); - stateManager.changeState(AMQState.CONNECTION_NOT_TUNED); + connection.changeState(AMQState.CONNECTION_NOT_TUNED); int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE); if(frameMax <= 0) @@ -120,18 +123,18 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener< ConnectionTuneBody tuneBody = methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(), frameMax, broker.getConnection_heartBeatDelay()); - session.writeFrame(tuneBody.generateFrame(0)); + connection.writeFrame(tuneBody.generateFrame(0)); break; case CONTINUE: - stateManager.changeState(AMQState.CONNECTION_NOT_AUTH); + connection.changeState(AMQState.CONNECTION_NOT_AUTH); ConnectionSecureBody secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge()); - session.writeFrame(secureBody.generateFrame(0)); + connection.writeFrame(secureBody.generateFrame(0)); } } catch (SaslException e) { - disposeSaslServer(session); + disposeSaslServer(connection); throw new AMQException("SASL error: " + e, e); } } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java index fb4818d3ed..d5f066063d 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java @@ -29,7 +29,6 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.v0_8.state.AMQState; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; public class ConnectionTuneOkMethodHandler implements StateAwareMethodListener<ConnectionTuneOkBody> @@ -43,19 +42,20 @@ public class ConnectionTuneOkMethodHandler implements StateAwareMethodListener<C return _instance; } - public void methodReceived(AMQStateManager stateManager, ConnectionTuneOkBody body, int channelId) throws AMQException + public void methodReceived(final AMQProtocolSession<?> connection, + ConnectionTuneOkBody body, + int channelId) throws AMQException { - AMQProtocolSession session = stateManager.getProtocolSession(); if (_logger.isDebugEnabled()) { _logger.debug(body); } - stateManager.changeState(AMQState.CONNECTION_NOT_OPENED); + connection.changeState(AMQState.CONNECTION_NOT_OPENED); - session.initHeartbeats(body.getHeartbeat()); + connection.initHeartbeats(body.getHeartbeat()); - int brokerFrameMax = stateManager.getBroker().getContextValue(Integer.class,Broker.BROKER_FRAME_SIZE); + int brokerFrameMax = connection.getBroker().getContextValue(Integer.class,Broker.BROKER_FRAME_SIZE); if(brokerFrameMax <= 0) { brokerFrameMax = Integer.MAX_VALUE; @@ -68,7 +68,7 @@ public class ConnectionTuneOkMethodHandler implements StateAwareMethodListener<C + "greater than the broker will allow: " + brokerFrameMax, body.getClazz(), body.getMethod(), - body.getMajor(), body.getMinor(),null); + connection.getMethodRegistry(),null); } else if(body.getFrameMax() > 0 && body.getFrameMax() < AMQConstant.FRAME_MIN_SIZE.getCode()) { @@ -77,13 +77,13 @@ public class ConnectionTuneOkMethodHandler implements StateAwareMethodListener<C + "which is smaller than the specification definined minimum: " + AMQConstant.FRAME_MIN_SIZE.getCode(), body.getClazz(), body.getMethod(), - body.getMajor(), body.getMinor(),null); + connection.getMethodRegistry(),null); } int frameMax = body.getFrameMax() == 0 ? brokerFrameMax : (int) body.getFrameMax(); - session.setMaxFrameSize(frameMax); + connection.setMaxFrameSize(frameMax); long maxChannelNumber = body.getChannelMax(); //0 means no implied limit, except that forced by protocol limitations (0xFFFF) - session.setMaximumNumberOfChannels( maxChannelNumber == 0 ? 0xFFFFL : maxChannelNumber); + connection.setMaximumNumberOfChannels(maxChannelNumber == 0 ? 0xFFFFL : maxChannelNumber); } } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java index 2ea638a358..22e377c219 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java @@ -29,7 +29,6 @@ import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; import org.apache.qpid.server.virtualhost.VirtualHostImpl; @@ -65,16 +64,17 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo { } - public void methodReceived(AMQStateManager stateManager, ExchangeBoundBody body, int channelId) throws AMQException + public void methodReceived(final AMQProtocolSession<?> connection, + ExchangeBoundBody body, + int channelId) throws AMQException { - AMQProtocolSession session = stateManager.getProtocolSession(); - VirtualHostImpl virtualHost = session.getVirtualHost(); - MethodRegistry methodRegistry = session.getMethodRegistry(); + VirtualHostImpl virtualHost = connection.getVirtualHost(); + MethodRegistry methodRegistry = connection.getMethodRegistry(); - final AMQChannel channel = session.getChannel(channelId); + final AMQChannel channel = connection.getChannel(channelId); if (channel == null) { - throw body.getChannelNotFoundException(channelId); + throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); } channel.sync(); @@ -227,7 +227,7 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo } } } - session.writeFrame(response.generateFrame(channelId)); + connection.writeFrame(response.generateFrame(channelId)); } protected boolean isDefaultExchange(final AMQShortString exchangeName) diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java index 3f48b413ef..f90f47d77c 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java @@ -41,7 +41,6 @@ import org.apache.qpid.server.model.NoFactoryForTypeException; import org.apache.qpid.server.model.UnknownConfiguredObjectException; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; import org.apache.qpid.server.virtualhost.ExchangeExistsException; import org.apache.qpid.server.virtualhost.ReservedExchangeNameException; @@ -62,14 +61,15 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange { } - public void methodReceived(AMQStateManager stateManager, ExchangeDeclareBody body, int channelId) throws AMQException + public void methodReceived(final AMQProtocolSession<?> connection, + ExchangeDeclareBody body, + int channelId) throws AMQException { - AMQProtocolSession session = stateManager.getProtocolSession(); - VirtualHostImpl virtualHost = session.getVirtualHost(); - final AMQChannel channel = session.getChannel(channelId); + VirtualHostImpl virtualHost = connection.getVirtualHost(); + final AMQChannel channel = connection.getChannel(channelId); if (channel == null) { - throw body.getChannelNotFoundException(channelId); + throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); } final AMQShortString exchangeName = body.getExchange(); @@ -89,7 +89,7 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange + ExchangeDefaults.DIRECT_EXCHANGE_CLASS + " to " + body.getType() +".", body.getClazz(), body.getMethod(), - body.getMajor(), body.getMinor(),null); + connection.getMethodRegistry(),null); } } else @@ -99,14 +99,15 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange exchange = virtualHost.getExchange(exchangeName.toString()); if(exchange == null) { - throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + exchangeName); + throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + exchangeName, + connection.getMethodRegistry()); } else if (!(body.getType() == null || body.getType().length() ==0) && !exchange.getType().equals(body.getType().asString())) { throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + exchangeName + " of type " + exchange.getType() - + " to " + body.getType() +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor(),null); + + " to " + body.getType() +".",body.getClazz(), body.getMethod(),connection.getMethodRegistry(),null); } } @@ -139,7 +140,7 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange { throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to declare exchange: " + exchangeName + - " which begins with reserved prefix."); + " which begins with reserved prefix.", connection.getMethodRegistry()); } catch(ExchangeExistsException e) @@ -147,40 +148,44 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange exchange = e.getExistingExchange(); if(!new AMQShortString(exchange.getType()).equals(body.getType())) { - throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " - + exchangeName + " of type " - + exchange.getType() - + " to " + body.getType() +".", - body.getClazz(), body.getMethod(), - body.getMajor(), body.getMinor(),null); + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + + exchangeName + " of type " + + exchange.getType() + + " to " + body.getType() + ".", + connection.getMethodRegistry()); } } catch(NoFactoryForTypeException e) { - throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Unknown exchange: " + exchangeName,e); + throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Unknown exchange type '"+e.getType()+"' for exchange '" + exchangeName + "'", connection.getMethodRegistry()); } catch (AccessControlException e) { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage()); + throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage(), connection.getMethodRegistry()); } catch (UnknownConfiguredObjectException e) { // note - since 0-8/9/9-1 can't set the alt. exchange this exception should never occur - throw body.getConnectionException(AMQConstant.NOT_FOUND, "Unknown alternate exchange",e); + throw body.getConnectionException(AMQConstant.NOT_FOUND, + "Unknown alternate exchange " + + (e.getName() != null + ? "name: \"" + e.getName() + "\"" + : "id: " + e.getId()), + connection.getMethodRegistry()); } catch (IllegalArgumentException e) { - throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Error creating exchange",e); + throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Error creating exchange '"+exchangeName+"': " + e.getMessage(),connection.getMethodRegistry()); } } } if(!body.getNowait()) { - MethodRegistry methodRegistry = session.getMethodRegistry(); + MethodRegistry methodRegistry = connection.getMethodRegistry(); AMQMethodBody responseBody = methodRegistry.createExchangeDeclareOkBody(); channel.sync(); - session.writeFrame(responseBody.generateFrame(channelId)); + connection.writeFrame(responseBody.generateFrame(channelId)); } } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java index 1c01a24f63..b5c10c190e 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.protocol.v0_8.handler; +import java.security.AccessControlException; + import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ExchangeDeleteBody; @@ -28,14 +30,11 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException; import org.apache.qpid.server.virtualhost.RequiredExchangeException; import org.apache.qpid.server.virtualhost.VirtualHostImpl; -import java.security.AccessControlException; - public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeDeleteBody> { private static final ExchangeDeleteHandler _instance = new ExchangeDeleteHandler(); @@ -49,14 +48,15 @@ public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeD { } - public void methodReceived(AMQStateManager stateManager, ExchangeDeleteBody body, int channelId) throws AMQException + public void methodReceived(final AMQProtocolSession<?> connection, + ExchangeDeleteBody body, + int channelId) throws AMQException { - AMQProtocolSession session = stateManager.getProtocolSession(); - VirtualHostImpl virtualHost = session.getVirtualHost(); - final AMQChannel channel = session.getChannel(channelId); + VirtualHostImpl virtualHost = connection.getVirtualHost(); + final AMQChannel channel = connection.getChannel(channelId); if (channel == null) { - throw body.getChannelNotFoundException(channelId); + throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); } channel.sync(); try @@ -64,7 +64,7 @@ public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeD if(isDefaultExchange(body.getExchange())) { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Default Exchange cannot be deleted"); + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Default Exchange cannot be deleted", connection.getMethodRegistry()); } final String exchangeName = body.getExchange().toString(); @@ -72,28 +72,31 @@ public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeD final ExchangeImpl exchange = virtualHost.getExchange(exchangeName); if(exchange == null) { - throw body.getChannelException(AMQConstant.NOT_FOUND, "No such exchange: " + body.getExchange()); + throw body.getChannelException(AMQConstant.NOT_FOUND, "No such exchange: " + body.getExchange(), + connection.getMethodRegistry()); } virtualHost.removeExchange(exchange, !body.getIfUnused()); - ExchangeDeleteOkBody responseBody = session.getMethodRegistry().createExchangeDeleteOkBody(); + ExchangeDeleteOkBody responseBody = connection.getMethodRegistry().createExchangeDeleteOkBody(); - session.writeFrame(responseBody.generateFrame(channelId)); + connection.writeFrame(responseBody.generateFrame(channelId)); } catch (ExchangeIsAlternateException e) { - throw body.getChannelException(AMQConstant.NOT_ALLOWED, "Exchange in use as an alternate exchange"); + throw body.getChannelException(AMQConstant.NOT_ALLOWED, "Exchange in use as an alternate exchange", + connection.getMethodRegistry()); } catch (RequiredExchangeException e) { - throw body.getChannelException(AMQConstant.NOT_ALLOWED, "Exchange '"+body.getExchange()+"' cannot be deleted"); + throw body.getChannelException(AMQConstant.NOT_ALLOWED, "Exchange '"+body.getExchange()+"' cannot be deleted", + connection.getMethodRegistry()); } catch (AccessControlException e) { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage()); + throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage(), connection.getMethodRegistry()); } } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java index 0140d2ec7e..c47a4b528f 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java @@ -36,7 +36,6 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHostImpl; @@ -56,15 +55,16 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> { } - public void methodReceived(AMQStateManager stateManager, QueueBindBody body, int channelId) throws AMQException + public void methodReceived(final AMQProtocolSession<?> connection, + QueueBindBody body, + int channelId) throws AMQException { - AMQProtocolSession protocolConnection = stateManager.getProtocolSession(); - VirtualHostImpl virtualHost = protocolConnection.getVirtualHost(); - AMQChannel channel = protocolConnection.getChannel(channelId); + VirtualHostImpl virtualHost = connection.getVirtualHost(); + AMQChannel channel = connection.getChannel(channelId); if (channel == null) { - throw body.getChannelNotFoundException(channelId); + throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); } final AMQQueue queue; @@ -79,7 +79,8 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> if (queue == null) { - throw body.getChannelException(AMQConstant.NOT_FOUND, "No default queue defined on channel and queue was null"); + throw body.getChannelException(AMQConstant.NOT_FOUND, "No default queue defined on channel and queue was null", + connection.getMethodRegistry()); } if (body.getRoutingKey() == null) @@ -99,12 +100,14 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> if (queue == null) { - throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + queueName + " does not exist."); + throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + queueName + " does not exist.", + connection.getMethodRegistry()); } if(isDefaultExchange(body.getExchange())) { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Cannot bind the queue " + queueName + " to the default exchange"); + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Cannot bind the queue " + queueName + " to the default exchange", + connection.getMethodRegistry()); } final String exchangeName = body.getExchange().toString(); @@ -112,7 +115,8 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> final ExchangeImpl exch = virtualHost.getExchange(exchangeName); if (exch == null) { - throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + exchangeName + " does not exist."); + throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + exchangeName + " does not exist.", + connection.getMethodRegistry()); } @@ -133,7 +137,7 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> } catch (AccessControlException e) { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage()); + throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage(), connection.getMethodRegistry()); } if (_log.isInfoEnabled()) @@ -143,9 +147,9 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> if (!body.getNowait()) { channel.sync(); - MethodRegistry methodRegistry = protocolConnection.getMethodRegistry(); + MethodRegistry methodRegistry = connection.getMethodRegistry(); AMQMethodBody responseBody = methodRegistry.createQueueBindOkBody(); - protocolConnection.writeFrame(responseBody.generateFrame(channelId)); + connection.writeFrame(responseBody.generateFrame(channelId)); } } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java index ef8d01d89f..0e1016c319 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java @@ -39,7 +39,6 @@ import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueArgumentsConverter; @@ -57,11 +56,12 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar return _instance; } - public void methodReceived(AMQStateManager stateManager, QueueDeclareBody body, int channelId) throws AMQException + public void methodReceived(final AMQProtocolSession<?> connection, + QueueDeclareBody body, + int channelId) throws AMQException { - final AMQProtocolSession protocolConnection = stateManager.getProtocolSession(); - final AMQSessionModel session = protocolConnection.getChannel(channelId); - VirtualHostImpl virtualHost = protocolConnection.getVirtualHost(); + final AMQSessionModel session = connection.getChannel(channelId); + VirtualHostImpl virtualHost = connection.getVirtualHost(); final AMQShortString queueName; @@ -79,11 +79,11 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar //TODO: do we need to check that the queue already exists with exactly the same "configuration"? - AMQChannel channel = protocolConnection.getChannel(channelId); + AMQChannel channel = connection.getChannel(channelId); if (channel == null) { - throw body.getChannelNotFoundException(channelId); + throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); } if(body.getPassive()) @@ -92,14 +92,15 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar if (queue == null) { String msg = "Queue: " + queueName + " not found on VirtualHost(" + virtualHost + ")."; - throw body.getChannelException(AMQConstant.NOT_FOUND, msg); + throw body.getChannelException(AMQConstant.NOT_FOUND, msg, connection.getMethodRegistry()); } else { if (!queue.verifySessionAccess(channel)) { throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Queue " + queue.getName() + " is exclusive, but not created on this Connection."); + "Queue " + queue.getName() + " is exclusive, but not created on this Connection.", + connection.getMethodRegistry()); } //set this as the default queue on the channel: @@ -112,7 +113,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar try { - queue = createQueue(channel, queueName, body, virtualHost, protocolConnection); + queue = createQueue(channel, queueName, body, virtualHost, connection); } catch(QueueExistsException qe) @@ -123,33 +124,37 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar if (!queue.verifySessionAccess(channel)) { throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Queue " + queue.getName() + " is exclusive, but not created on this Connection."); + "Queue " + queue.getName() + " is exclusive, but not created on this Connection.", + connection.getMethodRegistry()); } else if(queue.isExclusive() != body.getExclusive()) { throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot re-declare queue '" + queue.getName() + "' with different exclusivity (was: " - + queue.isExclusive() + " requested " + body.getExclusive() + ")"); + + queue.isExclusive() + " requested " + body.getExclusive() + ")", + connection.getMethodRegistry()); } else if((body.getAutoDelete() && queue.getLifetimePolicy() != LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS) || (!body.getAutoDelete() && queue.getLifetimePolicy() != ((body.getExclusive() && !body.getDurable()) ? LifetimePolicy.DELETE_ON_CONNECTION_CLOSE : LifetimePolicy.PERMANENT))) { throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot re-declare queue '" + queue.getName() + "' with different lifetime policy (was: " - + queue.getLifetimePolicy() + " requested autodelete: " + body.getAutoDelete() + ")"); + + queue.getLifetimePolicy() + " requested autodelete: " + body.getAutoDelete() + ")", + connection.getMethodRegistry()); } else if(queue.isDurable() != body.getDurable()) { throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot re-declare queue '" + queue.getName() + "' with different durability (was: " - + queue.isDurable() + " requested " + body.getDurable() + ")"); + + queue.isDurable() + " requested " + body.getDurable() + ")", + connection.getMethodRegistry()); } } catch (AccessControlException e) { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage()); + throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage(), connection.getMethodRegistry()); } //set this as the default queue on the channel: @@ -159,12 +164,12 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar if (!body.getNowait()) { channel.sync(); - MethodRegistry methodRegistry = protocolConnection.getMethodRegistry(); + MethodRegistry methodRegistry = connection.getMethodRegistry(); QueueDeclareOkBody responseBody = methodRegistry.createQueueDeclareOkBody(queueName, queue.getQueueDepthMessages(), queue.getConsumerCount()); - protocolConnection.writeFrame(responseBody.generateFrame(channelId)); + connection.writeFrame(responseBody.generateFrame(channelId)); _logger.info("Queue " + queueName + " declared successfully"); } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java index 84efcb68b8..123c076a25 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.protocol.v0_8.handler; +import java.security.AccessControlException; + import org.apache.qpid.AMQException; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.framing.QueueDeleteBody; @@ -27,14 +29,10 @@ import org.apache.qpid.framing.QueueDeleteOkBody; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; -import org.apache.qpid.server.store.DurableConfigurationStore; +import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHostImpl; -import java.security.AccessControlException; - public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteBody> { private static final QueueDeleteHandler _instance = new QueueDeleteHandler(); @@ -57,18 +55,17 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB } - public void methodReceived(AMQStateManager stateManager, QueueDeleteBody body, int channelId) throws AMQException + public void methodReceived(final AMQProtocolSession<?> connection, + QueueDeleteBody body, + int channelId) throws AMQException { - AMQProtocolSession protocolConnection = stateManager.getProtocolSession(); - VirtualHostImpl virtualHost = protocolConnection.getVirtualHost(); - DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); - + VirtualHostImpl virtualHost = connection.getVirtualHost(); - AMQChannel channel = protocolConnection.getChannel(channelId); + AMQChannel channel = connection.getChannel(channelId); if (channel == null) { - throw body.getChannelNotFoundException(channelId); + throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); } channel.sync(); AMQQueue queue; @@ -87,26 +84,30 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB { if (_failIfNotFound) { - throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist."); + throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.", + connection.getMethodRegistry()); } } else { if (body.getIfEmpty() && !queue.isEmpty()) { - throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.getQueue() + " is not empty."); + throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.getQueue() + " is not empty.", + connection.getMethodRegistry()); } else if (body.getIfUnused() && !queue.isUnused()) { // TODO - Error code - throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.getQueue() + " is still used."); + throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.getQueue() + " is still used.", + connection.getMethodRegistry()); } else { if (!queue.verifySessionAccess(channel)) { throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Queue " + queue.getName() + " is exclusive, but not created on this Connection."); + "Queue " + queue.getName() + " is exclusive, but not created on this Connection.", + connection.getMethodRegistry()); } int purged = 0; @@ -116,12 +117,12 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB } catch (AccessControlException e) { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage()); + throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage(), connection.getMethodRegistry()); } - MethodRegistry methodRegistry = protocolConnection.getMethodRegistry(); + MethodRegistry methodRegistry = connection.getMethodRegistry(); QueueDeleteOkBody responseBody = methodRegistry.createQueueDeleteOkBody(purged); - protocolConnection.writeFrame(responseBody.generateFrame(channelId)); + connection.writeFrame(responseBody.generateFrame(channelId)); } } } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java index 68ecf0324c..2c06fef1e2 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java @@ -21,6 +21,8 @@ package org.apache.qpid.server.protocol.v0_8.handler; +import java.security.AccessControlException; + import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.MethodRegistry; @@ -28,13 +30,10 @@ import org.apache.qpid.framing.QueuePurgeBody; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; +import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHostImpl; -import java.security.AccessControlException; - public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBody> { private static final QueuePurgeHandler _instance = new QueuePurgeHandler(); @@ -56,15 +55,16 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod _failIfNotFound = failIfNotFound; } - public void methodReceived(AMQStateManager stateManager, QueuePurgeBody body, int channelId) throws AMQException + public void methodReceived(final AMQProtocolSession<?> connection, + QueuePurgeBody body, + int channelId) throws AMQException { - AMQProtocolSession protocolConnection = stateManager.getProtocolSession(); - VirtualHostImpl virtualHost = protocolConnection.getVirtualHost(); + VirtualHostImpl virtualHost = connection.getVirtualHost(); - AMQChannel channel = protocolConnection.getChannel(channelId); + AMQChannel channel = connection.getChannel(channelId); if (channel == null) { - throw body.getChannelNotFoundException(channelId); + throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); } AMQQueue queue; if(body.getQueue() == null) @@ -77,7 +77,7 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod { if(_failIfNotFound) { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED,"No queue specified."); + throw body.getConnectionException(AMQConstant.NOT_ALLOWED,"No queue specified.", connection.getMethodRegistry()); } } } @@ -90,7 +90,8 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod { if(_failIfNotFound) { - throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist."); + throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.", + connection.getMethodRegistry()); } } else @@ -98,7 +99,7 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod if (!queue.verifySessionAccess(channel)) { throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Queue is exclusive, but not created on this Connection."); + "Queue is exclusive, but not created on this Connection.", connection.getMethodRegistry()); } long purged = 0; @@ -108,16 +109,16 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod } catch (AccessControlException e) { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage()); + throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage(), connection.getMethodRegistry()); } if(!body.getNowait()) { channel.sync(); - MethodRegistry methodRegistry = protocolConnection.getMethodRegistry(); + MethodRegistry methodRegistry = connection.getMethodRegistry(); AMQMethodBody responseBody = methodRegistry.createQueuePurgeOkBody(purged); - protocolConnection.writeFrame(responseBody.generateFrame(channelId)); + connection.writeFrame(responseBody.generateFrame(channelId)); } } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java index 4e2d1bc8a2..9ae66b889c 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.protocol.v0_8.handler; +import java.security.AccessControlException; + import org.apache.log4j.Logger; import org.apache.qpid.AMQException; @@ -33,13 +35,10 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; +import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHostImpl; -import java.security.AccessControlException; - public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindBody> { private static final Logger _log = Logger.getLogger(QueueUnbindHandler.class); @@ -55,19 +54,20 @@ public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindB { } - public void methodReceived(AMQStateManager stateManager, QueueUnbindBody body, int channelId) throws AMQException + public void methodReceived(final AMQProtocolSession<?> connection, + QueueUnbindBody body, + int channelId) throws AMQException { - AMQProtocolSession session = stateManager.getProtocolSession(); - VirtualHostImpl virtualHost = session.getVirtualHost(); + VirtualHostImpl virtualHost = connection.getVirtualHost(); final AMQQueue queue; final AMQShortString routingKey; - AMQChannel channel = session.getChannel(channelId); + AMQChannel channel = connection.getChannel(channelId); if (channel == null) { - throw body.getChannelNotFoundException(channelId); + throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); } if (body.getQueue() == null) @@ -77,7 +77,8 @@ public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindB if (queue == null) { - throw body.getChannelException(AMQConstant.NOT_FOUND, "No default queue defined on channel and queue was null"); + throw body.getChannelException(AMQConstant.NOT_FOUND, "No default queue defined on channel and queue was null", + connection.getMethodRegistry()); } routingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().intern(false); @@ -91,23 +92,28 @@ public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindB if (queue == null) { - throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist."); + throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.", + connection.getMethodRegistry()); } if(isDefaultExchange(body.getExchange())) { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Cannot unbind the queue " + queue.getName() + " from the default exchange"); + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, + "Cannot unbind the queue " + + queue.getName() + + " from the default exchange", connection.getMethodRegistry()); } final ExchangeImpl exch = virtualHost.getExchange(body.getExchange() == null ? null : body.getExchange().toString()); if (exch == null) { - throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.getExchange() + " does not exist."); + throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.getExchange() + " does not exist.", + connection.getMethodRegistry()); } if(!exch.hasBinding(String.valueOf(routingKey), queue)) { - throw body.getChannelException(AMQConstant.NOT_FOUND,"No such binding"); + throw body.getChannelException(AMQConstant.NOT_FOUND,"No such binding", connection.getMethodRegistry()); } else { @@ -117,7 +123,7 @@ public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindB } catch (AccessControlException e) { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage()); + throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage(), connection.getMethodRegistry()); } } @@ -127,7 +133,7 @@ public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindB _log.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + routingKey); } - final MethodRegistry registry = session.getMethodRegistry(); + final MethodRegistry registry = connection.getMethodRegistry(); final AMQMethodBody responseBody; if (registry instanceof MethodRegistry_0_9) { @@ -140,10 +146,10 @@ public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindB else { // 0-8 does not support QueueUnbind - throw new AMQException(AMQConstant.COMMAND_INVALID, "QueueUnbind not present in AMQP version: " + session.getProtocolVersion(), null); + throw new AMQException(AMQConstant.COMMAND_INVALID, "QueueUnbind not present in AMQP version: " + connection.getProtocolVersion(), null); } channel.sync(); - session.writeFrame(responseBody.generateFrame(channelId)); + connection.writeFrame(responseBody.generateFrame(channelId)); } protected boolean isDefaultExchange(final AMQShortString exchangeName) diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl.java index 43e97c0cb6..429b5875bc 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl.java @@ -20,20 +20,20 @@ */
package org.apache.qpid.server.protocol.v0_8.handler;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
-
import java.util.HashMap;
import java.util.Map;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
+
public class ServerMethodDispatcherImpl implements MethodDispatcher
{
- private final AMQStateManager _stateManager;
+ private final AMQProtocolSession<?> _connection;
private static interface DispatcherFactory
{
- public MethodDispatcher createMethodDispatcher(AMQStateManager stateManager);
+ public MethodDispatcher createMethodDispatcher(AMQProtocolSession<?> connection);
}
private static final Map<ProtocolVersion, DispatcherFactory> _dispatcherFactories =
@@ -45,26 +45,26 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher _dispatcherFactories.put(ProtocolVersion.v8_0,
new DispatcherFactory()
{
- public MethodDispatcher createMethodDispatcher(AMQStateManager stateManager)
+ public MethodDispatcher createMethodDispatcher(AMQProtocolSession<?> connection)
{
- return new ServerMethodDispatcherImpl_8_0(stateManager);
+ return new ServerMethodDispatcherImpl_8_0(connection);
}
});
_dispatcherFactories.put(ProtocolVersion.v0_9,
new DispatcherFactory()
{
- public MethodDispatcher createMethodDispatcher(AMQStateManager stateManager)
+ public MethodDispatcher createMethodDispatcher(AMQProtocolSession<?> connection)
{
- return new ServerMethodDispatcherImpl_0_9(stateManager);
+ return new ServerMethodDispatcherImpl_0_9(connection);
}
});
_dispatcherFactories.put(ProtocolVersion.v0_91,
new DispatcherFactory()
{
- public MethodDispatcher createMethodDispatcher(AMQStateManager stateManager)
+ public MethodDispatcher createMethodDispatcher(AMQProtocolSession<?> connection)
{
- return new ServerMethodDispatcherImpl_0_91(stateManager);
+ return new ServerMethodDispatcherImpl_0_91(connection);
}
});
@@ -103,82 +103,80 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher - public static MethodDispatcher createMethodDispatcher(AMQStateManager stateManager, ProtocolVersion protocolVersion)
+ public static MethodDispatcher createMethodDispatcher(AMQProtocolSession<?> connection)
{
- return _dispatcherFactories.get(protocolVersion).createMethodDispatcher(stateManager);
+ return _dispatcherFactories.get(connection.getProtocolVersion()).createMethodDispatcher(connection);
}
- public ServerMethodDispatcherImpl(AMQStateManager stateManager)
+ public ServerMethodDispatcherImpl(AMQProtocolSession<?> connection)
{
- _stateManager = stateManager;
+ _connection = connection;
}
- protected AMQStateManager getStateManager()
+ protected final AMQProtocolSession<?> getConnection()
{
- return _stateManager;
+ return _connection;
}
-
-
public boolean dispatchAccessRequest(AccessRequestBody body, int channelId) throws AMQException
{
- _accessRequestHandler.methodReceived(_stateManager, body, channelId);
+ _accessRequestHandler.methodReceived(getConnection(), body, channelId);
return true;
}
public boolean dispatchBasicAck(BasicAckBody body, int channelId) throws AMQException
{
- _basicAckMethodHandler.methodReceived(_stateManager, body, channelId);
+ _basicAckMethodHandler.methodReceived(getConnection(), body, channelId);
return true;
}
public boolean dispatchBasicCancel(BasicCancelBody body, int channelId) throws AMQException
{
- _basicCancelMethodHandler.methodReceived(_stateManager, body, channelId);
+ _basicCancelMethodHandler.methodReceived(getConnection(), body, channelId);
return true;
}
public boolean dispatchBasicConsume(BasicConsumeBody body, int channelId) throws AMQException
{
- _basicConsumeMethodHandler.methodReceived(_stateManager, body, channelId);
+ _basicConsumeMethodHandler.methodReceived(getConnection(), body, channelId);
return true;
}
public boolean dispatchBasicGet(BasicGetBody body, int channelId) throws AMQException
{
- _basicGetMethodHandler.methodReceived(_stateManager, body, channelId);
+ _basicGetMethodHandler.methodReceived(getConnection(), body, channelId);
return true;
}
public boolean dispatchBasicPublish(BasicPublishBody body, int channelId) throws AMQException
{
- _basicPublishMethodHandler.methodReceived(_stateManager, body, channelId);
+ _basicPublishMethodHandler.methodReceived(getConnection(), body, channelId);
return true;
}
public boolean dispatchBasicQos(BasicQosBody body, int channelId) throws AMQException
{
- _basicQosHandler.methodReceived(_stateManager, body, channelId);
+ _basicQosHandler.methodReceived(getConnection(), body, channelId);
return true;
}
public boolean dispatchBasicRecover(BasicRecoverBody body, int channelId) throws AMQException
{
- _basicRecoverMethodHandler.methodReceived(_stateManager, body, channelId);
+ _basicRecoverMethodHandler.methodReceived(getConnection(), body, channelId);
return true;
}
public boolean dispatchBasicReject(BasicRejectBody body, int channelId) throws AMQException
{
- _basicRejectMethodHandler.methodReceived(_stateManager, body, channelId);
+ _basicRejectMethodHandler.methodReceived(getConnection(), body, channelId);
return true;
}
public boolean dispatchChannelOpen(ChannelOpenBody body, int channelId) throws AMQException
{
- _channelOpenHandler.methodReceived(_stateManager, body, channelId);
+ _channelOpenHandler.methodReceived(getConnection(), body, channelId);
return true;
}
@@ -225,21 +223,21 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher public boolean dispatchChannelClose(ChannelCloseBody body, int channelId) throws AMQException
{
- _channelCloseHandler.methodReceived(_stateManager, body, channelId);
+ _channelCloseHandler.methodReceived(getConnection(), body, channelId);
return true;
}
public boolean dispatchChannelCloseOk(ChannelCloseOkBody body, int channelId) throws AMQException
{
- _channelCloseOkHandler.methodReceived(_stateManager, body, channelId);
+ _channelCloseOkHandler.methodReceived(getConnection(), body, channelId);
return true;
}
public boolean dispatchChannelFlow(ChannelFlowBody body, int channelId) throws AMQException
{
- _channelFlowHandler.methodReceived(_stateManager, body, channelId);
+ _channelFlowHandler.methodReceived(getConnection(), body, channelId);
return true;
}
@@ -256,21 +254,23 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher public boolean dispatchConnectionOpen(ConnectionOpenBody body, int channelId) throws AMQException
{
- _connectionOpenMethodHandler.methodReceived(_stateManager, body, channelId);
+ _connectionOpenMethodHandler.methodReceived(getConnection(), body, channelId);
return true;
}
public boolean dispatchConnectionClose(ConnectionCloseBody body, int channelId) throws AMQException
{
- _connectionCloseMethodHandler.methodReceived(_stateManager, body, channelId);
+ _connectionCloseMethodHandler.methodReceived(getConnection(), body, channelId);
return true;
}
public boolean dispatchConnectionCloseOk(ConnectionCloseOkBody body, int channelId) throws AMQException
{
- _connectionCloseOkMethodHandler.methodReceived(_stateManager, body, channelId);
+ _connectionCloseOkMethodHandler.methodReceived(
+ getConnection(),
+ body, channelId);
return true;
}
@@ -299,15 +299,6 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher throw new UnexpectedMethodException(body);
}
- public boolean dispatchDtxSelectOk(DtxSelectOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchDtxStartOk(DtxStartOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
public boolean dispatchExchangeBoundOk(ExchangeBoundOkBody body, int channelId) throws AMQException
{
@@ -324,46 +315,6 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher throw new UnexpectedMethodException(body);
}
- public boolean dispatchFileCancelOk(FileCancelOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchFileConsumeOk(FileConsumeOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchFileDeliver(FileDeliverBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchFileOpen(FileOpenBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchFileOpenOk(FileOpenOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchFileQosOk(FileQosOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchFileReturn(FileReturnBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchFileStage(FileStageBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
public boolean dispatchQueueBindOk(QueueBindOkBody body, int channelId) throws AMQException
{
throw new UnexpectedMethodException(body);
@@ -384,31 +335,6 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher throw new UnexpectedMethodException(body);
}
- public boolean dispatchStreamCancelOk(StreamCancelOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchStreamConsumeOk(StreamConsumeOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchStreamDeliver(StreamDeliverBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchStreamQosOk(StreamQosOkBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- public boolean dispatchStreamReturn(StreamReturnBody body, int channelId) throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
public boolean dispatchTxCommitOk(TxCommitOkBody body, int channelId) throws AMQException
{
throw new UnexpectedMethodException(body);
@@ -427,144 +353,84 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher public boolean dispatchConnectionSecureOk(ConnectionSecureOkBody body, int channelId) throws AMQException
{
- _connectionSecureOkMethodHandler.methodReceived(_stateManager, body, channelId);
+ _connectionSecureOkMethodHandler.methodReceived(
+ getConnection(),
+ body, channelId);
return true;
}
public boolean dispatchConnectionStartOk(ConnectionStartOkBody body, int channelId) throws AMQException
{
- _connectionStartOkMethodHandler.methodReceived(_stateManager, body, channelId);
+ _connectionStartOkMethodHandler.methodReceived(
+ getConnection(),
+ body, channelId);
return true;
}
public boolean dispatchConnectionTuneOk(ConnectionTuneOkBody body, int channelId) throws AMQException
{
- _connectionTuneOkMethodHandler.methodReceived(_stateManager, body, channelId);
+ _connectionTuneOkMethodHandler.methodReceived(getConnection(), body, channelId);
return true;
}
- public boolean dispatchDtxSelect(DtxSelectBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchDtxStart(DtxStartBody body, int channelId) throws AMQException
- {
- return false;
- }
-
public boolean dispatchExchangeBound(ExchangeBoundBody body, int channelId) throws AMQException
{
- _exchangeBoundHandler.methodReceived(_stateManager, body, channelId);
+ _exchangeBoundHandler.methodReceived(getConnection(), body, channelId);
return true;
}
public boolean dispatchExchangeDeclare(ExchangeDeclareBody body, int channelId) throws AMQException
{
- _exchangeDeclareHandler.methodReceived(_stateManager, body, channelId);
+ _exchangeDeclareHandler.methodReceived(getConnection(), body, channelId);
return true;
}
public boolean dispatchExchangeDelete(ExchangeDeleteBody body, int channelId) throws AMQException
{
- _exchangeDeleteHandler.methodReceived(_stateManager, body, channelId);
+ _exchangeDeleteHandler.methodReceived(getConnection(), body, channelId);
return true;
}
- public boolean dispatchFileAck(FileAckBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchFileCancel(FileCancelBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchFileConsume(FileConsumeBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchFilePublish(FilePublishBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchFileQos(FileQosBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchFileReject(FileRejectBody body, int channelId) throws AMQException
- {
- return false;
- }
-
public boolean dispatchQueueBind(QueueBindBody body, int channelId) throws AMQException
{
- _queueBindHandler.methodReceived(_stateManager, body, channelId);
+ _queueBindHandler.methodReceived(getConnection(), body, channelId);
return true;
}
public boolean dispatchQueueDeclare(QueueDeclareBody body, int channelId) throws AMQException
{
- _queueDeclareHandler.methodReceived(_stateManager, body, channelId);
+ _queueDeclareHandler.methodReceived(getConnection(), body, channelId);
return true;
}
public boolean dispatchQueueDelete(QueueDeleteBody body, int channelId) throws AMQException
{
- _queueDeleteHandler.methodReceived(_stateManager, body, channelId);
+ _queueDeleteHandler.methodReceived(getConnection(), body, channelId);
return true;
}
public boolean dispatchQueuePurge(QueuePurgeBody body, int channelId) throws AMQException
{
- _queuePurgeHandler.methodReceived(_stateManager, body, channelId);
+ _queuePurgeHandler.methodReceived(getConnection(), body, channelId);
return true;
}
- public boolean dispatchStreamCancel(StreamCancelBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchStreamConsume(StreamConsumeBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchStreamPublish(StreamPublishBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchStreamQos(StreamQosBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchTunnelRequest(TunnelRequestBody body, int channelId) throws AMQException
- {
- return false;
- }
public boolean dispatchTxCommit(TxCommitBody body, int channelId) throws AMQException
{
- _txCommitHandler.methodReceived(_stateManager, body, channelId);
+ _txCommitHandler.methodReceived(getConnection(), body, channelId);
return true;
}
public boolean dispatchTxRollback(TxRollbackBody body, int channelId) throws AMQException
{
- _txRollbackHandler.methodReceived(_stateManager, body, channelId);
+ _txRollbackHandler.methodReceived(getConnection(), body, channelId);
return true;
}
public boolean dispatchTxSelect(TxSelectBody body, int channelId) throws AMQException
{
- _txSelectHandler.methodReceived(_stateManager, body, channelId);
+ _txSelectHandler.methodReceived(getConnection(), body, channelId);
return true;
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_0_9.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_0_9.java index 1ee6d732c2..4d66d0f999 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_0_9.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_0_9.java @@ -24,7 +24,7 @@ package org.apache.qpid.server.protocol.v0_8.handler; import org.apache.qpid.AMQException;
import org.apache.qpid.framing.*;
import org.apache.qpid.framing.amqp_0_9.MethodDispatcher_0_9;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
+import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
@@ -40,14 +40,14 @@ public class ServerMethodDispatcherImpl_0_9 QueueUnbindHandler.getInstance();
- public ServerMethodDispatcherImpl_0_9(AMQStateManager stateManager)
+ public ServerMethodDispatcherImpl_0_9(AMQProtocolSession<?> connection)
{
- super(stateManager);
+ super(connection);
}
public boolean dispatchBasicRecoverSync(BasicRecoverSyncBody body, int channelId) throws AMQException
{
- _basicRecoverSyncMethodHandler.methodReceived(getStateManager(), body, channelId);
+ _basicRecoverSyncMethodHandler.methodReceived(getConnection(), body, channelId);
return true;
}
@@ -56,101 +56,6 @@ public class ServerMethodDispatcherImpl_0_9 throw new UnexpectedMethodException(body);
}
- public boolean dispatchChannelOk(ChannelOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchChannelPing(ChannelPingBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchChannelPong(ChannelPongBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchChannelResume(ChannelResumeBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageAppend(MessageAppendBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageCancel(MessageCancelBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageCheckpoint(MessageCheckpointBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageClose(MessageCloseBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageConsume(MessageConsumeBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageEmpty(MessageEmptyBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageGet(MessageGetBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageOffset(MessageOffsetBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageOk(MessageOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageOpen(MessageOpenBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageQos(MessageQosBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageRecover(MessageRecoverBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageReject(MessageRejectBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageResume(MessageResumeBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageTransfer(MessageTransferBody body, int channelId) throws AMQException
- {
- return false;
- }
-
public boolean dispatchQueueUnbindOk(QueueUnbindOkBody body, int channelId) throws AMQException
{
throw new UnexpectedMethodException(body);
@@ -158,7 +63,7 @@ public class ServerMethodDispatcherImpl_0_9 public boolean dispatchQueueUnbind(QueueUnbindBody body, int channelId) throws AMQException
{
- _queueUnbindHandler.methodReceived(getStateManager(),body,channelId);
+ _queueUnbindHandler.methodReceived(getConnection(), body,channelId);
return true;
}
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_0_91.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_0_91.java index b11b9cff2b..711dfe12ff 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_0_91.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_0_91.java @@ -24,7 +24,7 @@ package org.apache.qpid.server.protocol.v0_8.handler; import org.apache.qpid.AMQException;
import org.apache.qpid.framing.*;
import org.apache.qpid.framing.amqp_0_91.MethodDispatcher_0_91;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
+import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
public class ServerMethodDispatcherImpl_0_91
@@ -39,14 +39,14 @@ public class ServerMethodDispatcherImpl_0_91 QueueUnbindHandler.getInstance();
- public ServerMethodDispatcherImpl_0_91(AMQStateManager stateManager)
+ public ServerMethodDispatcherImpl_0_91(AMQProtocolSession<?> connection)
{
- super(stateManager);
+ super(connection);
}
public boolean dispatchBasicRecoverSync(BasicRecoverSyncBody body, int channelId) throws AMQException
{
- _basicRecoverSyncMethodHandler.methodReceived(getStateManager(), body, channelId);
+ _basicRecoverSyncMethodHandler.methodReceived(getConnection(), body, channelId);
return true;
}
@@ -55,106 +55,6 @@ public class ServerMethodDispatcherImpl_0_91 throw new UnexpectedMethodException(body);
}
- public boolean dispatchChannelOk(ChannelOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchChannelPing(ChannelPingBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchChannelPong(ChannelPongBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchChannelResume(ChannelResumeBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageAppend(MessageAppendBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageCancel(MessageCancelBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageCheckpoint(MessageCheckpointBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageClose(MessageCloseBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageConsume(MessageConsumeBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageEmpty(MessageEmptyBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageGet(MessageGetBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageOffset(MessageOffsetBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageOk(MessageOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageOpen(MessageOpenBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageQos(MessageQosBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageRecover(MessageRecoverBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageReject(MessageRejectBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageResume(MessageResumeBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchMessageTransfer(MessageTransferBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchBasicRecoverOk(BasicRecoverOkBody body, int channelId) throws AMQException
- {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
public boolean dispatchQueueUnbindOk(QueueUnbindOkBody body, int channelId) throws AMQException
{
throw new UnexpectedMethodException(body);
@@ -162,7 +62,7 @@ public class ServerMethodDispatcherImpl_0_91 public boolean dispatchQueueUnbind(QueueUnbindBody body, int channelId) throws AMQException
{
- _queueUnbindHandler.methodReceived(getStateManager(),body,channelId);
+ _queueUnbindHandler.methodReceived(getConnection(), body,channelId);
return true;
}
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_8_0.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_8_0.java index f05219712f..df2306428f 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_8_0.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_8_0.java @@ -23,24 +23,16 @@ package org.apache.qpid.server.protocol.v0_8.handler; import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicRecoverOkBody;
import org.apache.qpid.framing.ChannelAlertBody;
-import org.apache.qpid.framing.TestContentBody;
-import org.apache.qpid.framing.TestContentOkBody;
-import org.apache.qpid.framing.TestIntegerBody;
-import org.apache.qpid.framing.TestIntegerOkBody;
-import org.apache.qpid.framing.TestStringBody;
-import org.apache.qpid.framing.TestStringOkBody;
-import org.apache.qpid.framing.TestTableBody;
-import org.apache.qpid.framing.TestTableOkBody;
import org.apache.qpid.framing.amqp_8_0.MethodDispatcher_8_0;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
+import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
public class ServerMethodDispatcherImpl_8_0
extends ServerMethodDispatcherImpl
implements MethodDispatcher_8_0
{
- public ServerMethodDispatcherImpl_8_0(AMQStateManager stateManager)
+ public ServerMethodDispatcherImpl_8_0(AMQProtocolSession<?> connection)
{
- super(stateManager);
+ super(connection);
}
public boolean dispatchBasicRecoverOk(BasicRecoverOkBody body, int channelId) throws AMQException
@@ -53,43 +45,4 @@ public class ServerMethodDispatcherImpl_8_0 throw new UnexpectedMethodException(body);
}
- public boolean dispatchTestContent(TestContentBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchTestContentOk(TestContentOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchTestInteger(TestIntegerBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchTestIntegerOk(TestIntegerOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchTestString(TestStringBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchTestStringOk(TestStringOkBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchTestTable(TestTableBody body, int channelId) throws AMQException
- {
- return false;
- }
-
- public boolean dispatchTestTableOk(TestTableOkBody body, int channelId) throws AMQException
- {
- return false;
- }
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxCommitHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxCommitHandler.java index b257030a59..cb08b1fd4f 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxCommitHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxCommitHandler.java @@ -28,7 +28,6 @@ import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.framing.TxCommitBody; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; public class TxCommitHandler implements StateAwareMethodListener<TxCommitBody> @@ -46,21 +45,21 @@ public class TxCommitHandler implements StateAwareMethodListener<TxCommitBody> { } - public void methodReceived(AMQStateManager stateManager, TxCommitBody body, final int channelId) throws AMQException + public void methodReceived(final AMQProtocolSession<?> connection, + TxCommitBody body, + final int channelId) throws AMQException { - final AMQProtocolSession session = stateManager.getProtocolSession(); - try { if (_log.isDebugEnabled()) { _log.debug("Commit received on channel " + channelId); } - AMQChannel channel = session.getChannel(channelId); + AMQChannel channel = connection.getChannel(channelId); if (channel == null) { - throw body.getChannelNotFoundException(channelId); + throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); } channel.commit(new Runnable() { @@ -68,9 +67,9 @@ public class TxCommitHandler implements StateAwareMethodListener<TxCommitBody> @Override public void run() { - MethodRegistry methodRegistry = session.getMethodRegistry(); + MethodRegistry methodRegistry = connection.getMethodRegistry(); AMQMethodBody responseBody = methodRegistry.createTxCommitOkBody(); - session.writeFrame(responseBody.generateFrame(channelId)); + connection.writeFrame(responseBody.generateFrame(channelId)); } }, true); @@ -79,7 +78,8 @@ public class TxCommitHandler implements StateAwareMethodListener<TxCommitBody> } catch (AMQException e) { - throw body.getChannelException(e.getErrorCode(), "Failed to commit: " + e.getMessage()); + throw body.getChannelException(e.getErrorCode(), "Failed to commit: " + e.getMessage(), + connection.getMethodRegistry()); } } } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxRollbackHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxRollbackHandler.java index 69ad1a0a21..08c1c2378b 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxRollbackHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxRollbackHandler.java @@ -26,7 +26,6 @@ import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.framing.TxRollbackBody; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; public class TxRollbackHandler implements StateAwareMethodListener<TxRollbackBody> @@ -42,22 +41,22 @@ public class TxRollbackHandler implements StateAwareMethodListener<TxRollbackBod { } - public void methodReceived(AMQStateManager stateManager, TxRollbackBody body, final int channelId) throws AMQException + public void methodReceived(final AMQProtocolSession<?> connection, + TxRollbackBody body, + final int channelId) throws AMQException { - final AMQProtocolSession session = stateManager.getProtocolSession(); - try { - AMQChannel channel = session.getChannel(channelId); + AMQChannel channel = connection.getChannel(channelId); if (channel == null) { - throw body.getChannelNotFoundException(channelId); + throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); } - final MethodRegistry methodRegistry = session.getMethodRegistry(); + final MethodRegistry methodRegistry = connection.getMethodRegistry(); final AMQMethodBody responseBody = methodRegistry.createTxRollbackOkBody(); Runnable task = new Runnable() @@ -65,7 +64,7 @@ public class TxRollbackHandler implements StateAwareMethodListener<TxRollbackBod public void run() { - session.writeFrame(responseBody.generateFrame(channelId)); + connection.writeFrame(responseBody.generateFrame(channelId)); } }; @@ -79,7 +78,8 @@ public class TxRollbackHandler implements StateAwareMethodListener<TxRollbackBod } catch (AMQException e) { - throw body.getChannelException(e.getErrorCode(), "Failed to rollback: " + e.getMessage()); + throw body.getChannelException(e.getErrorCode(), "Failed to rollback: " + e.getMessage(), + connection.getMethodRegistry()); } } } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxSelectHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxSelectHandler.java index a43e1ebdab..d6ac194b09 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxSelectHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxSelectHandler.java @@ -26,7 +26,6 @@ import org.apache.qpid.framing.TxSelectBody; import org.apache.qpid.framing.TxSelectOkBody; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; public class TxSelectHandler implements StateAwareMethodListener<TxSelectBody> @@ -42,21 +41,21 @@ public class TxSelectHandler implements StateAwareMethodListener<TxSelectBody> { } - public void methodReceived(AMQStateManager stateManager, TxSelectBody body, int channelId) throws AMQException + public void methodReceived(final AMQProtocolSession<?> connection, + TxSelectBody body, + int channelId) throws AMQException { - AMQProtocolSession session = stateManager.getProtocolSession(); - - AMQChannel channel = session.getChannel(channelId); + AMQChannel channel = connection.getChannel(channelId); if (channel == null) { - throw body.getChannelNotFoundException(channelId); + throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); } channel.setLocalTransactional(); - MethodRegistry methodRegistry = session.getMethodRegistry(); + MethodRegistry methodRegistry = connection.getMethodRegistry(); TxSelectOkBody responseBody = methodRegistry.createTxSelectOkBody(); - session.writeFrame(responseBody.generateFrame(channelId)); + connection.writeFrame(responseBody.generateFrame(channelId)); } } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java deleted file mode 100644 index 3c1f1dedc3..0000000000 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8.state; - -import java.security.PrivilegedActionException; -import java.security.PrivilegedExceptionAction; - -import javax.security.auth.Subject; - -import org.apache.log4j.Logger; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.ChannelCloseBody; -import org.apache.qpid.framing.ChannelCloseOkBody; -import org.apache.qpid.framing.ChannelOpenBody; -import org.apache.qpid.framing.MethodDispatcher; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.protocol.AMQMethodEvent; -import org.apache.qpid.protocol.AMQMethodListener; -import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.security.SubjectCreator; -import org.apache.qpid.server.util.ServerScopedRuntimeException; - -/** - * The state manager is responsible for managing the state of the protocol session. - * <p> - * For each AMQProtocolHandler there is a separate state manager. - */ -public class AMQStateManager implements AMQMethodListener -{ - private static final Logger _logger = Logger.getLogger(AMQStateManager.class); - - private final Broker<?> _broker; - private final AMQProtocolSession _protocolSession; - /** The current state */ - private AMQState _currentState; - - public AMQStateManager(Broker<?> broker, AMQProtocolSession protocolSession) - { - _broker = broker; - _protocolSession = protocolSession; - _currentState = AMQState.CONNECTION_NOT_STARTED; - - } - - /** - * Get the Broker instance - * - * @return the Broker - */ - public Broker<?> getBroker() - { - return _broker; - } - - public void changeState(AMQState newState) - { - _logger.debug("State changing to " + newState + " from old state " + _currentState); - final AMQState oldState = _currentState; - _currentState = newState; - - } - - public void error(Exception e) - { - _logger.error("State manager received error notification[Current State:" + _currentState + "]: " + e, e); - } - - public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws AMQException - { - final MethodDispatcher dispatcher = _protocolSession.getMethodDispatcher(); - - final int channelId = evt.getChannelId(); - final B body = evt.getMethod(); - - final AMQChannel channel = _protocolSession.getChannel(channelId); - if(channelId != 0 && channel == null) - { - - if(! ((body instanceof ChannelOpenBody) - || (body instanceof ChannelCloseOkBody) - || (body instanceof ChannelCloseBody))) - { - throw body.getConnectionException(AMQConstant.CHANNEL_ERROR, "channel is closed won't process:" + body); - } - - } - if(channel == null) - { - return body.execute(dispatcher, channelId); - } - else - { - try - { - return Subject.doAs(channel.getSubject(), new PrivilegedExceptionAction<Boolean>() - { - @Override - public Boolean run() throws AMQException - { - return body.execute(dispatcher, channelId); - } - }); - } - catch (PrivilegedActionException e) - { - if(e.getCause() instanceof AMQException) - { - throw (AMQException) e.getCause(); - } - else - { - throw new ServerScopedRuntimeException(e.getCause()); - } - } - - - } - - } - - public AMQProtocolSession getProtocolSession() - { - return _protocolSession; - } - - - public SubjectCreator getSubjectCreator() - { - return _broker.getSubjectCreator(getProtocolSession().getLocalAddress(), getProtocolSession().getTransport().isSecure()); - } -} diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/StateAwareMethodListener.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/StateAwareMethodListener.java index 63ab23919d..d767c7e326 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/StateAwareMethodListener.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/StateAwareMethodListener.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v0_8.state; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; /** * A frame listener that is informed of the protocol state when invoked and has @@ -30,5 +31,5 @@ import org.apache.qpid.framing.AMQMethodBody; */ public interface StateAwareMethodListener<B extends AMQMethodBody> { - void methodReceived(AMQStateManager stateManager, B evt, int channelId) throws AMQException; + void methodReceived(final AMQProtocolSession<?> connection, B evt, int channelId) throws AMQException; } |