diff options
Diffstat (limited to 'java/broker-plugins/amqp-0-8-protocol')
5 files changed, 491 insertions, 330 deletions
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 4b0fc6fd02..27fa654843 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -202,7 +202,6 @@ public class AMQChannel public AMQChannel(AMQProtocolEngine connection, int channelId, final MessageStore messageStore) - throws AMQException { _connection = connection; _channelId = channelId; diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index 4f560b1e74..dce0c3128e 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; +import java.security.AccessControlException; import java.security.AccessController; import java.security.Principal; import java.security.PrivilegedAction; @@ -43,6 +44,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import javax.security.auth.Subject; +import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; import org.apache.log4j.Logger; @@ -70,12 +72,15 @@ import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.Transport; +import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.SessionModelListener; import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; +import org.apache.qpid.server.security.auth.SubjectAuthenticationResult; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; @@ -88,7 +93,8 @@ import org.apache.qpid.util.BytesDataOutput; public class AMQProtocolEngine implements ServerProtocolEngine, AMQConnectionModel<AMQProtocolEngine, AMQChannel>, - AMQVersionAwareProtocolSession + AMQVersionAwareProtocolSession, + ConnectionMethodProcessor { private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class); @@ -836,38 +842,17 @@ public class AMQProtocolEngine implements ServerProtocolEngine, return !_closingChannelsList.isEmpty() && _closingChannelsList.containsKey(channelId); } - public void addChannel(AMQChannel channel) throws AMQException + public void addChannel(AMQChannel channel) { - if (_closed) - { - throw new AMQException("Session is closed"); - } - final int channelId = channel.getChannelId(); - if (_closingChannelsList.containsKey(channelId)) - { - throw new AMQException("Session is marked awaiting channel close"); - } - - if (_channelMap.size() == _maxNoOfChannels) - { - String errorMessage = - toString() + ": maximum number of channels has been reached (" + _maxNoOfChannels - + "); can't create channel"; - _logger.error(errorMessage); - throw new AMQException(AMQConstant.NOT_ALLOWED, errorMessage); - } - else + synchronized (_channelMap) { - synchronized (_channelMap) + _channelMap.put(channel.getChannelId(), channel); + sessionAdded(channel); + if(_blocking) { - _channelMap.put(channel.getChannelId(), channel); - sessionAdded(channel); - if(_blocking) - { - channel.block(); - } + channel.block(); } } @@ -893,7 +878,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, } } - public Long getMaximumNumberOfChannels() + public long getMaximumNumberOfChannels() { return _maxNoOfChannels; } @@ -1269,7 +1254,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, return _virtualHost; } - public void setVirtualHost(VirtualHostImpl<?,?,?> virtualHost) throws AMQException + public void setVirtualHost(VirtualHostImpl<?,?,?> virtualHost) { _virtualHost = virtualHost; @@ -1676,6 +1661,336 @@ public class AMQProtocolEngine implements ServerProtocolEngine, _deferFlush = deferFlush; } + @Override + public void receiveChannelOpen(final int channelId) + { + // Protect the broker against out of order frame request. + if (_virtualHost == null) + { + closeConnection(AMQConstant.COMMAND_INVALID, + "Virtualhost has not yet been set. ConnectionOpen has not been called.", channelId); + } + else if(getChannel(channelId) != null || channelAwaitingClosure(channelId)) + { + closeConnection(AMQConstant.CHANNEL_ERROR, "Channel " + channelId + " already exists", channelId); + } + else if(channelId > getMaximumNumberOfChannels()) + { + closeConnection(AMQConstant.CHANNEL_ERROR, + "Channel " + channelId + " cannot be created as the max allowed channel id is " + + getMaximumNumberOfChannels(), + channelId); + } + else + { + _logger.info("Connecting to: " + _virtualHost.getName()); + + final AMQChannel channel = new AMQChannel(this, channelId, _virtualHost.getMessageStore()); + + addChannel(channel); + + ChannelOpenOkBody response; + + + response = getMethodRegistry().createChannelOpenOkBody(); + + + writeFrame(response.generateFrame(channelId)); + } + } + + @Override + public void receiveConnectionOpen(AMQShortString virtualHostName, + AMQShortString capabilities, + boolean insist) + { + String virtualHostStr; + if ((virtualHostName != null) && virtualHostName.charAt(0) == '/') + { + virtualHostStr = virtualHostName.toString().substring(1); + } + else + { + virtualHostStr = virtualHostName == null ? null : virtualHostName.toString(); + } + + VirtualHostImpl virtualHost = ((AmqpPort)getPort()).getVirtualHost(virtualHostStr); + + if (virtualHost == null) + { + closeConnection(AMQConstant.NOT_FOUND, + "Unknown virtual host: '" + virtualHostName + "'",0); + + } + else + { + // Check virtualhost access + if (virtualHost.getState() != State.ACTIVE) + { + closeConnection(AMQConstant.CONNECTION_FORCED, + "Virtual host '" + virtualHost.getName() + "' is not active",0); + + } + else + { + setVirtualHost(virtualHost); + try + { + virtualHost.getSecurityManager().authoriseCreateConnection(this); + if (getContextKey() == null) + { + setContextKey(new AMQShortString(Long.toString(System.currentTimeMillis()))); + } + + MethodRegistry methodRegistry = getMethodRegistry(); + AMQMethodBody responseBody = methodRegistry.createConnectionOpenOkBody(virtualHostName); + + writeFrame(responseBody.generateFrame(0)); + } + catch (AccessControlException e) + { + closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage(),0); + } + } + } + } + + @Override + public void receiveConnectionClose(final int replyCode, + final AMQShortString replyText, + final int classId, + final int methodId) + { + if (_logger.isInfoEnabled()) + { + _logger.info("ConnectionClose received with reply code/reply text " + replyCode + "/" + + replyText + " for " + this); + } + try + { + closeSession(); + } + catch (Exception e) + { + _logger.error("Error closing protocol session: " + e, e); + } + + MethodRegistry methodRegistry = getMethodRegistry(); + ConnectionCloseOkBody responseBody = methodRegistry.createConnectionCloseOkBody(); + writeFrame(responseBody.generateFrame(0)); + + closeProtocolSession(); + + } + + @Override + public void receiveConnectionCloseOk() + { + + _logger.info("Received Connection-close-ok"); + + try + { + closeSession(); + } + catch (Exception e) + { + _logger.error("Error closing protocol session: " + e, e); + } + } + + @Override + public void receiveConnectionSecureOk(final byte[] response) + { + + Broker<?> broker = getBroker(); + + SubjectCreator subjectCreator = getSubjectCreator(); + + SaslServer ss = getSaslServer(); + if (ss == null) + { + closeConnection(AMQConstant.INTERNAL_ERROR, "No SASL context set up in session",0 ); + } + MethodRegistry methodRegistry = getMethodRegistry(); + SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, response); + switch (authResult.getStatus()) + { + case ERROR: + Exception cause = authResult.getCause(); + + _logger.info("Authentication failed:" + (cause == null ? "" : cause.getMessage())); + + closeConnection(AMQConstant.NOT_ALLOWED, "Authentication failed",0); + + disposeSaslServer(); + break; + case SUCCESS: + if (_logger.isInfoEnabled()) + { + _logger.info("Connected as: " + authResult.getSubject()); + } + + int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE); + + if (frameMax <= 0) + { + frameMax = Integer.MAX_VALUE; + } + + ConnectionTuneBody tuneBody = + methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(), + frameMax, + broker.getConnection_heartBeatDelay()); + writeFrame(tuneBody.generateFrame(0)); + setAuthorizedSubject(authResult.getSubject()); + disposeSaslServer(); + break; + case CONTINUE: + + ConnectionSecureBody + secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge()); + writeFrame(secureBody.generateFrame(0)); + } + } + + + private void disposeSaslServer() + { + SaslServer ss = getSaslServer(); + if (ss != null) + { + setSaslServer(null); + try + { + ss.dispose(); + } + catch (SaslException e) + { + _logger.error("Error disposing of Sasl server: " + e); + } + } + } + + @Override + public void receiveConnectionStartOk(final FieldTable clientProperties, + final AMQShortString mechanism, + final byte[] response, + final AMQShortString locale) + { + Broker<?> broker = getBroker(); + + _logger.info("SASL Mechanism selected: " + mechanism); + _logger.info("Locale selected: " + locale); + + SubjectCreator subjectCreator = getSubjectCreator(); + SaslServer ss = null; + try + { + ss = subjectCreator.createSaslServer(String.valueOf(mechanism), + getLocalFQDN(), + getPeerPrincipal()); + + if (ss == null) + { + closeConnection(AMQConstant.RESOURCE_ERROR, "Unable to create SASL Server:" + mechanism, 0); + + } + else + { + //save clientProperties + setClientProperties(clientProperties); + + setSaslServer(ss); + + final SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, response); + + MethodRegistry methodRegistry = getMethodRegistry(); + + switch (authResult.getStatus()) + { + case ERROR: + Exception cause = authResult.getCause(); + + _logger.info("Authentication failed:" + (cause == null ? "" : cause.getMessage())); + + closeConnection(AMQConstant.NOT_ALLOWED, "Authentication failed", 0); + + disposeSaslServer(); + break; + + case SUCCESS: + if (_logger.isInfoEnabled()) + { + _logger.info("Connected as: " + authResult.getSubject()); + } + setAuthorizedSubject(authResult.getSubject()); + + int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE); + + if (frameMax <= 0) + { + frameMax = Integer.MAX_VALUE; + } + + ConnectionTuneBody + tuneBody = + methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(), + frameMax, + broker.getConnection_heartBeatDelay()); + writeFrame(tuneBody.generateFrame(0)); + break; + case CONTINUE: + ConnectionSecureBody + secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge()); + writeFrame(secureBody.generateFrame(0)); + } + } + } + catch (SaslException e) + { + disposeSaslServer(); + closeConnection(AMQConstant.INTERNAL_ERROR, "SASL error: " + e, 0); + } + } + + @Override + public void receiveConnectionTuneOk(final int channelMax, final long frameMax, final int heartbeat) + { + initHeartbeats(heartbeat); + + int brokerFrameMax = getBroker().getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE); + if (brokerFrameMax <= 0) + { + brokerFrameMax = Integer.MAX_VALUE; + } + + if (frameMax > (long) brokerFrameMax) + { + closeConnection(AMQConstant.SYNTAX_ERROR, + "Attempt to set max frame size to " + frameMax + + " greater than the broker will allow: " + + brokerFrameMax, 0); + } + else if (frameMax > 0 && frameMax < AMQConstant.FRAME_MIN_SIZE.getCode()) + { + closeConnection(AMQConstant.SYNTAX_ERROR, + "Attempt to set max frame size to " + frameMax + + " which is smaller than the specification defined minimum: " + + AMQConstant.FRAME_MIN_SIZE.getCode(), 0); + } + else + { + int calculatedFrameMax = frameMax == 0 ? brokerFrameMax : (int) frameMax; + setMaxFrameSize(calculatedFrameMax); + + //0 means no implied limit, except that forced by protocol limitations (0xFFFF) + setMaximumNumberOfChannels( ((channelMax == 0l) || (channelMax > 0xFFFFL)) + ? 0xFFFFL + : channelMax); + } + } + public final class WriteDeliverMethod implements ClientDeliveryMethod { diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConnectionMethodProcessor.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConnectionMethodProcessor.java new file mode 100644 index 0000000000..6e657c022e --- /dev/null +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConnectionMethodProcessor.java @@ -0,0 +1,44 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v0_8; + +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; + +public interface ConnectionMethodProcessor +{ + void receiveChannelOpen(int channelId); + + void receiveConnectionOpen(AMQShortString virtualHost, AMQShortString capabilities, boolean insist); + + void receiveConnectionClose(int replyCode, AMQShortString replyText, int classId, int methodId); + + void receiveConnectionCloseOk(); + + void receiveConnectionSecureOk(byte[] response); + + void receiveConnectionStartOk(FieldTable clientProperties, + AMQShortString mechanism, + byte[] response, + AMQShortString locale); + + void receiveConnectionTuneOk(int channelMax, long frameMax, int heartbeat); +} diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java index ac185d1aa9..b3ee5f9ff9 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.protocol.v0_8; -import java.security.AccessControlException; import java.security.PrivilegedAction; import javax.security.auth.Subject; @@ -29,16 +28,10 @@ import javax.security.sasl.SaslServer; import org.apache.log4j.Logger; -import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; import org.apache.qpid.framing.*; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.model.State; -import org.apache.qpid.server.model.port.AmqpPort; -import org.apache.qpid.server.security.SubjectCreator; -import org.apache.qpid.server.security.auth.SubjectAuthenticationResult; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; public class ServerMethodDispatcherImpl implements MethodDispatcher { @@ -52,6 +45,13 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher void onChannel(ChannelMethodProcessor channel); } + + private static interface ConnectionAction + { + void onConnection(ConnectionMethodProcessor connection); + } + + public static MethodDispatcher createMethodDispatcher(AMQProtocolEngine connection) { return new ServerMethodDispatcherImpl(connection); @@ -91,6 +91,21 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher } + private void processConnectionMethod(final ConnectionAction action) + { + Subject.doAs(_connection.getSubject(), new PrivilegedAction<Void>() + { + @Override + public Void run() + { + action.onConnection(_connection); + return null; + } + }); + + + } + public boolean dispatchAccessRequest(final AccessRequestBody body, int channelId) { processChannelMethod(channelId, @@ -240,7 +255,7 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher return true; } - public boolean dispatchBasicReject(final BasicRejectBody body, int channelId) throws AMQException + public boolean dispatchBasicReject(final BasicRejectBody body, int channelId) { processChannelMethod(channelId, @@ -257,30 +272,16 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher return true; } - public boolean dispatchChannelOpen(ChannelOpenBody body, int channelId) throws AMQException + public boolean dispatchChannelOpen(ChannelOpenBody body, final int channelId) { - VirtualHostImpl virtualHost = _connection.getVirtualHost(); - - // Protect the broker against out of order frame request. - if (virtualHost == null) + processConnectionMethod(new ConnectionAction() { - throw new AMQException(AMQConstant.COMMAND_INVALID, - "Virtualhost has not yet been set. ConnectionOpen has not been called.", - null); - } - _logger.info("Connecting to: " + virtualHost.getName()); - - final AMQChannel channel = new AMQChannel(_connection, channelId, virtualHost.getMessageStore()); - - _connection.addChannel(channel); - - ChannelOpenOkBody response; - - - response = _connection.getMethodRegistry().createChannelOpenOkBody(); - - - _connection.writeFrame(response.generateFrame(channelId)); + @Override + public void onConnection(final ConnectionMethodProcessor connection) + { + connection.receiveChannelOpen(channelId); + } + }); return true; } @@ -326,7 +327,7 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher throw new UnexpectedMethodException(body); } - public boolean dispatchChannelClose(ChannelCloseBody body, int channelId) throws AMQException + public boolean dispatchChannelClose(ChannelCloseBody body, int channelId) { processChannelMethod(channelId, @@ -344,7 +345,7 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher } - public boolean dispatchChannelCloseOk(ChannelCloseOkBody body, int channelId) throws AMQException + public boolean dispatchChannelCloseOk(ChannelCloseOkBody body, int channelId) { processChannelMethod(channelId, @@ -362,7 +363,7 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher } - public boolean dispatchChannelFlow(final ChannelFlowBody body, int channelId) throws AMQException + public boolean dispatchChannelFlow(final ChannelFlowBody body, int channelId) { processChannelMethod(channelId, @@ -389,103 +390,52 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher } - public boolean dispatchConnectionOpen(ConnectionOpenBody body, int channelId) throws AMQException + public boolean dispatchConnectionOpen(final ConnectionOpenBody body, int channelId) { - - //ignore leading '/' - String virtualHostName; - if ((body.getVirtualHost() != null) && body.getVirtualHost().charAt(0) == '/') - { - virtualHostName = - new StringBuilder(body.getVirtualHost().subSequence(1, body.getVirtualHost().length())).toString(); - } - else - { - virtualHostName = body.getVirtualHost() == null ? null : String.valueOf(body.getVirtualHost()); - } - - VirtualHostImpl virtualHost = ((AmqpPort) _connection.getPort()).getVirtualHost(virtualHostName); - - if (virtualHost == null) - { - closeConnection(AMQConstant.NOT_FOUND, - "Unknown virtual host: '" + virtualHostName + "'"); - - } - else + processConnectionMethod(new ConnectionAction() { - // Check virtualhost access - if (virtualHost.getState() != State.ACTIVE) + @Override + public void onConnection(final ConnectionMethodProcessor connection) { - closeConnection(AMQConstant.CONNECTION_FORCED, - "Virtual host '" + virtualHost.getName() + "' is not active" - ); - + connection.receiveConnectionOpen(body.getVirtualHost(), body.getCapabilities(), body.getInsist()); } - else - { - _connection.setVirtualHost(virtualHost); - try - { - virtualHost.getSecurityManager().authoriseCreateConnection(_connection); - if (_connection.getContextKey() == null) - { - _connection.setContextKey(new AMQShortString(Long.toString(System.currentTimeMillis()))); - } + }); - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - AMQMethodBody responseBody = methodRegistry.createConnectionOpenOkBody(body.getVirtualHost()); - - _connection.writeFrame(responseBody.generateFrame(channelId)); - } - catch (AccessControlException e) - { - closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage()); - } - } - } return true; } - public boolean dispatchConnectionClose(ConnectionCloseBody body, int channelId) throws AMQException + public boolean dispatchConnectionClose(final ConnectionCloseBody body, int channelId) { - if (_logger.isInfoEnabled()) - { - _logger.info("ConnectionClose received with reply code/reply text " + body.getReplyCode() + "/" + - body.getReplyText() + " for " + _connection); - } - try - { - _connection.closeSession(); - } - catch (Exception e) - { - _logger.error("Error closing protocol session: " + e, e); - } - - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - ConnectionCloseOkBody responseBody = methodRegistry.createConnectionCloseOkBody(); - _connection.writeFrame(responseBody.generateFrame(channelId)); - _connection.closeProtocolSession(); + processConnectionMethod(new ConnectionAction() + { + @Override + public void onConnection(final ConnectionMethodProcessor connection) + { + connection.receiveConnectionClose(body.getReplyCode(), + body.getReplyText(), + body.getClassId(), + body.getMethodId()); + } + }); return true; } - public boolean dispatchConnectionCloseOk(ConnectionCloseOkBody body, int channelId) throws AMQException + public boolean dispatchConnectionCloseOk(ConnectionCloseOkBody body, int channelId) { - _logger.info("Received Connection-close-ok"); - try + processConnectionMethod(new ConnectionAction() { - _connection.closeSession(); - } - catch (Exception e) - { - _logger.error("Error closing protocol session: " + e, e); - } + @Override + public void onConnection(final ConnectionMethodProcessor connection) + { + connection.receiveConnectionCloseOk(); + } + }); + return true; } @@ -566,62 +516,18 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher } - public boolean dispatchConnectionSecureOk(ConnectionSecureOkBody body, int channelId) throws AMQException + public boolean dispatchConnectionSecureOk(final ConnectionSecureOkBody body, int channelId) { - Broker<?> broker = _connection.getBroker(); - SubjectCreator subjectCreator = _connection.getSubjectCreator(); - - SaslServer ss = _connection.getSaslServer(); - if (ss == null) - { - throw new AMQException("No SASL context set up in session"); - } - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, body.getResponse()); - switch (authResult.getStatus()) + processConnectionMethod(new ConnectionAction() { - case ERROR: - Exception cause = authResult.getCause(); - - _logger.info("Authentication failed:" + (cause == null ? "" : cause.getMessage())); - - ConnectionCloseBody connectionCloseBody = - methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(), - AMQConstant.NOT_ALLOWED.getName(), - body.getClazz(), - body.getMethod()); - - _connection.writeFrame(connectionCloseBody.generateFrame(0)); - disposeSaslServer(_connection); - break; - case SUCCESS: - if (_logger.isInfoEnabled()) - { - _logger.info("Connected as: " + authResult.getSubject()); - } - - int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE); - - if (frameMax <= 0) - { - frameMax = Integer.MAX_VALUE; - } + @Override + public void onConnection(final ConnectionMethodProcessor connection) + { + connection.receiveConnectionSecureOk(body.getResponse()); + } + }); - ConnectionTuneBody tuneBody = - methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(), - frameMax, - broker.getConnection_heartBeatDelay()); - _connection.writeFrame(tuneBody.generateFrame(0)); - _connection.setAuthorizedSubject(authResult.getSubject()); - disposeSaslServer(_connection); - break; - case CONTINUE: - - ConnectionSecureBody - secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge()); - _connection.writeFrame(secureBody.generateFrame(0)); - } return true; } @@ -642,129 +548,40 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher } } - public boolean dispatchConnectionStartOk(ConnectionStartOkBody body, int channelId) throws AMQException + public boolean dispatchConnectionStartOk(final ConnectionStartOkBody body, int channelId) { - Broker<?> broker = _connection.getBroker(); - - _logger.info("SASL Mechanism selected: " + body.getMechanism()); - _logger.info("Locale selected: " + body.getLocale()); - SubjectCreator subjectCreator = _connection.getSubjectCreator(); - SaslServer ss = null; - try + processConnectionMethod(new ConnectionAction() { - ss = subjectCreator.createSaslServer(String.valueOf(body.getMechanism()), - _connection.getLocalFQDN(), - _connection.getPeerPrincipal()); - - if (ss == null) + @Override + public void onConnection(final ConnectionMethodProcessor connection) { - closeConnection(AMQConstant.RESOURCE_ERROR, - "Unable to create SASL Server:" + body.getMechanism() - ); - + connection.receiveConnectionStartOk(body.getClientProperties(), + body.getMechanism(), + body.getResponse(), + body.getLocale()); } - else - { - - _connection.setSaslServer(ss); + }); - final SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, body.getResponse()); - //save clientProperties - _connection.setClientProperties(body.getClientProperties()); - - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - - switch (authResult.getStatus()) - { - case ERROR: - Exception cause = authResult.getCause(); - - _logger.info("Authentication failed:" + (cause == null ? "" : cause.getMessage())); - - ConnectionCloseBody closeBody = - methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(), - // replyCode - AMQConstant.NOT_ALLOWED.getName(), - body.getClazz(), - body.getMethod()); - - _connection.writeFrame(closeBody.generateFrame(0)); - disposeSaslServer(_connection); - break; - - case SUCCESS: - if (_logger.isInfoEnabled()) - { - _logger.info("Connected as: " + authResult.getSubject()); - } - _connection.setAuthorizedSubject(authResult.getSubject()); - - int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE); - - if (frameMax <= 0) - { - frameMax = Integer.MAX_VALUE; - } - - ConnectionTuneBody - tuneBody = - methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(), - frameMax, - broker.getConnection_heartBeatDelay()); - _connection.writeFrame(tuneBody.generateFrame(0)); - break; - case CONTINUE: - ConnectionSecureBody - secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge()); - _connection.writeFrame(secureBody.generateFrame(0)); - } - } - } - catch (SaslException e) - { - disposeSaslServer(_connection); - throw new AMQException("SASL error: " + e, e); - } return true; } - public boolean dispatchConnectionTuneOk(ConnectionTuneOkBody body, int channelId) throws AMQException + public boolean dispatchConnectionTuneOk(final ConnectionTuneOkBody body, int channelId) { - final AMQProtocolEngine connection = getConnection(); - connection.initHeartbeats(body.getHeartbeat()); - - int brokerFrameMax = connection.getBroker().getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE); - if (brokerFrameMax <= 0) + processConnectionMethod(new ConnectionAction() { - brokerFrameMax = Integer.MAX_VALUE; - } + @Override + public void onConnection(final ConnectionMethodProcessor connection) + { + connection.receiveConnectionTuneOk(body.getChannelMax(), + body.getFrameMax(), + body.getHeartbeat()); + } + }); + final AMQProtocolEngine connection = getConnection(); - if (body.getFrameMax() > (long) brokerFrameMax) - { - throw new AMQConnectionException(AMQConstant.SYNTAX_ERROR, - "Attempt to set max frame size to " + body.getFrameMax() - + " greater than the broker will allow: " - + brokerFrameMax, - body.getClazz(), body.getMethod(), - connection.getMethodRegistry(), null); - } - else if (body.getFrameMax() > 0 && body.getFrameMax() < AMQConstant.FRAME_MIN_SIZE.getCode()) - { - throw new AMQConnectionException(AMQConstant.SYNTAX_ERROR, - "Attempt to set max frame size to " + body.getFrameMax() - + " which is smaller than the specification definined minimum: " - + AMQConstant.FRAME_MIN_SIZE.getCode(), - body.getClazz(), body.getMethod(), - connection.getMethodRegistry(), null); - } - int frameMax = body.getFrameMax() == 0 ? brokerFrameMax : (int) body.getFrameMax(); - connection.setMaxFrameSize(frameMax); - long maxChannelNumber = body.getChannelMax(); - //0 means no implied limit, except that forced by protocol limitations (0xFFFF) - connection.setMaximumNumberOfChannels(maxChannelNumber == 0 ? 0xFFFFL : maxChannelNumber); return true; } @@ -825,11 +642,6 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher return true; } - private boolean isDefaultExchange(final AMQShortString exchangeName) - { - return exchangeName == null || exchangeName.equals(AMQShortString.EMPTY_STRING); - } - public boolean dispatchQueueBind(final QueueBindBody body, int channelId) { processChannelMethod(channelId, @@ -891,7 +703,7 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher return true; } - public boolean dispatchQueuePurge(final QueuePurgeBody body, int channelId) throws AMQException + public boolean dispatchQueuePurge(final QueuePurgeBody body, int channelId) { processChannelMethod(channelId, @@ -910,7 +722,7 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher } - public boolean dispatchTxCommit(TxCommitBody body, final int channelId) throws AMQException + public boolean dispatchTxCommit(TxCommitBody body, final int channelId) { processChannelMethod(channelId, @@ -927,7 +739,7 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher return true; } - public boolean dispatchTxRollback(TxRollbackBody body, final int channelId) throws AMQException + public boolean dispatchTxRollback(TxRollbackBody body, final int channelId) { processChannelMethod(channelId, @@ -943,7 +755,7 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher return true; } - public boolean dispatchTxSelect(TxSelectBody body, int channelId) throws AMQException + public boolean dispatchTxSelect(TxSelectBody body, int channelId) { processChannelMethod(channelId, new ChannelAction() @@ -958,7 +770,7 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher return true; } - public boolean dispatchBasicRecoverSync(final BasicRecoverSyncBody body, int channelId) throws AMQException + public boolean dispatchBasicRecoverSync(final BasicRecoverSyncBody body, int channelId) { processChannelMethod(channelId, new ChannelAction() @@ -991,7 +803,7 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher throw new UnexpectedMethodException(body); } - public boolean dispatchQueueUnbind(final QueueUnbindBody body, int channelId) throws AMQException + public boolean dispatchQueueUnbind(final QueueUnbindBody body, int channelId) { processChannelMethod(channelId, diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java index 16c890eaea..107e64bee5 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java @@ -20,8 +20,6 @@ */ package org.apache.qpid.server.protocol.v0_8; -import org.apache.qpid.AMQException; -import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.test.utils.QpidTestCase; @@ -46,23 +44,16 @@ public class MaxChannelsTest extends QpidTestCase long maxChannels = 10L; _session.setMaximumNumberOfChannels(maxChannels); - assertEquals("Number of channels not correctly set.", new Long(maxChannels), _session.getMaximumNumberOfChannels()); + assertEquals("Number of channels not correctly set.", maxChannels, _session.getMaximumNumberOfChannels()); - for (long currentChannel = 0L; currentChannel < maxChannels; currentChannel++) + for (long currentChannel = 1L; currentChannel <= maxChannels; currentChannel++) { - _session.addChannel(new AMQChannel(_session, (int) currentChannel, null)); + _session.receiveChannelOpen( (int) currentChannel); } - - try - { - _session.addChannel(new AMQChannel(_session, (int) maxChannels, null)); - fail("Cannot create more channels then maximum"); - } - catch (AMQException e) - { - assertEquals("Wrong exception received.", e.getErrorCode(), AMQConstant.NOT_ALLOWED); - } - assertEquals("Maximum number of channels not set.", new Long(maxChannels), new Long(_session.getChannels().size())); + assertFalse("Connection should not be closed after opening " + maxChannels + " channels",_session.isClosed()); + assertEquals("Maximum number of channels not set.", maxChannels, _session.getChannels().size()); + _session.receiveChannelOpen((int) maxChannels+1); + assertTrue("Connection should be closed after opening " + (maxChannels + 1) + " channels",_session.isClosed()); } @Override |