diff options
Diffstat (limited to 'java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java')
-rw-r--r-- | java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java | 378 |
1 files changed, 95 insertions, 283 deletions
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java index ac185d1aa9..b3ee5f9ff9 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.protocol.v0_8; -import java.security.AccessControlException; import java.security.PrivilegedAction; import javax.security.auth.Subject; @@ -29,16 +28,10 @@ import javax.security.sasl.SaslServer; import org.apache.log4j.Logger; -import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; import org.apache.qpid.framing.*; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.model.State; -import org.apache.qpid.server.model.port.AmqpPort; -import org.apache.qpid.server.security.SubjectCreator; -import org.apache.qpid.server.security.auth.SubjectAuthenticationResult; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; public class ServerMethodDispatcherImpl implements MethodDispatcher { @@ -52,6 +45,13 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher void onChannel(ChannelMethodProcessor channel); } + + private static interface ConnectionAction + { + void onConnection(ConnectionMethodProcessor connection); + } + + public static MethodDispatcher createMethodDispatcher(AMQProtocolEngine connection) { return new ServerMethodDispatcherImpl(connection); @@ -91,6 +91,21 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher } + private void processConnectionMethod(final ConnectionAction action) + { + Subject.doAs(_connection.getSubject(), new PrivilegedAction<Void>() + { + @Override + public Void run() + { + action.onConnection(_connection); + return null; + } + }); + + + } + public boolean dispatchAccessRequest(final AccessRequestBody body, int channelId) { processChannelMethod(channelId, @@ -240,7 +255,7 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher return true; } - public boolean dispatchBasicReject(final BasicRejectBody body, int channelId) throws AMQException + public boolean dispatchBasicReject(final BasicRejectBody body, int channelId) { processChannelMethod(channelId, @@ -257,30 +272,16 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher return true; } - public boolean dispatchChannelOpen(ChannelOpenBody body, int channelId) throws AMQException + public boolean dispatchChannelOpen(ChannelOpenBody body, final int channelId) { - VirtualHostImpl virtualHost = _connection.getVirtualHost(); - - // Protect the broker against out of order frame request. - if (virtualHost == null) + processConnectionMethod(new ConnectionAction() { - throw new AMQException(AMQConstant.COMMAND_INVALID, - "Virtualhost has not yet been set. ConnectionOpen has not been called.", - null); - } - _logger.info("Connecting to: " + virtualHost.getName()); - - final AMQChannel channel = new AMQChannel(_connection, channelId, virtualHost.getMessageStore()); - - _connection.addChannel(channel); - - ChannelOpenOkBody response; - - - response = _connection.getMethodRegistry().createChannelOpenOkBody(); - - - _connection.writeFrame(response.generateFrame(channelId)); + @Override + public void onConnection(final ConnectionMethodProcessor connection) + { + connection.receiveChannelOpen(channelId); + } + }); return true; } @@ -326,7 +327,7 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher throw new UnexpectedMethodException(body); } - public boolean dispatchChannelClose(ChannelCloseBody body, int channelId) throws AMQException + public boolean dispatchChannelClose(ChannelCloseBody body, int channelId) { processChannelMethod(channelId, @@ -344,7 +345,7 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher } - public boolean dispatchChannelCloseOk(ChannelCloseOkBody body, int channelId) throws AMQException + public boolean dispatchChannelCloseOk(ChannelCloseOkBody body, int channelId) { processChannelMethod(channelId, @@ -362,7 +363,7 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher } - public boolean dispatchChannelFlow(final ChannelFlowBody body, int channelId) throws AMQException + public boolean dispatchChannelFlow(final ChannelFlowBody body, int channelId) { processChannelMethod(channelId, @@ -389,103 +390,52 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher } - public boolean dispatchConnectionOpen(ConnectionOpenBody body, int channelId) throws AMQException + public boolean dispatchConnectionOpen(final ConnectionOpenBody body, int channelId) { - - //ignore leading '/' - String virtualHostName; - if ((body.getVirtualHost() != null) && body.getVirtualHost().charAt(0) == '/') - { - virtualHostName = - new StringBuilder(body.getVirtualHost().subSequence(1, body.getVirtualHost().length())).toString(); - } - else - { - virtualHostName = body.getVirtualHost() == null ? null : String.valueOf(body.getVirtualHost()); - } - - VirtualHostImpl virtualHost = ((AmqpPort) _connection.getPort()).getVirtualHost(virtualHostName); - - if (virtualHost == null) - { - closeConnection(AMQConstant.NOT_FOUND, - "Unknown virtual host: '" + virtualHostName + "'"); - - } - else + processConnectionMethod(new ConnectionAction() { - // Check virtualhost access - if (virtualHost.getState() != State.ACTIVE) + @Override + public void onConnection(final ConnectionMethodProcessor connection) { - closeConnection(AMQConstant.CONNECTION_FORCED, - "Virtual host '" + virtualHost.getName() + "' is not active" - ); - + connection.receiveConnectionOpen(body.getVirtualHost(), body.getCapabilities(), body.getInsist()); } - else - { - _connection.setVirtualHost(virtualHost); - try - { - virtualHost.getSecurityManager().authoriseCreateConnection(_connection); - if (_connection.getContextKey() == null) - { - _connection.setContextKey(new AMQShortString(Long.toString(System.currentTimeMillis()))); - } + }); - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - AMQMethodBody responseBody = methodRegistry.createConnectionOpenOkBody(body.getVirtualHost()); - - _connection.writeFrame(responseBody.generateFrame(channelId)); - } - catch (AccessControlException e) - { - closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage()); - } - } - } return true; } - public boolean dispatchConnectionClose(ConnectionCloseBody body, int channelId) throws AMQException + public boolean dispatchConnectionClose(final ConnectionCloseBody body, int channelId) { - if (_logger.isInfoEnabled()) - { - _logger.info("ConnectionClose received with reply code/reply text " + body.getReplyCode() + "/" + - body.getReplyText() + " for " + _connection); - } - try - { - _connection.closeSession(); - } - catch (Exception e) - { - _logger.error("Error closing protocol session: " + e, e); - } - - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - ConnectionCloseOkBody responseBody = methodRegistry.createConnectionCloseOkBody(); - _connection.writeFrame(responseBody.generateFrame(channelId)); - _connection.closeProtocolSession(); + processConnectionMethod(new ConnectionAction() + { + @Override + public void onConnection(final ConnectionMethodProcessor connection) + { + connection.receiveConnectionClose(body.getReplyCode(), + body.getReplyText(), + body.getClassId(), + body.getMethodId()); + } + }); return true; } - public boolean dispatchConnectionCloseOk(ConnectionCloseOkBody body, int channelId) throws AMQException + public boolean dispatchConnectionCloseOk(ConnectionCloseOkBody body, int channelId) { - _logger.info("Received Connection-close-ok"); - try + processConnectionMethod(new ConnectionAction() { - _connection.closeSession(); - } - catch (Exception e) - { - _logger.error("Error closing protocol session: " + e, e); - } + @Override + public void onConnection(final ConnectionMethodProcessor connection) + { + connection.receiveConnectionCloseOk(); + } + }); + return true; } @@ -566,62 +516,18 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher } - public boolean dispatchConnectionSecureOk(ConnectionSecureOkBody body, int channelId) throws AMQException + public boolean dispatchConnectionSecureOk(final ConnectionSecureOkBody body, int channelId) { - Broker<?> broker = _connection.getBroker(); - SubjectCreator subjectCreator = _connection.getSubjectCreator(); - - SaslServer ss = _connection.getSaslServer(); - if (ss == null) - { - throw new AMQException("No SASL context set up in session"); - } - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, body.getResponse()); - switch (authResult.getStatus()) + processConnectionMethod(new ConnectionAction() { - case ERROR: - Exception cause = authResult.getCause(); - - _logger.info("Authentication failed:" + (cause == null ? "" : cause.getMessage())); - - ConnectionCloseBody connectionCloseBody = - methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(), - AMQConstant.NOT_ALLOWED.getName(), - body.getClazz(), - body.getMethod()); - - _connection.writeFrame(connectionCloseBody.generateFrame(0)); - disposeSaslServer(_connection); - break; - case SUCCESS: - if (_logger.isInfoEnabled()) - { - _logger.info("Connected as: " + authResult.getSubject()); - } - - int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE); - - if (frameMax <= 0) - { - frameMax = Integer.MAX_VALUE; - } + @Override + public void onConnection(final ConnectionMethodProcessor connection) + { + connection.receiveConnectionSecureOk(body.getResponse()); + } + }); - ConnectionTuneBody tuneBody = - methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(), - frameMax, - broker.getConnection_heartBeatDelay()); - _connection.writeFrame(tuneBody.generateFrame(0)); - _connection.setAuthorizedSubject(authResult.getSubject()); - disposeSaslServer(_connection); - break; - case CONTINUE: - - ConnectionSecureBody - secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge()); - _connection.writeFrame(secureBody.generateFrame(0)); - } return true; } @@ -642,129 +548,40 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher } } - public boolean dispatchConnectionStartOk(ConnectionStartOkBody body, int channelId) throws AMQException + public boolean dispatchConnectionStartOk(final ConnectionStartOkBody body, int channelId) { - Broker<?> broker = _connection.getBroker(); - - _logger.info("SASL Mechanism selected: " + body.getMechanism()); - _logger.info("Locale selected: " + body.getLocale()); - SubjectCreator subjectCreator = _connection.getSubjectCreator(); - SaslServer ss = null; - try + processConnectionMethod(new ConnectionAction() { - ss = subjectCreator.createSaslServer(String.valueOf(body.getMechanism()), - _connection.getLocalFQDN(), - _connection.getPeerPrincipal()); - - if (ss == null) + @Override + public void onConnection(final ConnectionMethodProcessor connection) { - closeConnection(AMQConstant.RESOURCE_ERROR, - "Unable to create SASL Server:" + body.getMechanism() - ); - + connection.receiveConnectionStartOk(body.getClientProperties(), + body.getMechanism(), + body.getResponse(), + body.getLocale()); } - else - { - - _connection.setSaslServer(ss); + }); - final SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, body.getResponse()); - //save clientProperties - _connection.setClientProperties(body.getClientProperties()); - - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - - switch (authResult.getStatus()) - { - case ERROR: - Exception cause = authResult.getCause(); - - _logger.info("Authentication failed:" + (cause == null ? "" : cause.getMessage())); - - ConnectionCloseBody closeBody = - methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(), - // replyCode - AMQConstant.NOT_ALLOWED.getName(), - body.getClazz(), - body.getMethod()); - - _connection.writeFrame(closeBody.generateFrame(0)); - disposeSaslServer(_connection); - break; - - case SUCCESS: - if (_logger.isInfoEnabled()) - { - _logger.info("Connected as: " + authResult.getSubject()); - } - _connection.setAuthorizedSubject(authResult.getSubject()); - - int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE); - - if (frameMax <= 0) - { - frameMax = Integer.MAX_VALUE; - } - - ConnectionTuneBody - tuneBody = - methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(), - frameMax, - broker.getConnection_heartBeatDelay()); - _connection.writeFrame(tuneBody.generateFrame(0)); - break; - case CONTINUE: - ConnectionSecureBody - secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge()); - _connection.writeFrame(secureBody.generateFrame(0)); - } - } - } - catch (SaslException e) - { - disposeSaslServer(_connection); - throw new AMQException("SASL error: " + e, e); - } return true; } - public boolean dispatchConnectionTuneOk(ConnectionTuneOkBody body, int channelId) throws AMQException + public boolean dispatchConnectionTuneOk(final ConnectionTuneOkBody body, int channelId) { - final AMQProtocolEngine connection = getConnection(); - connection.initHeartbeats(body.getHeartbeat()); - - int brokerFrameMax = connection.getBroker().getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE); - if (brokerFrameMax <= 0) + processConnectionMethod(new ConnectionAction() { - brokerFrameMax = Integer.MAX_VALUE; - } + @Override + public void onConnection(final ConnectionMethodProcessor connection) + { + connection.receiveConnectionTuneOk(body.getChannelMax(), + body.getFrameMax(), + body.getHeartbeat()); + } + }); + final AMQProtocolEngine connection = getConnection(); - if (body.getFrameMax() > (long) brokerFrameMax) - { - throw new AMQConnectionException(AMQConstant.SYNTAX_ERROR, - "Attempt to set max frame size to " + body.getFrameMax() - + " greater than the broker will allow: " - + brokerFrameMax, - body.getClazz(), body.getMethod(), - connection.getMethodRegistry(), null); - } - else if (body.getFrameMax() > 0 && body.getFrameMax() < AMQConstant.FRAME_MIN_SIZE.getCode()) - { - throw new AMQConnectionException(AMQConstant.SYNTAX_ERROR, - "Attempt to set max frame size to " + body.getFrameMax() - + " which is smaller than the specification definined minimum: " - + AMQConstant.FRAME_MIN_SIZE.getCode(), - body.getClazz(), body.getMethod(), - connection.getMethodRegistry(), null); - } - int frameMax = body.getFrameMax() == 0 ? brokerFrameMax : (int) body.getFrameMax(); - connection.setMaxFrameSize(frameMax); - long maxChannelNumber = body.getChannelMax(); - //0 means no implied limit, except that forced by protocol limitations (0xFFFF) - connection.setMaximumNumberOfChannels(maxChannelNumber == 0 ? 0xFFFFL : maxChannelNumber); return true; } @@ -825,11 +642,6 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher return true; } - private boolean isDefaultExchange(final AMQShortString exchangeName) - { - return exchangeName == null || exchangeName.equals(AMQShortString.EMPTY_STRING); - } - public boolean dispatchQueueBind(final QueueBindBody body, int channelId) { processChannelMethod(channelId, @@ -891,7 +703,7 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher return true; } - public boolean dispatchQueuePurge(final QueuePurgeBody body, int channelId) throws AMQException + public boolean dispatchQueuePurge(final QueuePurgeBody body, int channelId) { processChannelMethod(channelId, @@ -910,7 +722,7 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher } - public boolean dispatchTxCommit(TxCommitBody body, final int channelId) throws AMQException + public boolean dispatchTxCommit(TxCommitBody body, final int channelId) { processChannelMethod(channelId, @@ -927,7 +739,7 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher return true; } - public boolean dispatchTxRollback(TxRollbackBody body, final int channelId) throws AMQException + public boolean dispatchTxRollback(TxRollbackBody body, final int channelId) { processChannelMethod(channelId, @@ -943,7 +755,7 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher return true; } - public boolean dispatchTxSelect(TxSelectBody body, int channelId) throws AMQException + public boolean dispatchTxSelect(TxSelectBody body, int channelId) { processChannelMethod(channelId, new ChannelAction() @@ -958,7 +770,7 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher return true; } - public boolean dispatchBasicRecoverSync(final BasicRecoverSyncBody body, int channelId) throws AMQException + public boolean dispatchBasicRecoverSync(final BasicRecoverSyncBody body, int channelId) { processChannelMethod(channelId, new ChannelAction() @@ -991,7 +803,7 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher throw new UnexpectedMethodException(body); } - public boolean dispatchQueueUnbind(final QueueUnbindBody body, int channelId) throws AMQException + public boolean dispatchQueueUnbind(final QueueUnbindBody body, int channelId) { processChannelMethod(channelId, |