summaryrefslogtreecommitdiff
path: root/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java')
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java102
1 files changed, 87 insertions, 15 deletions
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
index a59f173d2f..1a746447b0 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
@@ -27,6 +27,8 @@ import java.nio.ByteBuffer;
import java.security.AccessController;
import java.security.Principal;
import java.security.PrivilegedAction;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -74,7 +76,7 @@ import org.apache.qpid.server.protocol.v0_8.handler.ServerMethodDispatcherImpl;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverterRegistry;
import org.apache.qpid.server.protocol.v0_8.state.AMQState;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
+import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.util.Action;
@@ -86,7 +88,7 @@ import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.util.BytesDataOutput;
-public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSession<AMQProtocolEngine>
+public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSession<AMQProtocolEngine>, MethodRegistrySource
{
private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class);
@@ -123,7 +125,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
private final Set<AMQChannel<AMQProtocolEngine>> _channelsForCurrentMessage =
new HashSet<AMQChannel<AMQProtocolEngine>>();
- private final AMQStateManager _stateManager;
+ /** The current state */
+ private AMQState _currentState;
private AMQDecoder _decoder;
@@ -189,7 +192,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
_transport = transport;
_maxNoOfChannels = broker.getConnection_sessionCountLimit();
_receivedLock = new ReentrantLock();
- _stateManager = new AMQStateManager(broker, this);
_decoder = new AMQDecoder(true, this);
_connectionID = connectionId;
_logSubject = new ConnectionLogSubject(this);
@@ -623,7 +625,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
{
try
{
- boolean wasAnyoneInterested = _stateManager.methodReceived(evt);
+ boolean wasAnyoneInterested = methodReceived(evt);
if (!wasAnyoneInterested)
{
@@ -654,9 +656,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
_logger.info("Closing connection due to: " + e.getMessage());
}
- AMQConnectionException ce =
- evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR,
- AMQConstant.CHANNEL_ERROR.getName().toString());
+ AMQConnectionException ce = new AMQConnectionException(AMQConstant.CHANNEL_ERROR,
+ AMQConstant.CHANNEL_ERROR.getName().toString(),
+ methodBody, getMethodRegistry());
_logger.info(e.getMessage() + " whilst processing:" + methodBody);
closeConnection(channelId, ce);
@@ -675,6 +677,58 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
closeProtocolSession();
}
}
+ private <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws AMQException
+ {
+ final MethodDispatcher dispatcher = getMethodDispatcher();
+
+ final int channelId = evt.getChannelId();
+ final B body = evt.getMethod();
+
+ final AMQChannel channel = getChannel(channelId);
+ if(channelId != 0 && channel == null)
+ {
+
+ if(! ((body instanceof ChannelOpenBody)
+ || (body instanceof ChannelCloseOkBody)
+ || (body instanceof ChannelCloseBody)))
+ {
+ throw new AMQConnectionException(AMQConstant.CHANNEL_ERROR, "channel is closed won't process:" + body, body, getMethodRegistry());
+ }
+
+ }
+ if(channel == null)
+ {
+ return body.execute(dispatcher, channelId);
+ }
+ else
+ {
+ try
+ {
+ return Subject.doAs(channel.getSubject(), new PrivilegedExceptionAction<Boolean>()
+ {
+ @Override
+ public Boolean run() throws AMQException
+ {
+ return body.execute(dispatcher, channelId);
+ }
+ });
+ }
+ catch (PrivilegedActionException e)
+ {
+ if(e.getCause() instanceof AMQException)
+ {
+ throw (AMQException) e.getCause();
+ }
+ else
+ {
+ throw new ServerScopedRuntimeException(e.getCause());
+ }
+ }
+
+
+ }
+
+ }
public void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException
{
@@ -1056,8 +1110,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
{
try
{
- _stateManager.changeState(AMQState.CONNECTION_CLOSING);
- writeFrame(e.getCloseFrame(channelId));
+ changeState(AMQState.CONNECTION_CLOSING);
+ writeFrame(e.getCloseFrame());
}
finally
{
@@ -1075,7 +1129,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
try
{
- _stateManager.changeState(AMQState.CONNECTION_CLOSED);
+ changeState(AMQState.CONNECTION_CLOSED);
}
catch (ConnectionScopedRuntimeException e)
{
@@ -1176,7 +1230,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
_protocolVersion = pv;
_methodRegistry = MethodRegistry.getMethodRegistry(_protocolVersion);
_protocolOutputConverter = ProtocolOutputConverterRegistry.getConverter(this);
- _dispatcher = ServerMethodDispatcherImpl.createMethodDispatcher(_stateManager, _protocolVersion);
+ _dispatcher = ServerMethodDispatcherImpl.createMethodDispatcher(this);
}
public byte getProtocolMajorVersion()
@@ -1478,9 +1532,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
public void close(AMQConstant cause, String message)
{
closeConnection(0, new AMQConnectionException(cause, message, 0, 0,
- getProtocolOutputConverter().getProtocolMajorVersion(),
- getProtocolOutputConverter().getProtocolMinorVersion(),
- null));
+ getMethodRegistry(),
+ null));
}
public void block()
@@ -1674,6 +1727,25 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
return _messageCompressionThreshold;
}
+ @Override
+ public void changeState(AMQState newState)
+ {
+ _logger.debug("State changing to " + newState + " from old state " + _currentState);
+ _currentState = newState;
+ }
+
+ @Override
+ public Broker<?> getBroker()
+ {
+ return _broker;
+ }
+
+ @Override
+ public SubjectCreator getSubjectCreator()
+ {
+ return _broker.getSubjectCreator(getLocalAddress(), getTransport().isSecure());
+ }
+
public EventLogger getEventLogger()
{
if(_virtualHost != null)