summaryrefslogtreecommitdiff
path: root/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java')
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java378
1 files changed, 95 insertions, 283 deletions
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java
index ac185d1aa9..b3ee5f9ff9 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.protocol.v0_8;
-import java.security.AccessControlException;
import java.security.PrivilegedAction;
import javax.security.auth.Subject;
@@ -29,16 +28,10 @@ import javax.security.sasl.SaslServer;
import org.apache.log4j.Logger;
-import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.State;
-import org.apache.qpid.server.model.port.AmqpPort;
-import org.apache.qpid.server.security.SubjectCreator;
-import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
-import org.apache.qpid.server.virtualhost.VirtualHostImpl;
public class ServerMethodDispatcherImpl implements MethodDispatcher
{
@@ -52,6 +45,13 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher
void onChannel(ChannelMethodProcessor channel);
}
+
+ private static interface ConnectionAction
+ {
+ void onConnection(ConnectionMethodProcessor connection);
+ }
+
+
public static MethodDispatcher createMethodDispatcher(AMQProtocolEngine connection)
{
return new ServerMethodDispatcherImpl(connection);
@@ -91,6 +91,21 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher
}
+ private void processConnectionMethod(final ConnectionAction action)
+ {
+ Subject.doAs(_connection.getSubject(), new PrivilegedAction<Void>()
+ {
+ @Override
+ public Void run()
+ {
+ action.onConnection(_connection);
+ return null;
+ }
+ });
+
+
+ }
+
public boolean dispatchAccessRequest(final AccessRequestBody body, int channelId)
{
processChannelMethod(channelId,
@@ -240,7 +255,7 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher
return true;
}
- public boolean dispatchBasicReject(final BasicRejectBody body, int channelId) throws AMQException
+ public boolean dispatchBasicReject(final BasicRejectBody body, int channelId)
{
processChannelMethod(channelId,
@@ -257,30 +272,16 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher
return true;
}
- public boolean dispatchChannelOpen(ChannelOpenBody body, int channelId) throws AMQException
+ public boolean dispatchChannelOpen(ChannelOpenBody body, final int channelId)
{
- VirtualHostImpl virtualHost = _connection.getVirtualHost();
-
- // Protect the broker against out of order frame request.
- if (virtualHost == null)
+ processConnectionMethod(new ConnectionAction()
{
- throw new AMQException(AMQConstant.COMMAND_INVALID,
- "Virtualhost has not yet been set. ConnectionOpen has not been called.",
- null);
- }
- _logger.info("Connecting to: " + virtualHost.getName());
-
- final AMQChannel channel = new AMQChannel(_connection, channelId, virtualHost.getMessageStore());
-
- _connection.addChannel(channel);
-
- ChannelOpenOkBody response;
-
-
- response = _connection.getMethodRegistry().createChannelOpenOkBody();
-
-
- _connection.writeFrame(response.generateFrame(channelId));
+ @Override
+ public void onConnection(final ConnectionMethodProcessor connection)
+ {
+ connection.receiveChannelOpen(channelId);
+ }
+ });
return true;
}
@@ -326,7 +327,7 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher
throw new UnexpectedMethodException(body);
}
- public boolean dispatchChannelClose(ChannelCloseBody body, int channelId) throws AMQException
+ public boolean dispatchChannelClose(ChannelCloseBody body, int channelId)
{
processChannelMethod(channelId,
@@ -344,7 +345,7 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher
}
- public boolean dispatchChannelCloseOk(ChannelCloseOkBody body, int channelId) throws AMQException
+ public boolean dispatchChannelCloseOk(ChannelCloseOkBody body, int channelId)
{
processChannelMethod(channelId,
@@ -362,7 +363,7 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher
}
- public boolean dispatchChannelFlow(final ChannelFlowBody body, int channelId) throws AMQException
+ public boolean dispatchChannelFlow(final ChannelFlowBody body, int channelId)
{
processChannelMethod(channelId,
@@ -389,103 +390,52 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher
}
- public boolean dispatchConnectionOpen(ConnectionOpenBody body, int channelId) throws AMQException
+ public boolean dispatchConnectionOpen(final ConnectionOpenBody body, int channelId)
{
-
- //ignore leading '/'
- String virtualHostName;
- if ((body.getVirtualHost() != null) && body.getVirtualHost().charAt(0) == '/')
- {
- virtualHostName =
- new StringBuilder(body.getVirtualHost().subSequence(1, body.getVirtualHost().length())).toString();
- }
- else
- {
- virtualHostName = body.getVirtualHost() == null ? null : String.valueOf(body.getVirtualHost());
- }
-
- VirtualHostImpl virtualHost = ((AmqpPort) _connection.getPort()).getVirtualHost(virtualHostName);
-
- if (virtualHost == null)
- {
- closeConnection(AMQConstant.NOT_FOUND,
- "Unknown virtual host: '" + virtualHostName + "'");
-
- }
- else
+ processConnectionMethod(new ConnectionAction()
{
- // Check virtualhost access
- if (virtualHost.getState() != State.ACTIVE)
+ @Override
+ public void onConnection(final ConnectionMethodProcessor connection)
{
- closeConnection(AMQConstant.CONNECTION_FORCED,
- "Virtual host '" + virtualHost.getName() + "' is not active"
- );
-
+ connection.receiveConnectionOpen(body.getVirtualHost(), body.getCapabilities(), body.getInsist());
}
- else
- {
- _connection.setVirtualHost(virtualHost);
- try
- {
- virtualHost.getSecurityManager().authoriseCreateConnection(_connection);
- if (_connection.getContextKey() == null)
- {
- _connection.setContextKey(new AMQShortString(Long.toString(System.currentTimeMillis())));
- }
+ });
- MethodRegistry methodRegistry = _connection.getMethodRegistry();
- AMQMethodBody responseBody = methodRegistry.createConnectionOpenOkBody(body.getVirtualHost());
-
- _connection.writeFrame(responseBody.generateFrame(channelId));
- }
- catch (AccessControlException e)
- {
- closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage());
- }
- }
- }
return true;
}
- public boolean dispatchConnectionClose(ConnectionCloseBody body, int channelId) throws AMQException
+ public boolean dispatchConnectionClose(final ConnectionCloseBody body, int channelId)
{
- if (_logger.isInfoEnabled())
- {
- _logger.info("ConnectionClose received with reply code/reply text " + body.getReplyCode() + "/" +
- body.getReplyText() + " for " + _connection);
- }
- try
- {
- _connection.closeSession();
- }
- catch (Exception e)
- {
- _logger.error("Error closing protocol session: " + e, e);
- }
-
- MethodRegistry methodRegistry = _connection.getMethodRegistry();
- ConnectionCloseOkBody responseBody = methodRegistry.createConnectionCloseOkBody();
- _connection.writeFrame(responseBody.generateFrame(channelId));
- _connection.closeProtocolSession();
+ processConnectionMethod(new ConnectionAction()
+ {
+ @Override
+ public void onConnection(final ConnectionMethodProcessor connection)
+ {
+ connection.receiveConnectionClose(body.getReplyCode(),
+ body.getReplyText(),
+ body.getClassId(),
+ body.getMethodId());
+ }
+ });
return true;
}
- public boolean dispatchConnectionCloseOk(ConnectionCloseOkBody body, int channelId) throws AMQException
+ public boolean dispatchConnectionCloseOk(ConnectionCloseOkBody body, int channelId)
{
- _logger.info("Received Connection-close-ok");
- try
+ processConnectionMethod(new ConnectionAction()
{
- _connection.closeSession();
- }
- catch (Exception e)
- {
- _logger.error("Error closing protocol session: " + e, e);
- }
+ @Override
+ public void onConnection(final ConnectionMethodProcessor connection)
+ {
+ connection.receiveConnectionCloseOk();
+ }
+ });
+
return true;
}
@@ -566,62 +516,18 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher
}
- public boolean dispatchConnectionSecureOk(ConnectionSecureOkBody body, int channelId) throws AMQException
+ public boolean dispatchConnectionSecureOk(final ConnectionSecureOkBody body, int channelId)
{
- Broker<?> broker = _connection.getBroker();
- SubjectCreator subjectCreator = _connection.getSubjectCreator();
-
- SaslServer ss = _connection.getSaslServer();
- if (ss == null)
- {
- throw new AMQException("No SASL context set up in session");
- }
- MethodRegistry methodRegistry = _connection.getMethodRegistry();
- SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, body.getResponse());
- switch (authResult.getStatus())
+ processConnectionMethod(new ConnectionAction()
{
- case ERROR:
- Exception cause = authResult.getCause();
-
- _logger.info("Authentication failed:" + (cause == null ? "" : cause.getMessage()));
-
- ConnectionCloseBody connectionCloseBody =
- methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(),
- AMQConstant.NOT_ALLOWED.getName(),
- body.getClazz(),
- body.getMethod());
-
- _connection.writeFrame(connectionCloseBody.generateFrame(0));
- disposeSaslServer(_connection);
- break;
- case SUCCESS:
- if (_logger.isInfoEnabled())
- {
- _logger.info("Connected as: " + authResult.getSubject());
- }
-
- int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE);
-
- if (frameMax <= 0)
- {
- frameMax = Integer.MAX_VALUE;
- }
+ @Override
+ public void onConnection(final ConnectionMethodProcessor connection)
+ {
+ connection.receiveConnectionSecureOk(body.getResponse());
+ }
+ });
- ConnectionTuneBody tuneBody =
- methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(),
- frameMax,
- broker.getConnection_heartBeatDelay());
- _connection.writeFrame(tuneBody.generateFrame(0));
- _connection.setAuthorizedSubject(authResult.getSubject());
- disposeSaslServer(_connection);
- break;
- case CONTINUE:
-
- ConnectionSecureBody
- secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge());
- _connection.writeFrame(secureBody.generateFrame(0));
- }
return true;
}
@@ -642,129 +548,40 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher
}
}
- public boolean dispatchConnectionStartOk(ConnectionStartOkBody body, int channelId) throws AMQException
+ public boolean dispatchConnectionStartOk(final ConnectionStartOkBody body, int channelId)
{
- Broker<?> broker = _connection.getBroker();
-
- _logger.info("SASL Mechanism selected: " + body.getMechanism());
- _logger.info("Locale selected: " + body.getLocale());
- SubjectCreator subjectCreator = _connection.getSubjectCreator();
- SaslServer ss = null;
- try
+ processConnectionMethod(new ConnectionAction()
{
- ss = subjectCreator.createSaslServer(String.valueOf(body.getMechanism()),
- _connection.getLocalFQDN(),
- _connection.getPeerPrincipal());
-
- if (ss == null)
+ @Override
+ public void onConnection(final ConnectionMethodProcessor connection)
{
- closeConnection(AMQConstant.RESOURCE_ERROR,
- "Unable to create SASL Server:" + body.getMechanism()
- );
-
+ connection.receiveConnectionStartOk(body.getClientProperties(),
+ body.getMechanism(),
+ body.getResponse(),
+ body.getLocale());
}
- else
- {
-
- _connection.setSaslServer(ss);
+ });
- final SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, body.getResponse());
- //save clientProperties
- _connection.setClientProperties(body.getClientProperties());
-
- MethodRegistry methodRegistry = _connection.getMethodRegistry();
-
- switch (authResult.getStatus())
- {
- case ERROR:
- Exception cause = authResult.getCause();
-
- _logger.info("Authentication failed:" + (cause == null ? "" : cause.getMessage()));
-
- ConnectionCloseBody closeBody =
- methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(),
- // replyCode
- AMQConstant.NOT_ALLOWED.getName(),
- body.getClazz(),
- body.getMethod());
-
- _connection.writeFrame(closeBody.generateFrame(0));
- disposeSaslServer(_connection);
- break;
-
- case SUCCESS:
- if (_logger.isInfoEnabled())
- {
- _logger.info("Connected as: " + authResult.getSubject());
- }
- _connection.setAuthorizedSubject(authResult.getSubject());
-
- int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE);
-
- if (frameMax <= 0)
- {
- frameMax = Integer.MAX_VALUE;
- }
-
- ConnectionTuneBody
- tuneBody =
- methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(),
- frameMax,
- broker.getConnection_heartBeatDelay());
- _connection.writeFrame(tuneBody.generateFrame(0));
- break;
- case CONTINUE:
- ConnectionSecureBody
- secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge());
- _connection.writeFrame(secureBody.generateFrame(0));
- }
- }
- }
- catch (SaslException e)
- {
- disposeSaslServer(_connection);
- throw new AMQException("SASL error: " + e, e);
- }
return true;
}
- public boolean dispatchConnectionTuneOk(ConnectionTuneOkBody body, int channelId) throws AMQException
+ public boolean dispatchConnectionTuneOk(final ConnectionTuneOkBody body, int channelId)
{
- final AMQProtocolEngine connection = getConnection();
- connection.initHeartbeats(body.getHeartbeat());
-
- int brokerFrameMax = connection.getBroker().getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE);
- if (brokerFrameMax <= 0)
+ processConnectionMethod(new ConnectionAction()
{
- brokerFrameMax = Integer.MAX_VALUE;
- }
+ @Override
+ public void onConnection(final ConnectionMethodProcessor connection)
+ {
+ connection.receiveConnectionTuneOk(body.getChannelMax(),
+ body.getFrameMax(),
+ body.getHeartbeat());
+ }
+ });
+ final AMQProtocolEngine connection = getConnection();
- if (body.getFrameMax() > (long) brokerFrameMax)
- {
- throw new AMQConnectionException(AMQConstant.SYNTAX_ERROR,
- "Attempt to set max frame size to " + body.getFrameMax()
- + " greater than the broker will allow: "
- + brokerFrameMax,
- body.getClazz(), body.getMethod(),
- connection.getMethodRegistry(), null);
- }
- else if (body.getFrameMax() > 0 && body.getFrameMax() < AMQConstant.FRAME_MIN_SIZE.getCode())
- {
- throw new AMQConnectionException(AMQConstant.SYNTAX_ERROR,
- "Attempt to set max frame size to " + body.getFrameMax()
- + " which is smaller than the specification definined minimum: "
- + AMQConstant.FRAME_MIN_SIZE.getCode(),
- body.getClazz(), body.getMethod(),
- connection.getMethodRegistry(), null);
- }
- int frameMax = body.getFrameMax() == 0 ? brokerFrameMax : (int) body.getFrameMax();
- connection.setMaxFrameSize(frameMax);
- long maxChannelNumber = body.getChannelMax();
- //0 means no implied limit, except that forced by protocol limitations (0xFFFF)
- connection.setMaximumNumberOfChannels(maxChannelNumber == 0 ? 0xFFFFL : maxChannelNumber);
return true;
}
@@ -825,11 +642,6 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher
return true;
}
- private boolean isDefaultExchange(final AMQShortString exchangeName)
- {
- return exchangeName == null || exchangeName.equals(AMQShortString.EMPTY_STRING);
- }
-
public boolean dispatchQueueBind(final QueueBindBody body, int channelId)
{
processChannelMethod(channelId,
@@ -891,7 +703,7 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher
return true;
}
- public boolean dispatchQueuePurge(final QueuePurgeBody body, int channelId) throws AMQException
+ public boolean dispatchQueuePurge(final QueuePurgeBody body, int channelId)
{
processChannelMethod(channelId,
@@ -910,7 +722,7 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher
}
- public boolean dispatchTxCommit(TxCommitBody body, final int channelId) throws AMQException
+ public boolean dispatchTxCommit(TxCommitBody body, final int channelId)
{
processChannelMethod(channelId,
@@ -927,7 +739,7 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher
return true;
}
- public boolean dispatchTxRollback(TxRollbackBody body, final int channelId) throws AMQException
+ public boolean dispatchTxRollback(TxRollbackBody body, final int channelId)
{
processChannelMethod(channelId,
@@ -943,7 +755,7 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher
return true;
}
- public boolean dispatchTxSelect(TxSelectBody body, int channelId) throws AMQException
+ public boolean dispatchTxSelect(TxSelectBody body, int channelId)
{
processChannelMethod(channelId,
new ChannelAction()
@@ -958,7 +770,7 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher
return true;
}
- public boolean dispatchBasicRecoverSync(final BasicRecoverSyncBody body, int channelId) throws AMQException
+ public boolean dispatchBasicRecoverSync(final BasicRecoverSyncBody body, int channelId)
{
processChannelMethod(channelId,
new ChannelAction()
@@ -991,7 +803,7 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher
throw new UnexpectedMethodException(body);
}
- public boolean dispatchQueueUnbind(final QueueUnbindBody body, int channelId) throws AMQException
+ public boolean dispatchQueueUnbind(final QueueUnbindBody body, int channelId)
{
processChannelMethod(channelId,