summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-10-12 15:43:40 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-10-12 15:43:40 +0000
commitb71808f6e2d65056b3cded958012ad1d96cd7391 (patch)
treebe93be32300eccdf9fdccfc2fcdb55d7f0f9a39b
parentce16072eb7da6d02e7224380459a27a19770a5d5 (diff)
downloadqpid-python-b71808f6e2d65056b3cded958012ad1d96cd7391.tar.gz
Move connection methods into AMQProtocolEngine
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6125-ProtocolRefactoring@1631192 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java1
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java375
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConnectionMethodProcessor.java44
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java378
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java23
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java11
6 files changed, 502 insertions, 330 deletions
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 4b0fc6fd02..27fa654843 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -202,7 +202,6 @@ public class AMQChannel
public AMQChannel(AMQProtocolEngine connection, int channelId, final MessageStore messageStore)
- throws AMQException
{
_connection = connection;
_channelId = channelId;
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
index 4f560b1e74..dce0c3128e 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
+import java.security.AccessControlException;
import java.security.AccessController;
import java.security.Principal;
import java.security.PrivilegedAction;
@@ -43,6 +44,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.security.auth.Subject;
+import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
import org.apache.log4j.Logger;
@@ -70,12 +72,15 @@ import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.SessionModelListener;
import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
+import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
@@ -88,7 +93,8 @@ import org.apache.qpid.util.BytesDataOutput;
public class AMQProtocolEngine implements ServerProtocolEngine,
AMQConnectionModel<AMQProtocolEngine, AMQChannel>,
- AMQVersionAwareProtocolSession
+ AMQVersionAwareProtocolSession,
+ ConnectionMethodProcessor
{
private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class);
@@ -836,38 +842,17 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
return !_closingChannelsList.isEmpty() && _closingChannelsList.containsKey(channelId);
}
- public void addChannel(AMQChannel channel) throws AMQException
+ public void addChannel(AMQChannel channel)
{
- if (_closed)
- {
- throw new AMQException("Session is closed");
- }
-
final int channelId = channel.getChannelId();
- if (_closingChannelsList.containsKey(channelId))
- {
- throw new AMQException("Session is marked awaiting channel close");
- }
-
- if (_channelMap.size() == _maxNoOfChannels)
- {
- String errorMessage =
- toString() + ": maximum number of channels has been reached (" + _maxNoOfChannels
- + "); can't create channel";
- _logger.error(errorMessage);
- throw new AMQException(AMQConstant.NOT_ALLOWED, errorMessage);
- }
- else
+ synchronized (_channelMap)
{
- synchronized (_channelMap)
+ _channelMap.put(channel.getChannelId(), channel);
+ sessionAdded(channel);
+ if(_blocking)
{
- _channelMap.put(channel.getChannelId(), channel);
- sessionAdded(channel);
- if(_blocking)
- {
- channel.block();
- }
+ channel.block();
}
}
@@ -893,7 +878,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
}
}
- public Long getMaximumNumberOfChannels()
+ public long getMaximumNumberOfChannels()
{
return _maxNoOfChannels;
}
@@ -1269,7 +1254,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
return _virtualHost;
}
- public void setVirtualHost(VirtualHostImpl<?,?,?> virtualHost) throws AMQException
+ public void setVirtualHost(VirtualHostImpl<?,?,?> virtualHost)
{
_virtualHost = virtualHost;
@@ -1676,6 +1661,336 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
_deferFlush = deferFlush;
}
+ @Override
+ public void receiveChannelOpen(final int channelId)
+ {
+ // Protect the broker against out of order frame request.
+ if (_virtualHost == null)
+ {
+ closeConnection(AMQConstant.COMMAND_INVALID,
+ "Virtualhost has not yet been set. ConnectionOpen has not been called.", channelId);
+ }
+ else if(getChannel(channelId) != null || channelAwaitingClosure(channelId))
+ {
+ closeConnection(AMQConstant.CHANNEL_ERROR, "Channel " + channelId + " already exists", channelId);
+ }
+ else if(channelId > getMaximumNumberOfChannels())
+ {
+ closeConnection(AMQConstant.CHANNEL_ERROR,
+ "Channel " + channelId + " cannot be created as the max allowed channel id is "
+ + getMaximumNumberOfChannels(),
+ channelId);
+ }
+ else
+ {
+ _logger.info("Connecting to: " + _virtualHost.getName());
+
+ final AMQChannel channel = new AMQChannel(this, channelId, _virtualHost.getMessageStore());
+
+ addChannel(channel);
+
+ ChannelOpenOkBody response;
+
+
+ response = getMethodRegistry().createChannelOpenOkBody();
+
+
+ writeFrame(response.generateFrame(channelId));
+ }
+ }
+
+ @Override
+ public void receiveConnectionOpen(AMQShortString virtualHostName,
+ AMQShortString capabilities,
+ boolean insist)
+ {
+ String virtualHostStr;
+ if ((virtualHostName != null) && virtualHostName.charAt(0) == '/')
+ {
+ virtualHostStr = virtualHostName.toString().substring(1);
+ }
+ else
+ {
+ virtualHostStr = virtualHostName == null ? null : virtualHostName.toString();
+ }
+
+ VirtualHostImpl virtualHost = ((AmqpPort)getPort()).getVirtualHost(virtualHostStr);
+
+ if (virtualHost == null)
+ {
+ closeConnection(AMQConstant.NOT_FOUND,
+ "Unknown virtual host: '" + virtualHostName + "'",0);
+
+ }
+ else
+ {
+ // Check virtualhost access
+ if (virtualHost.getState() != State.ACTIVE)
+ {
+ closeConnection(AMQConstant.CONNECTION_FORCED,
+ "Virtual host '" + virtualHost.getName() + "' is not active",0);
+
+ }
+ else
+ {
+ setVirtualHost(virtualHost);
+ try
+ {
+ virtualHost.getSecurityManager().authoriseCreateConnection(this);
+ if (getContextKey() == null)
+ {
+ setContextKey(new AMQShortString(Long.toString(System.currentTimeMillis())));
+ }
+
+ MethodRegistry methodRegistry = getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createConnectionOpenOkBody(virtualHostName);
+
+ writeFrame(responseBody.generateFrame(0));
+ }
+ catch (AccessControlException e)
+ {
+ closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage(),0);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void receiveConnectionClose(final int replyCode,
+ final AMQShortString replyText,
+ final int classId,
+ final int methodId)
+ {
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("ConnectionClose received with reply code/reply text " + replyCode + "/" +
+ replyText + " for " + this);
+ }
+ try
+ {
+ closeSession();
+ }
+ catch (Exception e)
+ {
+ _logger.error("Error closing protocol session: " + e, e);
+ }
+
+ MethodRegistry methodRegistry = getMethodRegistry();
+ ConnectionCloseOkBody responseBody = methodRegistry.createConnectionCloseOkBody();
+ writeFrame(responseBody.generateFrame(0));
+
+ closeProtocolSession();
+
+ }
+
+ @Override
+ public void receiveConnectionCloseOk()
+ {
+
+ _logger.info("Received Connection-close-ok");
+
+ try
+ {
+ closeSession();
+ }
+ catch (Exception e)
+ {
+ _logger.error("Error closing protocol session: " + e, e);
+ }
+ }
+
+ @Override
+ public void receiveConnectionSecureOk(final byte[] response)
+ {
+
+ Broker<?> broker = getBroker();
+
+ SubjectCreator subjectCreator = getSubjectCreator();
+
+ SaslServer ss = getSaslServer();
+ if (ss == null)
+ {
+ closeConnection(AMQConstant.INTERNAL_ERROR, "No SASL context set up in session",0 );
+ }
+ MethodRegistry methodRegistry = getMethodRegistry();
+ SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, response);
+ switch (authResult.getStatus())
+ {
+ case ERROR:
+ Exception cause = authResult.getCause();
+
+ _logger.info("Authentication failed:" + (cause == null ? "" : cause.getMessage()));
+
+ closeConnection(AMQConstant.NOT_ALLOWED, "Authentication failed",0);
+
+ disposeSaslServer();
+ break;
+ case SUCCESS:
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Connected as: " + authResult.getSubject());
+ }
+
+ int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE);
+
+ if (frameMax <= 0)
+ {
+ frameMax = Integer.MAX_VALUE;
+ }
+
+ ConnectionTuneBody tuneBody =
+ methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(),
+ frameMax,
+ broker.getConnection_heartBeatDelay());
+ writeFrame(tuneBody.generateFrame(0));
+ setAuthorizedSubject(authResult.getSubject());
+ disposeSaslServer();
+ break;
+ case CONTINUE:
+
+ ConnectionSecureBody
+ secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge());
+ writeFrame(secureBody.generateFrame(0));
+ }
+ }
+
+
+ private void disposeSaslServer()
+ {
+ SaslServer ss = getSaslServer();
+ if (ss != null)
+ {
+ setSaslServer(null);
+ try
+ {
+ ss.dispose();
+ }
+ catch (SaslException e)
+ {
+ _logger.error("Error disposing of Sasl server: " + e);
+ }
+ }
+ }
+
+ @Override
+ public void receiveConnectionStartOk(final FieldTable clientProperties,
+ final AMQShortString mechanism,
+ final byte[] response,
+ final AMQShortString locale)
+ {
+ Broker<?> broker = getBroker();
+
+ _logger.info("SASL Mechanism selected: " + mechanism);
+ _logger.info("Locale selected: " + locale);
+
+ SubjectCreator subjectCreator = getSubjectCreator();
+ SaslServer ss = null;
+ try
+ {
+ ss = subjectCreator.createSaslServer(String.valueOf(mechanism),
+ getLocalFQDN(),
+ getPeerPrincipal());
+
+ if (ss == null)
+ {
+ closeConnection(AMQConstant.RESOURCE_ERROR, "Unable to create SASL Server:" + mechanism, 0);
+
+ }
+ else
+ {
+ //save clientProperties
+ setClientProperties(clientProperties);
+
+ setSaslServer(ss);
+
+ final SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, response);
+
+ MethodRegistry methodRegistry = getMethodRegistry();
+
+ switch (authResult.getStatus())
+ {
+ case ERROR:
+ Exception cause = authResult.getCause();
+
+ _logger.info("Authentication failed:" + (cause == null ? "" : cause.getMessage()));
+
+ closeConnection(AMQConstant.NOT_ALLOWED, "Authentication failed", 0);
+
+ disposeSaslServer();
+ break;
+
+ case SUCCESS:
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Connected as: " + authResult.getSubject());
+ }
+ setAuthorizedSubject(authResult.getSubject());
+
+ int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE);
+
+ if (frameMax <= 0)
+ {
+ frameMax = Integer.MAX_VALUE;
+ }
+
+ ConnectionTuneBody
+ tuneBody =
+ methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(),
+ frameMax,
+ broker.getConnection_heartBeatDelay());
+ writeFrame(tuneBody.generateFrame(0));
+ break;
+ case CONTINUE:
+ ConnectionSecureBody
+ secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge());
+ writeFrame(secureBody.generateFrame(0));
+ }
+ }
+ }
+ catch (SaslException e)
+ {
+ disposeSaslServer();
+ closeConnection(AMQConstant.INTERNAL_ERROR, "SASL error: " + e, 0);
+ }
+ }
+
+ @Override
+ public void receiveConnectionTuneOk(final int channelMax, final long frameMax, final int heartbeat)
+ {
+ initHeartbeats(heartbeat);
+
+ int brokerFrameMax = getBroker().getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE);
+ if (brokerFrameMax <= 0)
+ {
+ brokerFrameMax = Integer.MAX_VALUE;
+ }
+
+ if (frameMax > (long) brokerFrameMax)
+ {
+ closeConnection(AMQConstant.SYNTAX_ERROR,
+ "Attempt to set max frame size to " + frameMax
+ + " greater than the broker will allow: "
+ + brokerFrameMax, 0);
+ }
+ else if (frameMax > 0 && frameMax < AMQConstant.FRAME_MIN_SIZE.getCode())
+ {
+ closeConnection(AMQConstant.SYNTAX_ERROR,
+ "Attempt to set max frame size to " + frameMax
+ + " which is smaller than the specification defined minimum: "
+ + AMQConstant.FRAME_MIN_SIZE.getCode(), 0);
+ }
+ else
+ {
+ int calculatedFrameMax = frameMax == 0 ? brokerFrameMax : (int) frameMax;
+ setMaxFrameSize(calculatedFrameMax);
+
+ //0 means no implied limit, except that forced by protocol limitations (0xFFFF)
+ setMaximumNumberOfChannels( ((channelMax == 0l) || (channelMax > 0xFFFFL))
+ ? 0xFFFFL
+ : channelMax);
+ }
+ }
+
public final class WriteDeliverMethod
implements ClientDeliveryMethod
{
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConnectionMethodProcessor.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConnectionMethodProcessor.java
new file mode 100644
index 0000000000..6e657c022e
--- /dev/null
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConnectionMethodProcessor.java
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_8;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+
+public interface ConnectionMethodProcessor
+{
+ void receiveChannelOpen(int channelId);
+
+ void receiveConnectionOpen(AMQShortString virtualHost, AMQShortString capabilities, boolean insist);
+
+ void receiveConnectionClose(int replyCode, AMQShortString replyText, int classId, int methodId);
+
+ void receiveConnectionCloseOk();
+
+ void receiveConnectionSecureOk(byte[] response);
+
+ void receiveConnectionStartOk(FieldTable clientProperties,
+ AMQShortString mechanism,
+ byte[] response,
+ AMQShortString locale);
+
+ void receiveConnectionTuneOk(int channelMax, long frameMax, int heartbeat);
+}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java
index ac185d1aa9..b3ee5f9ff9 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.protocol.v0_8;
-import java.security.AccessControlException;
import java.security.PrivilegedAction;
import javax.security.auth.Subject;
@@ -29,16 +28,10 @@ import javax.security.sasl.SaslServer;
import org.apache.log4j.Logger;
-import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.State;
-import org.apache.qpid.server.model.port.AmqpPort;
-import org.apache.qpid.server.security.SubjectCreator;
-import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
-import org.apache.qpid.server.virtualhost.VirtualHostImpl;
public class ServerMethodDispatcherImpl implements MethodDispatcher
{
@@ -52,6 +45,13 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher
void onChannel(ChannelMethodProcessor channel);
}
+
+ private static interface ConnectionAction
+ {
+ void onConnection(ConnectionMethodProcessor connection);
+ }
+
+
public static MethodDispatcher createMethodDispatcher(AMQProtocolEngine connection)
{
return new ServerMethodDispatcherImpl(connection);
@@ -91,6 +91,21 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher
}
+ private void processConnectionMethod(final ConnectionAction action)
+ {
+ Subject.doAs(_connection.getSubject(), new PrivilegedAction<Void>()
+ {
+ @Override
+ public Void run()
+ {
+ action.onConnection(_connection);
+ return null;
+ }
+ });
+
+
+ }
+
public boolean dispatchAccessRequest(final AccessRequestBody body, int channelId)
{
processChannelMethod(channelId,
@@ -240,7 +255,7 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher
return true;
}
- public boolean dispatchBasicReject(final BasicRejectBody body, int channelId) throws AMQException
+ public boolean dispatchBasicReject(final BasicRejectBody body, int channelId)
{
processChannelMethod(channelId,
@@ -257,30 +272,16 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher
return true;
}
- public boolean dispatchChannelOpen(ChannelOpenBody body, int channelId) throws AMQException
+ public boolean dispatchChannelOpen(ChannelOpenBody body, final int channelId)
{
- VirtualHostImpl virtualHost = _connection.getVirtualHost();
-
- // Protect the broker against out of order frame request.
- if (virtualHost == null)
+ processConnectionMethod(new ConnectionAction()
{
- throw new AMQException(AMQConstant.COMMAND_INVALID,
- "Virtualhost has not yet been set. ConnectionOpen has not been called.",
- null);
- }
- _logger.info("Connecting to: " + virtualHost.getName());
-
- final AMQChannel channel = new AMQChannel(_connection, channelId, virtualHost.getMessageStore());
-
- _connection.addChannel(channel);
-
- ChannelOpenOkBody response;
-
-
- response = _connection.getMethodRegistry().createChannelOpenOkBody();
-
-
- _connection.writeFrame(response.generateFrame(channelId));
+ @Override
+ public void onConnection(final ConnectionMethodProcessor connection)
+ {
+ connection.receiveChannelOpen(channelId);
+ }
+ });
return true;
}
@@ -326,7 +327,7 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher
throw new UnexpectedMethodException(body);
}
- public boolean dispatchChannelClose(ChannelCloseBody body, int channelId) throws AMQException
+ public boolean dispatchChannelClose(ChannelCloseBody body, int channelId)
{
processChannelMethod(channelId,
@@ -344,7 +345,7 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher
}
- public boolean dispatchChannelCloseOk(ChannelCloseOkBody body, int channelId) throws AMQException
+ public boolean dispatchChannelCloseOk(ChannelCloseOkBody body, int channelId)
{
processChannelMethod(channelId,
@@ -362,7 +363,7 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher
}
- public boolean dispatchChannelFlow(final ChannelFlowBody body, int channelId) throws AMQException
+ public boolean dispatchChannelFlow(final ChannelFlowBody body, int channelId)
{
processChannelMethod(channelId,
@@ -389,103 +390,52 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher
}
- public boolean dispatchConnectionOpen(ConnectionOpenBody body, int channelId) throws AMQException
+ public boolean dispatchConnectionOpen(final ConnectionOpenBody body, int channelId)
{
-
- //ignore leading '/'
- String virtualHostName;
- if ((body.getVirtualHost() != null) && body.getVirtualHost().charAt(0) == '/')
- {
- virtualHostName =
- new StringBuilder(body.getVirtualHost().subSequence(1, body.getVirtualHost().length())).toString();
- }
- else
- {
- virtualHostName = body.getVirtualHost() == null ? null : String.valueOf(body.getVirtualHost());
- }
-
- VirtualHostImpl virtualHost = ((AmqpPort) _connection.getPort()).getVirtualHost(virtualHostName);
-
- if (virtualHost == null)
- {
- closeConnection(AMQConstant.NOT_FOUND,
- "Unknown virtual host: '" + virtualHostName + "'");
-
- }
- else
+ processConnectionMethod(new ConnectionAction()
{
- // Check virtualhost access
- if (virtualHost.getState() != State.ACTIVE)
+ @Override
+ public void onConnection(final ConnectionMethodProcessor connection)
{
- closeConnection(AMQConstant.CONNECTION_FORCED,
- "Virtual host '" + virtualHost.getName() + "' is not active"
- );
-
+ connection.receiveConnectionOpen(body.getVirtualHost(), body.getCapabilities(), body.getInsist());
}
- else
- {
- _connection.setVirtualHost(virtualHost);
- try
- {
- virtualHost.getSecurityManager().authoriseCreateConnection(_connection);
- if (_connection.getContextKey() == null)
- {
- _connection.setContextKey(new AMQShortString(Long.toString(System.currentTimeMillis())));
- }
+ });
- MethodRegistry methodRegistry = _connection.getMethodRegistry();
- AMQMethodBody responseBody = methodRegistry.createConnectionOpenOkBody(body.getVirtualHost());
-
- _connection.writeFrame(responseBody.generateFrame(channelId));
- }
- catch (AccessControlException e)
- {
- closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage());
- }
- }
- }
return true;
}
- public boolean dispatchConnectionClose(ConnectionCloseBody body, int channelId) throws AMQException
+ public boolean dispatchConnectionClose(final ConnectionCloseBody body, int channelId)
{
- if (_logger.isInfoEnabled())
- {
- _logger.info("ConnectionClose received with reply code/reply text " + body.getReplyCode() + "/" +
- body.getReplyText() + " for " + _connection);
- }
- try
- {
- _connection.closeSession();
- }
- catch (Exception e)
- {
- _logger.error("Error closing protocol session: " + e, e);
- }
-
- MethodRegistry methodRegistry = _connection.getMethodRegistry();
- ConnectionCloseOkBody responseBody = methodRegistry.createConnectionCloseOkBody();
- _connection.writeFrame(responseBody.generateFrame(channelId));
- _connection.closeProtocolSession();
+ processConnectionMethod(new ConnectionAction()
+ {
+ @Override
+ public void onConnection(final ConnectionMethodProcessor connection)
+ {
+ connection.receiveConnectionClose(body.getReplyCode(),
+ body.getReplyText(),
+ body.getClassId(),
+ body.getMethodId());
+ }
+ });
return true;
}
- public boolean dispatchConnectionCloseOk(ConnectionCloseOkBody body, int channelId) throws AMQException
+ public boolean dispatchConnectionCloseOk(ConnectionCloseOkBody body, int channelId)
{
- _logger.info("Received Connection-close-ok");
- try
+ processConnectionMethod(new ConnectionAction()
{
- _connection.closeSession();
- }
- catch (Exception e)
- {
- _logger.error("Error closing protocol session: " + e, e);
- }
+ @Override
+ public void onConnection(final ConnectionMethodProcessor connection)
+ {
+ connection.receiveConnectionCloseOk();
+ }
+ });
+
return true;
}
@@ -566,62 +516,18 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher
}
- public boolean dispatchConnectionSecureOk(ConnectionSecureOkBody body, int channelId) throws AMQException
+ public boolean dispatchConnectionSecureOk(final ConnectionSecureOkBody body, int channelId)
{
- Broker<?> broker = _connection.getBroker();
- SubjectCreator subjectCreator = _connection.getSubjectCreator();
-
- SaslServer ss = _connection.getSaslServer();
- if (ss == null)
- {
- throw new AMQException("No SASL context set up in session");
- }
- MethodRegistry methodRegistry = _connection.getMethodRegistry();
- SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, body.getResponse());
- switch (authResult.getStatus())
+ processConnectionMethod(new ConnectionAction()
{
- case ERROR:
- Exception cause = authResult.getCause();
-
- _logger.info("Authentication failed:" + (cause == null ? "" : cause.getMessage()));
-
- ConnectionCloseBody connectionCloseBody =
- methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(),
- AMQConstant.NOT_ALLOWED.getName(),
- body.getClazz(),
- body.getMethod());
-
- _connection.writeFrame(connectionCloseBody.generateFrame(0));
- disposeSaslServer(_connection);
- break;
- case SUCCESS:
- if (_logger.isInfoEnabled())
- {
- _logger.info("Connected as: " + authResult.getSubject());
- }
-
- int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE);
-
- if (frameMax <= 0)
- {
- frameMax = Integer.MAX_VALUE;
- }
+ @Override
+ public void onConnection(final ConnectionMethodProcessor connection)
+ {
+ connection.receiveConnectionSecureOk(body.getResponse());
+ }
+ });
- ConnectionTuneBody tuneBody =
- methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(),
- frameMax,
- broker.getConnection_heartBeatDelay());
- _connection.writeFrame(tuneBody.generateFrame(0));
- _connection.setAuthorizedSubject(authResult.getSubject());
- disposeSaslServer(_connection);
- break;
- case CONTINUE:
-
- ConnectionSecureBody
- secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge());
- _connection.writeFrame(secureBody.generateFrame(0));
- }
return true;
}
@@ -642,129 +548,40 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher
}
}
- public boolean dispatchConnectionStartOk(ConnectionStartOkBody body, int channelId) throws AMQException
+ public boolean dispatchConnectionStartOk(final ConnectionStartOkBody body, int channelId)
{
- Broker<?> broker = _connection.getBroker();
-
- _logger.info("SASL Mechanism selected: " + body.getMechanism());
- _logger.info("Locale selected: " + body.getLocale());
- SubjectCreator subjectCreator = _connection.getSubjectCreator();
- SaslServer ss = null;
- try
+ processConnectionMethod(new ConnectionAction()
{
- ss = subjectCreator.createSaslServer(String.valueOf(body.getMechanism()),
- _connection.getLocalFQDN(),
- _connection.getPeerPrincipal());
-
- if (ss == null)
+ @Override
+ public void onConnection(final ConnectionMethodProcessor connection)
{
- closeConnection(AMQConstant.RESOURCE_ERROR,
- "Unable to create SASL Server:" + body.getMechanism()
- );
-
+ connection.receiveConnectionStartOk(body.getClientProperties(),
+ body.getMechanism(),
+ body.getResponse(),
+ body.getLocale());
}
- else
- {
-
- _connection.setSaslServer(ss);
+ });
- final SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, body.getResponse());
- //save clientProperties
- _connection.setClientProperties(body.getClientProperties());
-
- MethodRegistry methodRegistry = _connection.getMethodRegistry();
-
- switch (authResult.getStatus())
- {
- case ERROR:
- Exception cause = authResult.getCause();
-
- _logger.info("Authentication failed:" + (cause == null ? "" : cause.getMessage()));
-
- ConnectionCloseBody closeBody =
- methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(),
- // replyCode
- AMQConstant.NOT_ALLOWED.getName(),
- body.getClazz(),
- body.getMethod());
-
- _connection.writeFrame(closeBody.generateFrame(0));
- disposeSaslServer(_connection);
- break;
-
- case SUCCESS:
- if (_logger.isInfoEnabled())
- {
- _logger.info("Connected as: " + authResult.getSubject());
- }
- _connection.setAuthorizedSubject(authResult.getSubject());
-
- int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE);
-
- if (frameMax <= 0)
- {
- frameMax = Integer.MAX_VALUE;
- }
-
- ConnectionTuneBody
- tuneBody =
- methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(),
- frameMax,
- broker.getConnection_heartBeatDelay());
- _connection.writeFrame(tuneBody.generateFrame(0));
- break;
- case CONTINUE:
- ConnectionSecureBody
- secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge());
- _connection.writeFrame(secureBody.generateFrame(0));
- }
- }
- }
- catch (SaslException e)
- {
- disposeSaslServer(_connection);
- throw new AMQException("SASL error: " + e, e);
- }
return true;
}
- public boolean dispatchConnectionTuneOk(ConnectionTuneOkBody body, int channelId) throws AMQException
+ public boolean dispatchConnectionTuneOk(final ConnectionTuneOkBody body, int channelId)
{
- final AMQProtocolEngine connection = getConnection();
- connection.initHeartbeats(body.getHeartbeat());
-
- int brokerFrameMax = connection.getBroker().getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE);
- if (brokerFrameMax <= 0)
+ processConnectionMethod(new ConnectionAction()
{
- brokerFrameMax = Integer.MAX_VALUE;
- }
+ @Override
+ public void onConnection(final ConnectionMethodProcessor connection)
+ {
+ connection.receiveConnectionTuneOk(body.getChannelMax(),
+ body.getFrameMax(),
+ body.getHeartbeat());
+ }
+ });
+ final AMQProtocolEngine connection = getConnection();
- if (body.getFrameMax() > (long) brokerFrameMax)
- {
- throw new AMQConnectionException(AMQConstant.SYNTAX_ERROR,
- "Attempt to set max frame size to " + body.getFrameMax()
- + " greater than the broker will allow: "
- + brokerFrameMax,
- body.getClazz(), body.getMethod(),
- connection.getMethodRegistry(), null);
- }
- else if (body.getFrameMax() > 0 && body.getFrameMax() < AMQConstant.FRAME_MIN_SIZE.getCode())
- {
- throw new AMQConnectionException(AMQConstant.SYNTAX_ERROR,
- "Attempt to set max frame size to " + body.getFrameMax()
- + " which is smaller than the specification definined minimum: "
- + AMQConstant.FRAME_MIN_SIZE.getCode(),
- body.getClazz(), body.getMethod(),
- connection.getMethodRegistry(), null);
- }
- int frameMax = body.getFrameMax() == 0 ? brokerFrameMax : (int) body.getFrameMax();
- connection.setMaxFrameSize(frameMax);
- long maxChannelNumber = body.getChannelMax();
- //0 means no implied limit, except that forced by protocol limitations (0xFFFF)
- connection.setMaximumNumberOfChannels(maxChannelNumber == 0 ? 0xFFFFL : maxChannelNumber);
return true;
}
@@ -825,11 +642,6 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher
return true;
}
- private boolean isDefaultExchange(final AMQShortString exchangeName)
- {
- return exchangeName == null || exchangeName.equals(AMQShortString.EMPTY_STRING);
- }
-
public boolean dispatchQueueBind(final QueueBindBody body, int channelId)
{
processChannelMethod(channelId,
@@ -891,7 +703,7 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher
return true;
}
- public boolean dispatchQueuePurge(final QueuePurgeBody body, int channelId) throws AMQException
+ public boolean dispatchQueuePurge(final QueuePurgeBody body, int channelId)
{
processChannelMethod(channelId,
@@ -910,7 +722,7 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher
}
- public boolean dispatchTxCommit(TxCommitBody body, final int channelId) throws AMQException
+ public boolean dispatchTxCommit(TxCommitBody body, final int channelId)
{
processChannelMethod(channelId,
@@ -927,7 +739,7 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher
return true;
}
- public boolean dispatchTxRollback(TxRollbackBody body, final int channelId) throws AMQException
+ public boolean dispatchTxRollback(TxRollbackBody body, final int channelId)
{
processChannelMethod(channelId,
@@ -943,7 +755,7 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher
return true;
}
- public boolean dispatchTxSelect(TxSelectBody body, int channelId) throws AMQException
+ public boolean dispatchTxSelect(TxSelectBody body, int channelId)
{
processChannelMethod(channelId,
new ChannelAction()
@@ -958,7 +770,7 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher
return true;
}
- public boolean dispatchBasicRecoverSync(final BasicRecoverSyncBody body, int channelId) throws AMQException
+ public boolean dispatchBasicRecoverSync(final BasicRecoverSyncBody body, int channelId)
{
processChannelMethod(channelId,
new ChannelAction()
@@ -991,7 +803,7 @@ public class ServerMethodDispatcherImpl implements MethodDispatcher
throw new UnexpectedMethodException(body);
}
- public boolean dispatchQueueUnbind(final QueueUnbindBody body, int channelId) throws AMQException
+ public boolean dispatchQueueUnbind(final QueueUnbindBody body, int channelId)
{
processChannelMethod(channelId,
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java
index 16c890eaea..107e64bee5 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java
@@ -20,8 +20,6 @@
*/
package org.apache.qpid.server.protocol.v0_8;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -46,23 +44,16 @@ public class MaxChannelsTest extends QpidTestCase
long maxChannels = 10L;
_session.setMaximumNumberOfChannels(maxChannels);
- assertEquals("Number of channels not correctly set.", new Long(maxChannels), _session.getMaximumNumberOfChannels());
+ assertEquals("Number of channels not correctly set.", maxChannels, _session.getMaximumNumberOfChannels());
- for (long currentChannel = 0L; currentChannel < maxChannels; currentChannel++)
+ for (long currentChannel = 1L; currentChannel <= maxChannels; currentChannel++)
{
- _session.addChannel(new AMQChannel(_session, (int) currentChannel, null));
+ _session.receiveChannelOpen( (int) currentChannel);
}
-
- try
- {
- _session.addChannel(new AMQChannel(_session, (int) maxChannels, null));
- fail("Cannot create more channels then maximum");
- }
- catch (AMQException e)
- {
- assertEquals("Wrong exception received.", e.getErrorCode(), AMQConstant.NOT_ALLOWED);
- }
- assertEquals("Maximum number of channels not set.", new Long(maxChannels), new Long(_session.getChannels().size()));
+ assertFalse("Connection should not be closed after opening " + maxChannels + " channels",_session.isClosed());
+ assertEquals("Maximum number of channels not set.", maxChannels, _session.getChannels().size());
+ _session.receiveChannelOpen((int) maxChannels+1);
+ assertTrue("Connection should be closed after opening " + (maxChannels + 1) + " channels",_session.isClosed());
}
@Override
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
index 17735f5c9c..765d742789 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
@@ -389,6 +389,17 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
{
return new CharSubSequence(start + _sequenceOffset, end + _sequenceOffset);
}
+
+ @Override
+ public String toString()
+ {
+ char[] chars = new char[length()];
+ for(int i = 0; i < length(); i++)
+ {
+ chars[i] = charAt(i);
+ }
+ return new String(chars);
+ }
}
public char[] asChars()