summaryrefslogtreecommitdiff
path: root/java/broker
diff options
context:
space:
mode:
authorAidan Skinner <aidan@apache.org>2009-10-15 01:06:23 +0000
committerAidan Skinner <aidan@apache.org>2009-10-15 01:06:23 +0000
commit85ad542c1d5724cb5a1294f12e826a83972ef433 (patch)
treeaabcb3336d6b4b7d3ae9bb0a234bff0c9b4015aa /java/broker
parentf4809a431d4b374e866f5988a4099ac927f42dc4 (diff)
downloadqpid-python-85ad542c1d5724cb5a1294f12e826a83972ef433.tar.gz
Merge java-network-refactor branch
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@825362 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker')
-rw-r--r--java/broker/build.xml1
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java1
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/Main.java199
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java56
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java9
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java1
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java290
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPProtocolProvider.java52
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java (renamed from java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java)342
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java29
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java17
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java42
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java23
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java12
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java8
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java54
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java2
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java24
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java2
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/protocol/TestIoSession.java328
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java49
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java39
23 files changed, 516 insertions, 1066 deletions
diff --git a/java/broker/build.xml b/java/broker/build.xml
index 3c63c459be..ae133d1a96 100644
--- a/java/broker/build.xml
+++ b/java/broker/build.xml
@@ -21,6 +21,7 @@
<project name="AMQ Broker" default="build">
<property name="module.depends" value="management/common common"/>
+ <property name="module.test.depends" value="common/test" />
<property name="module.main" value="org.apache.qpid.server.Main"/>
<import file="../module.xml"/>
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index ea48bd7cc3..644a33db01 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -41,7 +41,6 @@ import org.apache.qpid.server.exchange.NoRouteException;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.flow.Pre0_10CreditManager;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
import org.apache.qpid.server.queue.*;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/Main.java b/java/broker/src/main/java/org/apache/qpid/server/Main.java
index f8deb95628..c45e794145 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/Main.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/Main.java
@@ -20,6 +20,13 @@
*/
package org.apache.qpid.server;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Properties;
+
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
@@ -30,16 +37,9 @@ import org.apache.commons.cli.PosixParser;
import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;
import org.apache.log4j.xml.QpidLog4JConfigurator;
-import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.FixedSizeByteBufferAllocator;
-import org.apache.mina.common.IoAcceptor;
-import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
-import org.apache.mina.transport.socket.nio.SocketSessionConfig;
-import org.apache.mina.util.NewThreadExecutor;
import org.apache.qpid.AMQException;
import org.apache.qpid.common.QpidProperties;
import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.pool.ReadWriteThreadModel;
import org.apache.qpid.server.configuration.ServerConfiguration;
import org.apache.qpid.server.configuration.management.ConfigurationManagementMBean;
import org.apache.qpid.server.information.management.ServerInformationMBean;
@@ -48,19 +48,13 @@ import org.apache.qpid.server.logging.actors.BrokerActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.management.LoggingManagementMBean;
import org.apache.qpid.server.logging.messages.BrokerMessages;
-import org.apache.qpid.server.protocol.AMQPFastProtocolHandler;
-import org.apache.qpid.server.protocol.AMQPProtocolProvider;
+import org.apache.qpid.server.protocol.AMQProtocolEngineFactory;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
import org.apache.qpid.server.transport.QpidAcceptor;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.BindException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.Properties;
+import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.transport.NetworkDriver;
+import org.apache.qpid.transport.network.mina.MINANetworkDriver;
/**
* Main entry point for AMQPD.
@@ -300,20 +294,6 @@ public class Main
_brokerLogger.info("Starting Qpid Broker " + QpidProperties.getReleaseVersion()
+ " build: " + QpidProperties.getBuildVersion());
- ByteBuffer.setUseDirectBuffers(serverConfig.getEnableDirectBuffers());
-
- // the MINA default is currently to use the pooled allocator although this may change in future
- // once more testing of the performance of the simple allocator has been done
- if (!serverConfig.getEnablePooledAllocator())
- {
- ByteBuffer.setAllocator(new FixedSizeByteBufferAllocator());
- }
-
- if (serverConfig.getUseBiasedWrites())
- {
- System.setProperty("org.apache.qpid.use_write_biased_pool", "true");
- }
-
int port = serverConfig.getPort();
String portStr = commandLine.getOptionValue("p");
@@ -329,7 +309,54 @@ public class Main
}
}
- bind(port, serverConfig);
+ String bindAddr = commandLine.getOptionValue("b");
+ if (bindAddr == null)
+ {
+ bindAddr = serverConfig.getBind();
+ }
+ InetAddress bindAddress = null;
+ if (bindAddr.equals("wildcard"))
+ {
+ bindAddress = new InetSocketAddress(port).getAddress();
+ }
+ else
+ {
+ bindAddress = InetAddress.getByAddress(parseIP(bindAddr));
+ }
+
+ String keystorePath = serverConfig.getKeystorePath();
+ String keystorePassword = serverConfig.getKeystorePassword();
+ String certType = serverConfig.getCertType();
+ SSLContextFactory sslFactory = null;
+ boolean isSsl = false;
+
+ if (!serverConfig.getSSLOnly())
+ {
+ NetworkDriver driver = new MINANetworkDriver();
+ driver.bind(port, new InetAddress[]{bindAddress}, new AMQProtocolEngineFactory(),
+ serverConfig.getNetworkConfiguration(), null);
+ ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port),
+ new QpidAcceptor(driver,"TCP"));
+ CurrentActor.get().message(BrokerMessages.BRK_1002("TCP", port));
+ }
+
+ if (serverConfig.getEnableSSL())
+ {
+ sslFactory = new SSLContextFactory(keystorePath, keystorePassword, certType);
+ NetworkDriver driver = new MINANetworkDriver();
+ driver.bind(serverConfig.getSSLPort(), new InetAddress[]{bindAddress},
+ new AMQProtocolEngineFactory(), serverConfig.getNetworkConfiguration(), sslFactory);
+ ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port),
+ new QpidAcceptor(driver,"TCP"));
+ CurrentActor.get().message(BrokerMessages.BRK_1002("TCP/SSL", serverConfig.getSSLPort()));
+ }
+
+ //fixme qpid.AMQP should be using qpidproperties to get value
+ _brokerLogger.info("Qpid Broker Ready :" + QpidProperties.getReleaseVersion()
+ + " build: " + QpidProperties.getBuildVersion());
+
+ CurrentActor.get().message(BrokerMessages.BRK_1004());
+
}
finally
{
@@ -358,114 +385,6 @@ public class Main
}
}
- protected void bind(int port, ServerConfiguration config) throws BindException
- {
- String bindAddr = commandLine.getOptionValue("b");
- if (bindAddr == null)
- {
- bindAddr = config.getBind();
- }
-
- try
- {
- IoAcceptor acceptor;
-
- if (ApplicationRegistry.getInstance().getConfiguration().getQpidNIO())
- {
- _logger.warn("Using Qpid Multithreaded IO Processing");
- acceptor = new org.apache.mina.transport.socket.nio.MultiThreadSocketAcceptor(config.getProcessors(), new NewThreadExecutor());
- }
- else
- {
- _logger.warn("Using Mina IO Processing");
- acceptor = new org.apache.mina.transport.socket.nio.SocketAcceptor(config.getProcessors(), new NewThreadExecutor());
- }
-
- SocketAcceptorConfig sconfig = (SocketAcceptorConfig) acceptor.getDefaultConfig();
- SocketSessionConfig sc = (SocketSessionConfig) sconfig.getSessionConfig();
-
- sc.setReceiveBufferSize(config.getReceiveBufferSize());
- sc.setSendBufferSize(config.getWriteBufferSize());
- sc.setTcpNoDelay(config.getTcpNoDelay());
-
- // if we do not use the executor pool threading model we get the default leader follower
- // implementation provided by MINA
- if (config.getEnableExecutorPool())
- {
- sconfig.setThreadModel(ReadWriteThreadModel.getInstance());
- }
-
- if (!config.getEnableSSL() || !config.getSSLOnly())
- {
- AMQPFastProtocolHandler handler = new AMQPProtocolProvider().getHandler();
- InetSocketAddress bindAddress;
- if (bindAddr.equals("wildcard"))
- {
- bindAddress = new InetSocketAddress(port);
- }
- else
- {
- bindAddress = new InetSocketAddress(InetAddress.getByAddress(parseIP(bindAddr)), port);
- }
-
- bind(new QpidAcceptor(acceptor,"TCP"), bindAddress, handler, sconfig);
-
- //fixme qpid.AMQP should be using qpidproperties to get value
- _brokerLogger.info("Qpid.AMQP listening on non-SSL address " + bindAddress);
- }
-
- if (config.getEnableSSL())
- {
- AMQPFastProtocolHandler handler = new AMQPProtocolProvider().getHandler();
- try
- {
-
- bind(new QpidAcceptor(acceptor, "TCP/SSL"), new InetSocketAddress(config.getSSLPort()), handler, sconfig);
-
- //fixme qpid.AMQP should be using qpidproperties to get value
- _brokerLogger.info("Qpid.AMQP listening on SSL port " + config.getSSLPort());
-
- }
- catch (IOException e)
- {
- _brokerLogger.error("Unable to listen on SSL port: " + e, e);
- }
- }
-
- //fixme qpid.AMQP should be using qpidproperties to get value
- _brokerLogger.info("Qpid Broker Ready :" + QpidProperties.getReleaseVersion()
- + " build: " + QpidProperties.getBuildVersion());
-
- CurrentActor.get().message(BrokerMessages.BRK_1004());
-
- }
- catch (Exception e)
- {
- _logger.error("Unable to bind service to registry: " + e, e);
- //fixme this need tidying up
- throw new BindException(e.getMessage());
- }
- }
-
- /**
- * Ensure that any bound Acceptors are recorded in the registry so they can be closed later.
- *
- * @param acceptor
- * @param bindAddress
- * @param handler
- * @param sconfig
- *
- * @throws IOException from the acceptor.bind command
- */
- private void bind(QpidAcceptor acceptor, InetSocketAddress bindAddress, AMQPFastProtocolHandler handler, SocketAcceptorConfig sconfig) throws IOException
- {
- acceptor.getIoAcceptor().bind(bindAddress, handler, sconfig);
-
- CurrentActor.get().message(BrokerMessages.BRK_1002(acceptor.toString(), bindAddress.getPort()));
-
- ApplicationRegistry.getInstance().addAcceptor(bindAddress, acceptor);
- }
-
public static void main(String[] args)
{
//if the -Dlog4j.configuration property has not been set, enable the init override
diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
index 01befbbfe8..641b44bb18 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
@@ -38,6 +38,7 @@ import org.apache.qpid.server.configuration.management.ConfigurationManagementMB
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.transport.NetworkDriverConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -570,7 +571,7 @@ public class ServerConfiguration implements SignalHandler
public boolean getSSLOnly()
{
- return getConfig().getBoolean("connector.ssl.sslOnly", true);
+ return getConfig().getBoolean("connector.ssl.sslOnly", false);
}
public int getSSLPort()
@@ -619,4 +620,57 @@ public class ServerConfiguration implements SignalHandler
getConfig().getLong("housekeeping.expiredMessageCheckPeriod",
DEFAULT_HOUSEKEEPING_PERIOD));
}
+
+ public NetworkDriverConfiguration getNetworkConfiguration()
+ {
+ return new NetworkDriverConfiguration()
+ {
+
+ public Integer getTrafficClass()
+ {
+ return null;
+ }
+
+ public Boolean getTcpNoDelay()
+ {
+ // Can't call parent getTcpNoDelay since it just calls this one
+ return getConfig().getBoolean("connector.tcpNoDelay", true);
+ }
+
+ public Integer getSoTimeout()
+ {
+ return null;
+ }
+
+ public Integer getSoLinger()
+ {
+ return null;
+ }
+
+ public Integer getSendBufferSize()
+ {
+ return getBufferWriteLimit();
+ }
+
+ public Boolean getReuseAddress()
+ {
+ return null;
+ }
+
+ public Integer getReceiveBufferSize()
+ {
+ return getBufferReadLimit();
+ }
+
+ public Boolean getOOBInline()
+ {
+ return null;
+ }
+
+ public Boolean getKeepAlive()
+ {
+ return null;
+ }
+ };
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
index d287595e2d..7b50a2e3ad 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
@@ -21,7 +21,6 @@
package org.apache.qpid.server.connection;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQConnectionException;
@@ -45,6 +44,14 @@ public class ConnectionRegistry implements IConnectionRegistry
{
}
+
+ public void expireClosedChannels()
+ {
+ for (AMQProtocolSession connection : _registry)
+ {
+ connection.closeIfLingeringClosedChannels();
+ }
+ }
/** Close all of the currently open connections. */
public void close() throws AMQException
diff --git a/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java
index d64fde1c20..002269bbaa 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.connection;
-import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.AMQException;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
deleted file mode 100644
index 16b85e67b3..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
+++ /dev/null
@@ -1,290 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-
-import org.apache.log4j.Logger;
-import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.IdleStatus;
-import org.apache.mina.common.IoFilterChain;
-import org.apache.mina.common.IoHandlerAdapter;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.filter.ReadThrottleFilterBuilder;
-import org.apache.mina.filter.SSLFilter;
-import org.apache.mina.filter.WriteBufferLimitFilterBuilder;
-import org.apache.mina.filter.codec.QpidProtocolCodecFilter;
-import org.apache.mina.filter.executor.ExecutorFilter;
-import org.apache.mina.util.SessionUtil;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.AMQProtocolHeaderException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ConnectionCloseBody;
-import org.apache.qpid.framing.HeartbeatBody;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.framing.ProtocolInitiation;
-import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.server.configuration.ServerConfiguration;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.ssl.SSLContextFactory;
-
-/**
- * The protocol handler handles "protocol events" for all connections. The state
- * associated with an individual connection is accessed through the protocol session.
- *
- * We delegate all frame (message) processing to the AMQProtocolSession which wraps
- * the state for the connection.
- */
-public class AMQPFastProtocolHandler extends IoHandlerAdapter
-{
- private static final Logger _logger = Logger.getLogger(AMQPFastProtocolHandler.class);
-
- private final IApplicationRegistry _applicationRegistry;
-
- private final int BUFFER_READ_LIMIT_SIZE;
- private final int BUFFER_WRITE_LIMIT_SIZE;
-
- public AMQPFastProtocolHandler(Integer applicationRegistryInstance)
- {
- this(ApplicationRegistry.getInstance(applicationRegistryInstance));
- }
-
- public AMQPFastProtocolHandler(IApplicationRegistry applicationRegistry)
- {
- _applicationRegistry = applicationRegistry;
-
- // Read the configuration from the application registry
- BUFFER_READ_LIMIT_SIZE = _applicationRegistry.getConfiguration().getBufferReadLimit();
- BUFFER_WRITE_LIMIT_SIZE = _applicationRegistry.getConfiguration().getBufferWriteLimit();
-
- _logger.debug("AMQPFastProtocolHandler created");
- }
-
- protected AMQPFastProtocolHandler(AMQPFastProtocolHandler handler)
- {
- this(handler._applicationRegistry);
- }
-
- public void sessionCreated(IoSession protocolSession) throws Exception
- {
- SessionUtil.initialize(protocolSession);
- final AMQCodecFactory codecFactory = new AMQCodecFactory(true);
-
- createSession(protocolSession, _applicationRegistry, codecFactory);
- _logger.info("Protocol session created for:" + protocolSession.getRemoteAddress());
-
- final QpidProtocolCodecFilter pcf = new QpidProtocolCodecFilter(codecFactory);
- final ServerConfiguration config = _applicationRegistry.getConfiguration();
-
- String keystorePath = config.getKeystorePath();
- String keystorePassword = config.getKeystorePassword();
- String certType = config.getCertType();
- SSLContextFactory sslContextFactory = null;
- boolean isSsl = false;
- if (config.getEnableSSL() && isSSLClient(config, protocolSession))
- {
- sslContextFactory = new SSLContextFactory(keystorePath, keystorePassword, certType);
- isSsl = true;
- }
- if (config.getEnableExecutorPool())
- {
- if (isSsl)
- {
- protocolSession.getFilterChain().addAfter("AsynchronousReadFilter", "sslFilter",
- new SSLFilter(sslContextFactory.buildServerContext()));
- }
- protocolSession.getFilterChain().addBefore("AsynchronousWriteFilter", "protocolFilter", pcf);
- }
- else
- {
- protocolSession.getFilterChain().addLast("protocolFilter", pcf);
- if (isSsl)
- {
- protocolSession.getFilterChain().addBefore("protocolFilter", "sslFilter",
- new SSLFilter(sslContextFactory.buildServerContext()));
- }
- }
-
- if (ApplicationRegistry.getInstance().getConfiguration().getProtectIOEnabled())
- {
- try
- {
-// //Add IO Protection Filters
- IoFilterChain chain = protocolSession.getFilterChain();
-
-
- protocolSession.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter());
-
- ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
- readfilter.setMaximumConnectionBufferSize(BUFFER_READ_LIMIT_SIZE);
- readfilter.attach(chain);
-
- WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
- writefilter.setMaximumConnectionBufferSize(BUFFER_WRITE_LIMIT_SIZE);
- writefilter.attach(chain);
-
- protocolSession.getFilterChain().remove("tempExecutorFilterForFilterBuilder");
- _logger.info("Using IO Read/Write Filter Protection");
- }
- catch (Exception e)
- {
- _logger.error("Unable to attach IO Read/Write Filter Protection :" + e.getMessage());
- }
- }
- }
-
- /** Separated into its own, protected, method to allow easier reuse */
- protected void createSession(IoSession session, IApplicationRegistry applicationRegistry, AMQCodecFactory codec) throws AMQException
- {
- new AMQMinaProtocolSession(session, applicationRegistry.getVirtualHostRegistry(), codec);
- }
-
- public void sessionOpened(IoSession protocolSession) throws Exception
- {
- _logger.info("Session opened for:" + protocolSession.getRemoteAddress());
- }
-
- public void sessionClosed(IoSession protocolSession) throws Exception
- {
- _logger.info("Protocol Session closed for:" + protocolSession.getRemoteAddress());
- final AMQProtocolSession amqProtocolSession = AMQMinaProtocolSession.getAMQProtocolSession(protocolSession);
- //fixme -- this can be null
- if (amqProtocolSession != null)
- {
- try
- {
- CurrentActor.set(amqProtocolSession.getLogActor());
- amqProtocolSession.closeSession();
- }
- catch (AMQException e)
- {
- _logger.error("Caught AMQException whilst closingSession:" + e);
- }
- finally
- {
- CurrentActor.remove();
- }
- }
- }
-
- public void sessionIdle(IoSession session, IdleStatus status) throws Exception
- {
- _logger.debug("Protocol Session [" + this + "] idle: " + status + " :for:" + session.getRemoteAddress());
- if (IdleStatus.WRITER_IDLE.equals(status))
- {
- //write heartbeat frame:
- session.write(HeartbeatBody.FRAME);
- }
- else if (IdleStatus.READER_IDLE.equals(status))
- {
- //failover:
- throw new IOException("Timed out while waiting for heartbeat from peer.");
- }
-
- }
-
- public void exceptionCaught(IoSession protocolSession, Throwable throwable) throws Exception
- {
- AMQProtocolSession session = AMQMinaProtocolSession.getAMQProtocolSession(protocolSession);
- if (throwable instanceof AMQProtocolHeaderException)
- {
-
- protocolSession.write(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()));
-
- protocolSession.close();
-
- _logger.error("Error in protocol initiation " + session + ":" + protocolSession.getRemoteAddress() + " :" + throwable.getMessage(), throwable);
- }
- else if (throwable instanceof IOException)
- {
- _logger.error("IOException caught in" + session + ", session closed implictly: " + throwable);
- }
- else
- {
- _logger.error("Exception caught in" + session + ", closing session explictly: " + throwable, throwable);
-
-
- MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(session.getProtocolVersion());
- ConnectionCloseBody closeBody = methodRegistry.createConnectionCloseBody(200,new AMQShortString(throwable.getMessage()),0,0);
-
- protocolSession.write(closeBody.generateFrame(0));
-
- protocolSession.close();
- }
- }
-
- /**
- * Invoked when a message is received on a particular protocol session. Note that a
- * protocol session is directly tied to a particular physical connection.
- *
- * @param protocolSession the protocol session that received the message
- * @param message the message itself (i.e. a decoded frame)
- *
- * @throws Exception if the message cannot be processed
- */
- public void messageReceived(IoSession protocolSession, Object message) throws Exception
- {
- final AMQProtocolSession amqProtocolSession = AMQMinaProtocolSession.getAMQProtocolSession(protocolSession);
-
- if (message instanceof AMQDataBlock)
- {
- amqProtocolSession.dataBlockReceived((AMQDataBlock) message);
-
- }
- else if (message instanceof ByteBuffer)
- {
- throw new IllegalStateException("Handed undecoded ByteBuffer buf = " + message);
- }
- else
- {
- throw new IllegalStateException("Handed unhandled message. message.class = " + message.getClass() + " message = " + message);
- }
- }
-
- /**
- * Called after a message has been sent out on a particular protocol session
- *
- * @param protocolSession the protocol session (i.e. connection) on which this
- * message was sent
- * @param object the message (frame) that was encoded and sent
- *
- * @throws Exception if we want to indicate an error
- */
- public void messageSent(IoSession protocolSession, Object object) throws Exception
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Message sent: " + object);
- }
- }
-
- protected boolean isSSLClient(ServerConfiguration connectionConfig,
- IoSession protocolSession)
- {
- InetSocketAddress addr = (InetSocketAddress) protocolSession.getLocalAddress();
- return addr.getPort() == connectionConfig.getSSLPort();
- }
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPProtocolProvider.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPProtocolProvider.java
deleted file mode 100644
index 07c153bfe8..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPProtocolProvider.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol;
-
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.IApplicationRegistry;
-
-/**
- * The protocol provide's role is to encapsulate the initialisation of the protocol handler.
- *
- * The protocol handler (see AMQPFastProtocolHandler class) handles protocol events
- * such as connection closing or a frame being received. It can either do this directly
- * or pass off to the protocol session in the cases where state information is required to
- * deal with the event.
- *
- */
-public class AMQPProtocolProvider
-{
- /**
- * Handler for protocol events
- */
- private AMQPFastProtocolHandler _handler;
-
- public AMQPProtocolProvider()
- {
- IApplicationRegistry registry = ApplicationRegistry.getInstance();
- _handler = new AMQPFastProtocolHandler(registry);
- }
-
- public AMQPFastProtocolHandler getHandler()
- {
- return _handler;
- }
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index 7bc4365152..3bcd102858 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -20,11 +20,25 @@
*/
package org.apache.qpid.server.protocol;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.security.Principal;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.management.JMException;
+import javax.security.sasl.SaslServer;
+
import org.apache.log4j.Logger;
-import org.apache.mina.common.CloseFuture;
-import org.apache.mina.common.IdleStatus;
-import org.apache.mina.common.IoServiceConfig;
-import org.apache.mina.common.IoSession;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
@@ -36,8 +50,10 @@ import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQProtocolHeaderException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.framing.ConnectionCloseBody;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
@@ -46,18 +62,20 @@ import org.apache.qpid.framing.MethodDispatcher;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.ProtocolInitiation;
import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.pool.ReadWriteThreadModel;
+import org.apache.qpid.pool.Job;
+import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
+import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.handler.ServerMethodDispatcherImpl;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.AMQPConnectionActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.subjects.ConnectionLogSubject;
-import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
+import org.apache.qpid.server.logging.subjects.ConnectionLogSubject;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.output.ProtocolOutputConverter;
@@ -67,24 +85,12 @@ import org.apache.qpid.server.state.AMQState;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.transport.NetworkDriver;
import org.apache.qpid.transport.Sender;
-import javax.management.JMException;
-import javax.security.sasl.SaslServer;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.security.Principal;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.atomic.AtomicLong;
-
-public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
+public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocolSession
{
- private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class);
+ private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class);
private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString();
@@ -94,8 +100,6 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
// channels. This value must be of the form 2^x - 1.
private static final int CHANNEL_CACHE_SIZE = 0xff;
- private final IoSession _minaProtocolSession;
-
private AMQShortString _contextKey;
private AMQShortString _clientVersion = null;
@@ -130,52 +134,48 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
private FieldTable _clientProperties;
private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
- private List<Integer> _closingChannelsList = new CopyOnWriteArrayList<Integer>();
+ private Map<Integer, Long> _closingChannelsList = new ConcurrentHashMap<Integer, Long>();
private ProtocolOutputConverter _protocolOutputConverter;
private Principal _authorizedID;
private MethodDispatcher _dispatcher;
private ProtocolSessionIdentifier _sessionIdentifier;
-
- private static final long LAST_WRITE_FUTURE_JOIN_TIMEOUT = 60000L;
- private org.apache.mina.common.WriteFuture _lastWriteFuture;
-
+
// Create a simple ID that increments for ever new Session
private final long _sessionID = idGenerator.getAndIncrement();
private AMQPConnectionActor _actor;
private LogSubject _logSubject;
+ private NetworkDriver _networkDriver;
+
+ private long _lastIoTime;
+
+ private long _writtenBytes;
+ private long _readBytes;
+
+ private Job _readJob;
+ private Job _writeJob;
+
+ private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance();
+
public ManagedObject getManagedObject()
{
return _managedObject;
}
- public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory)
- throws AMQException
+ public AMQProtocolEngine(VirtualHostRegistry virtualHostRegistry, NetworkDriver driver)
{
_stateManager = new AMQStateManager(virtualHostRegistry, this);
- _minaProtocolSession = session;
- session.setAttachment(this);
-
- _codecFactory = codecFactory;
+ _networkDriver = driver;
+
+ _codecFactory = new AMQCodecFactory(true, this);
+ _poolReference.acquireExecutorService();
+ _readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true);
+ _writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, false);
_actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger());
-
_actor.message(ConnectionMessages.CON_1001(null, null, false, false));
- try
- {
- IoServiceConfig config = session.getServiceConfig();
- ReadWriteThreadModel threadModel = (ReadWriteThreadModel) config.getThreadModel();
- threadModel.getAsynchronousReadFilter().createNewJobForSession(session);
- threadModel.getAsynchronousWriteFilter().createNewJobForSession(session);
- }
- catch (RuntimeException e)
- {
- e.printStackTrace();
- throw e;
-
- }
}
private AMQProtocolSessionMBean createMBean() throws AMQException
@@ -191,16 +191,6 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
}
}
- public IoSession getIOSession()
- {
- return _minaProtocolSession;
- }
-
- public static AMQProtocolSession getAMQProtocolSession(IoSession minaProtocolSession)
- {
- return (AMQProtocolSession) minaProtocolSession.getAttachment();
- }
-
public long getSessionID()
{
return _sessionID;
@@ -211,6 +201,42 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
return _actor;
}
+ @Override
+ public void received(final ByteBuffer msg)
+ {
+ _lastIoTime = System.currentTimeMillis();
+ try
+ {
+ final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
+ Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ // Decode buffer
+
+ for (AMQDataBlock dataBlock : dataBlocks)
+ {
+ try
+ {
+ dataBlockReceived(dataBlock);
+ }
+ catch (Exception e)
+ {
+ _logger.error("Unexpected exception when processing datablock", e);
+ closeProtocolSession();
+ }
+ }
+ }
+ });
+ }
+ catch (Exception e)
+ {
+ _logger.error("Unexpected exception when processing datablock", e);
+ closeProtocolSession();
+ }
+ }
+
public void dataBlockReceived(AMQDataBlock message) throws Exception
{
_lastReceived = message;
@@ -265,12 +291,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
}
else
{
- if (_logger.isInfoEnabled())
- {
- _logger.info("Channel[" + channelId + "] awaiting closure. Should close socket as client did not close-ok :" + frame);
- }
-
- closeProtocolSession();
+ // The channel has been told to close, we don't process any more frames until
+ // it's closed.
return;
}
}
@@ -314,21 +336,14 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
null,
mechanisms.getBytes(),
locales.getBytes());
- _minaProtocolSession.write(responseBody.generateFrame(0));
+ _networkDriver.send(responseBody.generateFrame(0).toNioByteBuffer());
}
catch (AMQException e)
{
_logger.info("Received unsupported protocol initiation for protocol version: " + getProtocolVersion());
- _minaProtocolSession.write(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()));
-
- // TODO: Close connection (but how to wait until message is sent?)
- // ritchiem 2006-12-04 will this not do?
- // WriteFuture future = _minaProtocolSession.write(new ProtocolInitiation(pv[i][PROTOCOLgetProtocolMajorVersion()], pv[i][PROTOCOLgetProtocolMinorVersion()]));
- // future.join();
- // close connection
-
+ _networkDriver.send(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()).toNioByteBuffer());
}
}
@@ -437,8 +452,17 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
public void writeFrame(AMQDataBlock frame)
{
_lastSent = frame;
-
- _lastWriteFuture = _minaProtocolSession.write(frame);
+ final ByteBuffer buf = frame.toNioByteBuffer();
+ _lastIoTime = System.currentTimeMillis();
+ _writtenBytes += buf.remaining();
+ Job.fireAsynchEvent(_poolReference.getPool(), _writeJob, new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ _networkDriver.send(buf);
+ }
+ });
}
public AMQShortString getContextKey()
@@ -483,7 +507,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
public boolean channelAwaitingClosure(int channelId)
{
- return !_closingChannelsList.isEmpty() && _closingChannelsList.contains(channelId);
+ return !_closingChannelsList.isEmpty() && _closingChannelsList.containsKey(channelId);
}
public void addChannel(AMQChannel channel) throws AMQException
@@ -495,7 +519,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
final int channelId = channel.getChannelId();
- if (_closingChannelsList.contains(channelId))
+ if (_closingChannelsList.containsKey(channelId))
{
throw new AMQException("Session is marked awaiting channel close");
}
@@ -539,7 +563,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
{
_maxNoOfChannels = value;
}
-
+
public void commitTransactions(AMQChannel channel) throws AMQException
{
if ((channel != null) && channel.isTransactional())
@@ -555,7 +579,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
channel.rollback();
}
}
-
+
/**
* Close a specific channel. This will remove any resources used by the channel, including: <ul><li>any queue
* subscriptions (this may in turn remove queues if they are auto delete</li> </ul>
@@ -602,7 +626,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
private void markChannelAwaitingCloseOk(int channelId)
{
- _closingChannelsList.add(channelId);
+ _closingChannelsList.put(channelId, System.currentTimeMillis());
}
/**
@@ -628,8 +652,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
{
if (delay > 0)
{
- _minaProtocolSession.setIdleTime(IdleStatus.WRITER_IDLE, delay);
- _minaProtocolSession.setIdleTime(IdleStatus.READER_IDLE, (int) (ApplicationRegistry.getInstance().getConfiguration().getHeartBeatTimeout() * delay));
+ _networkDriver.setMaxWriteIdle(delay);
+ _networkDriver.setMaxReadIdle((int) (ApplicationRegistry.getInstance().getConfiguration().getHeartBeatTimeout() * delay));
}
}
@@ -655,6 +679,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
/** This must be called when the session is _closed in order to free up any resources managed by the session. */
public void closeSession() throws AMQException
{
+ if (CurrentActor.get() == null)
+ {
+ CurrentActor.set(_actor);
+ }
if (!_closed)
{
if (_virtualHost != null)
@@ -672,9 +700,9 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
{
task.doTask(this);
}
-
+
_closed = true;
-
+ _poolReference.releaseExecutorService();
CurrentActor.get().message(_logSubject, ConnectionMessages.CON_1002());
}
}
@@ -699,21 +727,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
public void closeProtocolSession()
{
- closeProtocolSession(true);
- }
-
- public void closeProtocolSession(boolean waitLast)
- {
- if (waitLast && (_lastWriteFuture != null))
- {
- _logger.debug("Waiting for last write to join.");
- _lastWriteFuture.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
- }
-
- _logger.debug("REALLY Closing protocol session:" + _minaProtocolSession);
- final CloseFuture future = _minaProtocolSession.close();
- future.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
-
+ _networkDriver.close();
try
{
_stateManager.changeState(AMQState.CONNECTION_CLOSED);
@@ -726,7 +740,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
public String toString()
{
- return _minaProtocolSession.getRemoteAddress() + "(" + (getAuthorizedID() == null ? "?" : getAuthorizedID().getName() + ")");
+ return getRemoteAddress() + "(" + (getAuthorizedID() == null ? "?" : getAuthorizedID().getName() + ")");
}
public String dump()
@@ -737,7 +751,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
/** @return an object that can be used to identity */
public Object getKey()
{
- return _minaProtocolSession.getRemoteAddress();
+ return getRemoteAddress();
}
/**
@@ -748,7 +762,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
*/
public String getLocalFQDN()
{
- SocketAddress address = _minaProtocolSession.getLocalAddress();
+ SocketAddress address = _networkDriver.getLocalAddress();
// we use the vmpipe address in some tests hence the need for this rather ugly test. The host
// information is used by SASL primary.
if (address instanceof InetSocketAddress)
@@ -764,7 +778,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
throw new IllegalArgumentException("Unsupported socket address class: " + address);
}
}
-
+
public SaslServer getSaslServer()
{
return _saslServer;
@@ -837,7 +851,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
public Object getClientIdentifier()
{
- return (_minaProtocolSession != null) ? _minaProtocolSession.getRemoteAddress() : null;
+ return (_networkDriver != null) ? _networkDriver.getRemoteAddress() : null;
}
public VirtualHost getVirtualHost()
@@ -867,7 +881,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
{
_taskList.remove(task);
}
-
+
public ProtocolOutputConverter getProtocolOutputConverter()
{
return _protocolOutputConverter;
@@ -888,7 +902,12 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
public SocketAddress getRemoteAddress()
{
- return _minaProtocolSession.getRemoteAddress();
+ return _networkDriver.getRemoteAddress();
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ return _networkDriver.getLocalAddress();
}
public MethodRegistry getMethodRegistry()
@@ -901,23 +920,116 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
return _dispatcher;
}
- public ProtocolSessionIdentifier getSessionIdentifier()
+ @Override
+ public void closed()
{
- return _sessionIdentifier;
+ try
+ {
+ closeSession();
+ }
+ catch (AMQException e)
+ {
+ _logger.error("Could not close protocol engine", e);
+ }
}
- public String getClientVersion()
+ @Override
+ public void readerIdle()
{
- return (_clientVersion == null) ? null : _clientVersion.toString();
+ // Nothing
+ }
+
+ @Override
+ public void setNetworkDriver(NetworkDriver driver)
+ {
+ _networkDriver = driver;
}
- public void setSender(Sender<java.nio.ByteBuffer> sender)
+ @Override
+ public void writerIdle()
{
- // No-op, interface munging between this and AMQProtocolSession
+ _networkDriver.send(HeartbeatBody.FRAME.toNioByteBuffer());
+ }
+
+ @Override
+ public void exception(Throwable throwable)
+ {
+ if (throwable instanceof AMQProtocolHeaderException)
+ {
+
+ writeFrame(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()));
+ _networkDriver.close();
+
+ _logger.error("Error in protocol initiation " + this + ":" + getRemoteAddress() + " :" + throwable.getMessage(), throwable);
+ }
+ else if (throwable instanceof IOException)
+ {
+ _logger.error("IOException caught in" + this + ", session closed implictly: " + throwable);
+ }
+ else
+ {
+ _logger.error("Exception caught in" + this + ", closing session explictly: " + throwable, throwable);
+
+
+ MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(getProtocolVersion());
+ ConnectionCloseBody closeBody = methodRegistry.createConnectionCloseBody(200,new AMQShortString(throwable.getMessage()),0,0);
+
+ writeFrame(closeBody.generateFrame(0));
+
+ _networkDriver.close();
+ }
}
+ @Override
public void init()
{
- // No-op, interface munging between this and AMQProtocolSession
+ // Do nothing
+ }
+
+ @Override
+ public void setSender(Sender<ByteBuffer> sender)
+ {
+ // Do nothing
+ }
+
+ @Override
+ public long getReadBytes()
+ {
+ return _readBytes;
+ }
+
+ public long getWrittenBytes()
+ {
+ return _writtenBytes;
+ }
+
+ public long getLastIoTime()
+ {
+ return _lastIoTime;
+ }
+
+ public ProtocolSessionIdentifier getSessionIdentifier()
+ {
+ return _sessionIdentifier;
+ }
+
+ public String getClientVersion()
+ {
+ return (_clientVersion == null) ? null : _clientVersion.toString();
+ }
+
+ @Override
+ public void closeIfLingeringClosedChannels()
+ {
+ for (Entry<Integer, Long>id : _closingChannelsList.entrySet())
+ {
+ if (id.getValue() + 30000 > System.currentTimeMillis())
+ {
+ // We have a channel that we closed 30 seconds ago. Client's dead, kill the connection
+ _logger.error("Closing connection as channel was closed more than 30 seconds ago and no ChannelCloseOk has been processed");
+ closeProtocolSession();
+ }
+ }
}
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java
new file mode 100644
index 0000000000..ff0c007a60
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java
@@ -0,0 +1,29 @@
+package org.apache.qpid.server.protocol;
+
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.transport.NetworkDriver;
+
+public class AMQProtocolEngineFactory implements ProtocolEngineFactory
+{
+ private VirtualHostRegistry _vhosts;
+
+ public AMQProtocolEngineFactory()
+ {
+ this(1);
+ }
+
+ public AMQProtocolEngineFactory(Integer port)
+ {
+ _vhosts = ApplicationRegistry.getInstance(port).getVirtualHostRegistry();
+ }
+
+
+ public ProtocolEngine newProtocolEngine(NetworkDriver networkDriver)
+ {
+ return new AMQProtocolEngine(_vhosts, networkDriver);
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
index fff406bb3d..b16ed01c79 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
@@ -34,6 +34,7 @@ import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.security.Principal;
+import java.util.List;
public interface AMQProtocolSession extends AMQVersionAwareProtocolSession
@@ -210,5 +211,21 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession
public MethodDispatcher getMethodDispatcher();
public ProtocolSessionIdentifier getSessionIdentifier();
+
+ String getClientVersion();
+
+ long getLastIoTime();
+
+ long getWrittenBytes();
+
+ Long getMaximumNumberOfChannels();
+
+ void setMaximumNumberOfChannels(Long value);
+
+ void commitTransactions(AMQChannel channel) throws AMQException;
+
+ List<AMQChannel> getChannels();
+
+ void closeIfLingeringClosedChannels();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
index 81dbeeded2..8497c95e26 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
@@ -79,7 +79,7 @@ import org.apache.qpid.server.management.ManagedObject;
@MBeanDescription("Management Bean for an AMQ Broker Connection")
public class AMQProtocolSessionMBean extends AMQManagedObject implements ManagedConnection
{
- private AMQMinaProtocolSession _session = null;
+ private AMQProtocolSession _protocolSession = null;
private String _name = null;
// openmbean data types for representing the channel attributes
@@ -92,10 +92,10 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed
new AMQShortString("Broker Management Console has closed the connection.");
@MBeanConstructor("Creates an MBean exposing an AMQ Broker Connection")
- public AMQProtocolSessionMBean(AMQMinaProtocolSession session) throws NotCompliantMBeanException, OpenDataException
+ public AMQProtocolSessionMBean(AMQProtocolSession amqProtocolSession) throws NotCompliantMBeanException, OpenDataException
{
super(ManagedConnection.class, ManagedConnection.TYPE, ManagedConnection.VERSION);
- _session = session;
+ _protocolSession = amqProtocolSession;
String remote = getRemoteAddress();
remote = "anonymous".equals(remote) ? (remote + hashCode()) : remote;
_name = jmxEncode(new StringBuffer(remote), 0).toString();
@@ -128,52 +128,52 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed
public String getClientId()
{
- return (_session.getContextKey() == null) ? null : _session.getContextKey().toString();
+ return (_protocolSession.getContextKey() == null) ? null : _protocolSession.getContextKey().toString();
}
public String getAuthorizedId()
{
- return (_session.getAuthorizedID() != null ) ? _session.getAuthorizedID().getName() : null;
+ return (_protocolSession.getAuthorizedID() != null ) ? _protocolSession.getAuthorizedID().getName() : null;
}
public String getVersion()
{
- return (_session.getClientVersion() == null) ? null : _session.getClientVersion().toString();
+ return (_protocolSession.getClientVersion() == null) ? null : _protocolSession.getClientVersion().toString();
}
public Date getLastIoTime()
{
- return new Date(_session.getIOSession().getLastIoTime());
+ return new Date(_protocolSession.getLastIoTime());
}
public String getRemoteAddress()
{
- return _session.getIOSession().getRemoteAddress().toString();
+ return _protocolSession.getRemoteAddress().toString();
}
public ManagedObject getParentObject()
{
- return _session.getVirtualHost().getManagedObject();
+ return _protocolSession.getVirtualHost().getManagedObject();
}
public Long getWrittenBytes()
{
- return _session.getIOSession().getWrittenBytes();
+ return _protocolSession.getWrittenBytes();
}
public Long getReadBytes()
{
- return _session.getIOSession().getReadBytes();
+ return _protocolSession.getWrittenBytes();
}
public Long getMaximumNumberOfChannels()
{
- return _session.getMaximumNumberOfChannels();
+ return _protocolSession.getMaximumNumberOfChannels();
}
public void setMaximumNumberOfChannels(Long value)
{
- _session.setMaximumNumberOfChannels(value);
+ _protocolSession.setMaximumNumberOfChannels(value);
}
public String getObjectInstanceName()
@@ -192,13 +192,13 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed
CurrentActor.set(new ManagementActor(_logActor.getRootMessageLogger()));
try
{
- AMQChannel channel = _session.getChannel(channelId);
+ AMQChannel channel = _protocolSession.getChannel(channelId);
if (channel == null)
{
throw new JMException("The channel (channel Id = " + channelId + ") does not exist");
}
- _session.commitTransactions(channel);
+ _protocolSession.commitTransactions(channel);
}
catch (AMQException ex)
{
@@ -221,13 +221,13 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed
CurrentActor.set(new ManagementActor(_logActor.getRootMessageLogger()));
try
{
- AMQChannel channel = _session.getChannel(channelId);
+ AMQChannel channel = _protocolSession.getChannel(channelId);
if (channel == null)
{
throw new JMException("The channel (channel Id = " + channelId + ") does not exist");
}
- _session.rollbackTransactions(channel);
+ _protocolSession.commitTransactions(channel);
}
catch (AMQException ex)
{
@@ -248,7 +248,7 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed
public TabularData channels() throws OpenDataException
{
TabularDataSupport channelsList = new TabularDataSupport(_channelsType);
- List<AMQChannel> list = _session.getChannels();
+ List<AMQChannel> list = _protocolSession.getChannels();
for (AMQChannel channel : list)
{
@@ -274,7 +274,7 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed
public void closeConnection() throws JMException
{
- MethodRegistry methodRegistry = _session.getMethodRegistry();
+ MethodRegistry methodRegistry = _protocolSession.getMethodRegistry();
ConnectionCloseBody responseBody =
methodRegistry.createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(),
// replyCode
@@ -301,12 +301,12 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed
try
{
- _session.writeFrame(responseBody.generateFrame(0));
+ _protocolSession.writeFrame(responseBody.generateFrame(0));
try
{
- _session.closeSession();
+ _protocolSession.closeSession();
}
catch (AMQException ex)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
index 7511b5c818..1affdd6590 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
@@ -258,7 +258,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
for (InetSocketAddress bindAddress : _acceptors.keySet())
{
QpidAcceptor acceptor = _acceptors.get(bindAddress);
- acceptor.getIoAcceptor().unbind(bindAddress);
+ acceptor.getNetworkDriver().close();
CurrentActor.get().message(BrokerMessages.BRK_1003(acceptor.toString(), bindAddress.getPort()));
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java
index 5b3c3659c4..7450322130 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java
@@ -23,7 +23,6 @@ package org.apache.qpid.server.security.access.plugins.network;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
-import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
@@ -32,7 +31,6 @@ import org.apache.commons.configuration.CompositeConfiguration;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.XMLConfiguration;
-import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.security.access.ACLPlugin;
import org.apache.qpid.server.security.access.ACLPluginFactory;
@@ -180,13 +178,13 @@ public class FirewallPlugin extends AbstractACLPlugin
@Override
public AuthzResult authoriseConnect(AMQProtocolSession session, VirtualHost virtualHost)
{
- if (!(session instanceof AMQMinaProtocolSession))
+ SocketAddress sockAddr = session.getRemoteAddress();
+ if (!(sockAddr instanceof InetSocketAddress))
{
- return AuthzResult.ABSTAIN; // We only deal with tcp sessions, which
- // mean MINA right now
+ return AuthzResult.ABSTAIN; // We only deal with tcp sessions
}
- InetAddress addr = getInetAdressFromMinaSession((AMQMinaProtocolSession) session);
+ InetAddress addr = ((InetSocketAddress) sockAddr).getAddress();
if (addr == null)
{
@@ -213,19 +211,6 @@ public class FirewallPlugin extends AbstractACLPlugin
}
- private InetAddress getInetAdressFromMinaSession(AMQMinaProtocolSession session)
- {
- SocketAddress remote = session.getIOSession().getRemoteAddress();
- if (remote instanceof InetSocketAddress)
- {
- return ((InetSocketAddress) remote).getAddress();
- }
- else
- {
- return null;
- }
- }
-
public void setConfiguration(Configuration config) throws ConfigurationException
{
// Get default action
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java b/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java
index 61cc7cdeb6..3ca22b60c8 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java
@@ -20,21 +20,21 @@
*/
package org.apache.qpid.server.transport;
-import org.apache.mina.common.IoAcceptor;
+import org.apache.qpid.transport.NetworkDriver;
public class QpidAcceptor
{
- IoAcceptor _acceptor;
+ NetworkDriver _driver;
String _protocol;
- public QpidAcceptor(IoAcceptor acceptor, String protocol)
+ public QpidAcceptor(NetworkDriver driver, String protocol)
{
- _acceptor = acceptor;
+ _driver = driver;
_protocol = protocol;
}
- public IoAcceptor getIoAcceptor()
+ public NetworkDriver getNetworkDriver()
{
- return _acceptor;
+ return _driver;
}
public String toString()
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index 9ab3592628..3b776a62b4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -279,6 +279,14 @@ public class VirtualHost implements Accessable
_houseKeepingTimer.scheduleAtFixedRate(new RemoveExpiredMessagesTask(),
period / 2,
period);
+
+ class ForceChannelClosuresTask extends TimerTask
+ {
+ public void run()
+ {
+ _connectionRegistry.expireClosedChannels();
+ }
+ }
}
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java b/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
index ebfa80d139..08b95f9d77 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
@@ -20,26 +20,26 @@
*/
package org.apache.qpid.server.configuration;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+
import junit.framework.TestCase;
+
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.XMLConfiguration;
-import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
+import org.apache.qpid.server.protocol.AMQProtocolEngine;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.protocol.TestIoSession;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
-
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Locale;
+import org.apache.qpid.transport.TestNetworkDriver;
public class ServerConfigurationTest extends TestCase
{
@@ -589,12 +589,12 @@ public class ServerConfigurationTest extends TestCase
{
// Check default
ServerConfiguration serverConfig = new ServerConfiguration(_config);
- assertEquals(true, serverConfig.getSSLOnly());
+ assertEquals(false, serverConfig.getSSLOnly());
// Check value we set
- _config.setProperty("connector.ssl.sslOnly", false);
+ _config.setProperty("connector.ssl.sslOnly", true);
serverConfig = new ServerConfiguration(_config);
- assertEquals(false, serverConfig.getSSLOnly());
+ assertEquals(true, serverConfig.getSSLOnly());
}
public void testGetSSLPort() throws ConfigurationException
@@ -791,16 +791,15 @@ public class ServerConfigurationTest extends TestCase
// Test config
VirtualHostRegistry virtualHostRegistry = reg.getVirtualHostRegistry();
VirtualHost virtualHost = virtualHostRegistry.getVirtualHost("test");
- AMQCodecFactory codecFactory = new AMQCodecFactory(true);
- TestIoSession iosession = new TestIoSession();
- iosession.setAddress("127.0.0.1");
+ TestNetworkDriver testDriver = new TestNetworkDriver();
+ testDriver.setRemoteAddress("127.0.0.1");
- AMQProtocolSession session = new AMQMinaProtocolSession(iosession, virtualHostRegistry, codecFactory);
+ AMQProtocolEngine session = new AMQProtocolEngine(virtualHostRegistry, testDriver);
assertFalse(reg.getAccessManager().authoriseConnect(session, virtualHost));
- iosession.setAddress("127.1.2.3");
- session = new AMQMinaProtocolSession(iosession, virtualHostRegistry, codecFactory);
+ testDriver.setRemoteAddress("127.1.2.3");
+ session = new AMQProtocolEngine(virtualHostRegistry, testDriver);
assertTrue(reg.getAccessManager().authoriseConnect(session, virtualHost));
}
@@ -866,12 +865,12 @@ public class ServerConfigurationTest extends TestCase
// Test config
VirtualHostRegistry virtualHostRegistry = reg.getVirtualHostRegistry();
VirtualHost virtualHost = virtualHostRegistry.getVirtualHost("test");
- AMQCodecFactory codecFactory = new AMQCodecFactory(true);
- TestIoSession iosession = new TestIoSession();
- iosession.setAddress("127.0.0.1");
+ TestNetworkDriver testDriver = new TestNetworkDriver();
+ testDriver.setRemoteAddress("127.0.0.1");
- AMQProtocolSession session = new AMQMinaProtocolSession(iosession, virtualHostRegistry, codecFactory);
+ AMQProtocolEngine session = new AMQProtocolEngine(virtualHostRegistry, testDriver);
+ session.setNetworkDriver(testDriver);
assertFalse(reg.getAccessManager().authoriseConnect(session, virtualHost));
}
@@ -935,12 +934,11 @@ public class ServerConfigurationTest extends TestCase
ApplicationRegistry.initialise(reg, 1);
// Test config
- TestIoSession iosession = new TestIoSession();
- iosession.setAddress("127.0.0.1");
+ TestNetworkDriver testDriver = new TestNetworkDriver();
+ testDriver.setRemoteAddress("127.0.0.1");
VirtualHostRegistry virtualHostRegistry = reg.getVirtualHostRegistry();
VirtualHost virtualHost = virtualHostRegistry.getVirtualHost("test");
- AMQCodecFactory codecFactory = new AMQCodecFactory(true);
- AMQProtocolSession session = new AMQMinaProtocolSession(iosession, virtualHostRegistry, codecFactory);
+ AMQProtocolSession session = new AMQProtocolEngine(virtualHostRegistry, testDriver);
assertFalse(reg.getAccessManager().authoriseConnect(session, virtualHost));
RandomAccessFile fileBRandom = new RandomAccessFile(fileB, "rw");
diff --git a/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
index e199255f50..bc36c61382 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
@@ -44,7 +44,7 @@ public class AMQProtocolSessionMBeanTest extends TestCase
private static final Logger log = Logger.getLogger(AMQProtocolSessionMBeanTest.class);
private MessageStore _messageStore = new SkeletonMessageStore();
- private AMQMinaProtocolSession _protocolSession;
+ private AMQProtocolEngine _protocolSession;
private AMQChannel _channel;
private AMQProtocolSessionMBean _mbean;
diff --git a/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java b/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
index 37dfead2e5..ec7bf1cb72 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
@@ -20,23 +20,23 @@
*/
package org.apache.qpid.server.protocol;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.output.ProtocolOutputConverter;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-
+import java.security.Principal;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
-import java.security.Principal;
-public class InternalTestProtocolSession extends AMQMinaProtocolSession implements ProtocolOutputConverter
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.transport.TestNetworkDriver;
+
+public class InternalTestProtocolSession extends AMQProtocolEngine implements ProtocolOutputConverter
{
// ChannelID(LIST) -> LinkedList<Pair>
final Map<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>> _channelDelivers;
@@ -44,9 +44,7 @@ public class InternalTestProtocolSession extends AMQMinaProtocolSession implemen
public InternalTestProtocolSession(VirtualHost virtualHost) throws AMQException
{
- super(new TestIoSession(),
- ApplicationRegistry.getInstance().getVirtualHostRegistry(),
- new AMQCodecFactory(true));
+ super(ApplicationRegistry.getInstance().getVirtualHostRegistry(), new TestNetworkDriver());
_channelDelivers = new HashMap<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>>();
diff --git a/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java b/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java
index 8597fc5863..e37492bcb0 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java
@@ -37,7 +37,7 @@ import java.security.Principal;
/** Test class to test MBean operations for AMQMinaProtocolSession. */
public class MaxChannelsTest extends TestCase
{
- private AMQMinaProtocolSession _session;
+ private AMQProtocolEngine _session;
public void testChannels() throws Exception
{
diff --git a/java/broker/src/test/java/org/apache/qpid/server/protocol/TestIoSession.java b/java/broker/src/test/java/org/apache/qpid/server/protocol/TestIoSession.java
deleted file mode 100644
index 211f491867..0000000000
--- a/java/broker/src/test/java/org/apache/qpid/server/protocol/TestIoSession.java
+++ /dev/null
@@ -1,328 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol;
-
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.mina.common.CloseFuture;
-import org.apache.mina.common.IdleStatus;
-import org.apache.mina.common.IoFilterChain;
-import org.apache.mina.common.IoHandler;
-import org.apache.mina.common.IoService;
-import org.apache.mina.common.IoServiceConfig;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.common.IoSessionConfig;
-import org.apache.mina.common.ThreadModel;
-import org.apache.mina.common.TrafficMask;
-import org.apache.mina.common.TransportType;
-import org.apache.mina.common.WriteFuture;
-import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
-import org.apache.qpid.pool.ReadWriteThreadModel;
-
-/**
- * Test implementation of IoSession, which is required for some tests. Methods not being used are not implemented,
- * so if this class is being used and some methods are to be used, then please update those.
- */
-public class TestIoSession implements IoSession
-{
- private final ConcurrentMap attributes = new ConcurrentHashMap();
- private String _address = "127.0.0.1";
- private int _port = 1;
-
- public TestIoSession()
- {
- }
-
- public IoService getService()
- {
- return null;
- }
-
- public IoServiceConfig getServiceConfig()
- {
- return new TestIoConfig();
- }
-
- public IoHandler getHandler()
- {
- return null;
- }
-
- public IoSessionConfig getConfig()
- {
- return null;
- }
-
- public IoFilterChain getFilterChain()
- {
- return null;
- }
-
- public WriteFuture write(Object message)
- {
- return null;
- }
-
- public CloseFuture close()
- {
- return null;
- }
-
- public Object getAttachment()
- {
- return getAttribute("");
- }
-
- public Object setAttachment(Object attachment)
- {
- return setAttribute("",attachment);
- }
-
- public Object getAttribute(String key)
- {
- return attributes.get(key);
- }
-
- public Object setAttribute(String key, Object value)
- {
- return attributes.put(key,value);
- }
-
- public Object setAttribute(String key)
- {
- return attributes.put(key, Boolean.TRUE);
- }
-
- public Object removeAttribute(String key)
- {
- return attributes.remove(key);
- }
-
- public boolean containsAttribute(String key)
- {
- return attributes.containsKey(key);
- }
-
- public Set getAttributeKeys()
- {
- return attributes.keySet();
- }
-
- public TransportType getTransportType()
- {
- return null;
- }
-
- public boolean isConnected()
- {
- return false;
- }
-
- public boolean isClosing()
- {
- return false;
- }
-
- public CloseFuture getCloseFuture()
- {
- return null;
- }
-
- public SocketAddress getRemoteAddress()
- {
- return new InetSocketAddress(getAddress(), getPort());
- }
-
- public SocketAddress getLocalAddress()
- {
- return null;
- }
-
- public SocketAddress getServiceAddress()
- {
- return null;
- }
-
- public int getIdleTime(IdleStatus status)
- {
- return 0;
- }
-
- public long getIdleTimeInMillis(IdleStatus status)
- {
- return 0;
- }
-
- public void setIdleTime(IdleStatus status, int idleTime)
- {
-
- }
-
- public int getWriteTimeout()
- {
- return 0;
- }
-
- public long getWriteTimeoutInMillis()
- {
- return 0;
- }
-
- public void setWriteTimeout(int writeTimeout)
- {
-
- }
-
- public TrafficMask getTrafficMask()
- {
- return null;
- }
-
- public void setTrafficMask(TrafficMask trafficMask)
- {
-
- }
-
- public void suspendRead()
- {
-
- }
-
- public void suspendWrite()
- {
-
- }
-
- public void resumeRead()
- {
-
- }
-
- public void resumeWrite()
- {
-
- }
-
- public long getReadBytes()
- {
- return 0;
- }
-
- public long getWrittenBytes()
- {
- return 0;
- }
-
- public long getReadMessages()
- {
- return 0;
- }
-
- public long getWrittenMessages()
- {
- return 0;
- }
-
- public long getWrittenWriteRequests()
- {
- return 0;
- }
-
- public int getScheduledWriteRequests()
- {
- return 0;
- }
-
- public int getScheduledWriteBytes()
- {
- return 0;
- }
-
- public long getCreationTime()
- {
- return 0;
- }
-
- public long getLastIoTime()
- {
- return 0;
- }
-
- public long getLastReadTime()
- {
- return 0;
- }
-
- public long getLastWriteTime()
- {
- return 0;
- }
-
- public boolean isIdle(IdleStatus status)
- {
- return false;
- }
-
- public int getIdleCount(IdleStatus status)
- {
- return 0;
- }
-
- public long getLastIdleTime(IdleStatus status)
- {
- return 0;
- }
-
- public void setAddress(String string)
- {
- this._address = string;
- }
-
- public String getAddress()
- {
- return _address;
- }
-
- public void setPort(int _port)
- {
- this._port = _port;
- }
-
- public int getPort()
- {
- return _port;
- }
-
- /**
- * Test implementation of IoServiceConfig
- */
- private class TestIoConfig extends SocketAcceptorConfig
- {
- public ThreadModel getThreadModel()
- {
- return ReadWriteThreadModel.getInstance();
- }
- }
-}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
index 8c6260ca9e..19470e6226 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
@@ -20,37 +20,34 @@
*/
package org.apache.qpid.server.queue;
+import java.util.ArrayList;
+import java.util.LinkedList;
+
+import javax.management.Notification;
+
import junit.framework.TestCase;
+
+import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MemoryMessageStore;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.txn.NonTransactionalContext;
-import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.protocol.AMQProtocolEngine;
+import org.apache.qpid.server.protocol.InternalTestProtocolSession;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
-import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
-import org.apache.qpid.server.protocol.InternalTestProtocolSession;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.commons.configuration.CompositeConfiguration;
-import org.apache.mina.common.ByteBuffer;
-
-import javax.management.Notification;
-
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.Collections;
-import java.util.Set;
-import java.security.Principal;
+import org.apache.qpid.server.txn.NonTransactionalContext;
+import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.virtualhost.VirtualHost;
/** This class tests all the alerts an AMQQueue can throw based on threshold values of different parameters */
public class AMQQueueAlertTest extends TestCase
@@ -62,7 +59,7 @@ public class AMQQueueAlertTest extends TestCase
private AMQQueue _queue;
private AMQQueueMBean _queueMBean;
private VirtualHost _virtualHost;
- private AMQMinaProtocolSession _protocolSession;
+ private AMQProtocolEngine _protocolSession;
private MessageStore _messageStore = new MemoryMessageStore();
private StoreContext _storeContext = new StoreContext();
private TransactionalContext _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext,
diff --git a/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java b/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java
index a497365b06..5d3335c001 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java
@@ -30,17 +30,14 @@ import java.net.InetSocketAddress;
import junit.framework.TestCase;
import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.configuration.XMLConfiguration;
-import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
-import org.apache.qpid.server.protocol.TestIoSession;
+import org.apache.qpid.server.protocol.AMQProtocolEngine;
+import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.security.access.ACLPlugin.AuthzResult;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
-import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.transport.TestNetworkDriver;
public class FirewallPluginTest extends TestCase
{
@@ -84,22 +81,22 @@ public class FirewallPluginTest extends TestCase
private TestableMemoryMessageStore _store;
private VirtualHost _virtualHost;
- private AMQMinaProtocolSession _session;
+ private AMQProtocolEngine _session;
+ private TestNetworkDriver _testDriver;
@Override
public void setUp() throws Exception
{
super.setUp();
_store = new TestableMemoryMessageStore();
- TestIoSession iosession = new TestIoSession();
- iosession.setAddress("127.0.0.1");
+ _testDriver = new TestNetworkDriver();
+ _testDriver.setRemoteAddress("127.0.0.1");
// Retreive VirtualHost from the Registry
VirtualHostRegistry virtualHostRegistry = ApplicationRegistry.getInstance().getVirtualHostRegistry();
_virtualHost = virtualHostRegistry.getVirtualHost("test");
- AMQCodecFactory codecFactory = new AMQCodecFactory(true);
- _session = new AMQMinaProtocolSession(iosession, virtualHostRegistry, codecFactory);
+ _session = new AMQProtocolEngine(virtualHostRegistry, _testDriver);
}
public void tearDown() throws Exception
@@ -170,7 +167,7 @@ public class FirewallPluginTest extends TestCase
assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
// Set session IP so that we're connected from the right address
- ((TestIoSession) _session.getIOSession()).setAddress("192.168.23.23");
+ _testDriver.setRemoteAddress("192.168.23.23");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
@@ -185,7 +182,7 @@ public class FirewallPluginTest extends TestCase
assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
// Set session IP so that we're connected from the right address
- ((TestIoSession) _session.getIOSession()).setAddress("192.168.23.23");
+ _testDriver.setRemoteAddress("192.168.23.23");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
@@ -198,7 +195,7 @@ public class FirewallPluginTest extends TestCase
FirewallPlugin plugin = initialisePlugin("deny", new RuleInfo[]{rule});
// Set session IP so that we're connected from the right address
- ((TestIoSession) _session.getIOSession()).setAddress("127.0.0.1");
+ _testDriver.setRemoteAddress("127.0.0.1");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
@@ -211,7 +208,7 @@ public class FirewallPluginTest extends TestCase
FirewallPlugin plugin = initialisePlugin("deny", new RuleInfo[]{rule});
// Set session IP so that we're connected from the right address
- ((TestIoSession) _session.getIOSession()).setAddress("127.0.0.1");
+ _testDriver.setRemoteAddress("127.0.0.1");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
@@ -234,7 +231,7 @@ public class FirewallPluginTest extends TestCase
assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
// Set session IP so that we're connected from the right address
- ((TestIoSession) _session.getIOSession()).setAddress("192.168.23.23");
+ _testDriver.setRemoteAddress("192.168.23.23");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
@@ -257,7 +254,7 @@ public class FirewallPluginTest extends TestCase
assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
// Set session IP so that we're connected from the right address
- ((TestIoSession) _session.getIOSession()).setAddress("192.168.23.23");
+ _testDriver.setRemoteAddress("192.168.23.23");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
@@ -271,7 +268,7 @@ public class FirewallPluginTest extends TestCase
assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
// Set session IP so that we're connected from the right address
- ((TestIoSession) _session.getIOSession()).setAddress("192.168.23.23");
+ _testDriver.setRemoteAddress("192.168.23.23");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
@@ -285,7 +282,7 @@ public class FirewallPluginTest extends TestCase
assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
// Set session IP so that we're connected from the right address
- ((TestIoSession) _session.getIOSession()).setAddress("192.168.23.23");
+ _testDriver.setRemoteAddress("192.168.23.23");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
@@ -295,11 +292,11 @@ public class FirewallPluginTest extends TestCase
firstRule.setAccess("allow");
firstRule.setHostname("foo, bar, "+new InetSocketAddress("127.0.0.1", 5672).getHostName());
FirewallPlugin plugin = initialisePlugin("deny", new RuleInfo[]{firstRule});
- ((TestIoSession) _session.getIOSession()).setAddress("10.0.0.1");
+ _testDriver.setRemoteAddress("10.0.0.1");
assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
// Set session IP so that we're connected from the right address
- ((TestIoSession) _session.getIOSession()).setAddress("127.0.0.1");
+ _testDriver.setRemoteAddress("127.0.0.1");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}