From a7be8fc7337b5cc093f593cc1becb9fe7b4dc0fb Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Tue, 1 Sep 2009 16:27:52 +0000 Subject: QPID-2025: Add a AMQProtocolEngine from the de-MINAfied AMQMinaProtocolSession. Remove various now-unused classes and update references. Add tests for AMQDecoder. Net -1500 lines, +25% performance on transient messaging. Nice. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-network-refactor@810110 13f79535-47bb-0310-9956-ffa450edef68 --- .../src/main/java/org/apache/qpid/server/Main.java | 199 ++-- .../server/configuration/ServerConfiguration.java | 56 +- .../qpid/server/connection/ConnectionRegistry.java | 1 - .../server/connection/IConnectionRegistry.java | 1 - .../server/protocol/AMQMinaProtocolSession.java | 923 ----------------- .../qpid/server/protocol/AMQProtocolEngine.java | 1054 ++++++++++++++++++++ .../server/protocol/AMQProtocolEngineFactory.java | 29 + .../qpid/server/protocol/AMQProtocolSession.java | 15 + .../server/protocol/AMQProtocolSessionMBean.java | 42 +- .../qpid/server/registry/ApplicationRegistry.java | 2 +- .../access/plugins/network/FirewallPlugin.java | 23 +- .../apache/qpid/server/transport/QpidAcceptor.java | 12 +- .../configuration/ServerConfigurationTest.java | 56 +- .../protocol/AMQProtocolSessionMBeanTest.java | 2 +- .../protocol/InternalTestProtocolSession.java | 23 +- .../qpid/server/protocol/MaxChannelsTest.java | 2 +- .../apache/qpid/server/protocol/TestIoSession.java | 328 ------ .../qpid/server/protocol/TestNetworkDriver.java | 126 +++ .../qpid/server/queue/AMQQueueAlertTest.java | 49 +- .../access/plugins/network/FirewallPluginTest.java | 39 +- .../qpid/client/protocol/AMQProtocolHandler.java | 2 +- .../qpid/client/transport/TransportConnection.java | 12 +- .../org/apache/qpid/codec/AMQCodecFactory.java | 7 +- .../java/org/apache/qpid/codec/AMQDecoder.java | 79 +- .../apache/qpid/framing/AMQDataBlockDecoder.java | 39 +- .../apache/qpid/framing/AMQDataBlockEncoder.java | 2 +- .../apache/qpid/framing/ProtocolInitiation.java | 20 +- .../src/main/java/org/apache/qpid/pool/Event.java | 58 +- .../src/main/java/org/apache/qpid/pool/Job.java | 34 +- .../java/org/apache/qpid/pool/PoolingFilter.java | 27 +- .../org/apache/qpid/pool/ReadWriteRunnable.java | 1 - .../qpid/protocol/ProtocolEngineFactory.java | 4 +- .../qpid/transport/NetworkDriverConfiguration.java | 16 +- .../transport/network/mina/MINANetworkDriver.java | 20 +- .../java/org/apache/qpid/codec/AMQDecoderTest.java | 130 +++ .../codec/MockAMQVersionAwareProtocolSession.java | 95 ++ .../network/mina/MINANetworkDriverTest.java | 5 +- 37 files changed, 1890 insertions(+), 1643 deletions(-) delete mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java create mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java create mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java delete mode 100644 qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestIoSession.java create mode 100644 qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestNetworkDriver.java create mode 100644 qpid/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java create mode 100644 qpid/java/common/src/test/java/org/apache/qpid/codec/MockAMQVersionAwareProtocolSession.java diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java index f8deb95628..c45e794145 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java @@ -20,6 +20,13 @@ */ package org.apache.qpid.server; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.Properties; + import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Option; @@ -30,16 +37,9 @@ import org.apache.commons.cli.PosixParser; import org.apache.log4j.Logger; import org.apache.log4j.PropertyConfigurator; import org.apache.log4j.xml.QpidLog4JConfigurator; -import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.FixedSizeByteBufferAllocator; -import org.apache.mina.common.IoAcceptor; -import org.apache.mina.transport.socket.nio.SocketAcceptorConfig; -import org.apache.mina.transport.socket.nio.SocketSessionConfig; -import org.apache.mina.util.NewThreadExecutor; import org.apache.qpid.AMQException; import org.apache.qpid.common.QpidProperties; import org.apache.qpid.framing.ProtocolVersion; -import org.apache.qpid.pool.ReadWriteThreadModel; import org.apache.qpid.server.configuration.ServerConfiguration; import org.apache.qpid.server.configuration.management.ConfigurationManagementMBean; import org.apache.qpid.server.information.management.ServerInformationMBean; @@ -48,19 +48,13 @@ import org.apache.qpid.server.logging.actors.BrokerActor; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.management.LoggingManagementMBean; import org.apache.qpid.server.logging.messages.BrokerMessages; -import org.apache.qpid.server.protocol.AMQPFastProtocolHandler; -import org.apache.qpid.server.protocol.AMQPProtocolProvider; +import org.apache.qpid.server.protocol.AMQProtocolEngineFactory; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry; import org.apache.qpid.server.transport.QpidAcceptor; - -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.net.BindException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.util.Properties; +import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.transport.NetworkDriver; +import org.apache.qpid.transport.network.mina.MINANetworkDriver; /** * Main entry point for AMQPD. @@ -300,20 +294,6 @@ public class Main _brokerLogger.info("Starting Qpid Broker " + QpidProperties.getReleaseVersion() + " build: " + QpidProperties.getBuildVersion()); - ByteBuffer.setUseDirectBuffers(serverConfig.getEnableDirectBuffers()); - - // the MINA default is currently to use the pooled allocator although this may change in future - // once more testing of the performance of the simple allocator has been done - if (!serverConfig.getEnablePooledAllocator()) - { - ByteBuffer.setAllocator(new FixedSizeByteBufferAllocator()); - } - - if (serverConfig.getUseBiasedWrites()) - { - System.setProperty("org.apache.qpid.use_write_biased_pool", "true"); - } - int port = serverConfig.getPort(); String portStr = commandLine.getOptionValue("p"); @@ -329,7 +309,54 @@ public class Main } } - bind(port, serverConfig); + String bindAddr = commandLine.getOptionValue("b"); + if (bindAddr == null) + { + bindAddr = serverConfig.getBind(); + } + InetAddress bindAddress = null; + if (bindAddr.equals("wildcard")) + { + bindAddress = new InetSocketAddress(port).getAddress(); + } + else + { + bindAddress = InetAddress.getByAddress(parseIP(bindAddr)); + } + + String keystorePath = serverConfig.getKeystorePath(); + String keystorePassword = serverConfig.getKeystorePassword(); + String certType = serverConfig.getCertType(); + SSLContextFactory sslFactory = null; + boolean isSsl = false; + + if (!serverConfig.getSSLOnly()) + { + NetworkDriver driver = new MINANetworkDriver(); + driver.bind(port, new InetAddress[]{bindAddress}, new AMQProtocolEngineFactory(), + serverConfig.getNetworkConfiguration(), null); + ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port), + new QpidAcceptor(driver,"TCP")); + CurrentActor.get().message(BrokerMessages.BRK_1002("TCP", port)); + } + + if (serverConfig.getEnableSSL()) + { + sslFactory = new SSLContextFactory(keystorePath, keystorePassword, certType); + NetworkDriver driver = new MINANetworkDriver(); + driver.bind(serverConfig.getSSLPort(), new InetAddress[]{bindAddress}, + new AMQProtocolEngineFactory(), serverConfig.getNetworkConfiguration(), sslFactory); + ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port), + new QpidAcceptor(driver,"TCP")); + CurrentActor.get().message(BrokerMessages.BRK_1002("TCP/SSL", serverConfig.getSSLPort())); + } + + //fixme qpid.AMQP should be using qpidproperties to get value + _brokerLogger.info("Qpid Broker Ready :" + QpidProperties.getReleaseVersion() + + " build: " + QpidProperties.getBuildVersion()); + + CurrentActor.get().message(BrokerMessages.BRK_1004()); + } finally { @@ -358,114 +385,6 @@ public class Main } } - protected void bind(int port, ServerConfiguration config) throws BindException - { - String bindAddr = commandLine.getOptionValue("b"); - if (bindAddr == null) - { - bindAddr = config.getBind(); - } - - try - { - IoAcceptor acceptor; - - if (ApplicationRegistry.getInstance().getConfiguration().getQpidNIO()) - { - _logger.warn("Using Qpid Multithreaded IO Processing"); - acceptor = new org.apache.mina.transport.socket.nio.MultiThreadSocketAcceptor(config.getProcessors(), new NewThreadExecutor()); - } - else - { - _logger.warn("Using Mina IO Processing"); - acceptor = new org.apache.mina.transport.socket.nio.SocketAcceptor(config.getProcessors(), new NewThreadExecutor()); - } - - SocketAcceptorConfig sconfig = (SocketAcceptorConfig) acceptor.getDefaultConfig(); - SocketSessionConfig sc = (SocketSessionConfig) sconfig.getSessionConfig(); - - sc.setReceiveBufferSize(config.getReceiveBufferSize()); - sc.setSendBufferSize(config.getWriteBufferSize()); - sc.setTcpNoDelay(config.getTcpNoDelay()); - - // if we do not use the executor pool threading model we get the default leader follower - // implementation provided by MINA - if (config.getEnableExecutorPool()) - { - sconfig.setThreadModel(ReadWriteThreadModel.getInstance()); - } - - if (!config.getEnableSSL() || !config.getSSLOnly()) - { - AMQPFastProtocolHandler handler = new AMQPProtocolProvider().getHandler(); - InetSocketAddress bindAddress; - if (bindAddr.equals("wildcard")) - { - bindAddress = new InetSocketAddress(port); - } - else - { - bindAddress = new InetSocketAddress(InetAddress.getByAddress(parseIP(bindAddr)), port); - } - - bind(new QpidAcceptor(acceptor,"TCP"), bindAddress, handler, sconfig); - - //fixme qpid.AMQP should be using qpidproperties to get value - _brokerLogger.info("Qpid.AMQP listening on non-SSL address " + bindAddress); - } - - if (config.getEnableSSL()) - { - AMQPFastProtocolHandler handler = new AMQPProtocolProvider().getHandler(); - try - { - - bind(new QpidAcceptor(acceptor, "TCP/SSL"), new InetSocketAddress(config.getSSLPort()), handler, sconfig); - - //fixme qpid.AMQP should be using qpidproperties to get value - _brokerLogger.info("Qpid.AMQP listening on SSL port " + config.getSSLPort()); - - } - catch (IOException e) - { - _brokerLogger.error("Unable to listen on SSL port: " + e, e); - } - } - - //fixme qpid.AMQP should be using qpidproperties to get value - _brokerLogger.info("Qpid Broker Ready :" + QpidProperties.getReleaseVersion() - + " build: " + QpidProperties.getBuildVersion()); - - CurrentActor.get().message(BrokerMessages.BRK_1004()); - - } - catch (Exception e) - { - _logger.error("Unable to bind service to registry: " + e, e); - //fixme this need tidying up - throw new BindException(e.getMessage()); - } - } - - /** - * Ensure that any bound Acceptors are recorded in the registry so they can be closed later. - * - * @param acceptor - * @param bindAddress - * @param handler - * @param sconfig - * - * @throws IOException from the acceptor.bind command - */ - private void bind(QpidAcceptor acceptor, InetSocketAddress bindAddress, AMQPFastProtocolHandler handler, SocketAcceptorConfig sconfig) throws IOException - { - acceptor.getIoAcceptor().bind(bindAddress, handler, sconfig); - - CurrentActor.get().message(BrokerMessages.BRK_1002(acceptor.toString(), bindAddress.getPort())); - - ApplicationRegistry.getInstance().addAcceptor(bindAddress, acceptor); - } - public static void main(String[] args) { //if the -Dlog4j.configuration property has not been set, enable the init override diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java index 7ea7738189..b3c8975c7c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java @@ -38,6 +38,7 @@ import org.apache.qpid.server.configuration.management.ConfigurationManagementMB import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.qpid.transport.NetworkDriverConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -564,7 +565,7 @@ public class ServerConfiguration implements SignalHandler public boolean getSSLOnly() { - return getConfig().getBoolean("connector.ssl.sslOnly", true); + return getConfig().getBoolean("connector.ssl.sslOnly", false); } public int getSSLPort() @@ -613,4 +614,57 @@ public class ServerConfiguration implements SignalHandler getConfig().getLong("housekeeping.expiredMessageCheckPeriod", DEFAULT_HOUSEKEEPING_PERIOD)); } + + public NetworkDriverConfiguration getNetworkConfiguration() + { + return new NetworkDriverConfiguration() + { + + public Integer getTrafficClass() + { + return null; + } + + public Boolean getTcpNoDelay() + { + // Can't call parent getTcpNoDelay since it just calls this one + return getConfig().getBoolean("connector.tcpNoDelay", true); + } + + public Integer getSoTimeout() + { + return null; + } + + public Integer getSoLinger() + { + return null; + } + + public Integer getSendBufferSize() + { + return getBufferWriteLimit(); + } + + public Boolean getReuseAddress() + { + return null; + } + + public Integer getReceiveBufferSize() + { + return getBufferReadLimit(); + } + + public Boolean getOOBInline() + { + return null; + } + + public Boolean getKeepAlive() + { + return null; + } + }; + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java index d287595e2d..71e07172ed 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java @@ -21,7 +21,6 @@ package org.apache.qpid.server.connection; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.protocol.AMQMinaProtocolSession; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.AMQException; import org.apache.qpid.AMQConnectionException; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java index d64fde1c20..002269bbaa 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.connection; -import org.apache.qpid.server.protocol.AMQMinaProtocolSession; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.AMQException; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java deleted file mode 100644 index 7bc4365152..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ /dev/null @@ -1,923 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol; - -import org.apache.log4j.Logger; -import org.apache.mina.common.CloseFuture; -import org.apache.mina.common.IdleStatus; -import org.apache.mina.common.IoServiceConfig; -import org.apache.mina.common.IoSession; -import org.apache.mina.transport.vmpipe.VmPipeAddress; -import org.apache.qpid.AMQChannelException; -import org.apache.qpid.AMQConnectionException; -import org.apache.qpid.AMQException; -import org.apache.qpid.codec.AMQCodecFactory; -import org.apache.qpid.codec.AMQDecoder; -import org.apache.qpid.common.ClientProperties; -import org.apache.qpid.framing.AMQBody; -import org.apache.qpid.framing.AMQDataBlock; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ChannelCloseOkBody; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.HeartbeatBody; -import org.apache.qpid.framing.MethodDispatcher; -import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.framing.ProtocolInitiation; -import org.apache.qpid.framing.ProtocolVersion; -import org.apache.qpid.pool.ReadWriteThreadModel; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.protocol.AMQMethodEvent; -import org.apache.qpid.protocol.AMQMethodListener; -import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.handler.ServerMethodDispatcherImpl; -import org.apache.qpid.server.logging.actors.AMQPConnectionActor; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.subjects.ConnectionLogSubject; -import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.logging.LogActor; -import org.apache.qpid.server.logging.messages.ConnectionMessages; -import org.apache.qpid.server.management.Managable; -import org.apache.qpid.server.management.ManagedObject; -import org.apache.qpid.server.output.ProtocolOutputConverter; -import org.apache.qpid.server.output.ProtocolOutputConverterRegistry; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.state.AMQState; -import org.apache.qpid.server.state.AMQStateManager; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.virtualhost.VirtualHostRegistry; -import org.apache.qpid.transport.Sender; - -import javax.management.JMException; -import javax.security.sasl.SaslServer; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.security.Principal; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.atomic.AtomicLong; - -public class AMQMinaProtocolSession implements AMQProtocolSession, Managable -{ - private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class); - - private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString(); - - private static final AtomicLong idGenerator = new AtomicLong(0); - - // to save boxing the channelId and looking up in a map... cache in an array the low numbered - // channels. This value must be of the form 2^x - 1. - private static final int CHANNEL_CACHE_SIZE = 0xff; - - private final IoSession _minaProtocolSession; - - private AMQShortString _contextKey; - - private AMQShortString _clientVersion = null; - - private VirtualHost _virtualHost; - - private final Map _channelMap = new HashMap(); - - private final AMQChannel[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1]; - - private final CopyOnWriteArraySet _frameListeners = new CopyOnWriteArraySet(); - - private final AMQStateManager _stateManager; - - private AMQCodecFactory _codecFactory; - - private AMQProtocolSessionMBean _managedObject; - - private SaslServer _saslServer; - - private Object _lastReceived; - - private Object _lastSent; - - protected boolean _closed; - // maximum number of channels this session should have - private long _maxNoOfChannels = 1000; - - /* AMQP Version for this session */ - private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion(); - - private FieldTable _clientProperties; - private final List _taskList = new CopyOnWriteArrayList(); - - private List _closingChannelsList = new CopyOnWriteArrayList(); - private ProtocolOutputConverter _protocolOutputConverter; - private Principal _authorizedID; - private MethodDispatcher _dispatcher; - private ProtocolSessionIdentifier _sessionIdentifier; - - private static final long LAST_WRITE_FUTURE_JOIN_TIMEOUT = 60000L; - private org.apache.mina.common.WriteFuture _lastWriteFuture; - - // Create a simple ID that increments for ever new Session - private final long _sessionID = idGenerator.getAndIncrement(); - - private AMQPConnectionActor _actor; - private LogSubject _logSubject; - - public ManagedObject getManagedObject() - { - return _managedObject; - } - - public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory) - throws AMQException - { - _stateManager = new AMQStateManager(virtualHostRegistry, this); - _minaProtocolSession = session; - session.setAttachment(this); - - _codecFactory = codecFactory; - - _actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger()); - - _actor.message(ConnectionMessages.CON_1001(null, null, false, false)); - - try - { - IoServiceConfig config = session.getServiceConfig(); - ReadWriteThreadModel threadModel = (ReadWriteThreadModel) config.getThreadModel(); - threadModel.getAsynchronousReadFilter().createNewJobForSession(session); - threadModel.getAsynchronousWriteFilter().createNewJobForSession(session); - } - catch (RuntimeException e) - { - e.printStackTrace(); - throw e; - - } - } - - private AMQProtocolSessionMBean createMBean() throws AMQException - { - try - { - return new AMQProtocolSessionMBean(this); - } - catch (JMException ex) - { - _logger.error("AMQProtocolSession MBean creation has failed ", ex); - throw new AMQException("AMQProtocolSession MBean creation has failed ", ex); - } - } - - public IoSession getIOSession() - { - return _minaProtocolSession; - } - - public static AMQProtocolSession getAMQProtocolSession(IoSession minaProtocolSession) - { - return (AMQProtocolSession) minaProtocolSession.getAttachment(); - } - - public long getSessionID() - { - return _sessionID; - } - - public LogActor getLogActor() - { - return _actor; - } - - public void dataBlockReceived(AMQDataBlock message) throws Exception - { - _lastReceived = message; - if (message instanceof ProtocolInitiation) - { - protocolInitiationReceived((ProtocolInitiation) message); - - } - else if (message instanceof AMQFrame) - { - AMQFrame frame = (AMQFrame) message; - frameReceived(frame); - - } - else - { - throw new UnknnownMessageTypeException(message); - } - } - - private void frameReceived(AMQFrame frame) throws AMQException - { - int channelId = frame.getChannel(); - AMQBody body = frame.getBodyFrame(); - - //Look up the Channel's Actor and set that as the current actor - // If that is not available then we can use the ConnectionActor - // that is associated with this AMQMPSession. - LogActor channelActor = null; - if (_channelMap.get(channelId) != null) - { - channelActor = _channelMap.get(channelId).getLogActor(); - } - CurrentActor.set(channelActor == null ? _actor : channelActor); - - try - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Frame Received: " + frame); - } - - // Check that this channel is not closing - if (channelAwaitingClosure(channelId)) - { - if ((frame.getBodyFrame() instanceof ChannelCloseOkBody)) - { - if (_logger.isInfoEnabled()) - { - _logger.info("Channel[" + channelId + "] awaiting closure - processing close-ok"); - } - } - else - { - if (_logger.isInfoEnabled()) - { - _logger.info("Channel[" + channelId + "] awaiting closure. Should close socket as client did not close-ok :" + frame); - } - - closeProtocolSession(); - return; - } - } - - try - { - body.handle(channelId, this); - } - catch (AMQException e) - { - closeChannel(channelId); - throw e; - } - } - finally - { - CurrentActor.remove(); - } - } - - private void protocolInitiationReceived(ProtocolInitiation pi) - { - // this ensures the codec never checks for a PI message again - ((AMQDecoder) _codecFactory.getDecoder()).setExpectProtocolInitiation(false); - try - { - // Log incomming protocol negotiation request - _actor.message(ConnectionMessages.CON_1001(null, pi._protocolMajor + "-" + pi._protocolMinor, false, true)); - - ProtocolVersion pv = pi.checkVersion(); // Fails if not correct - - // This sets the protocol version (and hence framing classes) for this session. - setProtocolVersion(pv); - - String mechanisms = ApplicationRegistry.getInstance().getAuthenticationManager().getMechanisms(); - - String locales = "en_US"; - - AMQMethodBody responseBody = getMethodRegistry().createConnectionStartBody((short) getProtocolMajorVersion(), - (short) getProtocolMinorVersion(), - null, - mechanisms.getBytes(), - locales.getBytes()); - _minaProtocolSession.write(responseBody.generateFrame(0)); - - } - catch (AMQException e) - { - _logger.info("Received unsupported protocol initiation for protocol version: " + getProtocolVersion()); - - _minaProtocolSession.write(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion())); - - // TODO: Close connection (but how to wait until message is sent?) - // ritchiem 2006-12-04 will this not do? - // WriteFuture future = _minaProtocolSession.write(new ProtocolInitiation(pv[i][PROTOCOLgetProtocolMajorVersion()], pv[i][PROTOCOLgetProtocolMinorVersion()])); - // future.join(); - // close connection - - } - } - - public void methodFrameReceived(int channelId, AMQMethodBody methodBody) - { - - final AMQMethodEvent evt = new AMQMethodEvent(channelId, methodBody); - - try - { - try - { - - boolean wasAnyoneInterested = _stateManager.methodReceived(evt); - - if (!_frameListeners.isEmpty()) - { - for (AMQMethodListener listener : _frameListeners) - { - wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested; - } - } - - if (!wasAnyoneInterested) - { - throw new AMQNoMethodHandlerException(evt); - } - } - catch (AMQChannelException e) - { - if (getChannel(channelId) != null) - { - if (_logger.isInfoEnabled()) - { - _logger.info("Closing channel due to: " + e.getMessage()); - } - - writeFrame(e.getCloseFrame(channelId)); - closeChannel(channelId); - } - else - { - if (_logger.isDebugEnabled()) - { - _logger.debug("ChannelException occured on non-existent channel:" + e.getMessage()); - } - - if (_logger.isInfoEnabled()) - { - _logger.info("Closing connection due to: " + e.getMessage()); - } - - AMQConnectionException ce = - evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR, - AMQConstant.CHANNEL_ERROR.getName().toString()); - - closeConnection(channelId, ce, false); - } - } - catch (AMQConnectionException e) - { - closeConnection(channelId, e, false); - } - } - catch (Exception e) - { - - for (AMQMethodListener listener : _frameListeners) - { - listener.error(e); - } - - _logger.error("Unexpected exception while processing frame. Closing connection.", e); - - closeProtocolSession(); - } - } - - public void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException - { - - AMQChannel channel = getAndAssertChannel(channelId); - - channel.publishContentHeader(body); - - } - - public void contentBodyReceived(int channelId, ContentBody body) throws AMQException - { - AMQChannel channel = getAndAssertChannel(channelId); - - channel.publishContentBody(body); - } - - public void heartbeatBodyReceived(int channelId, HeartbeatBody body) - { - // NO - OP - } - - /** - * Convenience method that writes a frame to the protocol session. Equivalent to calling - * getProtocolSession().write(). - * - * @param frame the frame to write - */ - public void writeFrame(AMQDataBlock frame) - { - _lastSent = frame; - - _lastWriteFuture = _minaProtocolSession.write(frame); - } - - public AMQShortString getContextKey() - { - return _contextKey; - } - - public void setContextKey(AMQShortString contextKey) - { - _contextKey = contextKey; - } - - public List getChannels() - { - return new ArrayList(_channelMap.values()); - } - - public AMQChannel getAndAssertChannel(int channelId) throws AMQException - { - AMQChannel channel = getChannel(channelId); - if (channel == null) - { - throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + channelId); - } - - return channel; - } - - public AMQChannel getChannel(int channelId) throws AMQException - { - final AMQChannel channel = - ((channelId & CHANNEL_CACHE_SIZE) == channelId) ? _cachedChannels[channelId] : _channelMap.get(channelId); - if ((channel == null) || channel.isClosing()) - { - return null; - } - else - { - return channel; - } - } - - public boolean channelAwaitingClosure(int channelId) - { - return !_closingChannelsList.isEmpty() && _closingChannelsList.contains(channelId); - } - - public void addChannel(AMQChannel channel) throws AMQException - { - if (_closed) - { - throw new AMQException("Session is closed"); - } - - final int channelId = channel.getChannelId(); - - if (_closingChannelsList.contains(channelId)) - { - throw new AMQException("Session is marked awaiting channel close"); - } - - if (_channelMap.size() == _maxNoOfChannels) - { - String errorMessage = - toString() + ": maximum number of channels has been reached (" + _maxNoOfChannels - + "); can't create channel"; - _logger.error(errorMessage); - throw new AMQException(AMQConstant.NOT_ALLOWED, errorMessage); - } - else - { - _channelMap.put(channel.getChannelId(), channel); - } - - if (((channelId & CHANNEL_CACHE_SIZE) == channelId)) - { - _cachedChannels[channelId] = channel; - } - - checkForNotification(); - } - - private void checkForNotification() - { - int channelsCount = _channelMap.size(); - if (channelsCount >= _maxNoOfChannels) - { - _managedObject.notifyClients("Channel count (" + channelsCount + ") has reached the threshold value"); - } - } - - public Long getMaximumNumberOfChannels() - { - return _maxNoOfChannels; - } - - public void setMaximumNumberOfChannels(Long value) - { - _maxNoOfChannels = value; - } - - public void commitTransactions(AMQChannel channel) throws AMQException - { - if ((channel != null) && channel.isTransactional()) - { - channel.commit(); - } - } - - public void rollbackTransactions(AMQChannel channel) throws AMQException - { - if ((channel != null) && channel.isTransactional()) - { - channel.rollback(); - } - } - - /** - * Close a specific channel. This will remove any resources used by the channel, including:
  • any queue - * subscriptions (this may in turn remove queues if they are auto delete
- * - * @param channelId id of the channel to close - * - * @throws AMQException if an error occurs closing the channel - * @throws IllegalArgumentException if the channel id is not valid - */ - public void closeChannel(int channelId) throws AMQException - { - final AMQChannel channel = getChannel(channelId); - if (channel == null) - { - throw new IllegalArgumentException("Unknown channel id"); - } - else - { - try - { - channel.close(); - markChannelAwaitingCloseOk(channelId); - } - finally - { - removeChannel(channelId); - } - } - } - - public void closeChannelOk(int channelId) - { - // todo QPID-847 - This is called from two lcoations ChannelCloseHandler and ChannelCloseOkHandler. - // When it is the CC_OK_Handler then it makes sence to remove the channel else we will leak memory. - // We do it from the Close Handler as we are sending the OK back to the client. - // While this is AMQP spec compliant. The Java client in the event of an IllegalArgumentException - // will send a close-ok.. Where we should call removeChannel. - // However, due to the poor exception handling on the client. The client-user will be notified of the - // InvalidArgument and if they then decide to close the session/connection then the there will be time - // for that to occur i.e. a new close method be sent before the exeption handling can mark the session closed. - //removeChannel(channelId); - _closingChannelsList.remove(new Integer(channelId)); - } - - private void markChannelAwaitingCloseOk(int channelId) - { - _closingChannelsList.add(channelId); - } - - /** - * In our current implementation this is used by the clustering code. - * - * @param channelId The channel to remove - */ - public void removeChannel(int channelId) - { - _channelMap.remove(channelId); - if ((channelId & CHANNEL_CACHE_SIZE) == channelId) - { - _cachedChannels[channelId] = null; - } - } - - /** - * Initialise heartbeats on the session. - * - * @param delay delay in seconds (not ms) - */ - public void initHeartbeats(int delay) - { - if (delay > 0) - { - _minaProtocolSession.setIdleTime(IdleStatus.WRITER_IDLE, delay); - _minaProtocolSession.setIdleTime(IdleStatus.READER_IDLE, (int) (ApplicationRegistry.getInstance().getConfiguration().getHeartBeatTimeout() * delay)); - } - } - - /** - * Closes all channels that were opened by this protocol session. This frees up all resources used by the channel. - * - * @throws AMQException if an error occurs while closing any channel - */ - private void closeAllChannels() throws AMQException - { - for (AMQChannel channel : _channelMap.values()) - { - channel.close(); - } - - _channelMap.clear(); - for (int i = 0; i <= CHANNEL_CACHE_SIZE; i++) - { - _cachedChannels[i] = null; - } - } - - /** This must be called when the session is _closed in order to free up any resources managed by the session. */ - public void closeSession() throws AMQException - { - if (!_closed) - { - if (_virtualHost != null) - { - _virtualHost.getConnectionRegistry().deregisterConnection(this); - } - - closeAllChannels(); - if (_managedObject != null) - { - _managedObject.unregister(); - } - - for (Task task : _taskList) - { - task.doTask(this); - } - - _closed = true; - - CurrentActor.get().message(_logSubject, ConnectionMessages.CON_1002()); - } - } - - public void closeConnection(int channelId, AMQConnectionException e, boolean closeProtocolSession) throws AMQException - { - if (_logger.isInfoEnabled()) - { - _logger.info("Closing connection due to: " + e.getMessage()); - } - - markChannelAwaitingCloseOk(channelId); - closeSession(); - _stateManager.changeState(AMQState.CONNECTION_CLOSING); - writeFrame(e.getCloseFrame(channelId)); - - if (closeProtocolSession) - { - closeProtocolSession(); - } - } - - public void closeProtocolSession() - { - closeProtocolSession(true); - } - - public void closeProtocolSession(boolean waitLast) - { - if (waitLast && (_lastWriteFuture != null)) - { - _logger.debug("Waiting for last write to join."); - _lastWriteFuture.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT); - } - - _logger.debug("REALLY Closing protocol session:" + _minaProtocolSession); - final CloseFuture future = _minaProtocolSession.close(); - future.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT); - - try - { - _stateManager.changeState(AMQState.CONNECTION_CLOSED); - } - catch (AMQException e) - { - _logger.info(e.getMessage()); - } - } - - public String toString() - { - return _minaProtocolSession.getRemoteAddress() + "(" + (getAuthorizedID() == null ? "?" : getAuthorizedID().getName() + ")"); - } - - public String dump() - { - return this + " last_sent=" + _lastSent + " last_received=" + _lastReceived; - } - - /** @return an object that can be used to identity */ - public Object getKey() - { - return _minaProtocolSession.getRemoteAddress(); - } - - /** - * Get the fully qualified domain name of the local address to which this session is bound. Since some servers may - * be bound to multiple addresses this could vary depending on the acceptor this session was created from. - * - * @return a String FQDN - */ - public String getLocalFQDN() - { - SocketAddress address = _minaProtocolSession.getLocalAddress(); - // we use the vmpipe address in some tests hence the need for this rather ugly test. The host - // information is used by SASL primary. - if (address instanceof InetSocketAddress) - { - return ((InetSocketAddress) address).getHostName(); - } - else if (address instanceof VmPipeAddress) - { - return "vmpipe:" + ((VmPipeAddress) address).getPort(); - } - else - { - throw new IllegalArgumentException("Unsupported socket address class: " + address); - } - } - - public SaslServer getSaslServer() - { - return _saslServer; - } - - public void setSaslServer(SaslServer saslServer) - { - _saslServer = saslServer; - } - - public FieldTable getClientProperties() - { - return _clientProperties; - } - - public void setClientProperties(FieldTable clientProperties) - { - _clientProperties = clientProperties; - if (_clientProperties != null) - { - if (_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE) != null) - { - String clientID = _clientProperties.getString(CLIENT_PROPERTIES_INSTANCE); - setContextKey(new AMQShortString(clientID)); - - // Log the Opening of the connection for this client - _actor.message(ConnectionMessages.CON_1001(clientID, _protocolVersion.toString(), true, true)); - } - - if (_clientProperties.getString(ClientProperties.version.toString()) != null) - { - _clientVersion = new AMQShortString(_clientProperties.getString(ClientProperties.version.toString())); - } - } - _sessionIdentifier = new ProtocolSessionIdentifier(this); - } - - private void setProtocolVersion(ProtocolVersion pv) - { - _protocolVersion = pv; - - _protocolOutputConverter = ProtocolOutputConverterRegistry.getConverter(this); - _dispatcher = ServerMethodDispatcherImpl.createMethodDispatcher(_stateManager, _protocolVersion); - } - - public byte getProtocolMajorVersion() - { - return _protocolVersion.getMajorVersion(); - } - - public ProtocolVersion getProtocolVersion() - { - return _protocolVersion; - } - - public byte getProtocolMinorVersion() - { - return _protocolVersion.getMinorVersion(); - } - - public boolean isProtocolVersion(byte major, byte minor) - { - return (getProtocolMajorVersion() == major) && (getProtocolMinorVersion() == minor); - } - - public MethodRegistry getRegistry() - { - return getMethodRegistry(); - } - - public Object getClientIdentifier() - { - return (_minaProtocolSession != null) ? _minaProtocolSession.getRemoteAddress() : null; - } - - public VirtualHost getVirtualHost() - { - return _virtualHost; - } - - public void setVirtualHost(VirtualHost virtualHost) throws AMQException - { - _virtualHost = virtualHost; - - _actor.virtualHostSelected(this); - _logSubject = new ConnectionLogSubject(this); - - _virtualHost.getConnectionRegistry().registerConnection(this); - - _managedObject = createMBean(); - _managedObject.register(); - } - - public void addSessionCloseTask(Task task) - { - _taskList.add(task); - } - - public void removeSessionCloseTask(Task task) - { - _taskList.remove(task); - } - - public ProtocolOutputConverter getProtocolOutputConverter() - { - return _protocolOutputConverter; - } - - public void setAuthorizedID(Principal authorizedID) - { - _authorizedID = authorizedID; - - // Let the actor know that this connection is now Authorized - _actor.connectionAuthorized(this); - } - - public Principal getAuthorizedID() - { - return _authorizedID; - } - - public SocketAddress getRemoteAddress() - { - return _minaProtocolSession.getRemoteAddress(); - } - - public MethodRegistry getMethodRegistry() - { - return MethodRegistry.getMethodRegistry(getProtocolVersion()); - } - - public MethodDispatcher getMethodDispatcher() - { - return _dispatcher; - } - - public ProtocolSessionIdentifier getSessionIdentifier() - { - return _sessionIdentifier; - } - - public String getClientVersion() - { - return (_clientVersion == null) ? null : _clientVersion.toString(); - } - - public void setSender(Sender sender) - { - // No-op, interface munging between this and AMQProtocolSession - } - - public void init() - { - // No-op, interface munging between this and AMQProtocolSession - } -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java new file mode 100644 index 0000000000..49bdffb584 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -0,0 +1,1054 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.security.Principal; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicLong; + +import javax.management.JMException; +import javax.security.sasl.SaslServer; + +import org.apache.log4j.Logger; +import org.apache.mina.transport.vmpipe.VmPipeAddress; +import org.apache.qpid.AMQChannelException; +import org.apache.qpid.AMQConnectionException; +import org.apache.qpid.AMQException; +import org.apache.qpid.codec.AMQCodecFactory; +import org.apache.qpid.codec.AMQDecoder; +import org.apache.qpid.common.ClientProperties; +import org.apache.qpid.framing.AMQBody; +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQProtocolHeaderException; +import org.apache.qpid.framing.AMQProtocolVersionException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ChannelCloseOkBody; +import org.apache.qpid.framing.ConnectionCloseBody; +import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.HeartbeatBody; +import org.apache.qpid.framing.MethodDispatcher; +import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.framing.ProtocolInitiation; +import org.apache.qpid.framing.ProtocolVersion; +import org.apache.qpid.pool.Event; +import org.apache.qpid.pool.Job; +import org.apache.qpid.pool.PoolingFilter; +import org.apache.qpid.pool.ReadWriteThreadModel; +import org.apache.qpid.pool.ReferenceCountingExecutorService; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.protocol.AMQMethodListener; +import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.handler.ServerMethodDispatcherImpl; +import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.actors.AMQPConnectionActor; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.messages.ConnectionMessages; +import org.apache.qpid.server.logging.subjects.ConnectionLogSubject; +import org.apache.qpid.server.management.Managable; +import org.apache.qpid.server.management.ManagedObject; +import org.apache.qpid.server.output.ProtocolOutputConverter; +import org.apache.qpid.server.output.ProtocolOutputConverterRegistry; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.state.AMQState; +import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.qpid.transport.NetworkDriver; +import org.apache.qpid.transport.Sender; + +public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocolSession +{ + private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class); + + private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString(); + + private static final AtomicLong idGenerator = new AtomicLong(0); + + // to save boxing the channelId and looking up in a map... cache in an array the low numbered + // channels. This value must be of the form 2^x - 1. + private static final int CHANNEL_CACHE_SIZE = 0xff; + + private AMQShortString _contextKey; + + private AMQShortString _clientVersion = null; + + private VirtualHost _virtualHost; + + private final Map _channelMap = new HashMap(); + + private final AMQChannel[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1]; + + private final CopyOnWriteArraySet _frameListeners = new CopyOnWriteArraySet(); + + private final AMQStateManager _stateManager; + + private AMQCodecFactory _codecFactory; + + private AMQProtocolSessionMBean _managedObject; + + private SaslServer _saslServer; + + private Object _lastReceived; + + private Object _lastSent; + + protected boolean _closed; + // maximum number of channels this session should have + private long _maxNoOfChannels = 1000; + + /* AMQP Version for this session */ + private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion(); + + private FieldTable _clientProperties; + private final List _taskList = new CopyOnWriteArrayList(); + + private List _closingChannelsList = new CopyOnWriteArrayList(); + private ProtocolOutputConverter _protocolOutputConverter; + private Principal _authorizedID; + private MethodDispatcher _dispatcher; + private ProtocolSessionIdentifier _sessionIdentifier; + + // Create a simple ID that increments for ever new Session + private final long _sessionID = idGenerator.getAndIncrement(); + + private AMQPConnectionActor _actor; + private LogSubject _logSubject; + + private NetworkDriver _networkDriver; + + private long _lastIoTime; + + private long _writtenBytes; + private long _readBytes; + + private Job _readJob; + private Job _writeJob; + + private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance(); + + public ManagedObject getManagedObject() + { + return _managedObject; + } + + public AMQProtocolEngine(VirtualHostRegistry virtualHostRegistry, NetworkDriver driver) + { + _stateManager = new AMQStateManager(virtualHostRegistry, this); + _networkDriver = driver; + + _codecFactory = new AMQCodecFactory(true, this); + + ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance(); + _readJob = new Job(threadModel.getAsynchronousReadFilter(), PoolingFilter.MAX_JOB_EVENTS, true); + _writeJob = new Job(threadModel.getAsynchronousWriteFilter(), PoolingFilter.MAX_JOB_EVENTS, false); + + _actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger()); + _actor.message(ConnectionMessages.CON_1001(null, null, false, false)); + } + + private AMQProtocolSessionMBean createMBean() throws AMQException + { + try + { + return new AMQProtocolSessionMBean(this); + } + catch (JMException ex) + { + _logger.error("AMQProtocolSession MBean creation has failed ", ex); + throw new AMQException("AMQProtocolSession MBean creation has failed ", ex); + } + } + + public long getSessionID() + { + return _sessionID; + } + + public LogActor getLogActor() + { + return _actor; + } + + @Override + public void received(final ByteBuffer msg) + { + _lastIoTime = System.currentTimeMillis(); + try + { + final ArrayList dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg); + fireAsynchEvent(_readJob, new Event(new Runnable() + { + @Override + public void run() + { + // Decode buffer + + for (AMQDataBlock dataBlock : dataBlocks) + { + try + { + dataBlockReceived(dataBlock); + } + catch (Exception e) + { + e.printStackTrace(); + closeProtocolSession(); + } + } + } + })); + } + catch (Exception e) + { + e.printStackTrace(); + closeProtocolSession(); + } + } + + public void dataBlockReceived(AMQDataBlock message) throws Exception + { + _lastReceived = message; + if (message instanceof ProtocolInitiation) + { + protocolInitiationReceived((ProtocolInitiation) message); + + } + else if (message instanceof AMQFrame) + { + AMQFrame frame = (AMQFrame) message; + frameReceived(frame); + + } + else + { + throw new UnknnownMessageTypeException(message); + } + } + + private void frameReceived(AMQFrame frame) throws AMQException + { + int channelId = frame.getChannel(); + AMQBody body = frame.getBodyFrame(); + + //Look up the Channel's Actor and set that as the current actor + // If that is not available then we can use the ConnectionActor + // that is associated with this AMQMPSession. + LogActor channelActor = null; + if (_channelMap.get(channelId) != null) + { + channelActor = _channelMap.get(channelId).getLogActor(); + } + CurrentActor.set(channelActor == null ? _actor : channelActor); + + try + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Frame Received: " + frame); + } + + // Check that this channel is not closing + if (channelAwaitingClosure(channelId)) + { + if ((frame.getBodyFrame() instanceof ChannelCloseOkBody)) + { + if (_logger.isInfoEnabled()) + { + _logger.info("Channel[" + channelId + "] awaiting closure - processing close-ok"); + } + } + else + { + if (_logger.isInfoEnabled()) + { + _logger.info("Channel[" + channelId + "] awaiting closure. Should close socket as client did not close-ok :" + frame); + } + + closeProtocolSession(); + return; + } + } + + try + { + body.handle(channelId, this); + } + catch (AMQException e) + { + closeChannel(channelId); + throw e; + } + } + finally + { + CurrentActor.remove(); + } + } + + private void protocolInitiationReceived(ProtocolInitiation pi) + { + // this ensures the codec never checks for a PI message again + ((AMQDecoder) _codecFactory.getDecoder()).setExpectProtocolInitiation(false); + try + { + // Log incomming protocol negotiation request + _actor.message(ConnectionMessages.CON_1001(null, pi._protocolMajor + "-" + pi._protocolMinor, false, true)); + + ProtocolVersion pv = pi.checkVersion(); // Fails if not correct + + // This sets the protocol version (and hence framing classes) for this session. + setProtocolVersion(pv); + + String mechanisms = ApplicationRegistry.getInstance().getAuthenticationManager().getMechanisms(); + + String locales = "en_US"; + + AMQMethodBody responseBody = getMethodRegistry().createConnectionStartBody((short) getProtocolMajorVersion(), + (short) getProtocolMinorVersion(), + null, + mechanisms.getBytes(), + locales.getBytes()); + _networkDriver.send(responseBody.generateFrame(0).toNioByteBuffer()); + + } + catch (AMQException e) + { + _logger.info("Received unsupported protocol initiation for protocol version: " + getProtocolVersion()); + + _networkDriver.send(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()).toNioByteBuffer()); + } + } + + public void methodFrameReceived(int channelId, AMQMethodBody methodBody) + { + + final AMQMethodEvent evt = new AMQMethodEvent(channelId, methodBody); + + try + { + try + { + + boolean wasAnyoneInterested = _stateManager.methodReceived(evt); + + if (!_frameListeners.isEmpty()) + { + for (AMQMethodListener listener : _frameListeners) + { + wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested; + } + } + + if (!wasAnyoneInterested) + { + throw new AMQNoMethodHandlerException(evt); + } + } + catch (AMQChannelException e) + { + if (getChannel(channelId) != null) + { + if (_logger.isInfoEnabled()) + { + _logger.info("Closing channel due to: " + e.getMessage()); + } + + writeFrame(e.getCloseFrame(channelId)); + closeChannel(channelId); + } + else + { + if (_logger.isDebugEnabled()) + { + _logger.debug("ChannelException occured on non-existent channel:" + e.getMessage()); + } + + if (_logger.isInfoEnabled()) + { + _logger.info("Closing connection due to: " + e.getMessage()); + } + + AMQConnectionException ce = + evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR, + AMQConstant.CHANNEL_ERROR.getName().toString()); + + closeConnection(channelId, ce, false); + } + } + catch (AMQConnectionException e) + { + closeConnection(channelId, e, false); + } + } + catch (Exception e) + { + + for (AMQMethodListener listener : _frameListeners) + { + listener.error(e); + } + + _logger.error("Unexpected exception while processing frame. Closing connection.", e); + + closeProtocolSession(); + } + } + + public void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException + { + + AMQChannel channel = getAndAssertChannel(channelId); + + channel.publishContentHeader(body); + + } + + public void contentBodyReceived(int channelId, ContentBody body) throws AMQException + { + AMQChannel channel = getAndAssertChannel(channelId); + + channel.publishContentBody(body); + } + + public void heartbeatBodyReceived(int channelId, HeartbeatBody body) + { + // NO - OP + } + + /** + * Convenience method that writes a frame to the protocol session. Equivalent to calling + * getProtocolSession().write(). + * + * @param frame the frame to write + */ + public void writeFrame(AMQDataBlock frame) + { + _lastSent = frame; + final ByteBuffer buf = frame.toNioByteBuffer(); + _lastIoTime = System.currentTimeMillis(); + _writtenBytes += buf.remaining(); + fireAsynchEvent(_writeJob, new Event(new Runnable() + { + @Override + public void run() + { + _networkDriver.send(buf); + } + })); + } + + public AMQShortString getContextKey() + { + return _contextKey; + } + + public void setContextKey(AMQShortString contextKey) + { + _contextKey = contextKey; + } + + public List getChannels() + { + return new ArrayList(_channelMap.values()); + } + + public AMQChannel getAndAssertChannel(int channelId) throws AMQException + { + AMQChannel channel = getChannel(channelId); + if (channel == null) + { + throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + channelId); + } + + return channel; + } + + public AMQChannel getChannel(int channelId) throws AMQException + { + final AMQChannel channel = + ((channelId & CHANNEL_CACHE_SIZE) == channelId) ? _cachedChannels[channelId] : _channelMap.get(channelId); + if ((channel == null) || channel.isClosing()) + { + return null; + } + else + { + return channel; + } + } + + public boolean channelAwaitingClosure(int channelId) + { + return !_closingChannelsList.isEmpty() && _closingChannelsList.contains(channelId); + } + + public void addChannel(AMQChannel channel) throws AMQException + { + if (_closed) + { + throw new AMQException("Session is closed"); + } + + final int channelId = channel.getChannelId(); + + if (_closingChannelsList.contains(channelId)) + { + throw new AMQException("Session is marked awaiting channel close"); + } + + if (_channelMap.size() == _maxNoOfChannels) + { + String errorMessage = + toString() + ": maximum number of channels has been reached (" + _maxNoOfChannels + + "); can't create channel"; + _logger.error(errorMessage); + throw new AMQException(AMQConstant.NOT_ALLOWED, errorMessage); + } + else + { + _channelMap.put(channel.getChannelId(), channel); + } + + if (((channelId & CHANNEL_CACHE_SIZE) == channelId)) + { + _cachedChannels[channelId] = channel; + } + + checkForNotification(); + } + + private void checkForNotification() + { + int channelsCount = _channelMap.size(); + if (channelsCount >= _maxNoOfChannels) + { + _managedObject.notifyClients("Channel count (" + channelsCount + ") has reached the threshold value"); + } + } + + public Long getMaximumNumberOfChannels() + { + return _maxNoOfChannels; + } + + public void setMaximumNumberOfChannels(Long value) + { + _maxNoOfChannels = value; + } + + public void commitTransactions(AMQChannel channel) throws AMQException + { + if ((channel != null) && channel.isTransactional()) + { + channel.commit(); + } + } + + public void rollbackTransactions(AMQChannel channel) throws AMQException + { + if ((channel != null) && channel.isTransactional()) + { + channel.rollback(); + } + } + + /** + * Close a specific channel. This will remove any resources used by the channel, including:
  • any queue + * subscriptions (this may in turn remove queues if they are auto delete
+ * + * @param channelId id of the channel to close + * + * @throws AMQException if an error occurs closing the channel + * @throws IllegalArgumentException if the channel id is not valid + */ + public void closeChannel(int channelId) throws AMQException + { + final AMQChannel channel = getChannel(channelId); + if (channel == null) + { + throw new IllegalArgumentException("Unknown channel id"); + } + else + { + try + { + channel.close(); + markChannelAwaitingCloseOk(channelId); + } + finally + { + removeChannel(channelId); + } + } + } + + public void closeChannelOk(int channelId) + { + // todo QPID-847 - This is called from two lcoations ChannelCloseHandler and ChannelCloseOkHandler. + // When it is the CC_OK_Handler then it makes sence to remove the channel else we will leak memory. + // We do it from the Close Handler as we are sending the OK back to the client. + // While this is AMQP spec compliant. The Java client in the event of an IllegalArgumentException + // will send a close-ok.. Where we should call removeChannel. + // However, due to the poor exception handling on the client. The client-user will be notified of the + // InvalidArgument and if they then decide to close the session/connection then the there will be time + // for that to occur i.e. a new close method be sent before the exeption handling can mark the session closed. + //removeChannel(channelId); + _closingChannelsList.remove(new Integer(channelId)); + } + + private void markChannelAwaitingCloseOk(int channelId) + { + _closingChannelsList.add(channelId); + } + + /** + * In our current implementation this is used by the clustering code. + * + * @param channelId The channel to remove + */ + public void removeChannel(int channelId) + { + _channelMap.remove(channelId); + if ((channelId & CHANNEL_CACHE_SIZE) == channelId) + { + _cachedChannels[channelId] = null; + } + } + + /** + * Initialise heartbeats on the session. + * + * @param delay delay in seconds (not ms) + */ + public void initHeartbeats(int delay) + { + if (delay > 0) + { + _networkDriver.setMaxWriteIdle(delay); + _networkDriver.setMaxReadIdle((int) (ApplicationRegistry.getInstance().getConfiguration().getHeartBeatTimeout() * delay)); + } + } + + /** + * Closes all channels that were opened by this protocol session. This frees up all resources used by the channel. + * + * @throws AMQException if an error occurs while closing any channel + */ + private void closeAllChannels() throws AMQException + { + for (AMQChannel channel : _channelMap.values()) + { + channel.close(); + } + + _channelMap.clear(); + for (int i = 0; i <= CHANNEL_CACHE_SIZE; i++) + { + _cachedChannels[i] = null; + } + } + + /** This must be called when the session is _closed in order to free up any resources managed by the session. */ + public void closeSession() throws AMQException + { + if (!_closed) + { + if (_virtualHost != null) + { + _virtualHost.getConnectionRegistry().deregisterConnection(this); + } + + closeAllChannels(); + if (_managedObject != null) + { + _managedObject.unregister(); + } + + for (Task task : _taskList) + { + task.doTask(this); + } + + _closed = true; + + CurrentActor.get().message(_logSubject, ConnectionMessages.CON_1002()); + } + } + + public void closeConnection(int channelId, AMQConnectionException e, boolean closeProtocolSession) throws AMQException + { + if (_logger.isInfoEnabled()) + { + _logger.info("Closing connection due to: " + e.getMessage()); + } + + markChannelAwaitingCloseOk(channelId); + closeSession(); + _stateManager.changeState(AMQState.CONNECTION_CLOSING); + writeFrame(e.getCloseFrame(channelId)); + + if (closeProtocolSession) + { + closeProtocolSession(); + } + } + + public void closeProtocolSession() + { + _networkDriver.close(); + try + { + _stateManager.changeState(AMQState.CONNECTION_CLOSED); + } + catch (AMQException e) + { + _logger.info(e.getMessage()); + } + } + + public String toString() + { + return getRemoteAddress() + "(" + (getAuthorizedID() == null ? "?" : getAuthorizedID().getName() + ")"); + } + + public String dump() + { + return this + " last_sent=" + _lastSent + " last_received=" + _lastReceived; + } + + /** @return an object that can be used to identity */ + public Object getKey() + { + return getRemoteAddress(); + } + + /** + * Get the fully qualified domain name of the local address to which this session is bound. Since some servers may + * be bound to multiple addresses this could vary depending on the acceptor this session was created from. + * + * @return a String FQDN + */ + public String getLocalFQDN() + { + SocketAddress address = _networkDriver.getLocalAddress(); + // we use the vmpipe address in some tests hence the need for this rather ugly test. The host + // information is used by SASL primary. + if (address instanceof InetSocketAddress) + { + return ((InetSocketAddress) address).getHostName(); + } + else if (address instanceof VmPipeAddress) + { + return "vmpipe:" + ((VmPipeAddress) address).getPort(); + } + else + { + throw new IllegalArgumentException("Unsupported socket address class: " + address); + } + } + + public SaslServer getSaslServer() + { + return _saslServer; + } + + public void setSaslServer(SaslServer saslServer) + { + _saslServer = saslServer; + } + + public FieldTable getClientProperties() + { + return _clientProperties; + } + + public void setClientProperties(FieldTable clientProperties) + { + _clientProperties = clientProperties; + if (_clientProperties != null) + { + if (_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE) != null) + { + String clientID = _clientProperties.getString(CLIENT_PROPERTIES_INSTANCE); + setContextKey(new AMQShortString(clientID)); + + // Log the Opening of the connection for this client + _actor.message(ConnectionMessages.CON_1001(clientID, _protocolVersion.toString(), true, true)); + } + + if (_clientProperties.getString(ClientProperties.version.toString()) != null) + { + _clientVersion = new AMQShortString(_clientProperties.getString(ClientProperties.version.toString())); + } + } + _sessionIdentifier = new ProtocolSessionIdentifier(this); + } + + private void setProtocolVersion(ProtocolVersion pv) + { + _protocolVersion = pv; + + _protocolOutputConverter = ProtocolOutputConverterRegistry.getConverter(this); + _dispatcher = ServerMethodDispatcherImpl.createMethodDispatcher(_stateManager, _protocolVersion); + } + + public byte getProtocolMajorVersion() + { + return _protocolVersion.getMajorVersion(); + } + + public ProtocolVersion getProtocolVersion() + { + return _protocolVersion; + } + + public byte getProtocolMinorVersion() + { + return _protocolVersion.getMinorVersion(); + } + + public boolean isProtocolVersion(byte major, byte minor) + { + return (getProtocolMajorVersion() == major) && (getProtocolMinorVersion() == minor); + } + + public MethodRegistry getRegistry() + { + return getMethodRegistry(); + } + + public Object getClientIdentifier() + { + return (_networkDriver != null) ? _networkDriver.getRemoteAddress() : null; + } + + public VirtualHost getVirtualHost() + { + return _virtualHost; + } + + public void setVirtualHost(VirtualHost virtualHost) throws AMQException + { + _virtualHost = virtualHost; + + _actor.virtualHostSelected(this); + _logSubject = new ConnectionLogSubject(this); + + _virtualHost.getConnectionRegistry().registerConnection(this); + + _managedObject = createMBean(); + _managedObject.register(); + } + + public void addSessionCloseTask(Task task) + { + _taskList.add(task); + } + + public void removeSessionCloseTask(Task task) + { + _taskList.remove(task); + } + + public ProtocolOutputConverter getProtocolOutputConverter() + { + return _protocolOutputConverter; + } + + public void setAuthorizedID(Principal authorizedID) + { + _authorizedID = authorizedID; + + // Let the actor know that this connection is now Authorized + _actor.connectionAuthorized(this); + } + + public Principal getAuthorizedID() + { + return _authorizedID; + } + + public SocketAddress getRemoteAddress() + { + return _networkDriver.getRemoteAddress(); + } + + public MethodRegistry getMethodRegistry() + { + return MethodRegistry.getMethodRegistry(getProtocolVersion()); + } + + public MethodDispatcher getMethodDispatcher() + { + return _dispatcher; + } + + @Override + public void closed() + { + try + { + closeSession(); + } + catch (AMQException e) + { + _logger.error("Could not close protocol engine", e); + } + } + + @Override + public void readerIdle() + { + // Nothing + } + + @Override + public void setNetworkDriver(NetworkDriver driver) + { + _networkDriver = driver; + } + + @Override + public void writerIdle() + { + _networkDriver.send(HeartbeatBody.FRAME.toNioByteBuffer()); + } + + @Override + public void exception(Throwable throwable) + { + if (throwable instanceof AMQProtocolHeaderException) + { + + writeFrame(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion())); + _networkDriver.close(); + + _logger.error("Error in protocol initiation " + this + ":" + getRemoteAddress() + " :" + throwable.getMessage(), throwable); + } + else if (throwable instanceof IOException) + { + _logger.error("IOException caught in" + this + ", session closed implictly: " + throwable); + } + else + { + _logger.error("Exception caught in" + this + ", closing session explictly: " + throwable, throwable); + + + MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(getProtocolVersion()); + ConnectionCloseBody closeBody = methodRegistry.createConnectionCloseBody(200,new AMQShortString(throwable.getMessage()),0,0); + + writeFrame(closeBody.generateFrame(0)); + + _networkDriver.close(); + } + } + + @Override + public void init() + { + // Do nothing + } + + @Override + public void setSender(Sender sender) + { + // Do nothing + } + + @Override + public long getReadBytes() + { + return _readBytes; + } + + public long getWrittenBytes() + { + return _writtenBytes; + } + + public long getLastIoTime() + { + return _lastIoTime; + } + + public ProtocolSessionIdentifier getSessionIdentifier() + { + return _sessionIdentifier; + } + + public String getClientVersion() + { + return (_clientVersion == null) ? null : _clientVersion.toString(); + } + + /** + * Adds an {@link Event} to a {@link Job}, triggering the execution of the job if it is not already running. + * + * @param job The job. + * @param event The event to hand off asynchronously. + */ + void fireAsynchEvent(Job job, Event event) + { + + job.add(event); + + final ExecutorService pool = _poolReference .getPool(); + + if(pool == null) + { + return; + } + + // rather than perform additional checks on pool to check that it hasn't shutdown. + // catch the RejectedExecutionException that will result from executing on a shutdown pool + if (job.activate()) + { + try + { + pool.execute(job); + } + catch(RejectedExecutionException e) + { + _logger.warn("Thread pool shutdown while tasks still outstanding"); + } + } + + } + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java new file mode 100644 index 0000000000..ff0c007a60 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java @@ -0,0 +1,29 @@ +package org.apache.qpid.server.protocol; + +import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.protocol.ProtocolEngineFactory; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.qpid.transport.NetworkDriver; + +public class AMQProtocolEngineFactory implements ProtocolEngineFactory +{ + private VirtualHostRegistry _vhosts; + + public AMQProtocolEngineFactory() + { + this(1); + } + + public AMQProtocolEngineFactory(Integer port) + { + _vhosts = ApplicationRegistry.getInstance(port).getVirtualHostRegistry(); + } + + + public ProtocolEngine newProtocolEngine(NetworkDriver networkDriver) + { + return new AMQProtocolEngine(_vhosts, networkDriver); + } + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index fff406bb3d..b0bef04354 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -34,6 +34,7 @@ import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.virtualhost.VirtualHost; import java.security.Principal; +import java.util.List; public interface AMQProtocolSession extends AMQVersionAwareProtocolSession @@ -210,5 +211,19 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession public MethodDispatcher getMethodDispatcher(); public ProtocolSessionIdentifier getSessionIdentifier(); + + String getClientVersion(); + + long getLastIoTime(); + + long getWrittenBytes(); + + Long getMaximumNumberOfChannels(); + + void setMaximumNumberOfChannels(Long value); + + void commitTransactions(AMQChannel channel) throws AMQException; + + List getChannels(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java index 81dbeeded2..8497c95e26 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java @@ -79,7 +79,7 @@ import org.apache.qpid.server.management.ManagedObject; @MBeanDescription("Management Bean for an AMQ Broker Connection") public class AMQProtocolSessionMBean extends AMQManagedObject implements ManagedConnection { - private AMQMinaProtocolSession _session = null; + private AMQProtocolSession _protocolSession = null; private String _name = null; // openmbean data types for representing the channel attributes @@ -92,10 +92,10 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed new AMQShortString("Broker Management Console has closed the connection."); @MBeanConstructor("Creates an MBean exposing an AMQ Broker Connection") - public AMQProtocolSessionMBean(AMQMinaProtocolSession session) throws NotCompliantMBeanException, OpenDataException + public AMQProtocolSessionMBean(AMQProtocolSession amqProtocolSession) throws NotCompliantMBeanException, OpenDataException { super(ManagedConnection.class, ManagedConnection.TYPE, ManagedConnection.VERSION); - _session = session; + _protocolSession = amqProtocolSession; String remote = getRemoteAddress(); remote = "anonymous".equals(remote) ? (remote + hashCode()) : remote; _name = jmxEncode(new StringBuffer(remote), 0).toString(); @@ -128,52 +128,52 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed public String getClientId() { - return (_session.getContextKey() == null) ? null : _session.getContextKey().toString(); + return (_protocolSession.getContextKey() == null) ? null : _protocolSession.getContextKey().toString(); } public String getAuthorizedId() { - return (_session.getAuthorizedID() != null ) ? _session.getAuthorizedID().getName() : null; + return (_protocolSession.getAuthorizedID() != null ) ? _protocolSession.getAuthorizedID().getName() : null; } public String getVersion() { - return (_session.getClientVersion() == null) ? null : _session.getClientVersion().toString(); + return (_protocolSession.getClientVersion() == null) ? null : _protocolSession.getClientVersion().toString(); } public Date getLastIoTime() { - return new Date(_session.getIOSession().getLastIoTime()); + return new Date(_protocolSession.getLastIoTime()); } public String getRemoteAddress() { - return _session.getIOSession().getRemoteAddress().toString(); + return _protocolSession.getRemoteAddress().toString(); } public ManagedObject getParentObject() { - return _session.getVirtualHost().getManagedObject(); + return _protocolSession.getVirtualHost().getManagedObject(); } public Long getWrittenBytes() { - return _session.getIOSession().getWrittenBytes(); + return _protocolSession.getWrittenBytes(); } public Long getReadBytes() { - return _session.getIOSession().getReadBytes(); + return _protocolSession.getWrittenBytes(); } public Long getMaximumNumberOfChannels() { - return _session.getMaximumNumberOfChannels(); + return _protocolSession.getMaximumNumberOfChannels(); } public void setMaximumNumberOfChannels(Long value) { - _session.setMaximumNumberOfChannels(value); + _protocolSession.setMaximumNumberOfChannels(value); } public String getObjectInstanceName() @@ -192,13 +192,13 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed CurrentActor.set(new ManagementActor(_logActor.getRootMessageLogger())); try { - AMQChannel channel = _session.getChannel(channelId); + AMQChannel channel = _protocolSession.getChannel(channelId); if (channel == null) { throw new JMException("The channel (channel Id = " + channelId + ") does not exist"); } - _session.commitTransactions(channel); + _protocolSession.commitTransactions(channel); } catch (AMQException ex) { @@ -221,13 +221,13 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed CurrentActor.set(new ManagementActor(_logActor.getRootMessageLogger())); try { - AMQChannel channel = _session.getChannel(channelId); + AMQChannel channel = _protocolSession.getChannel(channelId); if (channel == null) { throw new JMException("The channel (channel Id = " + channelId + ") does not exist"); } - _session.rollbackTransactions(channel); + _protocolSession.commitTransactions(channel); } catch (AMQException ex) { @@ -248,7 +248,7 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed public TabularData channels() throws OpenDataException { TabularDataSupport channelsList = new TabularDataSupport(_channelsType); - List list = _session.getChannels(); + List list = _protocolSession.getChannels(); for (AMQChannel channel : list) { @@ -274,7 +274,7 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed public void closeConnection() throws JMException { - MethodRegistry methodRegistry = _session.getMethodRegistry(); + MethodRegistry methodRegistry = _protocolSession.getMethodRegistry(); ConnectionCloseBody responseBody = methodRegistry.createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), // replyCode @@ -301,12 +301,12 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed try { - _session.writeFrame(responseBody.generateFrame(0)); + _protocolSession.writeFrame(responseBody.generateFrame(0)); try { - _session.closeSession(); + _protocolSession.closeSession(); } catch (AMQException ex) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java index b6137e83de..9575bfa1ec 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java @@ -258,7 +258,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry for (InetSocketAddress bindAddress : _acceptors.keySet()) { QpidAcceptor acceptor = _acceptors.get(bindAddress); - acceptor.getIoAcceptor().unbind(bindAddress); + acceptor.getNetworkDriver().close(); CurrentActor.get().message(BrokerMessages.BRK_1003(acceptor.toString(), bindAddress.getPort())); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java index 810be8ae22..3a81932123 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java @@ -23,7 +23,6 @@ package org.apache.qpid.server.security.access.plugins.network; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; -import java.util.Iterator; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Pattern; @@ -32,7 +31,6 @@ import org.apache.commons.configuration.CompositeConfiguration; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.XMLConfiguration; -import org.apache.qpid.server.protocol.AMQMinaProtocolSession; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.security.access.ACLPlugin; import org.apache.qpid.server.security.access.ACLPluginFactory; @@ -180,13 +178,13 @@ public class FirewallPlugin extends AbstractACLPlugin @Override public AuthzResult authoriseConnect(AMQProtocolSession session, VirtualHost virtualHost) { - if (!(session instanceof AMQMinaProtocolSession)) + SocketAddress sockAddr = session.getRemoteAddress(); + if (!(sockAddr instanceof InetSocketAddress)) { - return AuthzResult.ABSTAIN; // We only deal with tcp sessions, which - // mean MINA right now + return AuthzResult.ABSTAIN; // We only deal with tcp sessions } - InetAddress addr = getInetAdressFromMinaSession((AMQMinaProtocolSession) session); + InetAddress addr = ((InetSocketAddress) sockAddr).getAddress(); if (addr == null) { @@ -213,19 +211,6 @@ public class FirewallPlugin extends AbstractACLPlugin } - private InetAddress getInetAdressFromMinaSession(AMQMinaProtocolSession session) - { - SocketAddress remote = session.getIOSession().getRemoteAddress(); - if (remote instanceof InetSocketAddress) - { - return ((InetSocketAddress) remote).getAddress(); - } - else - { - return null; - } - } - @Override public void setConfiguration(Configuration config) throws ConfigurationException { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java index 61cc7cdeb6..3ca22b60c8 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java @@ -20,21 +20,21 @@ */ package org.apache.qpid.server.transport; -import org.apache.mina.common.IoAcceptor; +import org.apache.qpid.transport.NetworkDriver; public class QpidAcceptor { - IoAcceptor _acceptor; + NetworkDriver _driver; String _protocol; - public QpidAcceptor(IoAcceptor acceptor, String protocol) + public QpidAcceptor(NetworkDriver driver, String protocol) { - _acceptor = acceptor; + _driver = driver; _protocol = protocol; } - public IoAcceptor getIoAcceptor() + public NetworkDriver getNetworkDriver() { - return _acceptor; + return _driver; } public String toString() diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java index 8cb0837b39..1162687741 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java @@ -20,27 +20,27 @@ */ package org.apache.qpid.server.configuration; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.util.Iterator; +import java.util.List; +import java.util.Locale; + import junit.framework.TestCase; + import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.XMLConfiguration; -import org.apache.qpid.codec.AMQCodecFactory; -import org.apache.qpid.server.protocol.AMQMinaProtocolSession; +import org.apache.qpid.server.protocol.AMQProtocolEngine; import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.protocol.TestIoSession; +import org.apache.qpid.server.protocol.TestNetworkDriver; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; -import java.io.File; -import java.io.FileWriter; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.util.Iterator; -import java.util.List; -import java.util.Locale; - public class ServerConfigurationTest extends TestCase { @@ -589,12 +589,12 @@ public class ServerConfigurationTest extends TestCase { // Check default ServerConfiguration serverConfig = new ServerConfiguration(_config); - assertEquals(true, serverConfig.getSSLOnly()); + assertEquals(false, serverConfig.getSSLOnly()); // Check value we set - _config.setProperty("connector.ssl.sslOnly", false); + _config.setProperty("connector.ssl.sslOnly", true); serverConfig = new ServerConfiguration(_config); - assertEquals(false, serverConfig.getSSLOnly()); + assertEquals(true, serverConfig.getSSLOnly()); } public void testGetSSLPort() throws ConfigurationException @@ -791,16 +791,15 @@ public class ServerConfigurationTest extends TestCase // Test config VirtualHostRegistry virtualHostRegistry = reg.getVirtualHostRegistry(); VirtualHost virtualHost = virtualHostRegistry.getVirtualHost("test"); - AMQCodecFactory codecFactory = new AMQCodecFactory(true); - - TestIoSession iosession = new TestIoSession(); - iosession.setAddress("127.0.0.1"); - AMQProtocolSession session = new AMQMinaProtocolSession(iosession, virtualHostRegistry, codecFactory); + TestNetworkDriver testDriver = new TestNetworkDriver(); + testDriver.setAddress("127.0.0.1"); + + AMQProtocolEngine session = new AMQProtocolEngine(virtualHostRegistry, testDriver); assertFalse(reg.getAccessManager().authoriseConnect(session, virtualHost)); - iosession.setAddress("127.1.2.3"); - session = new AMQMinaProtocolSession(iosession, virtualHostRegistry, codecFactory); + testDriver.setAddress("127.1.2.3"); + session = new AMQProtocolEngine(virtualHostRegistry, testDriver); assertTrue(reg.getAccessManager().authoriseConnect(session, virtualHost)); } @@ -866,12 +865,12 @@ public class ServerConfigurationTest extends TestCase // Test config VirtualHostRegistry virtualHostRegistry = reg.getVirtualHostRegistry(); VirtualHost virtualHost = virtualHostRegistry.getVirtualHost("test"); - AMQCodecFactory codecFactory = new AMQCodecFactory(true); - TestIoSession iosession = new TestIoSession(); - iosession.setAddress("127.0.0.1"); + TestNetworkDriver testDriver = new TestNetworkDriver(); + testDriver.setAddress("127.0.0.1"); - AMQProtocolSession session = new AMQMinaProtocolSession(iosession, virtualHostRegistry, codecFactory); + AMQProtocolEngine session = new AMQProtocolEngine(virtualHostRegistry, testDriver); + session.setNetworkDriver(testDriver); assertFalse(reg.getAccessManager().authoriseConnect(session, virtualHost)); } @@ -935,12 +934,11 @@ public class ServerConfigurationTest extends TestCase ApplicationRegistry.initialise(reg, 1); // Test config - TestIoSession iosession = new TestIoSession(); - iosession.setAddress("127.0.0.1"); + TestNetworkDriver testDriver = new TestNetworkDriver(); + testDriver.setAddress("127.0.0.1"); VirtualHostRegistry virtualHostRegistry = reg.getVirtualHostRegistry(); VirtualHost virtualHost = virtualHostRegistry.getVirtualHost("test"); - AMQCodecFactory codecFactory = new AMQCodecFactory(true); - AMQProtocolSession session = new AMQMinaProtocolSession(iosession, virtualHostRegistry, codecFactory); + AMQProtocolSession session = new AMQProtocolEngine(virtualHostRegistry, testDriver); assertFalse(reg.getAccessManager().authoriseConnect(session, virtualHost)); RandomAccessFile fileBRandom = new RandomAccessFile(fileB, "rw"); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java index e199255f50..bc36c61382 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java @@ -44,7 +44,7 @@ public class AMQProtocolSessionMBeanTest extends TestCase private static final Logger log = Logger.getLogger(AMQProtocolSessionMBeanTest.class); private MessageStore _messageStore = new SkeletonMessageStore(); - private AMQMinaProtocolSession _protocolSession; + private AMQProtocolEngine _protocolSession; private AMQChannel _channel; private AMQProtocolSessionMBean _mbean; diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java index 37dfead2e5..c4362f2c60 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java @@ -20,23 +20,22 @@ */ package org.apache.qpid.server.protocol; -import org.apache.qpid.AMQException; -import org.apache.qpid.codec.AMQCodecFactory; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.output.ProtocolOutputConverter; -import org.apache.qpid.server.queue.AMQMessage; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.virtualhost.VirtualHost; - +import java.security.Principal; import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; -import java.security.Principal; -public class InternalTestProtocolSession extends AMQMinaProtocolSession implements ProtocolOutputConverter +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.output.ProtocolOutputConverter; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.virtualhost.VirtualHost; + +public class InternalTestProtocolSession extends AMQProtocolEngine implements ProtocolOutputConverter { // ChannelID(LIST) -> LinkedList final Map>> _channelDelivers; @@ -44,9 +43,7 @@ public class InternalTestProtocolSession extends AMQMinaProtocolSession implemen public InternalTestProtocolSession(VirtualHost virtualHost) throws AMQException { - super(new TestIoSession(), - ApplicationRegistry.getInstance().getVirtualHostRegistry(), - new AMQCodecFactory(true)); + super(ApplicationRegistry.getInstance().getVirtualHostRegistry(), new TestNetworkDriver()); _channelDelivers = new HashMap>>(); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java index 8597fc5863..e37492bcb0 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java @@ -37,7 +37,7 @@ import java.security.Principal; /** Test class to test MBean operations for AMQMinaProtocolSession. */ public class MaxChannelsTest extends TestCase { - private AMQMinaProtocolSession _session; + private AMQProtocolEngine _session; public void testChannels() throws Exception { diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestIoSession.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestIoSession.java deleted file mode 100644 index 211f491867..0000000000 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestIoSession.java +++ /dev/null @@ -1,328 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol; - -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import org.apache.mina.common.CloseFuture; -import org.apache.mina.common.IdleStatus; -import org.apache.mina.common.IoFilterChain; -import org.apache.mina.common.IoHandler; -import org.apache.mina.common.IoService; -import org.apache.mina.common.IoServiceConfig; -import org.apache.mina.common.IoSession; -import org.apache.mina.common.IoSessionConfig; -import org.apache.mina.common.ThreadModel; -import org.apache.mina.common.TrafficMask; -import org.apache.mina.common.TransportType; -import org.apache.mina.common.WriteFuture; -import org.apache.mina.transport.socket.nio.SocketAcceptorConfig; -import org.apache.qpid.pool.ReadWriteThreadModel; - -/** - * Test implementation of IoSession, which is required for some tests. Methods not being used are not implemented, - * so if this class is being used and some methods are to be used, then please update those. - */ -public class TestIoSession implements IoSession -{ - private final ConcurrentMap attributes = new ConcurrentHashMap(); - private String _address = "127.0.0.1"; - private int _port = 1; - - public TestIoSession() - { - } - - public IoService getService() - { - return null; - } - - public IoServiceConfig getServiceConfig() - { - return new TestIoConfig(); - } - - public IoHandler getHandler() - { - return null; - } - - public IoSessionConfig getConfig() - { - return null; - } - - public IoFilterChain getFilterChain() - { - return null; - } - - public WriteFuture write(Object message) - { - return null; - } - - public CloseFuture close() - { - return null; - } - - public Object getAttachment() - { - return getAttribute(""); - } - - public Object setAttachment(Object attachment) - { - return setAttribute("",attachment); - } - - public Object getAttribute(String key) - { - return attributes.get(key); - } - - public Object setAttribute(String key, Object value) - { - return attributes.put(key,value); - } - - public Object setAttribute(String key) - { - return attributes.put(key, Boolean.TRUE); - } - - public Object removeAttribute(String key) - { - return attributes.remove(key); - } - - public boolean containsAttribute(String key) - { - return attributes.containsKey(key); - } - - public Set getAttributeKeys() - { - return attributes.keySet(); - } - - public TransportType getTransportType() - { - return null; - } - - public boolean isConnected() - { - return false; - } - - public boolean isClosing() - { - return false; - } - - public CloseFuture getCloseFuture() - { - return null; - } - - public SocketAddress getRemoteAddress() - { - return new InetSocketAddress(getAddress(), getPort()); - } - - public SocketAddress getLocalAddress() - { - return null; - } - - public SocketAddress getServiceAddress() - { - return null; - } - - public int getIdleTime(IdleStatus status) - { - return 0; - } - - public long getIdleTimeInMillis(IdleStatus status) - { - return 0; - } - - public void setIdleTime(IdleStatus status, int idleTime) - { - - } - - public int getWriteTimeout() - { - return 0; - } - - public long getWriteTimeoutInMillis() - { - return 0; - } - - public void setWriteTimeout(int writeTimeout) - { - - } - - public TrafficMask getTrafficMask() - { - return null; - } - - public void setTrafficMask(TrafficMask trafficMask) - { - - } - - public void suspendRead() - { - - } - - public void suspendWrite() - { - - } - - public void resumeRead() - { - - } - - public void resumeWrite() - { - - } - - public long getReadBytes() - { - return 0; - } - - public long getWrittenBytes() - { - return 0; - } - - public long getReadMessages() - { - return 0; - } - - public long getWrittenMessages() - { - return 0; - } - - public long getWrittenWriteRequests() - { - return 0; - } - - public int getScheduledWriteRequests() - { - return 0; - } - - public int getScheduledWriteBytes() - { - return 0; - } - - public long getCreationTime() - { - return 0; - } - - public long getLastIoTime() - { - return 0; - } - - public long getLastReadTime() - { - return 0; - } - - public long getLastWriteTime() - { - return 0; - } - - public boolean isIdle(IdleStatus status) - { - return false; - } - - public int getIdleCount(IdleStatus status) - { - return 0; - } - - public long getLastIdleTime(IdleStatus status) - { - return 0; - } - - public void setAddress(String string) - { - this._address = string; - } - - public String getAddress() - { - return _address; - } - - public void setPort(int _port) - { - this._port = _port; - } - - public int getPort() - { - return _port; - } - - /** - * Test implementation of IoServiceConfig - */ - private class TestIoConfig extends SocketAcceptorConfig - { - public ThreadModel getThreadModel() - { - return ReadWriteThreadModel.getInstance(); - } - } -} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestNetworkDriver.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestNetworkDriver.java new file mode 100644 index 0000000000..098843d315 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestNetworkDriver.java @@ -0,0 +1,126 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol; + +import java.net.BindException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import javax.net.ssl.SSLEngine; + +import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.protocol.ProtocolEngineFactory; +import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.transport.NetworkDriver; +import org.apache.qpid.transport.NetworkDriverConfiguration; +import org.apache.qpid.transport.OpenException; + +/** + * Test implementation of IoSession, which is required for some tests. Methods not being used are not implemented, + * so if this class is being used and some methods are to be used, then please update those. + */ +public class TestNetworkDriver implements NetworkDriver +{ + private final ConcurrentMap attributes = new ConcurrentHashMap(); + private String _address = "127.0.0.1"; + private int _port = 1; + + public TestNetworkDriver() + { + } + + public void setAddress(String string) + { + this._address = string; + } + + public String getAddress() + { + return _address; + } + + public void setPort(int _port) + { + this._port = _port; + } + + public int getPort() + { + return _port; + } + + public void bind(int port, InetAddress[] addresses, ProtocolEngineFactory protocolFactory, + NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws BindException + { + + } + + public SocketAddress getLocalAddress() + { + return new InetSocketAddress(_address, _port); + } + + public SocketAddress getRemoteAddress() + { + return new InetSocketAddress(_address, _port); + } + + public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkDriverConfiguration config, + SSLEngine sslEngine) throws OpenException + { + + } + + public void setMaxReadIdle(int idleTime) + { + + } + + public void setMaxWriteIdle(int idleTime) + { + + } + + public void close() + { + + } + + public void flush() + { + + } + + public void send(ByteBuffer msg) + { + + } + + public void setIdleTimeout(long l) + { + + } + +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java index 8c6260ca9e..19470e6226 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java @@ -20,37 +20,34 @@ */ package org.apache.qpid.server.queue; +import java.util.ArrayList; +import java.util.LinkedList; + +import javax.management.Notification; + import junit.framework.TestCase; + +import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.MemoryMessageStore; -import org.apache.qpid.server.store.StoreContext; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.registry.IApplicationRegistry; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.txn.TransactionalContext; -import org.apache.qpid.server.txn.NonTransactionalContext; -import org.apache.qpid.server.RequiredDeliveryException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.RequiredDeliveryException; import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.protocol.AMQProtocolEngine; +import org.apache.qpid.server.protocol.InternalTestProtocolSession; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.server.store.MemoryMessageStore; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; -import org.apache.qpid.server.protocol.AMQMinaProtocolSession; -import org.apache.qpid.server.protocol.InternalTestProtocolSession; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.framing.abstraction.ContentChunk; -import org.apache.commons.configuration.CompositeConfiguration; -import org.apache.mina.common.ByteBuffer; - -import javax.management.Notification; - -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.Collections; -import java.util.Set; -import java.security.Principal; +import org.apache.qpid.server.txn.NonTransactionalContext; +import org.apache.qpid.server.txn.TransactionalContext; +import org.apache.qpid.server.virtualhost.VirtualHost; /** This class tests all the alerts an AMQQueue can throw based on threshold values of different parameters */ public class AMQQueueAlertTest extends TestCase @@ -62,7 +59,7 @@ public class AMQQueueAlertTest extends TestCase private AMQQueue _queue; private AMQQueueMBean _queueMBean; private VirtualHost _virtualHost; - private AMQMinaProtocolSession _protocolSession; + private AMQProtocolEngine _protocolSession; private MessageStore _messageStore = new MemoryMessageStore(); private StoreContext _storeContext = new StoreContext(); private TransactionalContext _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext, diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java index a497365b06..bda816f5ab 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java @@ -30,17 +30,14 @@ import java.net.InetSocketAddress; import junit.framework.TestCase; import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.commons.configuration.XMLConfiguration; -import org.apache.qpid.codec.AMQCodecFactory; -import org.apache.qpid.server.configuration.VirtualHostConfiguration; -import org.apache.qpid.server.protocol.AMQMinaProtocolSession; -import org.apache.qpid.server.protocol.TestIoSession; +import org.apache.qpid.server.protocol.AMQProtocolEngine; +import org.apache.qpid.server.protocol.TestNetworkDriver; +import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.security.access.ACLPlugin.AuthzResult; import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; -import org.apache.qpid.server.registry.ApplicationRegistry; public class FirewallPluginTest extends TestCase { @@ -84,22 +81,22 @@ public class FirewallPluginTest extends TestCase private TestableMemoryMessageStore _store; private VirtualHost _virtualHost; - private AMQMinaProtocolSession _session; + private AMQProtocolEngine _session; + private TestNetworkDriver _testDriver; @Override public void setUp() throws Exception { super.setUp(); _store = new TestableMemoryMessageStore(); - TestIoSession iosession = new TestIoSession(); - iosession.setAddress("127.0.0.1"); + _testDriver = new TestNetworkDriver(); + _testDriver.setAddress("127.0.0.1"); // Retreive VirtualHost from the Registry VirtualHostRegistry virtualHostRegistry = ApplicationRegistry.getInstance().getVirtualHostRegistry(); _virtualHost = virtualHostRegistry.getVirtualHost("test"); - AMQCodecFactory codecFactory = new AMQCodecFactory(true); - _session = new AMQMinaProtocolSession(iosession, virtualHostRegistry, codecFactory); + _session = new AMQProtocolEngine(virtualHostRegistry, _testDriver); } public void tearDown() throws Exception @@ -170,7 +167,7 @@ public class FirewallPluginTest extends TestCase assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost)); // Set session IP so that we're connected from the right address - ((TestIoSession) _session.getIOSession()).setAddress("192.168.23.23"); + _testDriver.setAddress("192.168.23.23"); assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost)); } @@ -185,7 +182,7 @@ public class FirewallPluginTest extends TestCase assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost)); // Set session IP so that we're connected from the right address - ((TestIoSession) _session.getIOSession()).setAddress("192.168.23.23"); + _testDriver.setAddress("192.168.23.23"); assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost)); } @@ -198,7 +195,7 @@ public class FirewallPluginTest extends TestCase FirewallPlugin plugin = initialisePlugin("deny", new RuleInfo[]{rule}); // Set session IP so that we're connected from the right address - ((TestIoSession) _session.getIOSession()).setAddress("127.0.0.1"); + _testDriver.setAddress("127.0.0.1"); assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost)); } @@ -211,7 +208,7 @@ public class FirewallPluginTest extends TestCase FirewallPlugin plugin = initialisePlugin("deny", new RuleInfo[]{rule}); // Set session IP so that we're connected from the right address - ((TestIoSession) _session.getIOSession()).setAddress("127.0.0.1"); + _testDriver.setAddress("127.0.0.1"); assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost)); } @@ -234,7 +231,7 @@ public class FirewallPluginTest extends TestCase assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost)); // Set session IP so that we're connected from the right address - ((TestIoSession) _session.getIOSession()).setAddress("192.168.23.23"); + _testDriver.setAddress("192.168.23.23"); assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost)); } @@ -257,7 +254,7 @@ public class FirewallPluginTest extends TestCase assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost)); // Set session IP so that we're connected from the right address - ((TestIoSession) _session.getIOSession()).setAddress("192.168.23.23"); + _testDriver.setAddress("192.168.23.23"); assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost)); } @@ -271,7 +268,7 @@ public class FirewallPluginTest extends TestCase assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost)); // Set session IP so that we're connected from the right address - ((TestIoSession) _session.getIOSession()).setAddress("192.168.23.23"); + _testDriver.setAddress("192.168.23.23"); assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost)); } @@ -285,7 +282,7 @@ public class FirewallPluginTest extends TestCase assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost)); // Set session IP so that we're connected from the right address - ((TestIoSession) _session.getIOSession()).setAddress("192.168.23.23"); + _testDriver.setAddress("192.168.23.23"); assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost)); } @@ -295,11 +292,11 @@ public class FirewallPluginTest extends TestCase firstRule.setAccess("allow"); firstRule.setHostname("foo, bar, "+new InetSocketAddress("127.0.0.1", 5672).getHostName()); FirewallPlugin plugin = initialisePlugin("deny", new RuleInfo[]{firstRule}); - ((TestIoSession) _session.getIOSession()).setAddress("10.0.0.1"); + _testDriver.setAddress("10.0.0.1"); assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost)); // Set session IP so that we're connected from the right address - ((TestIoSession) _session.getIOSession()).setAddress("127.0.0.1"); + _testDriver.setAddress("127.0.0.1"); assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost)); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 2389c9e2da..e3a1a82dc4 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -191,7 +191,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter _logger.debug("Protocol session created for session " + System.identityHashCode(session)); _failoverHandler = new FailoverHandler(this, session); - final ProtocolCodecFilter pcf = new ProtocolCodecFilter(new AMQCodecFactory(false)); + final ProtocolCodecFilter pcf = new ProtocolCodecFilter(new AMQCodecFactory(false, _protocolSession)); if (Boolean.getBoolean("amqj.shared_read_write_pool")) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java index 32cc8c4cb5..4ff24c3607 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java @@ -31,7 +31,10 @@ import org.apache.mina.transport.vmpipe.VmPipeAddress; import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.pool.ReadWriteThreadModel; +import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.protocol.ProtocolEngineFactory; import org.apache.qpid.thread.QpidThreadExecutor; +import org.apache.qpid.transport.network.mina.MINANetworkDriver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,7 +65,7 @@ public class TransportConnection private static Logger _logger = LoggerFactory.getLogger(TransportConnection.class); - private static final String DEFAULT_QPID_SERVER = "org.apache.qpid.server.protocol.AMQPFastProtocolHandler"; + private static final String DEFAULT_QPID_SERVER = "org.apache.qpid.server.protocol.AMQProtocolEngineFactory"; private static Map _openSocketRegister = new ConcurrentHashMap(); @@ -190,8 +193,6 @@ public class TransportConnection _acceptor = new VmPipeAcceptor(); IoServiceConfig config = _acceptor.getDefaultConfig(); - - config.setThreadModel(ReadWriteThreadModel.getInstance()); } synchronized (_inVmPipeAddress) { @@ -276,7 +277,10 @@ public class TransportConnection { Class[] cnstr = {Integer.class}; Object[] params = {port}; - provider = (IoHandlerAdapter) Class.forName(protocolProviderClass).getConstructor(cnstr).newInstance(params); + + provider = new MINANetworkDriver(); + ProtocolEngineFactory engineFactory = (ProtocolEngineFactory) Class.forName(protocolProviderClass).getConstructor(cnstr).newInstance(params); + ((MINANetworkDriver) provider).setProtocolEngineFactory(engineFactory, true); // Give the broker a second to create _logger.info("Created VMBroker Instance:" + port); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java index fa890d0ebb..591dbd085b 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java @@ -23,6 +23,7 @@ package org.apache.qpid.codec; import org.apache.mina.filter.codec.ProtocolCodecFactory; import org.apache.mina.filter.codec.ProtocolDecoder; import org.apache.mina.filter.codec.ProtocolEncoder; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; /** * AMQCodecFactory is a Mina codec factory. It supplies the encoders and decoders need to read and write the bytes to @@ -50,9 +51,9 @@ public class AMQCodecFactory implements ProtocolCodecFactory * @param expectProtocolInitiation true if the first frame received is going to be a protocol initiation * frame, false if it is going to be a standard AMQ data block. */ - public AMQCodecFactory(boolean expectProtocolInitiation) + public AMQCodecFactory(boolean expectProtocolInitiation, AMQVersionAwareProtocolSession session) { - _frameDecoder = new AMQDecoder(expectProtocolInitiation); + _frameDecoder = new AMQDecoder(expectProtocolInitiation, session); } /** @@ -70,7 +71,7 @@ public class AMQCodecFactory implements ProtocolCodecFactory * * @return The AMQP decoder. */ - public ProtocolDecoder getDecoder() + public AMQDecoder getDecoder() { return _frameDecoder; } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java b/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java index 7eef73f337..281c0761d9 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java @@ -20,14 +20,21 @@ */ package org.apache.qpid.codec; +import java.util.ArrayList; + import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.IoSession; import org.apache.mina.common.SimpleByteBufferAllocator; import org.apache.mina.filter.codec.CumulativeProtocolDecoder; import org.apache.mina.filter.codec.ProtocolDecoderOutput; +import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQDataBlockDecoder; +import org.apache.qpid.framing.AMQFrameDecodingException; +import org.apache.qpid.framing.AMQMethodBodyFactory; +import org.apache.qpid.framing.AMQProtocolVersionException; import org.apache.qpid.framing.ProtocolInitiation; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; /** * AMQDecoder delegates the decoding of AMQP either to a data block decoder, or in the case of new connections, to a @@ -62,14 +69,19 @@ public class AMQDecoder extends CumulativeProtocolDecoder private boolean _expectProtocolInitiation; private boolean firstDecode = true; + private AMQMethodBodyFactory _bodyFactory; + + private ByteBuffer _remainingBuf; + /** * Creates a new AMQP decoder. * * @param expectProtocolInitiation true if this decoder needs to handle protocol initiation. */ - public AMQDecoder(boolean expectProtocolInitiation) + public AMQDecoder(boolean expectProtocolInitiation, AMQVersionAwareProtocolSession session) { _expectProtocolInitiation = expectProtocolInitiation; + _bodyFactory = new AMQMethodBodyFactory(session); } /** @@ -120,7 +132,7 @@ public class AMQDecoder extends CumulativeProtocolDecoder protected boolean doDecodeDataBlock(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception { int pos = in.position(); - boolean enoughData = _dataBlockDecoder.decodable(session, in); + boolean enoughData = _dataBlockDecoder.decodable(in.buf()); in.position(pos); if (!enoughData) { @@ -149,7 +161,7 @@ public class AMQDecoder extends CumulativeProtocolDecoder */ private boolean doDecodePI(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception { - boolean enoughData = _piDecoder.decodable(session, in); + boolean enoughData = _piDecoder.decodable(in.buf()); if (!enoughData) { // returning false means it will leave the contents in the buffer and @@ -158,7 +170,8 @@ public class AMQDecoder extends CumulativeProtocolDecoder } else { - _piDecoder.decode(session, in, out); + ProtocolInitiation pi = new ProtocolInitiation(in.buf()); + out.write(pi); return true; } @@ -177,7 +190,7 @@ public class AMQDecoder extends CumulativeProtocolDecoder } - /** + /** * Cumulates content of in into internal buffer and forwards * decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}. * doDecode() is invoked repeatedly until it returns false @@ -268,4 +281,60 @@ public class AMQDecoder extends CumulativeProtocolDecoder session.setAttribute( BUFFER, remainingBuf ); } + public ArrayList decodeBuffer(java.nio.ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException + { + + // get prior remaining data from accumulator + ArrayList dataBlocks = new ArrayList(); + ByteBuffer msg; + // if we have a session buffer, append data to that otherwise + // use the buffer read from the network directly + if( _remainingBuf != null ) + { + _remainingBuf.put(buf); + _remainingBuf.flip(); + msg = _remainingBuf; + } + else + { + msg = ByteBuffer.wrap(buf); + } + + if (_expectProtocolInitiation + || (firstDecode + && (msg.remaining() > 0) + && (msg.get(msg.position()) == (byte)'A'))) + { + if (_piDecoder.decodable(msg.buf())) + { + dataBlocks.add(new ProtocolInitiation(msg.buf())); + } + } + else + { + boolean enoughData = true; + while (enoughData) + { + int pos = msg.position(); + + enoughData = _dataBlockDecoder.decodable(msg); + msg.position(pos); + if (enoughData) + { + dataBlocks.add(_dataBlockDecoder.createAndPopulateFrame(_bodyFactory, msg)); + } + else + { + _remainingBuf = SIMPLE_BYTE_BUFFER_ALLOCATOR.allocate(msg.remaining(), false); + _remainingBuf.setAutoExpand(true); + _remainingBuf.put(msg); + } + } + } + if(firstDecode && dataBlocks.size() > 0) + { + firstDecode = false; + } + return dataBlocks; + } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java index 82ffc60802..228867b2b0 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java @@ -47,7 +47,7 @@ public class AMQDataBlockDecoder public AMQDataBlockDecoder() { } - public boolean decodable(IoSession session, ByteBuffer in) throws AMQFrameDecodingException + public boolean decodable(java.nio.ByteBuffer in) throws AMQFrameDecodingException { final int remainingAfterAttributes = in.remaining() - (1 + 2 + 4 + 1); // type, channel, body length and end byte @@ -56,14 +56,15 @@ public class AMQDataBlockDecoder return false; } - in.skip(1 + 2); - final long bodySize = in.getUnsignedInt(); + in.position(in.position() + 1 + 2); + // Get an unsigned int, lifted from MINA ByteBuffer getUnsignedInt() + final long bodySize = in.getInt() & 0xffffffffL; return (remainingAfterAttributes >= bodySize); } - protected Object createAndPopulateFrame(IoSession session, ByteBuffer in) + public AMQFrame createAndPopulateFrame(AMQMethodBodyFactory methodBodyFactory, ByteBuffer in) throws AMQFrameDecodingException, AMQProtocolVersionException { final byte type = in.get(); @@ -71,15 +72,7 @@ public class AMQDataBlockDecoder BodyFactory bodyFactory; if (type == AMQMethodBody.TYPE) { - bodyFactory = (BodyFactory) session.getAttribute(SESSION_METHOD_BODY_FACTORY); - if (bodyFactory == null) - { - AMQVersionAwareProtocolSession protocolSession = (AMQVersionAwareProtocolSession) session.getAttachment(); - bodyFactory = new AMQMethodBodyFactory(protocolSession); - session.setAttribute(SESSION_METHOD_BODY_FACTORY, bodyFactory); - - } - + bodyFactory = methodBodyFactory; } else { @@ -115,6 +108,24 @@ public class AMQDataBlockDecoder public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception { - out.write(createAndPopulateFrame(session, in)); + AMQMethodBodyFactory bodyFactory = (AMQMethodBodyFactory) session.getAttribute(SESSION_METHOD_BODY_FACTORY); + if (bodyFactory == null) + { + AMQVersionAwareProtocolSession protocolSession = (AMQVersionAwareProtocolSession) session.getAttachment(); + bodyFactory = new AMQMethodBodyFactory(protocolSession); + session.setAttribute(SESSION_METHOD_BODY_FACTORY, bodyFactory); + } + + out.write(createAndPopulateFrame(bodyFactory, in)); + } + + public boolean decodable(ByteBuffer msg) throws AMQFrameDecodingException + { + return decodable(msg.buf()); + } + + public AMQDataBlock createAndPopulateFrame(AMQMethodBodyFactory factory, java.nio.ByteBuffer msg) throws AMQProtocolVersionException, AMQFrameDecodingException + { + return createAndPopulateFrame(factory, ByteBuffer.wrap(msg)); } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java index 05fd2bb480..374644b4f2 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java @@ -50,7 +50,7 @@ public final class AMQDataBlockEncoder implements MessageEncoder { _logger.debug("Encoded frame byte-buffer is '" + EncodingUtils.convertToHexString(buffer) + "'"); } - + out.write(buffer); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java index 3ac17e9204..cf8a866e47 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java @@ -20,12 +20,10 @@ */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.IoSession; -import org.apache.mina.filter.codec.ProtocolDecoderOutput; import org.apache.qpid.AMQException; import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQDataBlock { @@ -53,13 +51,12 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData _protocolMajor = protocolMajor; _protocolMinor = protocolMinor; } - + public ProtocolInitiation(ProtocolVersion pv) { this(AMQP_HEADER, CURRENT_PROTOCOL_CLASS, TCP_PROTOCOL_INSTANCE, pv.getMajorVersion(), pv.getMinorVersion()); } - public ProtocolInitiation(ByteBuffer in) { _protocolHeader = new byte[4]; @@ -71,6 +68,11 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData _protocolMinor = in.get(); } + public void writePayload(org.apache.mina.common.ByteBuffer buffer) + { + writePayload(buffer.buf()); + } + public long getSize() { return 4 + 1 + 1 + 1 + 1; @@ -127,16 +129,11 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData * @return true if we have enough data to decode the PI frame fully, false if more * data is required */ - public boolean decodable(IoSession session, ByteBuffer in) + public boolean decodable(ByteBuffer in) { return (in.remaining() >= 8); } - public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) - { - ProtocolInitiation pi = new ProtocolInitiation(in); - out.write(pi); - } } public ProtocolVersion checkVersion() throws AMQException @@ -192,4 +189,5 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData buffer.append(Integer.toHexString(_protocolMinor)); return buffer.toString(); } + } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java index 5996cbf89c..49bce9f2f9 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java @@ -45,21 +45,31 @@ import org.apache.mina.common.IoSession; * a continuation. Job is also a continuation, as is the job completion handler. Or, as Event is totally abstract, * it is really an interface, so could just drop it and use the continuation interface instead. */ -public abstract class Event +public class Event { + private Runnable _runner; + + public Event() + { + + } + /** * Creates a continuation. */ - public Event() - { } + public Event(Runnable runner) + { + _runner = runner; + } /** - * Processes the continuation in the context of a Mina session. - * - * @param session The Mina session. + * Processes the continuation */ - public abstract void process(IoSession session); - + public void process() + { + _runner.run(); + } + /** * A continuation ({@link Event}) that takes a Mina messageReceived event, and passes it to a NextFilter. * @@ -68,22 +78,22 @@ public abstract class Event * Pass a Mina messageReceived event to a NextFilter. {@link IoFilter.NextFilter}, {@link IoSession} * */ - public static final class ReceivedEvent extends Event + public static final class MinaReceivedEvent extends Event { private final Object _data; - private final IoFilter.NextFilter _nextFilter; + private final IoSession _session; - public ReceivedEvent(final IoFilter.NextFilter nextFilter, final Object data) + public MinaReceivedEvent(final IoFilter.NextFilter nextFilter, final Object data, final IoSession session) { - super(); _nextFilter = nextFilter; _data = data; + _session = session; } - public void process(IoSession session) + public void process() { - _nextFilter.messageReceived(session, _data); + _nextFilter.messageReceived(_session, _data); } public IoFilter.NextFilter getNextFilter() @@ -101,21 +111,22 @@ public abstract class Event * {@link IoFilter.NextFilter}, {@link IoFilter.WriteRequest}, {@link IoSession} * */ - public static final class WriteEvent extends Event + public static final class MinaWriteEvent extends Event { private final IoFilter.WriteRequest _data; private final IoFilter.NextFilter _nextFilter; + private IoSession _session; - public WriteEvent(final IoFilter.NextFilter nextFilter, final IoFilter.WriteRequest data) + public MinaWriteEvent(final IoFilter.NextFilter nextFilter, final IoFilter.WriteRequest data, final IoSession session) { - super(); _nextFilter = nextFilter; _data = data; + _session = session; } - public void process(IoSession session) + public void process() { - _nextFilter.filterWrite(session, _data); + _nextFilter.filterWrite(_session, _data); } public IoFilter.NextFilter getNextFilter() @@ -135,16 +146,17 @@ public abstract class Event public static final class CloseEvent extends Event { private final IoFilter.NextFilter _nextFilter; + private final IoSession _session; - public CloseEvent(final IoFilter.NextFilter nextFilter) + public CloseEvent(final IoFilter.NextFilter nextFilter, final IoSession session) { - super(); _nextFilter = nextFilter; + _session = session; } - public void process(IoSession session) + public void process() { - _nextFilter.sessionClosed(session); + _nextFilter.sessionClosed(_session); } public IoFilter.NextFilter getNextFilter() diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java index 00da005515..4e4192dbe3 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java @@ -55,9 +55,6 @@ public class Job implements ReadWriteRunnable /** The maximum number of events to process per run of the job. More events than this may be queued in the job. */ private final int _maxEvents; - /** The Mina session. */ - private final IoSession _session; - /** Holds the queue of events that make up the job. */ private final java.util.Queue _eventQueue = new ConcurrentLinkedQueue(); @@ -79,7 +76,13 @@ public class Job implements ReadWriteRunnable */ Job(IoSession session, JobCompletionHandler completionHandler, int maxEvents, final boolean readJob) { - _session = session; + _completionHandler = completionHandler; + _maxEvents = maxEvents; + _readJob = readJob; + } + + public Job(JobCompletionHandler completionHandler, int maxEvents, boolean readJob) + { _completionHandler = completionHandler; _maxEvents = maxEvents; _readJob = readJob; @@ -90,7 +93,7 @@ public class Job implements ReadWriteRunnable * * @param evt The continuation to enqueue. */ - void add(Event evt) + public void add(Event evt) { _eventQueue.add(evt); } @@ -111,7 +114,7 @@ public class Job implements ReadWriteRunnable } else { - e.process(_session); + e.process(); } } return false; @@ -153,30 +156,19 @@ public class Job implements ReadWriteRunnable if(processAll()) { deactivate(); - _completionHandler.completed(_session, this); + _completionHandler.completed(this); } else { - _completionHandler.notCompleted(_session, this); + _completionHandler.notCompleted(this); } } - public boolean isReadJob() - { - return _readJob; - } - public boolean isRead() { return _readJob; } - public boolean isWrite() - { - return !_readJob; - } - - /** * Another interface for a continuation. * @@ -185,8 +177,8 @@ public class Job implements ReadWriteRunnable */ static interface JobCompletionHandler { - public void completed(IoSession session, Job job); + public void completed(Job job); - public void notCompleted(final IoSession session, final Job job); + public void notCompleted(final Job job); } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java index a080cc7e04..4863611c42 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java @@ -20,19 +20,17 @@ */ package org.apache.qpid.pool; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; + import org.apache.mina.common.IdleStatus; import org.apache.mina.common.IoFilterAdapter; import org.apache.mina.common.IoSession; import org.apache.qpid.pool.Event.CloseEvent; - +import org.apache.qpid.pool.Event.MinaReceivedEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ExecutorService; - /** * PoolingFilter, is a no-op pass through filter that hands all events down the Mina filter chain by default. As it * adds no behaviour by default to the filter chain, it is abstract. @@ -74,7 +72,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo private final String _name; /** Defines the maximum number of events that will be batched into a single job. */ - static final int MAX_JOB_EVENTS = Integer.getInteger("amqj.server.read_write_pool.max_events", 10); + public static final int MAX_JOB_EVENTS = Integer.getInteger("amqj.server.read_write_pool.max_events", 10); private final int _maxEvents; @@ -188,7 +186,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo Job job = new Job(session, this, MAX_JOB_EVENTS,_readFilter); session.setAttribute(_name, job); } - + /** * Retrieves this filters Job, by this filters name, from the Mina session. * @@ -208,7 +206,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo * @param session The Mina session to work in. * @param job The job that completed. */ - public void completed(IoSession session, Job job) + public void completed(Job job) { @@ -239,7 +237,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo } } - public void notCompleted(IoSession session, Job job) + public void notCompleted(Job job) { final ExecutorService pool = _poolReference.getPool(); @@ -430,7 +428,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo public void messageReceived(NextFilter nextFilter, final IoSession session, Object message) { Job job = getJobForSession(session); - fireAsynchEvent(job, new Event.ReceivedEvent(nextFilter, message)); + fireAsynchEvent(job, new MinaReceivedEvent(nextFilter, message, session)); } /** @@ -442,7 +440,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo public void sessionClosed(final NextFilter nextFilter, final IoSession session) { Job job = getJobForSession(session); - fireAsynchEvent(job, new CloseEvent(nextFilter)); + fireAsynchEvent(job, new CloseEvent(nextFilter, session)); } } @@ -473,7 +471,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo public void filterWrite(final NextFilter nextFilter, final IoSession session, final WriteRequest writeRequest) { Job job = getJobForSession(session); - fireAsynchEvent(job, new Event.WriteEvent(nextFilter, writeRequest)); + fireAsynchEvent(job, new Event.MinaWriteEvent(nextFilter, writeRequest, session)); } /** @@ -485,7 +483,8 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo public void sessionClosed(final NextFilter nextFilter, final IoSession session) { Job job = getJobForSession(session); - fireAsynchEvent(job, new CloseEvent(nextFilter)); + fireAsynchEvent(job, new CloseEvent(nextFilter, session)); } } + } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java index ad04a923e1..140c93ca8d 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java @@ -23,5 +23,4 @@ package org.apache.qpid.pool; public interface ReadWriteRunnable extends Runnable { boolean isRead(); - boolean isWrite(); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java index d8c0f2c916..9df84eef90 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java @@ -20,10 +20,12 @@ */ package org.apache.qpid.protocol; +import org.apache.qpid.transport.NetworkDriver; + public interface ProtocolEngineFactory { // Returns a new instance of a ProtocolEngine - ProtocolEngine newProtocolEngine(); + ProtocolEngine newProtocolEngine(NetworkDriver networkDriver); } \ No newline at end of file diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java index 18cae6bf85..c38afe5dd5 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java @@ -28,17 +28,17 @@ package org.apache.qpid.transport; public interface NetworkDriverConfiguration { // Taken from Socket - boolean getKeepAlive(); - boolean getOOBInline(); - boolean getReuseAddress(); + Boolean getKeepAlive(); + Boolean getOOBInline(); + Boolean getReuseAddress(); Integer getSoLinger(); // null means off - int getSoTimeout(); - boolean getTcpNoDelay(); - int getTrafficClass(); + Integer getSoTimeout(); + Boolean getTcpNoDelay(); + Integer getTrafficClass(); // The amount of memory in bytes to allocate to the incoming buffer - int getReceiveBufferSize(); + Integer getReceiveBufferSize(); // The amount of memory in bytes to allocate to the outgoing buffer - int getSendBufferSize(); + Integer getSendBufferSize(); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java index 7330a042df..477e2cd5af 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java @@ -181,6 +181,7 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver { return _ioSession.getLocalAddress(); } + public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkDriverConfiguration config, SSLEngine sslEngine) throws OpenException @@ -251,6 +252,10 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver public void close() { + if (_lastWriteFuture != null) + { + _lastWriteFuture.join(); + } if (_acceptor != null) { _acceptor.unbindAll(); @@ -359,9 +364,14 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver protocolSession.getFilterChain().remove("tempExecutorFilterForFilterBuilder"); } - + + if (_ioSession == null) + { + _ioSession = protocolSession; + } + // Set up the protocol engine - ProtocolEngine protocolEngine = _factory.newProtocolEngine(); + ProtocolEngine protocolEngine = _factory.newProtocolEngine(this); MINANetworkDriver newDriver = new MINANetworkDriver(_useNIO, _processors, _executorPool, _protectIO, protocolEngine, protocolSession); protocolEngine.setNetworkDriver(newDriver); protocolSession.setAttachment(protocolEngine); @@ -385,4 +395,10 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver return _protocolEngine; } + public void setProtocolEngineFactory(ProtocolEngineFactory engineFactory, boolean acceptingConnections) + { + _factory = engineFactory; + _acceptingConnections = acceptingConnections; + } + } diff --git a/qpid/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java b/qpid/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java new file mode 100644 index 0000000000..46c812e265 --- /dev/null +++ b/qpid/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java @@ -0,0 +1,130 @@ +package org.apache.qpid.codec; + +import java.nio.ByteBuffer; +import java.util.ArrayList; + +import junit.framework.TestCase; + +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQFrameDecodingException; +import org.apache.qpid.framing.AMQProtocolVersionException; +import org.apache.qpid.framing.HeartbeatBody; + +public class AMQDecoderTest extends TestCase +{ + + private AMQCodecFactory _factory; + private AMQDecoder _decoder; + + + public void setUp() + { + _factory = new AMQCodecFactory(false, null); + _decoder = _factory.getDecoder(); + } + + + public void testSingleFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException + { + ByteBuffer msg = HeartbeatBody.FRAME.toNioByteBuffer(); + ArrayList frames = _decoder.decodeBuffer(msg); + if (frames.get(0) instanceof AMQFrame) + { + assertEquals(HeartbeatBody.FRAME.getBodyFrame().getFrameType(), ((AMQFrame) frames.get(0)).getBodyFrame().getFrameType()); + } + else + { + fail("decode was not a frame"); + } + } + + public void testPartialFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException + { + ByteBuffer msg = HeartbeatBody.FRAME.toNioByteBuffer(); + ByteBuffer msgA = msg.slice(); + int msgbPos = msg.remaining() / 2; + int msgaLimit = msg.remaining() - msgbPos; + msgA.limit(msgaLimit); + msg.position(msgbPos); + ByteBuffer msgB = msg.slice(); + ArrayList frames = _decoder.decodeBuffer(msgA); + assertEquals(0, frames.size()); + frames = _decoder.decodeBuffer(msgB); + assertEquals(1, frames.size()); + if (frames.get(0) instanceof AMQFrame) + { + assertEquals(HeartbeatBody.FRAME.getBodyFrame().getFrameType(), ((AMQFrame) frames.get(0)).getBodyFrame().getFrameType()); + } + else + { + fail("decode was not a frame"); + } + } + + public void testMultipleFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException + { + ByteBuffer msgA = HeartbeatBody.FRAME.toNioByteBuffer(); + ByteBuffer msgB = HeartbeatBody.FRAME.toNioByteBuffer(); + ByteBuffer msg = ByteBuffer.allocate(msgA.remaining() + msgB.remaining()); + msg.put(msgA); + msg.put(msgB); + msg.flip(); + ArrayList frames = _decoder.decodeBuffer(msg); + assertEquals(2, frames.size()); + for (AMQDataBlock frame : frames) + { + if (frame instanceof AMQFrame) + { + assertEquals(HeartbeatBody.FRAME.getBodyFrame().getFrameType(), ((AMQFrame) frame).getBodyFrame().getFrameType()); + } + else + { + fail("decode was not a frame"); + } + } + } + + public void testMultiplePartialFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException + { + ByteBuffer msgA = HeartbeatBody.FRAME.toNioByteBuffer(); + ByteBuffer msgB = HeartbeatBody.FRAME.toNioByteBuffer(); + ByteBuffer msgC = HeartbeatBody.FRAME.toNioByteBuffer(); + + ByteBuffer sliceA = ByteBuffer.allocate(msgA.remaining() + msgB.remaining() / 2); + sliceA.put(msgA); + int limit = msgB.limit(); + int pos = msgB.remaining() / 2; + msgB.limit(pos); + sliceA.put(msgB); + sliceA.flip(); + msgB.limit(limit); + msgB.position(pos); + + ByteBuffer sliceB = ByteBuffer.allocate(msgB.remaining() + pos); + sliceB.put(msgB); + msgC.limit(pos); + sliceB.put(msgC); + sliceB.flip(); + msgC.limit(limit); + + ArrayList frames = _decoder.decodeBuffer(sliceA); + assertEquals(1, frames.size()); + frames = _decoder.decodeBuffer(sliceB); + assertEquals(1, frames.size()); + frames = _decoder.decodeBuffer(msgC); + assertEquals(1, frames.size()); + for (AMQDataBlock frame : frames) + { + if (frame instanceof AMQFrame) + { + assertEquals(HeartbeatBody.FRAME.getBodyFrame().getFrameType(), ((AMQFrame) frame).getBodyFrame().getFrameType()); + } + else + { + fail("decode was not a frame"); + } + } + } + +} diff --git a/qpid/java/common/src/test/java/org/apache/qpid/codec/MockAMQVersionAwareProtocolSession.java b/qpid/java/common/src/test/java/org/apache/qpid/codec/MockAMQVersionAwareProtocolSession.java new file mode 100644 index 0000000000..bd7fb68d93 --- /dev/null +++ b/qpid/java/common/src/test/java/org/apache/qpid/codec/MockAMQVersionAwareProtocolSession.java @@ -0,0 +1,95 @@ +package org.apache.qpid.codec; + +import java.nio.ByteBuffer; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.HeartbeatBody; +import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.framing.ProtocolVersion; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import org.apache.qpid.transport.Sender; + +public class MockAMQVersionAwareProtocolSession implements AMQVersionAwareProtocolSession +{ + + @Override + public void contentBodyReceived(int channelId, ContentBody body) throws AMQException + { + // TODO Auto-generated method stub + + } + + @Override + public void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException + { + // TODO Auto-generated method stub + + } + + @Override + public MethodRegistry getMethodRegistry() + { + return MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9); + } + + @Override + public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException + { + // TODO Auto-generated method stub + + } + + @Override + public void init() + { + // TODO Auto-generated method stub + + } + + @Override + public void methodFrameReceived(int channelId, AMQMethodBody body) throws AMQException + { + // TODO Auto-generated method stub + + } + + @Override + public void setSender(Sender sender) + { + // TODO Auto-generated method stub + + } + + @Override + public void writeFrame(AMQDataBlock frame) + { + // TODO Auto-generated method stub + + } + + @Override + public byte getProtocolMajorVersion() + { + // TODO Auto-generated method stub + return 0; + } + + @Override + public byte getProtocolMinorVersion() + { + // TODO Auto-generated method stub + return 0; + } + + @Override + public ProtocolVersion getProtocolVersion() + { + // TODO Auto-generated method stub + return null; + } + +} diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java index 7901f6a99d..6024875cf5 100644 --- a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java +++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java @@ -299,7 +299,7 @@ public class MINANetworkDriverTest extends TestCase _countingEngine.setNewLatch(TEST_DATA.getBytes().length); _client.send(ByteBuffer.wrap(TEST_DATA.getBytes())); _countingEngine.getExceptionLatch().await(2, TimeUnit.SECONDS); - assertEquals("Exception should not been thrown", 0, + assertEquals("Exception should have been thrown", 0, _countingEngine.getExceptionLatch().getCount()); } @@ -321,11 +321,12 @@ public class MINANetworkDriverTest extends TestCase { EchoProtocolEngine _engine = null; - public ProtocolEngine newProtocolEngine() + public ProtocolEngine newProtocolEngine(NetworkDriver driver) { if (_engine == null) { _engine = new EchoProtocolEngine(); + _engine.setNetworkDriver(driver); } return getEngine(); } -- cgit v1.2.1