diff options
Diffstat (limited to 'java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java')
-rw-r--r-- | java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java | 102 |
1 files changed, 87 insertions, 15 deletions
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index a59f173d2f..1a746447b0 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -27,6 +27,8 @@ import java.nio.ByteBuffer; import java.security.AccessController; import java.security.Principal; import java.security.PrivilegedAction; +import java.security.PrivilegedActionException; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -74,7 +76,7 @@ import org.apache.qpid.server.protocol.v0_8.handler.ServerMethodDispatcherImpl; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverterRegistry; import org.apache.qpid.server.protocol.v0_8.state.AMQState; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; +import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.util.Action; @@ -86,7 +88,7 @@ import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.util.BytesDataOutput; -public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSession<AMQProtocolEngine> +public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSession<AMQProtocolEngine>, MethodRegistrySource { private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class); @@ -123,7 +125,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private final Set<AMQChannel<AMQProtocolEngine>> _channelsForCurrentMessage = new HashSet<AMQChannel<AMQProtocolEngine>>(); - private final AMQStateManager _stateManager; + /** The current state */ + private AMQState _currentState; private AMQDecoder _decoder; @@ -189,7 +192,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi _transport = transport; _maxNoOfChannels = broker.getConnection_sessionCountLimit(); _receivedLock = new ReentrantLock(); - _stateManager = new AMQStateManager(broker, this); _decoder = new AMQDecoder(true, this); _connectionID = connectionId; _logSubject = new ConnectionLogSubject(this); @@ -623,7 +625,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi { try { - boolean wasAnyoneInterested = _stateManager.methodReceived(evt); + boolean wasAnyoneInterested = methodReceived(evt); if (!wasAnyoneInterested) { @@ -654,9 +656,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi _logger.info("Closing connection due to: " + e.getMessage()); } - AMQConnectionException ce = - evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR, - AMQConstant.CHANNEL_ERROR.getName().toString()); + AMQConnectionException ce = new AMQConnectionException(AMQConstant.CHANNEL_ERROR, + AMQConstant.CHANNEL_ERROR.getName().toString(), + methodBody, getMethodRegistry()); _logger.info(e.getMessage() + " whilst processing:" + methodBody); closeConnection(channelId, ce); @@ -675,6 +677,58 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi closeProtocolSession(); } } + private <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws AMQException + { + final MethodDispatcher dispatcher = getMethodDispatcher(); + + final int channelId = evt.getChannelId(); + final B body = evt.getMethod(); + + final AMQChannel channel = getChannel(channelId); + if(channelId != 0 && channel == null) + { + + if(! ((body instanceof ChannelOpenBody) + || (body instanceof ChannelCloseOkBody) + || (body instanceof ChannelCloseBody))) + { + throw new AMQConnectionException(AMQConstant.CHANNEL_ERROR, "channel is closed won't process:" + body, body, getMethodRegistry()); + } + + } + if(channel == null) + { + return body.execute(dispatcher, channelId); + } + else + { + try + { + return Subject.doAs(channel.getSubject(), new PrivilegedExceptionAction<Boolean>() + { + @Override + public Boolean run() throws AMQException + { + return body.execute(dispatcher, channelId); + } + }); + } + catch (PrivilegedActionException e) + { + if(e.getCause() instanceof AMQException) + { + throw (AMQException) e.getCause(); + } + else + { + throw new ServerScopedRuntimeException(e.getCause()); + } + } + + + } + + } public void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException { @@ -1056,8 +1110,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi { try { - _stateManager.changeState(AMQState.CONNECTION_CLOSING); - writeFrame(e.getCloseFrame(channelId)); + changeState(AMQState.CONNECTION_CLOSING); + writeFrame(e.getCloseFrame()); } finally { @@ -1075,7 +1129,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi try { - _stateManager.changeState(AMQState.CONNECTION_CLOSED); + changeState(AMQState.CONNECTION_CLOSED); } catch (ConnectionScopedRuntimeException e) { @@ -1176,7 +1230,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi _protocolVersion = pv; _methodRegistry = MethodRegistry.getMethodRegistry(_protocolVersion); _protocolOutputConverter = ProtocolOutputConverterRegistry.getConverter(this); - _dispatcher = ServerMethodDispatcherImpl.createMethodDispatcher(_stateManager, _protocolVersion); + _dispatcher = ServerMethodDispatcherImpl.createMethodDispatcher(this); } public byte getProtocolMajorVersion() @@ -1478,9 +1532,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi public void close(AMQConstant cause, String message) { closeConnection(0, new AMQConnectionException(cause, message, 0, 0, - getProtocolOutputConverter().getProtocolMajorVersion(), - getProtocolOutputConverter().getProtocolMinorVersion(), - null)); + getMethodRegistry(), + null)); } public void block() @@ -1674,6 +1727,25 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return _messageCompressionThreshold; } + @Override + public void changeState(AMQState newState) + { + _logger.debug("State changing to " + newState + " from old state " + _currentState); + _currentState = newState; + } + + @Override + public Broker<?> getBroker() + { + return _broker; + } + + @Override + public SubjectCreator getSubjectCreator() + { + return _broker.getSubjectCreator(getLocalAddress(), getTransport().isSecure()); + } + public EventLogger getEventLogger() { if(_virtualHost != null) |